Python Forum
Value Iteration/ Q-Iteration
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Value Iteration/ Q-Iteration
#1
Hello,

I have to implement value iteration and q iteration in Python 2.7.
This code is given:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import numpy as np
import mdp as util
 
 
def print_v_func(k, v):
    if PRINTING:
        print "k={} V={}".format(k, v)
 
 
def print_simulation_step(state_old, action, state_new, reward):
    if PRINTING:
        print "s={} a={} s'={} r={}".format(state_old, action, state_new, reward)
 
 
def value_iteration(mdp, num_iterations=10):
    """
    Does value iteration on the given Markov Decision Process for given number of iterations
 
    :param mdp: the Markov Decision Process
    :param num_iterations: the number of iteration to compute the V-function
    :return: (v, q) = the V-function and Q-function after 'num_iterations' steps
 
    :type mdp: util.MarkovDecisionProcess
    :type num_iterations: int
    """
    """
     Useful functions:
      - arr = np.ones(shape)
      - arr = np.zeros(shape)
      - arr = np.empty(shape)
      - arr = np.array(list)
      - mdp.gamma
      - P(s'|s,a) = mdp.psas[s',a,s]
      - reward = mdp.ras[a,s]
      - max(arr), util.argmax(arr)
    """
    # init the q and v function as numpy array
    q = None # change this
    v = None  # change this
    for k in range(num_iterations):
        print_v_func(k, v)  # print k and v
        # compute the Q-function
        # Compute the V-function
    return v, q
 
 
def simulate(mdp, state_old, action):
    """
    Simulates a single step in the given Markov Decision Process
 
    :param mdp: the Markov Decision Process
    :param action: the Action to be taken
    :param state_old: the old state
    :return: (reward, state_new) = the reward for taking the action in old state and the new state you are in
 
    :type mdp: util.MarkovDecisionProcess
    :type action: int
    :type state_old: int
    :rtype: tuple
    """
    # this method work as is, no change required
    reward = mdp.ras[action, state_old]  # gets reward
    state_new = util.sample_multinomial(mdp.psas[:, action, state_old])  # get new state as sample s' from P(s'|a,s)
    print_simulation_step(state_old, action, state_new, reward)  # print transition and reward
    return reward, state_new
 
 
PRINTING = False # do not print by default, please do not change this
if __name__ == '__main__':
    PRINTING = True  # enable printing
    util.random_seed()  # seed random number generator
    value_iteration(util.data.create_mdp_circle_world_one())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import numpy as np
from random import Random
import data as data
 
__author__ = 'Henning'
 
 
class InvaildArgumentException(Exception):
    pass
 
 
class MarkovDecisionProcess(object):
    """
    The main MDP data structure.
 
    Examples:
    mdp.ps[1]  # returns the start distribution for state one
    mdp.psas[1,0,2]  # return the transition from state 2 to state 1 given action 0
    mdp.ras[0,1]  # return the reward for doing action 0 in state 1 (the state it was before the action)
 
    :param num_actions: number of states
    :param num_states: number of actions
    :param state_start: start state
    :param psas: transition probabilities
    :param ras: reward expectation as a function of (action,x_before)
    :param gamma: discounting factor
    :type num_actions: int
    :type num_states: int
    :type state_start: int
    :type psas: np.ndarray
    :type ras: np.ndarray
    :type gamma: float
    """
 
    def __init__(self, ras, psas, state_start, gamma):
        """
        The constructor for a MDP.
        :param state_start: start distribution as nested list
        :param psas: transition probabilities as nested list
        :param ras: reward expectation as a function of (action,x_before) as nested list
        :param gamma: discounting factor
 
        :type state_start: int
        :type psas: list
        :type ras: list
        :type gamma: float
        """
 
        self.gamma = gamma
        self.ras = self.__check_input_tensor(
            ras,
            req_shape=(None, None)
        )
        self.num_actions = self.ras.shape[0]
        self.num_states = self.ras.shape[1]
        self.psas = self.__check_input_tensor(
            psas,
            normalized=True,
            req_shape=(self.num_states, self.num_actions, self.num_states)
        )
        self.state_start = state_start
 
    @staticmethod
    def __check_input_tensor(input_tensor, req_shape=None, normalized=False):
        """
        Does some check on the input for the MDP
        :param input_tensor: the input to check as nested list
        :param req_shape: the required shape as tuple or None for anything.
                          The tuple may have None or any integer as entry.
        :param normalized: whether the input should be is a distribution over the first index w.r.t. the later indices
        :return: the input as numpy.ndarray
        :type input_tensor: list
        :type req_shape: tuple
        :type normalized: bool
        :rtype: np.ndarray
        """
        arr = input_tensor
        if type(input_tensor) is not np.ndarray:
            if type(input_tensor) is not list:
                raise InvaildArgumentException  # input must be a list
            arr = np.array(input_tensor, np.float64)
 
        if req_shape is not None# check the shape
            if arr.ndim != len(req_shape):
                raise InvaildArgumentException  # input does not have the right dimensions
            for i in range(len(req_shape)):
                if (req_shape[i] is not None) & (arr.shape[i] != req_shape[i]):
                    raise InvaildArgumentException  # input does not have the required shape
 
        if normalized:  # check whether the input is a distribution over the first index w.r.t. the later indices
            if arr.ndim == 1:
                if not (1 - sum(arr)) < 1e-10:
                    raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 2:
                for j in range(arr.shape[0]):
                    if not (1 - sum(arr[:, j])) < 1e-10:
                        raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 3:
                for k in range(arr.shape[2]):
                    for j in range(arr.shape[1]):
                        if not (1 - sum(arr[:, j, k])) < 1e-10:
                            raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 4:
                for l in range(arr.shape[3]):
                    for k in range(arr.shape[2]):
                        for j in range(arr.shape[1]):
                            if not (1 - sum(arr[:, j, k, l])) < 1e-10:
                                raise InvaildArgumentException  # input is not a distribution over the first index
            else:
                raise Exception  # Not implemented
        return arr
 
__rnd = Random()
 
random_seed = __rnd.seed
 
 
def sample_multinomial(p):
    """
    Gets a sample form given multinomial distribution
 
    The distribution is an 1-D-array containing the probability as entries.
    The sample will be given as the index.
 
    :param p: the distribution
    :return: the sample
    :type p: np.array
    :rtype: int
    """
    if (type(p) is not list) and (type(p) is not tuple):
        if type(p) is not np.ndarray:
            raise InvaildArgumentException  # p must be a numpy.ndarray or a list or a tuple
        if p.ndim != 1:
            raise InvaildArgumentException  # p must have just 1 dimension
    if (1 - sum(p)) >= 1e-10:
        raise InvaildArgumentException  # p must be normalized
    s = 0.0
    ptr = __rnd.uniform(0.0, 1.0)
    for i in range(len(p)):
        s += p[i]
        if s > ptr:
            return i
    raise Exception  # Unexpected error, maybe p is not normalized
 
 
 
def random_range(start, stop=None, step=1):
    """
    Get a random integer from range(start, stop, step)
 
    :param start: lower bound
    :param stop: upper bound
    :param step: step
    :return: random integer
    :type start: int
    :type stop: int
    :type step: int
    :rtype: int
    """
    return __rnd.randrange(start, stop, step)
 
 
def random_uniform(a=0.0, b=1.0):
    """
    Get a random number in the range [a, b) or [a, b] depending on rounding.
 
    :param a: lower bound
    :param b: upper bound
    :return: random number
    :type a: float
    :type b: float
    :rtype: float
    """
    return __rnd.uniform(a, b)
 
 
def random_gaussian(mu=0.0, sigma=1.0):
    """
    Gaussian distribution
 
    :param mu: mean
    :param sigma: standard deviation
    :return: a random gaussian
    :type mu: float
    :type sigma: float
    :rtype: float
    """
    return __rnd.gauss(mu, sigma)
 
 
def random_tensor_uniform(shape, a=0.0, b=1.0):
    """
    Get a random tensor using a uniform distribution
 
    :param shape: the shape of the tensor
    :param a: lower bound
    :param b: upper bound
    :return: the random tensor
    :type shape: tuple:
    :type a: float
    :type b: float
    :rtype: np.ndarray
    """
    r = np.empty(shape, np.float64)
    for _ in r.flat:
        r[...] = __rnd.uniform(a, b)
    return r
 
 
def random_tensor_gaussian(shape, mu=0.0, sigma=1.0):
    """
    Get a random tensor using a Gaussian distribution
 
    :param shape: the shape of the tensor
    :param mu: mean
    :param sigma: standard deviation
    :return: the random tensor
    :type shape: tuple
    :type mu: float
    :type sigma: float
    :rtype: np.ndarray
    """
    r = np.empty(shape, np.float64)
    for _ in r.flat:
        r[...] = __rnd.gauss(mu, sigma)
    return r
 
 
def argmax(p):
    """
    Gets the index of maximum of p
 
    :param p: the array
    :return: the index of the maximum
    :type p: np.array
    :rtype: int
    """
    if (type(p) is not list) and (type(p) is not tuple):
        if type(p) is not np.ndarray:
            raise InvaildArgumentException  # p must be a numpy.ndarray or a list or a tuple
        if p.ndim != 1:
            raise InvaildArgumentException  # p must have just 1 dimension
 
    m = 0
    for i in range(len(p)):
        if p[i] > p[m]:
            m = i
    return m
I've done the following: (I only paste my changes)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def value_iteration(mdp, num_iterations=10):
    """
    Does value iteration on the given Markov Decision Process for given number of iterations
 
    :param mdp: the Markov Decision Process
    :param num_iterations: the number of iteration to compute the V-function
    :return: (v, q) = the V-function and Q-function after 'num_iterations' steps
 
    :type mdp: util.MarkovDecisionProcess
    :type num_iterations: int
    """
    """
     Useful functions:
      - arr = np.ones(shape)
      - arr = np.zeros(shape)
      - arr = np.empty(shape)
      - arr = np.array(list)
      - mdp.gamma
      - P(s'|s,a) = mdp.psas[s',a,s]
      - reward = mdp.ras[a,s]
      - max(arr), util.argmax(arr)
    """
    # init the q and v function as numpy array
    q = np.zeros(num_iterations)
    v = np.zeros(num_iterations)
    U1 = dict([(s, 0) for s in mdp.states])
    gamma = mdp.gamma
    for k in range(num_iterations):
        print_v_func(k, v)  # print k and v
        # compute the Q-function
        # Compute the V-function
        while True:
            v = U1.copy()
            for s, s1 in mdp.states:
                for a in mdp.states(s):
                    reward = mdp.ras[a, s]
                    p = mdp.psas[s1, a, s]
                    U1[s] = reward + gamma * max(sum(p * v[s1]))
    return v, q
My problem is that I don't know if I'm one the correct way. Because I'm not sure if I calculate the p correct.
Hope that somebody could help me:)

How to compute V:
Alle s in S (States): V_{k+1} (s) = max_a (R(s,a) + sum_{s'} P(s'|s,a) V_k(s'))

Where V_0(s) = 0
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  List iteration Peca 2 2,823 Nov-11-2020, 05:13 PM
Last Post: deanhystad
  Simple fixed point iteration root finding in python DoctorSmiles 3 11,828 Jul-11-2020, 04:08 AM
Last Post: ndc85430
  How can I run a function inside a loop every 24 values of the loop iteration range? mcva 1 2,840 Sep-18-2019, 04:50 PM
Last Post: buran
  Need help with fixing iteration counter Kapolt 8 6,025 Oct-18-2018, 07:56 PM
Last Post: buran
  Need help with iteration homework Dendro 2 3,043 Oct-18-2018, 02:53 AM
Last Post: stullis
  Return result from multiple iteration condition vestkok 3 3,520 Sep-11-2018, 03:48 PM
Last Post: ichabod801
  Homework (counting iteration issue) Cardinal07 10 7,215 Feb-26-2018, 04:45 AM
Last Post: Larz60+
  Repeating elements when appending in iteration in Python 3.6 miguelsantana 2 4,091 Dec-17-2017, 01:22 AM
Last Post: Terafy
  Iteration Inverse Method antoniomancera 7 6,696 Nov-24-2017, 09:52 AM
Last Post: heiner55
  Koch fractal with iteration drimades 7 6,268 Oct-11-2017, 08:35 PM
Last Post: ichabod801

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020