import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
import random
import math
from scipy.stats import beta
from scipy.stats import norm
from mpl_toolkits.mplot3d import Axes3D
from scipy.stats import multivariate_normal
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
# pdf - 概率密度函数
stats.norm(loc=0, scale=1).pdf(0)
0.3989422804014327
x = np.linspace(-5,5,num=100)
gauss = stats.norm(loc=0, scale=1)
y = gauss.pdf(x)
plt.plot(x, y)
[<matplotlib.lines.Line2D at 0x228958c71c0>]
# cdf - 累计概率密度函数
stats.norm(loc=0, scale=1).cdf(0)
0.5
x = np.linspace(-5,5,num=100)
gauss = stats.norm(loc=0, scale=1)
y = gauss.cdf(x)
plt.plot(x, y)
[<matplotlib.lines.Line2D at 0x22896979e20>]
随机抽样
# rvs - 随机抽样
stats.norm(loc=0, scale=1).rvs(1)
array([0.49671415])
gauss = stats.norm(loc=0, scale=1)
y = gauss.rvs(1000)
sns.distplot(y)
<AxesSubplot:ylabel='Density'>
x = np.random.normal(loc=0, scale=1, size=1000)
sns.distplot(x)
<AxesSubplot:ylabel='Density'>
随机正整数采样
# 返回 [low, high) 之间的随机整数
np.random.randint(low=0, high=3, size=1)
array([2])
# 返回 [0, 3) 之间的随机整数
np.random.randint(3)
2
多项式分布
# 根据概率分布pvals,采样n个样本
np.random.multinomial(n=1, pvals=[0.1, 0.7, 0.2])
array([0, 1, 0])
均匀分布采样
np.random.uniform(0, 1)
0.6977562760734295
目的:用标准正态分布(建议分布)模拟另外一个标准正态分布(目标分布)。(一般情况下,目标分布是不好采样的分布,建议分布是比较好采样的分布,此处仅用于模拟)
目标分布:N(0, 5)
建议分布:N(上一个采样点, 2) 注:上一个采样点的位置,相当于生成当前采样点的条件
samp_num = 10000
result = []
init_x = 1
result.append(init_x)
p = lambda r:stats.norm.pdf(r, 0,5)
q = lambda v:stats.norm.rvs(loc = v,scale = 2, size = 1)
for i in range(samp_num):
# 1. 建议分布 - 采样
x_pre = result[i]
x_sample = q(x_pre) # 在上一个采样点周围,生成当前采样点
# 2. 接受概率
alpha = min(1, p(x_sample) / p(result[i]))
# 3. 判断是否接受
u = np.random.rand(1)
if u < alpha:
result.append(x_sample[0])
else:
result.append(result[i])
print("\r process: %d" % (i), end = "")
process: 9999
x = np.linspace(-20,20,100)
raw_y = stats.norm.pdf(x,0,5)
plt.plot(raw_y);
plt.title('P - distribution')
Text(0.5, 1.0, 'P - distribution')
sns.distplot(result )
plt.title('Sample distplot')
Text(0.5, 1.0, 'Sample distplot')
sns.kdeplot(result, label='Sample' )
plt.plot(x, raw_y, label='Raw' )
plt.legend()
plt.title('Sample Vs Raw')
Text(0.5, 1.0, 'Sample Vs Raw')
def mcmc(p, m=5000, n=10000, isMH=True):
'''
功能:MCMC采样
p:待采样的概率分布
m:燃烧期采样的样本数
n:燃烧期后采样的样本数
isMH:
'''
# 初始状态
x0 = np.random.randint(low=0, high=len(p), size=1)[0]
print('初始状态:', x0)
cnt_all = 0 # 采样的个数,当该数值大于m时,认为已经过了燃烧期
cnt_accepted = 0 # 被接受样本的个数
sample_all = [x0] # 采样并且被接受的样本
sample_after_burn = [] # 燃烧期过后,采样并且被接受的样本
while len(sample_after_burn) < n:
# 1. 从采样并且被接受的样本中取出最后一个,作为上一个样本
x_pre = sample_all[cnt_accepted]
# 2. 从上一个状态 x_pre,按照转移概率 A[x_pre] 进行状态转移,得到下一个状态
x_cur = np.argmax(np.random.multinomial(n=1, pvals=A[x_pre]))
cnt_all += 1
# 3. 计算接受概率
if isMH:
a = (p[x_cur]*A[x_cur][x_pre]) / (p[x_pre]*A[x_pre][x_cur]) # 计算是否满足马氏平稳条件
alpha = min(a,1)
else:
alpha = p[x_cur] * A[x_cur][X]
# 4. 生成阈值
u = np.random.uniform(0,1)
# 5. 是否接受样本
if u <= alpha:
cnt_accepted += 1
sample_all.append(x_cur)
# 已经过燃烧期
if cnt_all > m:
sample_after_burn.append(x_cur)
return sample_after_burn
# 待采样分布
p = np.array([0.4, 0.2, 0.4])
# 转移矩阵
A = np.array([[0.5, 0.25, 0.25], [0.5, 0, 0.5], [0.25, 0.25, 0.5]], dtype=np.float64)
# 采样
sample_after_burn = mcmc(p)
初始状态: 0
df_sample = pd.DataFrame(pd.Series(sample_after_burn).value_counts()).reset_index().sort_values(by='index')
df_sample.columns = ['state', 'cnt']
df_sample = df_sample.reset_index()[['state', 'cnt']]
df_sample['label'] = df_sample.state.apply(lambda x: 'state' + str(x))
df_sample
state | cnt | label | |
---|---|---|---|
0 | 0 | 3997 | state0 |
1 | 1 | 2023 | state1 |
2 | 2 | 3980 | state2 |
plt.figure(figsize=(5,5))
plt.pie(df_sample.cnt.values, labels=df_sample.label.values,labeldistance=1.1,autopct='%1.2f%%');
假设要采样的是二维正态分布$N(\mu, \Sigma)$,其中:
$$
\mu = (\mu_1, \mu_2) = (5, -1) \\
\Sigma=\left(\begin{array}{cc}
\sigma_1^2 & \rho \sigma_1 \sigma_2 \\
\rho \sigma_1 \sigma_2 & \sigma_2^2
\end{array}\right)=\left(\begin{array}{ll}
1 & 1 \\
1 & 4
\end{array}\right) \\
\rho = 0.5
$$
samplesource = multivariate_normal(mean=[5,-1], cov=[[1,0.5],[0.5,2]])
fig = plt.figure()
ax = Axes3D(fig)
X = np.arange(0.3,9,0.1)
Y = np.arange(-8,6,0.1)
X,Y = np.meshgrid(X,Y)
pos = np.empty(X.shape + (2,)) # 从x.shape=(200,200)变为(200,200,2)
pos[:, :, 0] = X
pos[:, :, 1] = Y
Z = samplesource.pdf(pos)
ax.plot_surface(X,Y,Z,rstride=1,cstride=1,cmap='rainbow')
plt.show()
fig = plt.figure()
ax = Axes3D(fig, rect=[0, 0, 1, 1], elev=10, azim=20)
ax.scatter(pos.reshape(-1, 2)[:,0], pos.reshape(-1, 2)[:,1], Z.reshape(-1,1),marker='o',c= '#00CED1')
plt.show()
def p_ygivenx(x, m1, m2, s1, s2):
'''
p(y | x)
'''
return (random.normalvariate(m2 + rho * s2 / s1 * (x - m1), math.sqrt(1 - rho ** 2) * s2))
def p_xgiveny(y, m1, m2, s1, s2):
'''
p(x | y)
'''
return (random.normalvariate(m1 + rho * s1 / s2 * (y - m2), math.sqrt(1 - rho ** 2) * s1))
samplesource = multivariate_normal(mean=[5,-1], cov=[[1,0.5],[0.5,2]])
N = 10000
x_res = []
y_res = []
z_res = []
m1 = 5
m2 = -1
s1 = 1
s2 = 2
rho = 0.5
x = np.random.uniform(0, 1)
y = np.random.uniform(0, 1)
z = samplesource.pdf([x,y])
x_res.append(x)
y_res.append(y)
z_res.append(z)
for i in range(N):
x_cur = x_res[-1]
y_cur = y_res[-1]
x = p_xgiveny(y_cur, m1, m2, s1, s2) #y给定得到x的采样
y = p_ygivenx(x_cur, m1, m2, s1, s2) #x给定得到y的采样
z = samplesource.pdf([x,y])
x_res.append(x)
y_res.append(y)
z_res.append(z)
num_bins = 50
sns.distplot(x_res, label='x')
sns.distplot(y_res, label='y')
plt.title('Histogram')
plt.legend()
plt.show()
fig = plt.figure()
ax = Axes3D(fig, rect=[0, 0, 1, 1], elev=10, azim=20)
ax.scatter(x_res, y_res, z_res,marker='o',c= '#00CED1')
plt.show()