Python Forum
How do I create a actor if I subscribe to a specific topic?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How do I create a actor if I subscribe to a specific topic?
#1
Hi

I have some code which includes publish/subscribe and actors. The generic_device class allows me to create other actors. I was wondering how do I create a specific actor from the generic_device class when I subscribe to a topic and if the topic isn't subscribed then no actor is created from the generic_device class. At the moment it is just if a specific message is received then the actor is created. All help appreciated.

import logging
from datetime import timedelta
import time
from thespian.actors import *
from transitions import Machine
import paho.mqtt.client as mqtt

class laser(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = laser("Laser")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/laser")
client.publish("microscope/light_sheet_microscope/UI/laser","Hello World Im a laser!")
time.sleep(2) # wai
client.loop_stop() #stop the loop

class motorized_mirror_galvo(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = motorized_mirror_galvo("Motorized mirror galvo")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
client.publish("microscope/light_sheet_microscope/UI/motorized_mirror_galvo","Hello World Im a motorized mirror galvo!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class stages(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = stages("Stages")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/stages")
client.publish("microscope/light_sheet_microscope/UI/stages","Hello World Im a stage!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class cameras(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = cameras("Cameras")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/cameras")
client.publish("microscope/light_sheet_microscope/UI/cameras","Hello World Im a camera!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class filter_wheel(mqtt.Client):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/filter_wheel")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/filter_wheel")

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

print("creating new instance")
client = filter_wheel("Filter wheel")
client.run()

client.loop_start() #start the loop
time.sleep(2)
print("Publishing message to topic","microscope/light_sheet_microscope/UI/filter_wheel")
client.publish("microscope/light_sheet_microscope/UI/filter_wheel","Hello World Im a Filter Wheel!")
time.sleep(2) # wait
client.loop_stop() #stop the loop

class State(object):
    """
    We define a state object which provides some utility functions for the
    individual states within the state machine.

    """
    def __init__(self):
        print("Current state: ", str(self))

    def on_event(self, event):
        """
        Handle events that are delegated to this State.

        """
        pass

    def __repr__(self):
        """
        Leverages the __str__ method to describe the State.

        """
        return self.__str__()

    def __str__(self):
        """
        Returns the name of the State.

        """
        return self.__class__.__name__

# Start of our states
class UninitialisedState(State):
    """
    The uninitialised state.        

    """
    def on_event(self, event):
        if event == 'Initialised':
            return InitialisedState()

        return self

class InitialisedState(State):
    """
    The initialised state.

    """
    def on_event(self, event):
        if event == "Configured":
            return ConfiguredState()

        return self

class ConfiguredState(State):
    """
    The configured state.

    """
    def on_event(self, event):
        if event == "Running":
            return RunningState()
        return self

class RunningState(State):
    """
    The running state.

    """
    def on_event(self, event):
        if event == "Stop":
            return UninitialisedState()
        return self

class SimpleDevice(object):
    """
    A simple state machine that mimics the functionality of a device from a 
    high level.

    """
    def __init__(self):
        """ Initialise the components. """

        # Start with a default state.
        self.state = UninitialisedState()

    def on_event(self, event):
        """
        This is the bread and butter of the state machine. Incoming events are
        delegated to the given states which then handle the event. The result is
        then assigned as the new state.

        """

        # The next state will be the result of the on_event function.
        self.state = self.state.on_event(event)

device = SimpleDevice()

""" Shelf class """
class Shelf(dict):

    def __setitem__(self, key, item):
        self.__dict__[key] = item

    def __getitem__(self, key):
        return self.__dict__[key]

    def __repr__(self):
        return repr(self.__dict__)

    def __len__(self):
        return len(self.__dict__)

    def __delitem__(self, key):
        del self.__dict__[key]

    def clear(self):
        return self.__dict__.clear()

    def copy(self):
        return self.__dict__.copy()

    def has_key(self, k):
        return k in self.__dict__

    def update(self, *args, **kwargs):
        return self.__dict__.update(*args, **kwargs)

    def keys(self):
        return self.__dict__.keys()

    def values(self):
        return self.__dict__.values()

    def items(self):
        return self.__dict__.items()

    def pop(self, *args):
        return self.__dict__.pop(*args)

    def __cmp__(self, dict_):
        return self.__cmp__(self.__dict__, dict_)

    def __contains__(self, item):
        return item in self.__dict__

    def __iter__(self):
        return iter(self.__dict__)

    def __unicode__(self):
        return unicode(repr(self.__dict__))

s = Shelf()
s.shelf = "GFP"
print(s)

class Generic_device(mqtt.Client, Actor):
    def on_connect(self, mqttc, obj, flags, rc):
        print("rc: "+str(rc))
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/laser")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/stages")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/cameras")
        print("Subscribing to topic","microscope/light_sheet_microscope/UI/filter_wheel")        
        mqttc.subscribe("microscope/light_sheet_microscope/UI/laser")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/motorized_mirror_galvo")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/stages")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/cameras")
        mqttc.subscribe("microscope/light_sheet_microscope/UI/filter_wheel")

    def on_message(self, mqttc, userdata, message):
        print("message received " ,str(message.payload.decode("utf-8")))
        print("message topic=",message.topic)
        print("message qos=",message.qos)
        print("message retain flag=",message.retain)

    def on_publish(self, mqttc, obj, mid):
        print("mid: "+str(mid))

    def on_subscribe(self, mqttc, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))

    def on_log(self, mqttc, userdata, level, buf):
        print("log: ",buf)

    def run(self):
        self.connect("broker.hivemq.com", 1883, 60)

    def receiveMessage(self, message, sender):
        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/laser'":
            laser = self.createActor(Laser)
            lasermsg = (sender, 'Laser')
            self.send(laser, lasermsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/motorized_mirror_galvo'":
            galvo = self.createActor(Galvo)
            galvomsg = (sender, 'Galvo')
            self.send(galvo, galvomsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/stages'":
            stages = self.createActor(Stages)    
            stagesmsg = (sender, 'Stages')
            self.send(stages, stagesmsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/cameras'":
            cameras = self.createActor(Cameras)
            camerasmsg = (sender, 'Cameras')
            self.send(cameras, camerasmsg)

        if message == "'Subscribing to topic','microscope/light_sheet_microscope/UI/filter_wheel'":
            filter_wheel = self.createActor(Filter_wheel)
            filter_wheelmsg = (sender, 'Filter wheel')
            self.send(filter_wheel, filter_wheelmsg)

print("creating new instance")
client = Generic_device("Laser")
client.run()

client.loop_start() #start the loop
time.sleep(2) # wait
client.loop_stop() #stop the loop

class Laser(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_laser = message
            self.send(orig_sender, pre_laser + ' actor created')

class Galvo(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_galvo = message
            self.send(orig_sender, pre_galvo + ' actor created')

class Stages(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_galvo = message
            self.send(orig_sender, pre_galvo + ' actor created')

class Cameras(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_cameras = message
            self.send(orig_sender, pre_cameras + ' actor created')

class Filter_wheel(Actor):
    def receiveMessage(self, message, sender):
        if isinstance(message, tuple):
            orig_sender, pre_filter_wheel = message
            self.send(orig_sender, pre_filter_wheel + ' actor created')

def run_example(systembase=None):
    generic_device = ActorSystem().createActor(Generic_device)
    laser = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/laser'", 1.5)
    print(laser)
    galvo = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/motorized_mirror_galvo'", 1.5)
    print(galvo)
    stages = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/stages'", 1.5)
    print(stages)
    cameras = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/cameras'", 1.5)
    print(cameras)
    filter_wheel = ActorSystem().ask(generic_device, "'Subscribing to topic','microscope/light_sheet_microscope/UI/filter_wheel'", 1.5)
    print(filter_wheel)
    ActorSystem().shutdown()

if __name__ == "__main__":
    import sys
    run_example(sys.argv[1] if len(sys.argv) > 1 else None)
Thanks
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  create a default path with idle to a specific directory greybill 0 875 Apr-23-2023, 04:32 AM
Last Post: greybill
  Confusing in [for loop] topic Sherine 11 3,500 Jul-31-2021, 02:53 PM
Last Post: deanhystad
  [split] New thread/topic rajp1497 1 1,879 Sep-24-2020, 01:55 AM
Last Post: micseydel
  subscribing to kafka topic/key georgelza 10 4,242 Jan-03-2020, 04:58 AM
Last Post: georgelza
  Delete specific lines contain specific words mannyi 2 4,125 Nov-04-2019, 04:50 PM
Last Post: mannyi
  I carnt see publish/subscribe messages from the class I created. sdf1444 0 1,347 Jul-10-2019, 02:21 PM
Last Post: sdf1444
  Help with finding correct topic in Python learning yahya01 1 2,195 Jun-06-2019, 05:01 PM
Last Post: buran
  Topic Modelling - Document Labels Nicson 0 1,710 Nov-20-2018, 04:56 PM
Last Post: Nicson
  Ho to create a bar chart with two specific columns? Jack_Sparrow 1 2,343 Jun-24-2018, 06:16 PM
Last Post: Grok_It
  Projected Surface in 2D [Difficult topic] Hans_K 6 3,843 Aug-02-2017, 09:16 AM
Last Post: Hans_K

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020