Python Forum

Full Version: How do I create a actor if I subscribe to a specific topic?
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi

I have some code which includes publish/subscribe and actors. The generic_device class allows me to create other actors. I was wondering how do I create a specific actor from the generic_device class when I subscribe to a topic and if the topic isn't subscribed then no actor is created from the generic_device class. At the moment it is just if a specific message is received then the actor is created. All help appreciated.

import logging
from datetime import timedelta
import time
from thespian.actors import *
from transitions import Machine
import paho.mqtt.client as mqtt

class laser(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = laser("Laser")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/laser")
client.publish("microscope/light_sheet_microscope/UI/laser","Hello World Im a laser!")
time.sleep(2) # wai
client.loop_stop() #stop the loop

class motorized_mirror_galvo(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = motorized_mirror_galvo("Motorized mirror galvo")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
client.publish("microscope/light_sheet_microscope/UI/motorized_mirror_galvo","Hello World Im a motorized mirror galvo!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class stages(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = stages("Stages")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/stages")
client.publish("microscope/light_sheet_microscope/UI/stages","Hello World Im a stage!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class cameras(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = cameras("Cameras")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/cameras")
client.publish("microscope/light_sheet_microscope/UI/cameras","Hello World Im a camera!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class filter_wheel(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/filter_wheel")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/filter_wheel")

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = filter_wheel("Filter wheel")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/filter_wheel")
client.publish("microscope/light_sheet_microscope/UI/filter_wheel","Hello World Im a Filter Wheel!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class State(object):
    """
    We define a state object which provides some utility functions for the
    individual states within the state machine.

    """
    def __init__(self):
        print("Current state: ", str(self))

    def on_event(self, event):
        """
        Handle events that are delegated to this State.

        """
        pass

    def __repr__(self):
        """
        Leverages the __str__ method to describe the State.

        """
        return self.__str__()

    def __str__(self):
        """
        Returns the name of the State.

        """
        return self.__class__.__name__

# Start of our states
class UninitialisedState(State):
    """
    The uninitialised state.        

    """
    def on_event(self, event):
        if event == 'Initialised':
            return InitialisedState()

        return self

class InitialisedState(State):
    """
    The initialised state.

    """
    def on_event(self, event):
        if event == "Configured":
            return ConfiguredState()

        return self

class ConfiguredState(State):
    """
    The configured state.

    """
    def on_event(self, event):
        if event == "Running":
            return RunningState()
        return self

class RunningState(State):
    """
    The running state.

    """
    def on_event(self, event):
        if event == "Stop":
            return UninitialisedState()
        return self

class SimpleDevice(object):
    """
    A simple state machine that mimics the functionality of a device from a 
    high level.

    """
    def __init__(self):
        """ Initialise the components. """

        # Start with a default state.
        self.state = UninitialisedState()

    def on_event(self, event):
        """
        This is the bread and butter of the state machine. Incoming events are
        delegated to the given states which then handle the event. The result is
        then assigned as the new state.

        """

        # The next state will be the result of the on_event function.
        self.state = self.state.on_event(event)

device = SimpleDevice()

""" Shelf class """
class Shelf(dict):

    def __setitem__(self, key, item):
        self.__dict__[key] = item

    def __getitem__(self, key):
        return self.__dict__[key]

    def __repr__(self):
        return repr(self.__dict__)

    def __len__(self):
        return len(self.__dict__)

    def __delitem__(self, key):
        del self.__dict__[key]

    def clear(self):
        return self.__dict__.clear()

    def copy(self):
        return self.__dict__.copy()

    def has_key(self, k):
        return k in self.__dict__

    def update(self, *args, **kwargs):
        return self.__dict__.update(*args, **kwargs)

    def keys(self):
        return self.__dict__.keys()

    def values(self):
        return self.__dict__.values()

    def items(self):
        return self.__dict__.items()

    def pop(self, *args):
        return self.__dict__.pop(*args)

    def __cmp__(self, dict_):
        return self.__cmp__(self.__dict__, dict_)

    def __contains__(self, item):
        return item in self.__dict__

    def __iter__(self):
        return iter(self.__dict__)

    def __unicode__(self):
        return unicode(repr(self.__dict__))

s = Shelf()
s.shelf = "GFP"
print(s)

class Generic_device(mqtt.Client, Actor):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/laser")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/stages")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/cameras")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/filter_wheel")        
        mqttc.subscribe("microscope/light_sheet_microscope/UI/laser")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/stages")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/cameras")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/filter_wheel")

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

    def receiveMessage(self, message, sender):
        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/laser'":
            laser = self.createActor(Laser)
            lasermsg = (sender, 'Laser')
            self.send(laser, lasermsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/motorized_mirror_galvo'":
            galvo = self.createActor(Galvo)
            galvomsg = (sender, 'Galvo')
            self.send(galvo, galvomsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/stages'":
            stages = self.createActor(Stages)    
            stagesmsg = (sender, 'Stages')
            self.send(stages, stagesmsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/cameras'":
            cameras = self.createActor(Cameras)
            camerasmsg = (sender, 'Cameras')
            self.send(cameras, camerasmsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/filter_wheel'":
            filter_wheel = self.createActor(Filter_wheel)
            filter_wheelmsg = (sender, 'Filter wheel')
            self.send(filter_wheel, filter_wheelmsg)

print("creating new instance")
client = Generic_device("Laser")
client.run()

client.loop_start() #start the loop
time.sleep(2) # wait
client.loop_stop() #stop the loop

class Laser(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_laser = message
            self.send(orig_sender, pre_laser + ' actor created')

class Galvo(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_galvo = message
            self.send(orig_sender, pre_galvo + ' actor created')

class Stages(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_galvo = message
            self.send(orig_sender, pre_galvo + ' actor created')

class Cameras(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_cameras = message
            self.send(orig_sender, pre_cameras + ' actor created')

class Filter_wheel(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_filter_wheel = message
            self.send(orig_sender, pre_filter_wheel + ' actor created')

def run_example(systembase=None):
    generic_device = ActorSystem().createActor(Generic_device)
    laser = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/laser'", 1.5)
    print(laser)
    galvo = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/motorized_mirror_galvo'", 1.5)
    print(galvo)
    stages = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/stages'", 1.5)
    print(stages)
    cameras = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/cameras'", 1.5)
    print(cameras)
    filter_wheel = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/filter_wheel'", 1.5)
    print(filter_wheel)
    ActorSystem().shutdown()

if __name__ == "__main__":
    import sys
    run_example(sys.argv[1] if len(sys.argv) > 1 else None)
Thanks