Python Forum
Multi-processing to communicate with microcontrollers
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Multi-processing to communicate with microcontrollers
#1
Hi everyone,

I have 12 ESP32's (microcontroller) that connect to the network as TCP/IP server and I would to make my PC as a single TCP/IP Client. My overall idea is that I would like to establish connection between each process of the PC to each microcontroller using sock.connect and begin storing the data to the csv file. Below is my code on the client side.

#!/usr/bin/env python
"""
    Parallel Collection code for Khoi
        by Sarah Aguasvivas Manzano
    This could be extended to multirhreading

"""


import struct
import socket
import sys
import time
import os
from multiprocessing import Pool, Manager, Value, Queue
# Pool --> pool of cores
# Manager--> scheduler capable of making locks
# Value--> I forgot
# Queue--> allows you to share data across processes


NUM_ESP= 12
BUFFER_SIZE= 10000000000
STARTING_TCP_PORT=  5099

def collection_function(ready_to_read, IP, TCP_PORT, espID):
#def collection_function(x):
    # 'q' is optional. However, I added it because most likely you will need
    # to share some sort of data structure across processes
    # go to main to see how to declare it. In this callback
    # function you can read and write in q from any process
    # However, this is a double-edged sword because you
    # have to work so you have a good idea of who wrote what
    # and when.
    filename= ('collect_parallel'+ '_' + str(espID)+'.csv')
    sock= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((IP, TCP_PORT))
    print("Connection established for esp # " + str(espID))
    try:
        listl=[]
        while (ready_to_read):
            data= sock.recv(BUFFER_SIZE)
            str1= str(len(data)//4) + "I"

            data= struct.unpack(str1, data)
            listl+= list(data)

    except KeyboardInterrupt:
        print("saving data file for esp #"+ str(espID))
        filef= open(filename, 'w')
        listl= ",".join(str(bit) for bit in listl)
        filef.write(listl)
        sock.close()

if __name__ == "__main__":
    print("Trying to Connect to PZTs")
    manager = Manager()
    d = manager.dict() #the processes will be sharing a dictionary (optional)
        
    ESPIPlist={}
    
    # These were the IPs that I needed to connect
    # change these to the ones you need
    ESPIPlist[0]='192.168.1.14'
    ESPIPlist[1]='192.168.1.4'
    ESPIPlist[2]='192.168.1.3'
    ESPIPlist[3]='192.168.1.5'
    ESPIPlist[4]='192.168.1.6'
    ESPIPlist[5]='192.168.1.7'
    ESPIPlist[6]='192.168.1.8'
    ESPIPlist[7]='192.168.1.9'
    ESPIPlist[8]='192.168.1.10'
    ESPIPlist[9]='192.168.1.11'
    ESPIPlist[10]='192.168.1.12'
    ESPIPlist[11]='192.168.1.13'
    
    
    
    ready_to_read= manager.Value('ready_to_read', False) # example of using semaphores across all processes
    ready_to_read.value= False
    
    pool= Pool(processes=NUM_ESP)
    results=[] #this is optional, you can just apply_async but this worked for me
    for count in range(NUM_ESP):
        # Here you haven't run yet the collection_function. You are just setting up the processes
        results.append(
        pool.apply_async(collection_function, (ready_to_read, ESPIPlist[count], STARTING_TCP_PORT+count, count))
        )

    pool.close()
    ready_to_read.value=True
    for result in results:
        result.get()

    pool.join()
Error:
runfile('/home/khoi/esp/workspace/ESP_ADS_WLAN/collect_parallel.py', wdir='/home/khoi/esp/workspace/ESP_ADS_WLAN') Trying to Connect to PZTs Connection established for esp # 4 Connection established for esp # 3 Connection established for esp # 5 Connection established for esp # 11 Connection established for esp # 9 Connection established for esp # 6 Connection established for esp # 7 Connection established for esp # 2 Traceback (most recent call last): File "<ipython-input-2-465ccd5d1510>", line 1, in <module> runfile('/home/khoi/esp/workspace/ESP_ADS_WLAN/collect_parallel.py', wdir='/home/khoi/esp/workspace/ESP_ADS_WLAN') File "/home/khoi/anaconda3/lib/python3.7/site-packages/spyder_kernels/customize/spydercustomize.py", line 704, in runfile execfile(filename, namespace) File "/home/khoi/anaconda3/lib/python3.7/site-packages/spyder_kernels/customize/spydercustomize.py", line 108, in execfile exec(compile(f.read(), filename, 'exec'), namespace) File "/home/khoi/esp/workspace/ESP_ADS_WLAN/collect_parallel.py", line 93, in <module> result.get() File "/home/khoi/anaconda3/lib/python3.7/multiprocessing/pool.py", line 683, in get raise self._value OSError: [Errno 113] No route to host
The problem is that sometimes I am able to connect to all esp's, other times I cannot. The esp that are connected also change. I wonder if you can detect any problem on the Python side?

The way the current code works is that each process establishes the connection and immediately collect data without waiting for all connections to establish. I wonder if it is better to estabhlish the connections to all ESP's before I collect data.

Thank you,

Khoi Ly

I am sorry, I was able to fix the problem. It was on the esp32 side rather than my python
Reply
#2
You can use asyncio.
This code is not tested, but should work.

import asyncio
import struct
import csv


async def connect_and_recv(ip, port):
    reader, writer = await asyncio.open_connection(ip, port)
    data = await reader.read(1024)
    writer.close()
    await writer.wait_closed()
    result = parse(data)
    return result


async def collect_data(addresses):
    tasks = [connect_and_recv(ip, port) for ip, port in addresses]
    return await asyncio.gather(*tasks)


def parse(data):
    integers = len(data) // 4
    st_format = str(integers) + 'I'
    return struct.unpack(st_format, data)


def write_data(data):
    with open('data.csv', 'w') as fd:
        writer = csv.writer(fd)
        writer.writerow(['id', 'data1', 'data2', 'data3', 'data4'])
        for index, row in enumerate(data):
            writer.writerow([index, *row])


STARTING_TCP_PORT=  5099
ips = [
    '192.168.1.4', 
    '192.168.1.3',
    '192.168.1.5',
    '192.168.1.6',
    '192.168.1.7',
    '192.168.1.8',
    '192.168.1.9',
    '192.168.1.10',
    '192.168.1.11',
    '192.168.1.12',
    '192.168.1.13',
    ]
addresses = [(ip, STARTING_TCP_PORT + idx) for idx, ip in enumerate(ips)]
loop = asyncio.get_event_loop()
task = collect_data(addresses)
print('Collecting data')
data = loop.run_until_complete(task)
print('Writing data')
write_data(data)
print('Finished')
You should do something with exception handling.
You can decide if you kill the whole data collection process of data, if only one device fails.
Also it's a bit strange, that each esp is listening on a different port.
To simplify this, you could listem them all on the start port. Then the order does not matter.
If you need to know which esp has sent which data, the esp should send it's own id as unsigned short for example.
Then you don't need to enumerate all esps. In this case, you have to change the fortmat for struct a little bit (H on the beginning).

More ressources: https://docs.python.org/3/library/asynci...ml#streams
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  How to communicate between Electron Framework and python Abdul_Rafey 0 171 Mar-09-2024, 12:18 PM
Last Post: Abdul_Rafey
  Conceptualizing modulus. How to compare & communicate with values in a Dictionary Kaanyrvhok 7 3,903 Mar-15-2021, 05:43 PM
Last Post: Kaanyrvhok
  Updating variables in Multi-processing WiPi 11 10,603 Feb-13-2021, 10:58 AM
Last Post: WiPi
  Multi-processing - problem with running multiple *.py files at the same time Antonio 5 3,745 Sep-12-2018, 01:08 PM
Last Post: volcano63
  multi-processing dR_Garuby 1 2,612 Mar-24-2018, 05:59 PM
Last Post: woooee
  How does multi-processing in python increase the execution time? kadsank 0 2,292 Jan-15-2018, 01:15 PM
Last Post: kadsank
  proc communicate not exiting on python subprocess timeout leoonardoo 0 3,750 Sep-13-2017, 09:54 AM
Last Post: leoonardoo

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020