Python Forum
Value Iteration/ Q-Iteration
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Value Iteration/ Q-Iteration
#1
Hello,

I have to implement value iteration and q iteration in Python 2.7.
This code is given:
import numpy as np
import mdp as util


def print_v_func(k, v):
    if PRINTING:
        print "k={} V={}".format(k, v)


def print_simulation_step(state_old, action, state_new, reward):
    if PRINTING:
        print "s={} a={} s'={} r={}".format(state_old, action, state_new, reward)


def value_iteration(mdp, num_iterations=10):
    """
    Does value iteration on the given Markov Decision Process for given number of iterations

    :param mdp: the Markov Decision Process
    :param num_iterations: the number of iteration to compute the V-function
    :return: (v, q) = the V-function and Q-function after 'num_iterations' steps

    :type mdp: util.MarkovDecisionProcess
    :type num_iterations: int
    """
    """
     Useful functions:
      - arr = np.ones(shape)
      - arr = np.zeros(shape)
      - arr = np.empty(shape)
      - arr = np.array(list)
      - mdp.gamma
      - P(s'|s,a) = mdp.psas[s',a,s]
      - reward = mdp.ras[a,s]
      - max(arr), util.argmax(arr)
    """
    # init the q and v function as numpy array
    q = None # change this
    v = None  # change this
    for k in range(num_iterations):
        print_v_func(k, v)  # print k and v
        # compute the Q-function
        # Compute the V-function
    return v, q


def simulate(mdp, state_old, action):
    """
    Simulates a single step in the given Markov Decision Process

    :param mdp: the Markov Decision Process
    :param action: the Action to be taken
    :param state_old: the old state
    :return: (reward, state_new) = the reward for taking the action in old state and the new state you are in

    :type mdp: util.MarkovDecisionProcess
    :type action: int
    :type state_old: int
    :rtype: tuple
    """
    # this method work as is, no change required
    reward = mdp.ras[action, state_old]  # gets reward
    state_new = util.sample_multinomial(mdp.psas[:, action, state_old])  # get new state as sample s' from P(s'|a,s)
    print_simulation_step(state_old, action, state_new, reward)  # print transition and reward
    return reward, state_new


PRINTING = False # do not print by default, please do not change this
if __name__ == '__main__':
    PRINTING = True  # enable printing
    util.random_seed()  # seed random number generator
    value_iteration(util.data.create_mdp_circle_world_one())
import numpy as np
from random import Random
import data as data

__author__ = 'Henning'


class InvaildArgumentException(Exception):
    pass


class MarkovDecisionProcess(object):
    """
    The main MDP data structure.

    Examples:
    mdp.ps[1]  # returns the start distribution for state one
    mdp.psas[1,0,2]  # return the transition from state 2 to state 1 given action 0
    mdp.ras[0,1]  # return the reward for doing action 0 in state 1 (the state it was before the action)

    :param num_actions: number of states
    :param num_states: number of actions
    :param state_start: start state
    :param psas: transition probabilities
    :param ras: reward expectation as a function of (action,x_before)
    :param gamma: discounting factor
    :type num_actions: int
    :type num_states: int
    :type state_start: int
    :type psas: np.ndarray
    :type ras: np.ndarray
    :type gamma: float
    """

    def __init__(self, ras, psas, state_start, gamma):
        """
        The constructor for a MDP.
        :param state_start: start distribution as nested list
        :param psas: transition probabilities as nested list
        :param ras: reward expectation as a function of (action,x_before) as nested list
        :param gamma: discounting factor

        :type state_start: int
        :type psas: list
        :type ras: list
        :type gamma: float
        """

        self.gamma = gamma
        self.ras = self.__check_input_tensor(
            ras,
            req_shape=(None, None)
        )
        self.num_actions = self.ras.shape[0]
        self.num_states = self.ras.shape[1]
        self.psas = self.__check_input_tensor(
            psas,
            normalized=True,
            req_shape=(self.num_states, self.num_actions, self.num_states)
        )
        self.state_start = state_start

    @staticmethod
    def __check_input_tensor(input_tensor, req_shape=None, normalized=False):
        """
        Does some check on the input for the MDP
        :param input_tensor: the input to check as nested list
        :param req_shape: the required shape as tuple or None for anything.
                          The tuple may have None or any integer as entry.
        :param normalized: whether the input should be is a distribution over the first index w.r.t. the later indices
        :return: the input as numpy.ndarray
        :type input_tensor: list
        :type req_shape: tuple
        :type normalized: bool
        :rtype: np.ndarray
        """
        arr = input_tensor
        if type(input_tensor) is not np.ndarray:
            if type(input_tensor) is not list:
                raise InvaildArgumentException  # input must be a list
            arr = np.array(input_tensor, np.float64)

        if req_shape is not None:  # check the shape
            if arr.ndim != len(req_shape):
                raise InvaildArgumentException  # input does not have the right dimensions
            for i in range(len(req_shape)):
                if (req_shape[i] is not None) & (arr.shape[i] != req_shape[i]):
                    raise InvaildArgumentException  # input does not have the required shape

        if normalized:  # check whether the input is a distribution over the first index w.r.t. the later indices
            if arr.ndim == 1:
                if not (1 - sum(arr)) < 1e-10:
                    raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 2:
                for j in range(arr.shape[0]):
                    if not (1 - sum(arr[:, j])) < 1e-10:
                        raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 3:
                for k in range(arr.shape[2]):
                    for j in range(arr.shape[1]):
                        if not (1 - sum(arr[:, j, k])) < 1e-10:
                            raise InvaildArgumentException  # input is not a distribution over the first index
            elif arr.ndim == 4:
                for l in range(arr.shape[3]):
                    for k in range(arr.shape[2]):
                        for j in range(arr.shape[1]):
                            if not (1 - sum(arr[:, j, k, l])) < 1e-10:
                                raise InvaildArgumentException  # input is not a distribution over the first index
            else:
                raise Exception  # Not implemented
        return arr

__rnd = Random()

random_seed = __rnd.seed


def sample_multinomial(p):
    """
    Gets a sample form given multinomial distribution

    The distribution is an 1-D-array containing the probability as entries.
    The sample will be given as the index.

    :param p: the distribution
    :return: the sample
    :type p: np.array
    :rtype: int
    """
    if (type(p) is not list) and (type(p) is not tuple):
        if type(p) is not np.ndarray:
            raise InvaildArgumentException  # p must be a numpy.ndarray or a list or a tuple
        if p.ndim != 1:
            raise InvaildArgumentException  # p must have just 1 dimension
    if (1 - sum(p)) >= 1e-10:
        raise InvaildArgumentException  # p must be normalized
    s = 0.0
    ptr = __rnd.uniform(0.0, 1.0)
    for i in range(len(p)):
        s += p[i]
        if s > ptr:
            return i
    raise Exception  # Unexpected error, maybe p is not normalized



def random_range(start, stop=None, step=1):
    """
    Get a random integer from range(start, stop, step)

    :param start: lower bound
    :param stop: upper bound
    :param step: step
    :return: random integer
    :type start: int
    :type stop: int
    :type step: int
    :rtype: int
    """
    return __rnd.randrange(start, stop, step)


def random_uniform(a=0.0, b=1.0):
    """
    Get a random number in the range [a, b) or [a, b] depending on rounding.

    :param a: lower bound
    :param b: upper bound
    :return: random number
    :type a: float
    :type b: float
    :rtype: float
    """
    return __rnd.uniform(a, b)


def random_gaussian(mu=0.0, sigma=1.0):
    """
    Gaussian distribution

    :param mu: mean
    :param sigma: standard deviation
    :return: a random gaussian
    :type mu: float
    :type sigma: float
    :rtype: float
    """
    return __rnd.gauss(mu, sigma)


def random_tensor_uniform(shape, a=0.0, b=1.0):
    """
    Get a random tensor using a uniform distribution

    :param shape: the shape of the tensor
    :param a: lower bound
    :param b: upper bound
    :return: the random tensor
    :type shape: tuple:
    :type a: float
    :type b: float
    :rtype: np.ndarray
    """
    r = np.empty(shape, np.float64)
    for _ in r.flat:
        r[...] = __rnd.uniform(a, b)
    return r


def random_tensor_gaussian(shape, mu=0.0, sigma=1.0):
    """
    Get a random tensor using a Gaussian distribution

    :param shape: the shape of the tensor
    :param mu: mean
    :param sigma: standard deviation
    :return: the random tensor
    :type shape: tuple
    :type mu: float
    :type sigma: float
    :rtype: np.ndarray
    """
    r = np.empty(shape, np.float64)
    for _ in r.flat:
        r[...] = __rnd.gauss(mu, sigma)
    return r


def argmax(p):
    """
    Gets the index of maximum of p

    :param p: the array
    :return: the index of the maximum
    :type p: np.array
    :rtype: int
    """
    if (type(p) is not list) and (type(p) is not tuple):
        if type(p) is not np.ndarray:
            raise InvaildArgumentException  # p must be a numpy.ndarray or a list or a tuple
        if p.ndim != 1:
            raise InvaildArgumentException  # p must have just 1 dimension

    m = 0
    for i in range(len(p)):
        if p[i] > p[m]:
            m = i
    return m
I've done the following: (I only paste my changes)
def value_iteration(mdp, num_iterations=10):
    """
    Does value iteration on the given Markov Decision Process for given number of iterations

    :param mdp: the Markov Decision Process
    :param num_iterations: the number of iteration to compute the V-function
    :return: (v, q) = the V-function and Q-function after 'num_iterations' steps

    :type mdp: util.MarkovDecisionProcess
    :type num_iterations: int
    """
    """
     Useful functions:
      - arr = np.ones(shape)
      - arr = np.zeros(shape)
      - arr = np.empty(shape)
      - arr = np.array(list)
      - mdp.gamma
      - P(s'|s,a) = mdp.psas[s',a,s]
      - reward = mdp.ras[a,s]
      - max(arr), util.argmax(arr)
    """
    # init the q and v function as numpy array
    q = np.zeros(num_iterations)
    v = np.zeros(num_iterations)
    U1 = dict([(s, 0) for s in mdp.states])
    gamma = mdp.gamma
    for k in range(num_iterations):
        print_v_func(k, v)  # print k and v
        # compute the Q-function
        # Compute the V-function
        while True:
            v = U1.copy()
            for s, s1 in mdp.states:
                for a in mdp.states(s):
                    reward = mdp.ras[a, s]
                    p = mdp.psas[s1, a, s]
                    U1[s] = reward + gamma * max(sum(p * v[s1]))
    return v, q
My problem is that I don't know if I'm one the correct way. Because I'm not sure if I calculate the p correct.
Hope that somebody could help me:)

How to compute V:
Alle s in S (States): V_{k+1} (s) = max_a (R(s,a) + sum_{s'} P(s'|s,a) V_k(s'))

Where V_0(s) = 0
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  List iteration Peca 2 2,158 Nov-11-2020, 05:13 PM
Last Post: deanhystad
  Simple fixed point iteration root finding in python DoctorSmiles 3 9,664 Jul-11-2020, 04:08 AM
Last Post: ndc85430
  How can I run a function inside a loop every 24 values of the loop iteration range? mcva 1 2,149 Sep-18-2019, 04:50 PM
Last Post: buran
  Need help with fixing iteration counter Kapolt 8 4,742 Oct-18-2018, 07:56 PM
Last Post: buran
  Need help with iteration homework Dendro 2 2,430 Oct-18-2018, 02:53 AM
Last Post: stullis
  Return result from multiple iteration condition vestkok 3 2,802 Sep-11-2018, 03:48 PM
Last Post: ichabod801
  Homework (counting iteration issue) Cardinal07 10 5,707 Feb-26-2018, 04:45 AM
Last Post: Larz60+
  Repeating elements when appending in iteration in Python 3.6 miguelsantana 2 3,444 Dec-17-2017, 01:22 AM
Last Post: Terafy
  Iteration Inverse Method antoniomancera 7 5,475 Nov-24-2017, 09:52 AM
Last Post: heiner55
  Koch fractal with iteration drimades 7 4,872 Oct-11-2017, 08:35 PM
Last Post: ichabod801

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020