隐马尔科夫模型

隐马尔可夫模型

解决时序性的预测问题,我们通常会用到HMM模型(隐马尔科夫模型),但在开始介绍HMM模型之前,我们有必要先了解它的前置知识:

  • 马尔科夫链
  • 马尔科夫模型

马尔科夫链

现有状态只和上一个状态有关,而未来状态只与现有状态有关:

满足这样的一种结构,我们就称之为马尔科夫链

如果一个系统,在做不同状态之间转换的时候,当前状态只受到过的一个状态的影响,和其他状态都没有关系;换句话说,这个系统内的未来的一个状态只受到当前状态的影响,和其他状态都无关,满足这样性质的一个系统,我们就称之为是一个具有马尔科夫性的系统。

一个马尔科夫链,需要具备以下参数:

  • 初始分布
  • 转移概率和转移概率矩阵

初始分布

其中,初始分布如下:

$$
\pi_j(0) = P\{ξ_0=j\}
$$

这个公式的含义是在一个系统内,各个状态初始时的概率分布情况。其中$\pi$是概率分布的意思。举例说明:

例如:假设一个系统内只有1、2、3这三种状态,其中状态1出现的概率为0.2,状态2出现的概率是0.3,状态3出现的概率是0.5。这三个概率即初始概率分布。

转移概率和转移概率矩阵

继续上面的例子。当我们处于状态1时,下一个状态可能是状态2,可能是状态3,也可能是状态1。如果状态1变为状态2的概率是0.2,状态1变为状态3的概率是0.3,状态1变为状态1的概率是0.5:

这里的概率就是转移概率

对于某一个状态来说,具有的转移概率一共有3个,那么对于3种状态来说一共有3x3=9个转移概率。我们可以用一个3x3的矩阵来表示。这个矩阵就被称为转移概率矩阵


一个具体的例子,愚蠢的顾客

  • 某同类物品A、B、C的宣传力度不同,愚蠢的顾客在广告宣传的效应下,第一次尝试选择购买A、B、C的概率为0.2,0.4,0.4。经零售商统计,顾客的购买倾向为下表,尝试求某顾客第四次来购买各物品的概率:

在这个例子中,第一次购买A、B、C的概率0.2,0.4,0.4就是初始分布,上面的那个表就是状态转移矩阵

可观测的马尔科夫模型

上面的描述都是关于马尔科夫链的,那么什么是马尔科夫模型呢?

  • 对于一个问题而言,我们有初始分布$\pi$,转移概率矩阵A,在给定的任意一个时刻t,我们都有一个状态$q_t$,随着时间的变化,一个状态转移到另一个状态,我们便能得到一个观测序列,即为状态序列$O=[q_1,q_2,q_3,q_4,…,q_m]$。而且整个问题中一共有n个观测状态。

  • 出现这样的序列的概率为:

$$
P(O|A,\pi)=P(q_1)\prod_{t=2}^mP(q_t|q_{t-1})
$$

所以一个可观测的马尔科夫模型由一个三元组描述:$(A,\pi,n)$一般情况下简写为$(A,\pi)$。(因为观测状态的数量n可以从状态概率分布$\pi$得出)

这里的$A$就是转移概率矩阵,$\pi$就是状态初始分布,$n$就是观测状态的数量。

举个例子:

  • 有一个抽屉,抽屉里放有三种颜色的球,颜色分别为红蓝绿。某人随机的将球一个一个从抽屉中取出,球的颜色依次构成序列(C1,C2,C3,…)。如果红、蓝、绿三个状态的初始分布为$\pi=(0.5,0.2,0.3)$,转移概率矩阵:

$$
A=\begin{pmatrix}
0.4 & 0.3 & 0.3 \\
0.2 & 0.6 & 0.2 \\
0.1 & 0.1 & 0.8
\end{pmatrix}
$$

  • 那么出现颜色序列为:红,红,绿,绿 的概率是多少?

解答:

初始概率分布$\pi=(0.5,0.2,0.3)$,可以看出,初始为红色的概率为0.5。

从状态转移矩阵可以看出,各个颜色变化的概率分布:

绿
0.4 0.3 0.3
0.2 0.6 0.2
绿 0.1 0.1 0.8

可以看出,红色转移到红色的概率为0.4,红色转移到绿色的概率是0.3,绿色转移到绿色的概率是0.8。所以最终出现序列红,红,绿,绿 的概率为:

$$
P = 0.5 \times 0.4 \times 0.3 \times 0.8 = 0.048
$$

所以上面的那个公式就是将所求的序列O之中的各个状态的转移概率连乘起来得到的最终概率:

$$
P(O|A,\pi)=P(q_1)\prod_{t=2}^mP(q_t|q_{t-1})
$$

问题来了

那么我们能不能在只知道观测序列的情况下,得知初始分布和转移概率矩阵呢?

解答:

如果我们穷举了所有的观测序列,那么:

$$
\pi_i=\frac{以状态i开始的序列的数目}{序列总数}
$$

$$
p_{ij}=\frac{从状态i转移到状态j的序列的数目}{从状态i开始的序列的总数}
$$

具体实例,比如我们从抽屉中抽小球,四次的观测序列如下:

  • [红,红,红]
  • [红,红,蓝]
  • [红,蓝,红]
  • [蓝,红,红]

可以得出初始分布为:

$$
\pi={0.75, 0.25}
$$

转移概率为:

$$
p_{红蓝} = 1/3 \\
p_{红红} = 2/3 \\
p_{蓝蓝} = 0 \\
p_{蓝红} = 1
$$

隐马尔科夫模型(Hidden Markov Model)

介绍完了上面的可观测的马尔科夫模型,接下来介绍隐马尔科夫模型

隐马尔科夫模型的基本想法是:系统的状态S无法观测,但我们可以观测到某个其他和状态关联的事物,这个事物出现是伴随系统状态而出现的。

为什么会有无法观测的情况呢?举个例子:观测天空是否在下雨这个现象可以通过观测苔藓的生长情况来判断。比如下雨天,苔藓生长比较茂盛。所以我们可以通过观察苔藓来判断下雨的概率是否大。

一个隐马尔科夫模型一般包含以下参数组成:

  • 观测集合:$R=\{R_1,R_2,R_3,R_4,…,R_m\}$
    • 代表我们能观测到的状态有哪些,比如抓小球的例子中就是红蓝绿三种颜色。
  • 观测序列:$O=[o_1,o_2,o_3,o_4,…,o_l]$
    • 代表我们能观测到的具体的观测序列
  • 状态集合:$S=\{S_1,S_2,S_3,S_4,…,S_n\}$
    • 代表状态的集合,比如上面的下雨天的例子中,状态就是晴天、雨天
  • 状态序列:$Q=[q_1,q_2,q_3,q_4,…,q_l]$
    • 就是出现某些状态的序列
  • 观测概率:$P\{o_i=R_k|q_t=S_j\}=b_j(i)$,记$B=[b_j(i)]$
    • 观测概率是隐马尔科夫模型特有的,在$t$时刻的时候,出现状态$q$,观测到状态$o_i$为指定状态$R_k$的概率。

所以,隐马尔可夫模型由一个五元组来描述$(A,B,\pi,R,S)$,一般情况下,可以简化为$(A,B,\pi)$。其中$A$是状态转移矩阵,$B$是观测概率,$\pi$是初始分布。

注意

  • 不同的状态序列可以产生相同的观测序列(以不同的概率产生)
  • 状态转移是随机的,系统在一个状态中产生的观测也是随机的
  • 可观测马尔科夫模型是隐马尔科夫模型的特例:当$m=n$,如果$i=j,b_j(i)=1$否则$b_j(i)=0$。
    • 即在马尔科夫模型下,状态序列和观测序列是一样的。

三个基本问题

隐马尔可夫模型一般可以用来解决三个基本问题:

  • (1)估计:已知模型$(A,B,\pi)$,求观测序列出现的概率
    • 解决方法:前后向算法
  • (2)预测:已知模型$(A,B,\pi)$和一个观测序列,求对应的不可观测的状态序列
    • 解决方法:Viterbi算法
  • (3)学习:已知一组观测序列,求模型$(A,B,\pi)$
    • 解决方法:Baum-Welch算法

下面用一个具体的例子来了解一下这三个基本问题是如何处理的。

股市预测

  • 如果股市只有三种状态:牛市、熊市、普通
  • 而且股票只有三种趋势:涨、跌、不变
  • 如何利用隐马尔可夫模型进行股市预测?

那么如果在股市预测问题中,应用隐马尔科夫模型,来解决上面的那三个对应的基本问题,分别如下:

  • (1)已知模型,求观测到连续一周出现涨势的概率
  • (2)已知模型,观察到一周的变化情况为:涨、不变、涨、不变、跌,问股市的状态变化情况?
  • (3)观察到股市一周的变化情况为:涨、不变、涨、不变、跌,求下周一开盘时的涨跌情况?

隐马尔科夫模型的代码如下hmm.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from __future__ import division
import math
import random

class HMM:
"""Class to implement an HMM.
Defined by:
1. Hidden state transition probability matrix T
2. Observable emission probability matrix E
3. Prior probability matrix 'priors'
4. Vocabulary of possible hidden states M ('states')
5. Vocabulary of possible observable emissions V ('emissions')
"""
def __init__(self, states, emissions):
self._states = states
self._emissions = emissions
self._T = dict()
self._E = dict()
self._priors = dict()
#print "Don't forget to set: T, E, and priors..."

def set_T(self, new_T):
tf = True
for key in new_T:
if sorted(new_T[key].keys()) != sorted(self._states):
tf = False
if sorted(new_T.keys()) != sorted(self._states):
tf = False

if tf: self._T = new_T
else:
print """Unmatched key -- check dictionary!
T => T[to state][given state]
"""

def set_E(self, new_E):
tf = True
for key in new_E:
if sorted(new_E[key].keys()) != sorted(self._states):
tf = False
if sorted(new_E.keys()) != sorted(self._emissions):
tf = False

if tf: self._E = new_E
else:
print """Unmatched key -- check dictionary!
E => E[emission][given state]
"""

def set_priors(self, new_priors):
if sorted(new_priors.keys()) == sorted(self._states):
self._priors = new_priors
else:
print """Unmatched key -- check dictionary!
priors => priors[state]
"""

# **************************************************
# Functions that take an observation sequence and an HMM
# **************************************************

def forward(O, hmm):
"""Return trellis representing p(theta_t | O_1^t).
"""
# initialize local variables
n = len(O)
f = {state: list() for state in hmm._states}
for o in O:
for state in hmm._states:
f[state].append(0)

# construct forward trellis
for state in hmm._states:
f[state][0] = hmm._priors[state] * hmm._E[O[0]][state]
for t in range(1, n):
for j in hmm._states:
for i in hmm._states:
f[j][t] += f[i][t-1] * hmm._T[j][i] * hmm._E[O[t]][j]
return f

def backward(O, hmm):
"""Return trellis representing p(O_t+1^N | theta_t == i).
"""
# initialize local variables
n = len(O)
b = {state: list() for state in hmm._states}
for o in O:
for state in hmm._states:
b[state].append(0)

# construct backward trellis
for state in hmm._states:
b[state][n-1] = 1
for t in range(n-2, -1, -1):
for i in hmm._states:
for j in hmm._states:
b[i][t] += b[j][t+1] * hmm._T[j][i] * hmm._E[O[t+1]][j]
return b

def posterior(O, hmm):
"""Return trellis representing p(theta_t | O).
Posterior probabilities would be used to find the maximum likelihood
of a state at a given time step based on the observation sequence
O. The value returned by the forward algorithm is the O_prob
value returned here.
"""
# get n
n = len(O)
# initialize forward, backward, and posterior trellises
f = forward(O, hmm)
b = backward(O, hmm)
p = {state: list() for state in hmm._states}
for o in O:
for state in hmm._states:
p[state].append(0)

# total probability of sequence O
O_prob = math.fsum([f[state][n-1] for state in hmm._states])

# build posterior trellis
for state in hmm._states:
for t in range(n):
p[state][t] = (f[state][t] * b[state][t]) / O_prob

return p

def forward_algrithm(O, hmm):
f = forward(O, hmm)
prop = 0.0
for stat in hmm._T:
prop += f[stat][len(O)-1]
return

def viterbi_path(O, hmm):
"""Return most likely hidden state path given observation sequence O.
"""
n = len(O)
u = {state: list() for state in hmm._states}
v = {state: list() for state in hmm._states}
bt = list()
for o in O:
for state in hmm._states:
for t in (u, v):
u[state].append(0)
v[state].append(str())
bt.append(str())

for state in hmm._states:
u[state][0] = hmm._priors[state] * hmm._E[O[0]][state]
# v[state][0] not of interest
for t in range(1, n):
for j in hmm._states:
for i in hmm._states:
p = u[i][t-1] * hmm._T[j][i] * hmm._E[O[t]][j]
if p > u[j][t]:
u[j][t] = p
v[j][t] = i
p = 0
for state in hmm._states:
if u[state][n-1] > p:
p = u[state][n-1]
bt[n-1] = state
for t in range(n-2, -1, -1):
bt[t] = v[bt[t+1]][t+1]

return bt

def baum_welch(O, hmm):
"""Return new hmm from one iteration of re-estimation."""
n = len(O)
f = forward(O, hmm)
b = backward(O, hmm)
p = posterior(O, hmm)
E_prime = dict()
for emission in hmm._emissions: E_prime[emission] = dict()
T_prime = dict()
for state in hmm._states: T_prime[state] = dict()
priors_prime = dict()

# construct E_prime
for state in hmm._states:
den = math.fsum([p[state][t] for t in range(n)])
for emission in hmm._emissions:
v = 0
for t in range(n):
if O[t] == emission: v += p[state][t]
E_prime[emission][state] = v / den

# construct T_prime
p_O = math.fsum([f[s][n-1] for s in hmm._states])
for given in hmm._states:
den = math.fsum([p[given][t] for t in range(n)])
for to in hmm._states:
v = 0
for t in range(1, n):
v += ( f[given][t-1] *
b[to][t] *
hmm._T[to][given] *
hmm._E[O[t]][to]
) / p_O
T_prime[to][given] = v / den

# construct priors_prime
for state in hmm._states:
priors_prime[state] = p[state][0]

new_hmm = HMM(hmm._states, hmm._emissions)
new_hmm.set_E(E_prime)
new_hmm.set_T(T_prime)
new_hmm.set_priors(priors_prime)

return new_hmm

下面我们来用这个模型解决这三个问题。

估计问题:已知模型,求观测到连续一周出现涨势的概率

已知模型如下图所示:

某股民根据经验判断当前为牛市、熊市、普通的概率分别是0.4、0.3、0.3。

这个问题中,我们的观测状态集合为:牛市、熊市、普通

我们的发射状态集合为:涨、跌、不变

初始分布$\pi$为:$(0.4, 0.3, 0.3)$

转移概率矩阵A:$
A=\begin{pmatrix}
0.6 & 0.2 & 0.2 \\
0.5 & 0.3 & 0.2 \\
0.4 & 0.1 & 0.5
\end{pmatrix}
$

观测概率矩阵B:$
A=\begin{pmatrix}
0.7 & 0.1 & 0.2 \\
0.1 & 0.6 & 0.3 \\
0.3 & 0.3 & 0.4
\end{pmatrix}
$

求连续一周出现涨势的概率,我们应该使用前后向算法forward_algrithm()。改算法接收一个hmm模型。我们在初始化hmm模型的时候,需要设置观测状态states、发射状态emissions,转移概率矩阵set_T(),发射概率矩阵set_E(),以及初始分布set_priors()

我们初始化好这些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 
def initTestModel():
model = hmm.HMM(['bull', 'bear', 'normal'], ['up', 'down', 'unchange'])
transition_matrix = {
'bull': {
'bull': 0.6,
'bear': 0.2,
'normal': 0.2
},
'bear': {
'bull': 0.5,
'bear': 0.3,
'normal': 0.2
},
'normal': {
'bull': 0.4,
'bear': 0.1,
'normal': 0.5
}
}
emission_matrix = {
'up': {
'bull': 0.7,
'bear': 0.1,
'normal': 0.3
},
'down': {
'bull': 0.1,
'bear': 0.6,
'normal': 0.3
},
'unchange': {
'bull': 0.2,
'bear': 0.3,
'normal': 0.4
}
}
p = {'bull':0.4, 'bear':0.3, 'normal':0.3}

model.set_T(transition_matrix)
model.set_E(emission_matrix)
model.set_priors(p)

return model

初始化模型完成之后,调用forward_algrithm()即可得出指定观测序列的预测结果:

1
2
3
4
def evaluate():
model = initTestModel()
prop = model.forward_algrithm(['up', 'up', 'up', 'up', 'up'], model)
print prop

输出结果为:

1
0.0232968298

下面的代码可以查看所有组合可能出现的概率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def generate_weekly_list():
status = ['up','down','unchange']

i = 0
weekly_list = {}
for statu1 in status:
for statu2 in status:
for statu3 in status:
for statu4 in status:
for statu5 in status:
key = statu1 + ',' + statu2 + ',' + statu3 + ',' + statu4 + ',' + statu5
value = [statu1, statu2, statu3, statu4, statu5]
weekly_list[key] = value
return weekly_list


def evaluate():
model = initTestModel()

all_list = generate_weekly_list()
prop_list = {}
for key in all_list:
prop = hmm.forward_algrithm(all_list[key], model)
prop_list[key] = prop

sort_list = sorted(prop_list.items(), lambda x, y: cmp(x[1], y[1]), reverse=True)

for (key, value) in sort_list:
print ('%.6f:' % value) + key
预测问题:已知模型+股市变化,求股市状态

已知模型,观察到一周的变化情况为:涨、不变、涨、不变、跌,问股市的状态变化情况?

还是同样的模型:

观测状态集合为:牛市、熊市、普通

我们的发射状态集合为:涨、跌、不变

初始分布$\pi$为:$(0.4, 0.3, 0.3)$

转移概率矩阵A:$
A=\begin{pmatrix}
0.6 & 0.2 & 0.2 \\
0.5 & 0.3 & 0.2 \\
0.4 & 0.1 & 0.5
\end{pmatrix}
$

观测概率矩阵B:$
A=\begin{pmatrix}
0.7 & 0.1 & 0.2 \\
0.1 & 0.6 & 0.3 \\
0.3 & 0.3 & 0.4
\end{pmatrix}
$

代码实现如下:

1
2
3
4

def predict():
model = initTestModel()
print hmm.viterbi_path(['up', 'unchange', 'up', 'unchange', 'down'], model)

结果是:

1
['bull', 'bull', 'bull', 'bull', 'bear']

我们同样可以写出所有的组合的预测结果:

1
2
3
4
5
6
7

def predict():
model = initTestModel()
weekly_list = generate_weekly_list()
for key in weekly_list:
print key + ':'
print hmm.viterbi_path(weekly_list[key], model)
学习问题:已知一堆观测序列,求模型

某股民连续三周观测到股市的变动情况为:

  • 涨,不变,涨,跌,涨
  • 跌,涨,跌,涨,不变
  • 不变,不变,跌,涨,涨

问,下周的变化情况?

这个问题的思路为:

根据观测序列 -> 求出模型 -> 得到$A,B,\pi$ -> 预测当前状态 -> 利用转移矩阵预测下一个状态。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
def learn():
model = initTestModel()

o_list1 = ['up', 'unchange', 'up', 'down', 'up']
o_list2 = ['down', 'up', 'down', 'up', 'unchange']
o_list3 = ['unchange', 'unchange', 'down', 'up', 'up']

model = hmm.baum_welch(o_list1, model)
model = hmm.baum_welch(o_list2, model)
model = hmm.baum_welch(o_list3, model)

print hmm.viterbi_path(o_list3, model)

得出结果为:

1
['normal', 'bull', 'bear', 'bull', 'bull']

你也可以将转移概率矩阵打印出来:

1
print model._T

以上就是HMM模型的全部内容。

坚持原创技术分享,您的支持将鼓励我继续创作!