Hello,
I have to implement value iteration and q iteration in Python 2.7.
This code is given:
I've done the following: (I only paste my changes)
My problem is that I don't know if I'm one the correct way. Because I'm not sure if I calculate the p correct.
Hope that somebody could help me:)
How to compute V:
Alle s in S (States): V_{k+1} (s) = max_a (R(s,a) + sum_{s'} P(s'|s,a) V_k(s'))
Where V_0(s) = 0
I have to implement value iteration and q iteration in Python 2.7.
This code is given:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import numpy as np import mdp as util def print_v_func(k, v): if PRINTING: print "k={} V={}" . format (k, v) def print_simulation_step(state_old, action, state_new, reward): if PRINTING: print "s={} a={} s'={} r={}" . format (state_old, action, state_new, reward) def value_iteration(mdp, num_iterations = 10 ): """ Does value iteration on the given Markov Decision Process for given number of iterations :param mdp: the Markov Decision Process :param num_iterations: the number of iteration to compute the V-function :return: (v, q) = the V-function and Q-function after 'num_iterations' steps :type mdp: util.MarkovDecisionProcess :type num_iterations: int """ """ Useful functions: - arr = np.ones(shape) - arr = np.zeros(shape) - arr = np.empty(shape) - arr = np.array(list) - mdp.gamma - P(s'|s,a) = mdp.psas[s',a,s] - reward = mdp.ras[a,s] - max(arr), util.argmax(arr) """ # init the q and v function as numpy array q = None # change this v = None # change this for k in range (num_iterations): print_v_func(k, v) # print k and v # compute the Q-function # Compute the V-function return v, q def simulate(mdp, state_old, action): """ Simulates a single step in the given Markov Decision Process :param mdp: the Markov Decision Process :param action: the Action to be taken :param state_old: the old state :return: (reward, state_new) = the reward for taking the action in old state and the new state you are in :type mdp: util.MarkovDecisionProcess :type action: int :type state_old: int :rtype: tuple """ # this method work as is, no change required reward = mdp.ras[action, state_old] # gets reward state_new = util.sample_multinomial(mdp.psas[:, action, state_old]) # get new state as sample s' from P(s'|a,s) print_simulation_step(state_old, action, state_new, reward) # print transition and reward return reward, state_new PRINTING = False # do not print by default, please do not change this if __name__ = = '__main__' : PRINTING = True # enable printing util.random_seed() # seed random number generator value_iteration(util.data.create_mdp_circle_world_one()) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
import numpy as np from random import Random import data as data __author__ = 'Henning' class InvaildArgumentException(Exception): pass class MarkovDecisionProcess( object ): """ The main MDP data structure. Examples: mdp.ps[1] # returns the start distribution for state one mdp.psas[1,0,2] # return the transition from state 2 to state 1 given action 0 mdp.ras[0,1] # return the reward for doing action 0 in state 1 (the state it was before the action) :param num_actions: number of states :param num_states: number of actions :param state_start: start state :param psas: transition probabilities :param ras: reward expectation as a function of (action,x_before) :param gamma: discounting factor :type num_actions: int :type num_states: int :type state_start: int :type psas: np.ndarray :type ras: np.ndarray :type gamma: float """ def __init__( self , ras, psas, state_start, gamma): """ The constructor for a MDP. :param state_start: start distribution as nested list :param psas: transition probabilities as nested list :param ras: reward expectation as a function of (action,x_before) as nested list :param gamma: discounting factor :type state_start: int :type psas: list :type ras: list :type gamma: float """ self .gamma = gamma self .ras = self .__check_input_tensor( ras, req_shape = ( None , None ) ) self .num_actions = self .ras.shape[ 0 ] self .num_states = self .ras.shape[ 1 ] self .psas = self .__check_input_tensor( psas, normalized = True , req_shape = ( self .num_states, self .num_actions, self .num_states) ) self .state_start = state_start @staticmethod def __check_input_tensor(input_tensor, req_shape = None , normalized = False ): """ Does some check on the input for the MDP :param input_tensor: the input to check as nested list :param req_shape: the required shape as tuple or None for anything. The tuple may have None or any integer as entry. :param normalized: whether the input should be is a distribution over the first index w.r.t. the later indices :return: the input as numpy.ndarray :type input_tensor: list :type req_shape: tuple :type normalized: bool :rtype: np.ndarray """ arr = input_tensor if type (input_tensor) is not np.ndarray: if type (input_tensor) is not list : raise InvaildArgumentException # input must be a list arr = np.array(input_tensor, np.float64) if req_shape is not None : # check the shape if arr.ndim ! = len (req_shape): raise InvaildArgumentException # input does not have the right dimensions for i in range ( len (req_shape)): if (req_shape[i] is not None ) & (arr.shape[i] ! = req_shape[i]): raise InvaildArgumentException # input does not have the required shape if normalized: # check whether the input is a distribution over the first index w.r.t. the later indices if arr.ndim = = 1 : if not ( 1 - sum (arr)) < 1e - 10 : raise InvaildArgumentException # input is not a distribution over the first index elif arr.ndim = = 2 : for j in range (arr.shape[ 0 ]): if not ( 1 - sum (arr[:, j])) < 1e - 10 : raise InvaildArgumentException # input is not a distribution over the first index elif arr.ndim = = 3 : for k in range (arr.shape[ 2 ]): for j in range (arr.shape[ 1 ]): if not ( 1 - sum (arr[:, j, k])) < 1e - 10 : raise InvaildArgumentException # input is not a distribution over the first index elif arr.ndim = = 4 : for l in range (arr.shape[ 3 ]): for k in range (arr.shape[ 2 ]): for j in range (arr.shape[ 1 ]): if not ( 1 - sum (arr[:, j, k, l])) < 1e - 10 : raise InvaildArgumentException # input is not a distribution over the first index else : raise Exception # Not implemented return arr __rnd = Random() random_seed = __rnd.seed def sample_multinomial(p): """ Gets a sample form given multinomial distribution The distribution is an 1-D-array containing the probability as entries. The sample will be given as the index. :param p: the distribution :return: the sample :type p: np.array :rtype: int """ if ( type (p) is not list ) and ( type (p) is not tuple ): if type (p) is not np.ndarray: raise InvaildArgumentException # p must be a numpy.ndarray or a list or a tuple if p.ndim ! = 1 : raise InvaildArgumentException # p must have just 1 dimension if ( 1 - sum (p)) > = 1e - 10 : raise InvaildArgumentException # p must be normalized s = 0.0 ptr = __rnd.uniform( 0.0 , 1.0 ) for i in range ( len (p)): s + = p[i] if s > ptr: return i raise Exception # Unexpected error, maybe p is not normalized def random_range(start, stop = None , step = 1 ): """ Get a random integer from range(start, stop, step) :param start: lower bound :param stop: upper bound :param step: step :return: random integer :type start: int :type stop: int :type step: int :rtype: int """ return __rnd.randrange(start, stop, step) def random_uniform(a = 0.0 , b = 1.0 ): """ Get a random number in the range [a, b) or [a, b] depending on rounding. :param a: lower bound :param b: upper bound :return: random number :type a: float :type b: float :rtype: float """ return __rnd.uniform(a, b) def random_gaussian(mu = 0.0 , sigma = 1.0 ): """ Gaussian distribution :param mu: mean :param sigma: standard deviation :return: a random gaussian :type mu: float :type sigma: float :rtype: float """ return __rnd.gauss(mu, sigma) def random_tensor_uniform(shape, a = 0.0 , b = 1.0 ): """ Get a random tensor using a uniform distribution :param shape: the shape of the tensor :param a: lower bound :param b: upper bound :return: the random tensor :type shape: tuple: :type a: float :type b: float :rtype: np.ndarray """ r = np.empty(shape, np.float64) for _ in r.flat: r[...] = __rnd.uniform(a, b) return r def random_tensor_gaussian(shape, mu = 0.0 , sigma = 1.0 ): """ Get a random tensor using a Gaussian distribution :param shape: the shape of the tensor :param mu: mean :param sigma: standard deviation :return: the random tensor :type shape: tuple :type mu: float :type sigma: float :rtype: np.ndarray """ r = np.empty(shape, np.float64) for _ in r.flat: r[...] = __rnd.gauss(mu, sigma) return r def argmax(p): """ Gets the index of maximum of p :param p: the array :return: the index of the maximum :type p: np.array :rtype: int """ if ( type (p) is not list ) and ( type (p) is not tuple ): if type (p) is not np.ndarray: raise InvaildArgumentException # p must be a numpy.ndarray or a list or a tuple if p.ndim ! = 1 : raise InvaildArgumentException # p must have just 1 dimension m = 0 for i in range ( len (p)): if p[i] > p[m]: m = i return m |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
def value_iteration(mdp, num_iterations = 10 ): """ Does value iteration on the given Markov Decision Process for given number of iterations :param mdp: the Markov Decision Process :param num_iterations: the number of iteration to compute the V-function :return: (v, q) = the V-function and Q-function after 'num_iterations' steps :type mdp: util.MarkovDecisionProcess :type num_iterations: int """ """ Useful functions: - arr = np.ones(shape) - arr = np.zeros(shape) - arr = np.empty(shape) - arr = np.array(list) - mdp.gamma - P(s'|s,a) = mdp.psas[s',a,s] - reward = mdp.ras[a,s] - max(arr), util.argmax(arr) """ # init the q and v function as numpy array q = np.zeros(num_iterations) v = np.zeros(num_iterations) U1 = dict ([(s, 0 ) for s in mdp.states]) gamma = mdp.gamma for k in range (num_iterations): print_v_func(k, v) # print k and v # compute the Q-function # Compute the V-function while True : v = U1.copy() for s, s1 in mdp.states: for a in mdp.states(s): reward = mdp.ras[a, s] p = mdp.psas[s1, a, s] U1[s] = reward + gamma * max ( sum (p * v[s1])) return v, q |
Hope that somebody could help me:)
How to compute V:
Alle s in S (States): V_{k+1} (s) = max_a (R(s,a) + sum_{s'} P(s'|s,a) V_k(s'))
Where V_0(s) = 0