Multi-processing to communicate with microcontrollers - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: General Coding Help (https://python-forum.io/forum-8.html) +--- Thread: Multi-processing to communicate with microcontrollers (/thread-16477.html) |
Multi-processing to communicate with microcontrollers - Khoily - Mar-01-2019 Hi everyone, I have 12 ESP32's (microcontroller) that connect to the network as TCP/IP server and I would to make my PC as a single TCP/IP Client. My overall idea is that I would like to establish connection between each process of the PC to each microcontroller using sock.connect and begin storing the data to the csv file. Below is my code on the client side. #!/usr/bin/env python """ Parallel Collection code for Khoi by Sarah Aguasvivas Manzano This could be extended to multirhreading """ import struct import socket import sys import time import os from multiprocessing import Pool, Manager, Value, Queue # Pool --> pool of cores # Manager--> scheduler capable of making locks # Value--> I forgot # Queue--> allows you to share data across processes NUM_ESP= 12 BUFFER_SIZE= 10000000000 STARTING_TCP_PORT= 5099 def collection_function(ready_to_read, IP, TCP_PORT, espID): #def collection_function(x): # 'q' is optional. However, I added it because most likely you will need # to share some sort of data structure across processes # go to main to see how to declare it. In this callback # function you can read and write in q from any process # However, this is a double-edged sword because you # have to work so you have a good idea of who wrote what # and when. filename= ('collect_parallel'+ '_' + str(espID)+'.csv') sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((IP, TCP_PORT)) print("Connection established for esp # " + str(espID)) try: listl=[] while (ready_to_read): data= sock.recv(BUFFER_SIZE) str1= str(len(data)//4) + "I" data= struct.unpack(str1, data) listl+= list(data) except KeyboardInterrupt: print("saving data file for esp #"+ str(espID)) filef= open(filename, 'w') listl= ",".join(str(bit) for bit in listl) filef.write(listl) sock.close() if __name__ == "__main__": print("Trying to Connect to PZTs") manager = Manager() d = manager.dict() #the processes will be sharing a dictionary (optional) ESPIPlist={} # These were the IPs that I needed to connect # change these to the ones you need ESPIPlist[0]='192.168.1.14' ESPIPlist[1]='192.168.1.4' ESPIPlist[2]='192.168.1.3' ESPIPlist[3]='192.168.1.5' ESPIPlist[4]='192.168.1.6' ESPIPlist[5]='192.168.1.7' ESPIPlist[6]='192.168.1.8' ESPIPlist[7]='192.168.1.9' ESPIPlist[8]='192.168.1.10' ESPIPlist[9]='192.168.1.11' ESPIPlist[10]='192.168.1.12' ESPIPlist[11]='192.168.1.13' ready_to_read= manager.Value('ready_to_read', False) # example of using semaphores across all processes ready_to_read.value= False pool= Pool(processes=NUM_ESP) results=[] #this is optional, you can just apply_async but this worked for me for count in range(NUM_ESP): # Here you haven't run yet the collection_function. You are just setting up the processes results.append( pool.apply_async(collection_function, (ready_to_read, ESPIPlist[count], STARTING_TCP_PORT+count, count)) ) pool.close() ready_to_read.value=True for result in results: result.get() pool.join() The problem is that sometimes I am able to connect to all esp's, other times I cannot. The esp that are connected also change. I wonder if you can detect any problem on the Python side?The way the current code works is that each process establishes the connection and immediately collect data without waiting for all connections to establish. I wonder if it is better to estabhlish the connections to all ESP's before I collect data. Thank you, Khoi Ly I am sorry, I was able to fix the problem. It was on the esp32 side rather than my python RE: Multi-processing to communicate with microcontrollers - DeaD_EyE - Mar-01-2019 You can use asyncio. This code is not tested, but should work. import asyncio import struct import csv async def connect_and_recv(ip, port): reader, writer = await asyncio.open_connection(ip, port) data = await reader.read(1024) writer.close() await writer.wait_closed() result = parse(data) return result async def collect_data(addresses): tasks = [connect_and_recv(ip, port) for ip, port in addresses] return await asyncio.gather(*tasks) def parse(data): integers = len(data) // 4 st_format = str(integers) + 'I' return struct.unpack(st_format, data) def write_data(data): with open('data.csv', 'w') as fd: writer = csv.writer(fd) writer.writerow(['id', 'data1', 'data2', 'data3', 'data4']) for index, row in enumerate(data): writer.writerow([index, *row]) STARTING_TCP_PORT= 5099 ips = [ '192.168.1.4', '192.168.1.3', '192.168.1.5', '192.168.1.6', '192.168.1.7', '192.168.1.8', '192.168.1.9', '192.168.1.10', '192.168.1.11', '192.168.1.12', '192.168.1.13', ] addresses = [(ip, STARTING_TCP_PORT + idx) for idx, ip in enumerate(ips)] loop = asyncio.get_event_loop() task = collect_data(addresses) print('Collecting data') data = loop.run_until_complete(task) print('Writing data') write_data(data) print('Finished')You should do something with exception handling. You can decide if you kill the whole data collection process of data, if only one device fails. Also it's a bit strange, that each esp is listening on a different port. To simplify this, you could listem them all on the start port. Then the order does not matter. If you need to know which esp has sent which data, the esp should send it's own id as unsigned short for example. Then you don't need to enumerate all esps. In this case, you have to change the fortmat for struct a little bit (H on the beginning). More ressources: https://docs.python.org/3/library/asyncio-stream.html#streams |