Python Forum

Full Version: action on MQTT while long loop is running
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I am in the process of coding a pool controller (Pi 0 W) that accepts MQTT commands in order to fire up the SPA, change pool light color etc. My function for starting and stopping the SPA takes a long time because I'm turning valves, waiting for heat up, cool down etc. I would like to be able to take action on further incoming MQTT commands while this is ongoing. I also would like to disrupt a startup command mid flow if I receive a command to do the opposite.
As it is now I have to wait for the function to finish before the script processes new commands. How do I need to change my program to accomplish this?

#!/usr/bin/python
import RPi.GPIO as GPIO
import time
import paho.mqtt.client as mqtt

GPIO.setmode(GPIO.BCM)

# init list with pin numbers
pinList = [17, 27, 22, 13, 19, 26, 18, 23, 24, 25, 12, 16, 20, 21, 5, 6]

# loop through pins and set mode and state to 'low'
for i in pinList: 
    GPIO.setup(i, GPIO.OUT) 
    GPIO.output(i, GPIO.LOW)

# assign pins
pump_P1_Pin = 23 # Relay 1 Pin 8
pump_P2_Pin = 18 # Relay 1 Pin 7
pump_P3_Pin = 26 # Relay 1 Pin 6
pump_P4_Pin = 19 # Relay 1 Pin 5
valve_Intake_Pin = 13 # Relay 1 Pin 4
valve_Return_Pin = 22 # Relay 1 Pin 3
pool_Light_Pin = 27  # Relay 1 Pin 2
spa_Light_Pin = 17  # Relay 1 Pin 1

stenner_Chlorine_Pin = 24 # Relay 2 Pin 1
stenner_Acid_Pin = 25 # Relay 2 Pin 2
heater_Control_Pin = 12 # Relay 2 Pin 3
heater_Temp_Select_Pin = 16 # Relay 2 Pin 4
waterfallPin = 20 # Relay 2 pin 5
not_USed_1 = 21 # Relay 2 pin 6
not_USed_2 = 5 # Relay 2 pin 6 Boots High
not_USed_3 = 6 # Relay 2 pin 6 Boots High


# time to sleep between operations in the main loop
sleep_Time_L = 0.2
sleep_Time_Valve = 40 # set to 40
sleep_Time_Pump = 3
sleep_Time_Cooldown = 600 # set to 600
sleep_Time_Light = 0.1

jacuzzi_Sleep = True
jacuzzi_On = False

start_time = time.time()
elapsed_time = time.time() - start_time

# functions
def on_connect(client, userdata, flags, rc):
  print("Connected with result code "+str(rc))
  client.subscribe([("pool/jacuzzi", 0), ("pool/pool_light", 0)]) 

def on_message(client, userdata, msg):
  p = msg.payload.decode()
  print("Message Received "+str(p))
  if msg.topic == 'pool/jacuzzi':
      jacuzzi(p)
  else:
      pool_color(p)
 
def jacuzzi(pay_load):
  global start_time
  global elapsed_time
  global jacuzzi_Sleep
  global jacuzzi_On
  elapsed_time = time.time() - start_time
  if jacuzzi_Sleep and elapsed_time>1:
        if str(pay_load) == 'off' and jacuzzi_On:
            print("Jacuzzi Commanded Off")
            jacuzzi_Sleep = not jacuzzi_Sleep
            stop_spa()
            jacuzzi_On = not jacuzzi_On
        elif str(pay_load) == 'on' and not jacuzzi_On:
            print("Jacuzzi Commanded On")
            jacuzzi_Sleep = not jacuzzi_Sleep
            start_spa()
            jacuzzi_On = True
        else:
            print("Jacuzzi same command")
        jacuzzi_Sleep = True
        start_time = time.time()

def start_spa():
  print("Start SPA Begin - Turning Valves")
  GPIO.output(valve_Intake_Pin, GPIO.HIGH)
  GPIO.output(valve_Return_Pin, GPIO.HIGH)
  time.sleep(sleep_Time_Valve);
  print("Turning Pump4 On")
  GPIO.output(pump_P4_Pin, GPIO.HIGH)
  time.sleep(sleep_Time_Pump);
  print("Turning Heater On")
  GPIO.output(heater_Temp_Select_Pin, GPIO.LOW)
  GPIO.output(heater_Control_Pin, GPIO.HIGH)
  print("Start SPA Complete")
  
def stop_spa():
  print("Stop SPA Begin - Turning Heater Off and sleeping 10m")
  GPIO.output(heater_Control_Pin, GPIO.LOW)
  time.sleep(sleep_Time_Cooldown);
  print("Turning Valves")
  GPIO.output(valve_Intake_Pin, GPIO.LOW)
  GPIO.output(valve_Return_Pin, GPIO.LOW)
  time.sleep(sleep_Time_Valve);
  print("Sleep another 10m")
  time.sleep(sleep_Time_Cooldown);
  print("Turning Pump4 Off")
  GPIO.output(pump_P4_Pin, GPIO.LOW)
  print("Stop SPA Complete")
  
def pool_color(fcolor):
    toggle = 0;
    switcher = {
        "off": 0,
        "cycle": 1,
        "party": 2,
        "romance": 3,
        "caribbean": 4,
        "american": 5,
        "sunset": 6,
        "royal": 7,
        "blue": 8,
        "green": 9,
        "red": 10,
        "white": 11,
        "magenta": 12,
        "hold": 13,
        "recall": 14
    }
    toggle = switcher.get(fcolor, 11)
    count = toggle
    if count > 0:
        while count > 0:
            print(count)
            GPIO.output(pool_Light_Pin, GPIO.LOW)
            time.sleep(sleep_Time_Light)
            GPIO.output(pool_Light_Pin, GPIO.HIGH)
            time.sleep(sleep_Time_Light)
            count = count - 1
    else:
            GPIO.output(pool_Light_Pin, GPIO.LOW)
   
# main loop

try:
     # MQTT

    client = mqtt.Client()
    client.connect("x.x.x.x",1883,60)
    client.on_connect = on_connect
    client.on_message = on_message
    client.loop_forever()
     
# End program cleanly with keyboard
except KeyboardInterrupt:
  print ("  Quit")

  # Reset GPIO settings
  GPIO.cleanup()
(Oct-05-2018, 08:28 AM)Larz60+ Wrote: [ -> ]look at: multiprocessing: https://docs.python.org/3/library/multiprocessing.html
or threading: https://docs.python.org/3/library/threading.html

Thanks Larz60+,

I put the Jacuzzi startup/shutdown function in its own thread and now the code works like I want it to. I still need to address the interrupt aspect if a user changes his mind in the middle of a startup/shutdown process, but I suspect that will just be a matter of checking my queue in the middle of the function.
I realize my code is pretty rough and doesn't have much error handling etc., so if anybody has recommendations to improvements, I would be very thankful!!

#!/usr/bin/python
Jacuzzi_worker_flag=True
import RPi.GPIO as GPIO
import time
import paho.mqtt.client as mqtt
import threading
from queue import Queue
Jacuzzi_q = Queue()

GPIO.setmode(GPIO.BCM)

# init list with pin numbers

pinList = [17, 27, 22, 13, 19, 26, 18, 23, 24, 25, 12, 16, 20, 21, 5, 6]

# loop through pins and set mode and state to 'low'
for i in pinList: 
    GPIO.setup(i, GPIO.OUT) 
    GPIO.output(i, GPIO.LOW)

# assign pins
pump_P1_Pin = 23 # Relay 1 Pin 8
pump_P2_Pin = 18 # Relay 1 Pin 7
pump_P3_Pin = 26 # Relay 1 Pin 6
pump_P4_Pin = 19 # Relay 1 Pin 5
valve_Intake_Pin = 13 # Relay 1 Pin 4
valve_Return_Pin = 22 # Relay 1 Pin 3
pool_Light_Pin = 27  # Relay 1 Pin 2
spa_Light_Pin = 17  # Relay 1 Pin 1

stenner_Chlorine_Pin = 24 # Relay 2 Pin 1
stenner_Acid_Pin = 25 # Relay 2 Pin 2
heater_Control_Pin = 12 # Relay 2 Pin 3
heater_Temp_Select_Pin = 16 # Relay 2 Pin 4
waterfallPin = 20 # Relay 2 pin 5
not_USed_1 = 21 # Relay 2 pin 6
not_USed_2 = 5 # Relay 2 pin 6 Boots High
not_USed_3 = 6 # Relay 2 pin 6 Boots High


# time to sleep between operations in the main loop

sleep_Time_L = 0.2
sleep_Time_Valve = 20 # set to 40
sleep_Time_Pump = 3
sleep_Time_Cooldown = 20 # set to 600
sleep_Time_Light = 0.1

jacuzzi_Sleep = True
jacuzzi_On = False

start_time = time.time()
elapsed_time = time.time() - start_time

# functions
def on_connect(client, userdata, flags, rc):
  print("Connected with result code "+str(rc))
  client.subscribe([("pool/jacuzzi", 0), ("pool/pool_light", 0)]) 

def on_message(client, userdata, msg):
  data=dict()
  data["topic"]=msg.topic
  data["message"]=str(msg.payload.decode("utf-8","ignore"))
  p = msg.payload.decode()
  print("Message Received "+str(p))
  if msg.topic == 'pool/jacuzzi':
      print("adding to Jacuzzi_q")
      client.Jacuzzi_q.put(data)
  else:
      pool_color(p)
 
def jacuzzi(pay_load):
  global start_time
  global elapsed_time
  global jacuzzi_Sleep
  global jacuzzi_On
  elapsed_time = time.time() - start_time
  if jacuzzi_Sleep and elapsed_time>1:
        if str(pay_load) == 'off' and jacuzzi_On:
            print("Jacuzzi Commanded Off")
            jacuzzi_Sleep = not jacuzzi_Sleep
            stop_spa()
            jacuzzi_On = not jacuzzi_On
        elif str(pay_load) == 'on' and not jacuzzi_On:
            print("Jacuzzi Commanded On")
            jacuzzi_Sleep = not jacuzzi_Sleep
            start_spa()
            jacuzzi_On = True
        else:
            print("Jacuzzi same command")
        jacuzzi_Sleep = True
        start_time = time.time()

def start_spa():
  print("Start SPA Begin - Turning Valves")
  GPIO.output(valve_Intake_Pin, GPIO.HIGH)
  GPIO.output(valve_Return_Pin, GPIO.HIGH)
  time.sleep(sleep_Time_Valve);
  print("Turning Pump4 On")
  GPIO.output(pump_P4_Pin, GPIO.HIGH)
  time.sleep(sleep_Time_Pump);
  print("Turning Heater On")
  GPIO.output(heater_Temp_Select_Pin, GPIO.LOW)
  GPIO.output(heater_Control_Pin, GPIO.HIGH)
  print("Start SPA Complete")
  
def stop_spa():
  print("Stop SPA Begin - Turning Heater Off and sleeping 10m")
  GPIO.output(heater_Control_Pin, GPIO.LOW)
  time.sleep(sleep_Time_Cooldown);
  print("Turning Valves")
  GPIO.output(valve_Intake_Pin, GPIO.LOW)
  GPIO.output(valve_Return_Pin, GPIO.LOW)
  time.sleep(sleep_Time_Valve);
  print("Sleep another 10m")
  time.sleep(sleep_Time_Cooldown);
  print("Turning Pump4 Off")
  GPIO.output(pump_P4_Pin, GPIO.LOW)
  print("Stop SPA Complete")
  
def pool_color(fcolor):
    toggle = 0;
    switcher = {
        "off": 0,
        "cycle": 1,
        "party": 2,
        "romance": 3,
        "caribbean": 4,
        "american": 5,
        "sunset": 6,
        "royal": 7,
        "blue": 8,
        "green": 9,
        "red": 10,
        "white": 11,
        "magenta": 12,
        "hold": 13,
        "recall": 14
    }
    toggle = switcher.get(fcolor, 11)
    count = toggle
    if count > 0:
        while count > 0:
            print(count)
            GPIO.output(pool_Light_Pin, GPIO.LOW)
            time.sleep(sleep_Time_Light)
            GPIO.output(pool_Light_Pin, GPIO.HIGH)
            time.sleep(sleep_Time_Light)
            count = count - 1
    else:
            GPIO.output(pool_Light_Pin, GPIO.LOW)

def Jacuzzi_worker():
    """runs in own thread"""
    while Jacuzzi_worker_flag:
        while not Jacuzzi_q.empty():
            print("Getting Jacuzzi_q que")
            results = Jacuzzi_q.get()
            if results is None:
                continue
            print("message saved ",results["topic"],results["message"])
            jacuzzi(results["message"])
   
# main loop
if __name__ == "__main__":
        print("Starting Program")

client = mqtt.Client()

#Jacuzzi_worker_flag=True
t = threading.Thread(target=Jacuzzi_worker) #start Jacuzzi Thread
t.start() #start 

client.last_message=dict()
client.Jacuzzi_q=Jacuzzi_q #make queue available as part of client

try:
     # MQTT
    client.connect("x.x.x.x",1883,60)
    client.on_connect = on_connect
    client.on_message = on_message
    client.loop_start()
except:
    print("connection failed")
 
try:
    while True:
        pass

# End program cleanly with keyboard
except KeyboardInterrupt:
  print ("  Quit")

  # Reset GPIO settings
GPIO.cleanup()
client.loop_stop()
Jacuzzi_worker_flag=False #stop logging thread
time.sleep(5)
hate to point to stackoverflow, but this looks like what you need for your interrupts
https://stackoverflow.com/questions/2395...read-event
(Oct-05-2018, 09:46 PM)Larz60+ Wrote: [ -> ]hate to point to stackoverflow, but this looks like what you need for your interrupts
https://stackoverflow.com/questions/2395...read-event

It is at very specific points that I want the interrupt to happen and the roll back is dependent on exactly how far the process has come, so I decided to do it manually by doing a
while not Jacuzzi_q.empty()
at the specific points in the code.