第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > 隐马尔科夫模型HMM之Baum-Welch算法Python代码实现

隐马尔科夫模型HMM之Baum-Welch算法Python代码实现

时间:2021-08-20 09:59:19

相关推荐

隐马尔科夫模型HMM之Baum-Welch算法Python代码实现

☕️ 本文系列文章汇总:

(1)HMM开篇:基本概念和几个要素

(2)HMM计算问题:前后向算法

代码实现

(3)HMM学习问题:Baum-Welch算法

(4) HMM预测问题:维特比算法

本篇算法原理分析及公式推导请参考: HMM学习问题:Baum-Welch算法

原理解析及公式推导已在系列博客中介绍,本篇重点用python实现一下Baum-Welch算法,走起~

目录

1. 初始化一些参数

2. 定义前向算法获得α_{ij}

3. 定义后向算法获得β_{ij}

4. 根据《统计学习方法》公式10.24计算γ_{t}(i)

5. 根据《统计学习方法》公式10.26计算ξ_{t}(i, j)

6. 根据《统计学习方法》算法【10.4】定义模型训练过程

7. 整体代码

8. 实例

1. 初始化一些参数

def __init__(self, N, M, V):self.A = np.random.dirichlet(np.ones(N), size=N) # 状态转移概率矩阵self.B = np.random.dirichlet(np.ones(M), size=N) # 观测概率矩阵self.pi = np.array(np.random.dirichlet(np.ones(N), size=1))[0] # 初始状态概率矩阵self.V = V # 所有可能的观测self.N = N # 所有可能的状态长度self.M = M # 所有可能的观测长度

这里用到的`np.random.dirichlet(args, size)`是随机生成一个维度为args,size行的数组,并保证每一行之和为1

2. 定义前向算法获得α_{ij}

def forward(self):"""前向算法,Baum welch算法需要用到:param O: 已知的观测序列:return: alpha_{i}"""row, col = len(self.O), self.A.shape[0]alpha_t_plus_1 = np.zeros((row, col))obj_index = self.V.index(self.O[0])# 初值α 公式10.15alpha_t_plus_1[0][:] = self.pi * self.B[:].T[obj_index]for t, o in enumerate(self.O[1:]):t += 1# 递推 公式10.16obj_index = self.V.index(o)alpha_ji = alpha_t_plus_1[t - 1][:].T @ self.Aalpha_t_plus_1[t][:] = alpha_ji * self.B[:].T[obj_index]self.alpha = alpha_t_plus_1

3. 定义后向算法获得β_{ij}

def backward(self):"""后向算法,Baum welch算法需要用到:param O: 已知的观测序列:return: beta_{i}"""row, col = len(self.O), self.A.shape[0]betaT = np.zeros((row + 1, col))# 初值β 公式10.19betaT[0][:] = [1] * self.A.shape[0]for t, o in enumerate(self.O[::-1][1:]):t += 1# 反向递推 公式10.20obj_index = self.V.index(self.O[t - 1])beta_t = self.A * self.B[:].T[obj_index] @ betaT[t - 1][:].TbetaT[t][:] = beta_t# 由于我们这里要的是beta矩阵,不做probs的计算,所以不需要这一行,即不计算公式【10.27】# betaT[-1][:] = [self.pi[i] * self.B[i][self.V.index(self.O[0])] * betaT[-2][i] for i in range(self.A.shape[0])]# 注意这里计算后向算法时,betaT是倒着存放的,所以我们需要按照beta1,beta2,...,betaT的顺序取self.beta = betaT[:-1][::-1]

上述前向和后向算法的具体实现在上一篇博客已经给出,这里不再解释

4. 根据《统计学习方法》公式10.24计算γ_{t}(i)

def gamma(self, t, i):"""根据课本公式【10.24】计算γ:param t: 当前时间点:param i: 当前状态节点:return: γ值"""numerator = self.alpha[t][i] * self.beta[t][i]denominator = 0.for j in range(self.N):denominator += (self.alpha[t][j] * self.beta[t][j])return numerator / denominator

5. 根据《统计学习方法》公式10.26计算ξ_{t}(i, j)

def ksi(self, t, i, j):"""根据公式【10.26】计算 ξ:param t: 当前时间点:param i: 当前状态节点:param j: 同i:return:"""obj_index = self.V.index(self.O[t + 1])numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]denominator = 0.for i in range(self.N):for j in range(self.N):denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]return numerator / denominator

6. 根据《统计学习方法》算法【10.4】定义模型训练过程

def train(self, O, n):"""根据算法【10.4】训练模型:param O: 已知观测序列:param n: 最大迭代步长:return: 模型参数λ=(π,A,B)"""self.O = Oself.T = len(O)maxIter = 0while maxIter < n:tempA = np.zeros((self.N, self.N))tempB = np.zeros((self.N, self.M))tempPi = np.array([0.] * self.N)# 根据前向算法和后向算法得到α和βself.forward()self.backward()maxIter += 1# a_{ij},公式【10.39】for i in range(self.N):for j in range(self.N):numerator = 0.denominator = 0.for t in range(self.T - 1):numerator += self.ksi(t, i, j)denominator += self.gamma(t, i)tempA[i][j] = numerator / denominator# b_{i}{j},公式【10.40】for j in range(self.N):for k in range(self.M):numerator = 0.denominator = 0.for t in range(self.T):if self.O[t] == self.V[k]:numerator += self.gamma(t, j)denominator += self.gamma(t, j)tempB[j][k] = numerator / denominator# π_{i},公式【10.41】for i in range(self.N):tempPi[i] = self.gamma(0, i)# 更新self.A = tempAself.B = tempBself.pi = tempPireturn AttrDict(pi=self.pi,A=self.A,B=self.B)

7. 整体代码

import randomimport numpy as nprandom.seed(1) # 好像不起租用class AttrDict(dict):# 一个小trick,将结果返回成一个字典格式def __init__(self, *args, **kwargs):super(AttrDict, self).__init__(*args, **kwargs)self.__dict__ = selfclass Baum_Welch:def __init__(self, N, M, V):self.A = np.random.dirichlet(np.ones(N), size=N) # 状态转移概率矩阵self.B = np.random.dirichlet(np.ones(M), size=N) # 观测概率矩阵self.pi = np.array(np.random.dirichlet(np.ones(N), size=1))[0] # 初始状态概率矩阵self.V = V # 所有可能的观测self.N = N # 所有可能的状态长度self.M = M # 所有可能的观测长度def forward(self):"""前向算法,Baum welch算法需要用到:param O: 已知的观测序列:return: alpha_{i}"""row, col = len(self.O), self.A.shape[0]alpha_t_plus_1 = np.zeros((row, col))obj_index = self.V.index(self.O[0])# 初值α 公式10.15alpha_t_plus_1[0][:] = self.pi * self.B[:].T[obj_index]for t, o in enumerate(self.O[1:]):t += 1# 递推 公式10.16obj_index = self.V.index(o)alpha_ji = alpha_t_plus_1[t - 1][:].T @ self.Aalpha_t_plus_1[t][:] = alpha_ji * self.B[:].T[obj_index]self.alpha = alpha_t_plus_1def backward(self):"""后向算法,Baum welch算法需要用到:param O: 已知的观测序列:return: beta_{i}"""row, col = len(self.O), self.A.shape[0]betaT = np.zeros((row + 1, col))# 初值β 公式10.19betaT[0][:] = [1] * self.A.shape[0]for t, o in enumerate(self.O[::-1][1:]):t += 1# 反向递推 公式10.20obj_index = self.V.index(self.O[t - 1])beta_t = self.A * self.B[:].T[obj_index] @ betaT[t - 1][:].TbetaT[t][:] = beta_t# betaT[-1][:] = [self.pi[i] * self.B[i][self.V.index(self.O[0])] * betaT[-2][i] for i in range(self.A.shape[0])]self.beta = betaT[:-1][::-1]def gamma(self, t, i):"""根据课本公式【10.24】计算γ:param t: 当前时间点:param i: 当前状态节点:return: γ值"""numerator = self.alpha[t][i] * self.beta[t][i]denominator = 0.for j in range(self.N):denominator += (self.alpha[t][j] * self.beta[t][j])return numerator / denominatordef ksi(self, t, i, j):"""根据公式【10.26】计算 ξ:param t: 当前时间点:param i: 当前状态节点:param j: 同i:return:"""obj_index = self.V.index(self.O[t + 1])numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]denominator = 0.for i in range(self.N):for j in range(self.N):denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]return numerator / denominatordef train(self, O, n):"""根据算法【10.4】训练模型:param O: 已知观测序列:param n: 最大迭代步长:return: 模型参数λ=(π,A,B)"""self.O = Oself.T = len(O)maxIter = 0while maxIter < n:tempA = np.zeros((self.N, self.N))tempB = np.zeros((self.N, self.M))tempPi = np.array([0.] * self.N)# 根据前向算法和后向算法得到α和βself.forward()self.backward()maxIter += 1# a_{ij},公式【10.39】for i in range(self.N):for j in range(self.N):numerator = 0.denominator = 0.for t in range(self.T - 1):numerator += self.ksi(t, i, j)denominator += self.gamma(t, i)tempA[i][j] = numerator / denominator# b_{i}{j},公式【10.40】for j in range(self.N):for k in range(self.M):numerator = 0.denominator = 0.for t in range(self.T):if self.O[t] == self.V[k]:numerator += self.gamma(t, j)denominator += self.gamma(t, j)tempB[j][k] = numerator / denominator# π_{i},公式【10.41】for i in range(self.N):tempPi[i] = self.gamma(0, i)# 更新self.A = tempAself.B = tempBself.pi = tempPireturn AttrDict(pi=self.pi,A=self.A,B=self.B)

8. 实例

if __name__ == '__main__':bm = Baum_Welch(N=3, M=2, V=['红', '白'])O = ['红', '白', '红']res = bm.train(O, 3)print(res.pi)print(res.A)print(res.B)

我们将n设置为3轮,可以得到如下结果:

π: [0.38663305 0.61098003 0.00238692]A: [[0.00183726 0.03990598 0.95825676][0.03327835 0.93088611 0.03583554][0.00324882 0.9861656 0.01058559]]B: [[0.98736893 0.01263107][0.72787272 0.27212728][0.03464711 0.96535289]]

可以看出,每个参数的每一行之和约等于1,这是正确的。要注意,由于这里使用random随机初始化,所以每次初始化的结果都不一样。参数的初始化很影响最后的计算结果,这是个十分玄学的过程。因为原理及公式推导我已经弄明白了,所以我在编写代码的时候,基本不会再去纠结于公式内涵,基本是无脑按照公式来写的,这样会降低错误概率。

代码已经放到GitHub上了,我将会持续更新其它算法的实现。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。