In [7]:
# Simulates an HMM

import numpy
import pandas

# Transition matrix a, emission matrix e, number of simulations n, starting state s
def simulate_HMM(a, e, n, s):
    
    symbols = []
    states = []
    for i in range(n):
        states.append(s)
        symbolEmitted = numpy.random.choice(list(e.T.index), 1, p = list(e.T[s]))[0]
        symbols.append(symbolEmitted)
        s = numpy.random.choice(list(a.T.columns), 1, p = list(a.T[s]))[0]
    
    return ("".join(states), "".join(symbols))

    

stateNames = ["A", "B", "C"]
symbolNames = ["1", "2", "3", "4"]

# Will transpose the matrices for easier access to rows (which are now columns)
transitionMatrix = pandas.DataFrame(numpy.matrix('0.1 0.2 0.7;0.5 0 0.5;0.2 0.4 0.4'), index=stateNames, columns=stateNames)
emissionMatrix = pandas.DataFrame(numpy.matrix('0.2 0.4 0.2 0.2; 0.1 0.1 0.8 0; 0.5 0.4 0.05 0.05'), index=stateNames, columns=symbolNames)

print(transitionMatrix)
print(emissionMatrix)


simulate_HMM(transitionMatrix, emissionMatrix, 10, "A")



     A    B    C
A  0.1  0.2  0.7
B  0.5  0.0  0.5
C  0.2  0.4  0.4
     1    2     3     4
A  0.2  0.4  0.20  0.20
B  0.1  0.1  0.80  0.00
C  0.5  0.4  0.05  0.05


('ACBACBAACC', '4312212223')

In [2]:
import math


# Finds log probability of observing sequence x
def forward(a, e, x, startingProbs = []):
    
    # Create the forward table with 1 row and for each state and x.length columns 
    F = numpy.zeros([a.shape[0], len(x)])
    
    
    # If starting probs unspecified, assume uniformity
    if not len(startingProbs):
        startingProbs = [numpy.log(1/a.shape[0]) for x in range(a.shape[0])]
    
    # Convert to log probs
    else:
        startingProbs = [numpy.log(x) for x in startingProbs]
        
        
    # Convert A and E to log space (and transpose to get easier access to their rows)
    logA = pandas.DataFrame(numpy.log(a.as_matrix().T), index=a.columns, columns=a.index)
    logE = pandas.DataFrame(numpy.log(e.as_matrix().T), index=e.columns, columns=e.index)
    stateNames = list(logA.columns)
    
    
    # Initialise first column
    for row in range(a.shape[0]):
        F[row,0] = startingProbs[row] + logE[stateNames[row]][x[0]]
        
        
    # Iterate through remaining columns
    for col in range(1,len(x)):
        for row in range(a.shape[0]):
            F[row,col] = logE[stateNames[row]][x[col]]
            
            logSumPrevCol = 0
            for prevRow in range(a.shape[0]):
                logSumPrevCol = logSumPrevCol + math.exp(F[prevRow,col-1] + logA[stateNames[prevRow]][stateNames[row]]) 
                
            F[row,col] = F[row,col] + math.log(logSumPrevCol)
        
        

    #print(F)
    
    # Terminate by returning p(x) in log space
    return numpy.log(numpy.sum(numpy.exp(F[:,len(x)-1])))
    
    

forward(transitionMatrix, emissionMatrix, "123142314231")



-16.9767023361821

In [3]:
import math


# Finds log probability of observing sequence x
def backward(a, e, x, startingProbs = []):
    
    # Create the forward table with 1 row and for each state and x.length columns 
    B = numpy.zeros([a.shape[0], len(x)])
    
    
    # If starting probs unspecified, assume uniformity
    if not len(startingProbs):
        startingProbs = [numpy.log(1/a.shape[0]) for x in range(a.shape[0])]
    
    # Convert to log probs
    else:
        startingProbs = [numpy.log(x) for x in startingProbs]
        
        
    # Convert A and E to log space
    logA = pandas.DataFrame(numpy.log(a.as_matrix().T), index=a.columns, columns=a.index)
    logE = pandas.DataFrame(numpy.log(e.as_matrix().T), index=e.columns, columns=e.index)
    stateNames = list(logA.columns)
    
    
    # Last column set to zero (ending states not modelled)

     
    
    # Iterate through remaining columns
    for col in range(len(x)-2,-1,-1):
        for row in range(a.shape[0]):
            
            for prevRow in range(a.shape[0]):
                B[row,col] = B[row,col] + math.exp(logE[stateNames[prevRow]][x[col+1]] + B[prevRow,col+1] + logA[stateNames[row]][stateNames[prevRow]]) 
                
            B[row,col] = numpy.log(B[row,col])
        
        

    #print(B)
    
    # Terminate by returning p(x) in log space
    PX = 0
    for row in range(a.shape[0]):
        PX = PX + math.exp(logE[stateNames[row]][x[0]] + B[row,0] + startingProbs[row]) 
    return numpy.log(PX)
    
    

backward(transitionMatrix, emissionMatrix, "123142314231")



-16.9767023361821