I'm coding a pyqt5 interface to exchange data with an arduino through serial communication.
Basically, the program sends a command, the arduino makes a measurement (or whatever) and sends back some values. I want to be able to do this either once or sequentially with minimum time wasted between the commands (e.g. 1000 measurements total, each measurement made every 0.x seconds).
I'm adding some threading to avoid the interface from freezing while it waits for data from the arduino. There are quite a few approaches for threading with pyqt, but I'm not sure which is better suited.
The code below shows the 3 main methods I found online. All three send a "Hello World!" byte string to the arduino, which is echoed back to the program
All three methods seem to work with this very simplified example, but I'm wondering which (if any) would be best suited, especially for sequential measurements. Any thoughts?
Arduino code:
Basically, the program sends a command, the arduino makes a measurement (or whatever) and sends back some values. I want to be able to do this either once or sequentially with minimum time wasted between the commands (e.g. 1000 measurements total, each measurement made every 0.x seconds).
I'm adding some threading to avoid the interface from freezing while it waits for data from the arduino. There are quite a few approaches for threading with pyqt, but I'm not sure which is better suited.
The code below shows the 3 main methods I found online. All three send a "Hello World!" byte string to the arduino, which is echoed back to the program
- Single Worker: both worker object and thread are destroyed after the thread finishes. This seems suited for single shot commands, but I guess it will create a fair bit of overhead for continuous polling.
- Loop Worker: same principle as above, except the thread constantly loops over serial.readline() and sends back the results to the main thread until the user gets the worker function to quit with worker.running = False. This seems to fulfill the purpose of reusing the same thread for multiple commands, but its looks a bit hacky (maybe?)
- QRunnable: as far as I understood, multiple QRunnables can be fed to the thread pool, but I am not sure if there is a significant advantage over the other 2 methods (except that qthreadpool only needs to be instanciated once)
All three methods seem to work with this very simplified example, but I'm wondering which (if any) would be best suited, especially for sequential measurements. Any thoughts?
Arduino code:
void setup() { // put your setup code here, to run once: Serial.begin(9600); } void loop() { // put your main code here, to run repeatedly: if(Serial.available() > 0){ Serial.write(Serial.read()); } }Python code:
import time import serial from PyQt5.QtCore import pyqtSignal, QObject, QThread, QRunnable, pyqtSlot, QThreadPool from PyQt5.QtWidgets import QMainWindow, QWidget, QPushButton, QVBoxLayout, QCheckBox class SingleWorker(QObject): finished = pyqtSignal() result = pyqtSignal(object) def __init__(self, ser): super(SingleWorker, self).__init__() self.ser = ser def run(self): msg = None time.sleep(0.1) for i in range(20): if self.ser.inWaiting(): msg = b'SW:' + self.ser.readline() break self.result.emit(msg) self.finished.emit() class LoopWorker(QObject): finished = pyqtSignal() result = pyqtSignal(object) def __init__(self, ser): super(LoopWorker, self).__init__() self.ser = ser self.running = True def run(self): time.sleep(0.1) while self.running: if self.ser.inWaiting(): msg = b"LW:" + self.ser.readline() self.result.emit(msg) self.finished.emit() class WorkerSignals(QObject): finished = pyqtSignal() result = pyqtSignal(object) class RunnableWorker(QRunnable): def __init__(self, ser): super(RunnableWorker, self).__init__() self.signals = WorkerSignals() self.ser = ser @pyqtSlot() def run(self): msg = None time.sleep(0.1) for i in range(20): time.sleep(0.1) if self.ser.inWaiting(): msg = b'RW:' + self.ser.readline() break self.signals.result.emit(msg) self.signals.finished.emit() class MainWin(QMainWindow): def __init__(self): super().__init__() self.central_widget = QWidget() self.vbox = QVBoxLayout(self.central_widget) self.btn1 = QPushButton("Send to single thread", self.central_widget) self.check = QCheckBox("Connect loop", self) self.btn2 = QPushButton("Send to looping thread", self.central_widget) self.btn2.setEnabled(False) self.btn3 = QPushButton("Send with QRunnable", self.central_widget) self.vbox.addWidget(self.btn1) self.vbox.addWidget(self.check) self.vbox.addWidget(self.btn2) self.vbox.addWidget(self.btn3) self.setCentralWidget(self.central_widget) self.btn1.pressed.connect(self.send_single) self.check.toggled.connect(self.start_loop) self.btn2.pressed.connect(self.send_msg) self.btn3.pressed.connect(self.qrunnable_send) self.ser = serial.Serial("COM4", 9600) self.threadpool = QThreadPool() def send_single(self): self.ser.write(b"hello world!\n") self.worker = SingleWorker(self.ser) self.thread = QThread() self.worker.moveToThread(self.thread) self.thread.started.connect(self.worker.run) self.worker.finished.connect(self.thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.thread.finished.connect(self.thread.deleteLater) self.thread.finished.connect(self.reset_ui) self.worker.result.connect(self.print_msg) self.btn1.setEnabled(False) self.btn2.setEnabled(False) self.btn3.setEnabled(False) self.thread.start() def send_msg(self): self.ser.write(b"hello world!\n") def start_loop(self, running): if running: self.l_worker = LoopWorker(self.ser) self.l_thread = QThread() self.l_worker.moveToThread(self.l_thread) self.l_thread.started.connect(self.l_worker.run) self.l_worker.finished.connect(self.l_thread.quit) self.l_worker.finished.connect(self.l_worker.deleteLater) self.l_thread.finished.connect(self.l_thread.deleteLater) self.l_worker.result.connect(self.print_msg) self.l_thread.finished.connect(self.reset_ui) self.l_thread.start() self.btn1.setEnabled(False) self.btn2.setEnabled(True) self.btn3.setEnabled(False) else: self.l_worker.running = False def qrunnable_send(self): self.ser.write(b"hello world!\n") worker = RunnableWorker(self.ser) worker.signals.result.connect(self.print_msg) worker.signals.finished.connect(self.reset_ui) self.btn1.setEnabled(False) self.btn2.setEnabled(False) self.btn3.setEnabled(False) self.threadpool.start(worker) def receive(self): time.sleep(1) if self.ser.inWaiting(): print(self.ser.read_all()) def print_msg(self, i): print(i) def reset_ui(self): self.btn1.setEnabled(True) self.btn2.setEnabled(False) self.btn3.setEnabled(True) if __name__ == "__main__": from PyQt5.QtWidgets import QApplication app = QApplication([]) win = MainWin() win.show() app.exit(app.exec_())