Oct-25-2022, 09:40 PM
Hi,
Unfortunately, my experience in websocket programming is not yet great.
I am trying to migrate my current simplewebsocket server to the sockets module. In addition, asyncio should also be implemented.
Now I get one error message after the other and can't manage to debug my script.
Could someone please help me to get the script running?
Latest: Websocketserver.py
The old version:
Unfortunately, my experience in websocket programming is not yet great.
I am trying to migrate my current simplewebsocket server to the sockets module. In addition, asyncio should also be implemented.
Now I get one error message after the other and can't manage to debug my script.
Error:Traceback (most recent call last):
File "websocketserver5_asyncio.py", line 407, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "websocketserver5_asyncio.py", line 384, in main
async with websockets.serve("192.168.0.24", PORTNUM, functools.partial(Websocket(), client)):
TypeError: __init__() missing 1 required positional argument: 'client'
According to the documentation, it should actually be implemented correctly. -> https://websockets.readthedocs.io/en/stable/howto/faq.htmlCould someone please help me to get the script running?
Latest: Websocketserver.py
#!/usr/bin/env python import asyncio import json import os import secrets import signal import websockets from websockets import connect import sys import shutil import logging import time import sqlite3 from sqlite3 import Error import functools from functools import partial from pymodbus.client.sync import ModbusTcpClient as ModbusClient logger = logging.getLogger(__name__) # Modbus Config UNIT = 0x1 # Adresses to Hardware HOST = '192.168.0.24' #HOST = '192.168.0.50' # Portnumber PORT = 5020 #PORT = 502 # Coil Output Base Address COILBASE = 512 # Websocket Portnummer listen PORTNUM = 8001 class Websocket(): def __init__(self, client): #, server, sock, address try: super(Websocket, self).__init__(server, sock, address) self.modbus = client self.functions = { "relais1": partial(self.simple_switch, COILBASE + 16), "relais2": partial(self.simple_switch, COILBASE + 1), "relais3": partial(self.simple_switch, COILBASE + 2), "set_temp1": partial(self.set_temperature1, 524), "temperatur1": partial(self.read_temperature, 2), "temperatur2": partial(self.read_temperature, 3), "heizen1": partial(self.heating, 524), "heizen2": partial(self.heating, 525), "heatread": partial(self.simple_read, 524), "systemp": self.systemp } except Exception as exception: logger.exception("constructor") raise def __await__(self): # see: http://stackoverflow.com/a/33420721/1113207 return self._async_init().__await__() async def _async_init(self): self._conn = connect("wss://echo.websocket.org") self.websocket = await self._conn.__aenter__() return self def simple_switch(self, address, value): """ value can be one of "on", "off" or "get" """ if value in ['on', 'off']: rq = self.modbus.write_coil(address, value == 'on', unit=UNIT) time.sleep(0.01) print("rq = self.modbus.write_coil(address, value == on, unit=UNIT)") elif value != 'get': raise ValueError(value) # Relaisregister reading rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) return "on" if rp.bits[0] else "off" async def set_temperature1(self, address, value): print("set_temperature1 was triggerd") # Loop for temperature controll print("database_reading") print(self.database_reading()[1][0]) if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'on': print("DB-Reading from set_temperature1 'heizen1': ", self.database_reading()[1][1]) while True: print("Loop!!!!!!!") rp = self.modbus.read_coils(address, unit=UNIT) print(rp.bits[0]) time.sleep(0.01) if rp.bits[0] == False: rq = self.modbus.write_coil(address, value == "on", unit=UNIT) time.sleep(0.01) print("Heating is active!!!!") print(address) time.sleep(0.01) print(type(self.read_temperature( address == 2, 'get'))) print(type(value)) if float(self.read_temperature( address == 2, 'get')) >= float(value): print("Heating is on!!!!!!!") time.sleep(0.01) print(address) rq = self.modbus.write_coil(address, value == "off", unit=UNIT) time.sleep(0.01) print("Heating Temperture is achieved! -> deactivate heating") if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'off': break await asyncio.sleep(1) # Breake müsste bei Heizen1 aus kommen, nicht wenn nur die Soll Temp erreicht wurde # return NULL def database_reading(self): def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # Database state request | If heating button was pressed sqliteConnection = sqlite3.connect("state") sqliteConnection.row_factory = dict_factory zeiger = sqliteConnection.cursor() zeiger.execute("SELECT * FROM heating_relais_state") # zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state") row = zeiger.fetchone() rowDict = dict(zip([c[0] for c in zeiger.description], row)) # row = json.dumps(row) # row1 = json.loads(row) print(type(row)) print("row1") # print(row1) # row1 = {} # print(row[0]) # Convert a dict Array into List[] result = row.items() row1 = list(result) print(row1) # print(list(row.keya())[1]) # print(list(row.items())[1]) # print(list(row.values())[1]) print(row1[1][1]) # self.sendMessage(json.dumps(row)) # row = zeiger.fetchone() # print("row2") # print(row) # self.sendMessage(json.dumps(row)) # print(rowDict) # inhalt = zeiger.fetchall() sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") # return json.dumps(row) return row1 def simple_read(self, address, value): try: if value != 'get': raise ValueError(value) # Relaisregister reading | If heating wire is physically on rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # Database state request | If heating button was pressed sqliteConnection = sqlite3.connect("state") sqliteConnection.row_factory = dict_factory zeiger = sqliteConnection.cursor() zeiger.execute("SELECT * FROM heating_relais_state") # zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state") row = zeiger.fetchone() rowDict = dict(zip([c[0] for c in zeiger.description], row)) print("row12") print(row) self.sendMessage(json.dumps(row)) # Jump to the next row row = zeiger.fetchone() self.sendMessage(json.dumps(row)) # print(rowDict) # inhalt = zeiger.fetchall() sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") # json_object = json.dumps(inhalt) # print(json_object) time.sleep(0.01) return "on" if rp.bits[0] else "off" except AttributeError: pass # Send temperature data to client def read_temperature(self, address, value): if value != "get": raise ValueError(value) response = self.modbus.read_holding_registers(0x00, 8, unit=UNIT) time.sleep(0.01) t = response.registers[address] time.sleep(0.01) z = t/10 return z # return response.registers[address] # return response.registers[address] # Save heating_state in database # Get the heating_state with 'get' def heating(self, address, value): # Heizen an | Modul an dem der Heizdraht geschaltet wird if value in ["on", "off"]: # rq = self.modbus.write_coil(524, value == "on", unit=UNIT) # time.sleep(0.05) # rq = self.modbus.write_coil(address, value == "on", unit=UNIT) # time.sleep(0.01) # Database Handling | Heating_State = true does not mean Relaise_State = True | this depends on the temperature sqliteConnection = sqlite3.connect("state") zeiger = sqliteConnection.cursor() sql_anweisung = ("""CREATE TABLE IF NOT EXISTS heating_relais_state ( heating_relais_address INTEGER, state TEXT)""") zeiger.execute(sql_anweisung) # print(address) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (address, value)) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (524, value)) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (525, value)) # zeiger.execute(sql_anweisung) # if value == "on": zeiger.execute("UPDATE heating_relais_state SET state =? WHERE heating_relais_address =?", (value, address)) # zeiger.execute(sql_anweisung) zeiger.execute("SELECT * FROM heating_relais_state") inhalt = zeiger.fetchall() for inhalt in inhalt: print(inhalt) sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") elif value != 'get': raise ValueError(value) rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) return "on" if rp.bits[0] else "off" def systemp(self, value): # result = subprocess.run(["vcgencmd", "measure_temp"], stdout=subprocess.PIPE) # temperature = result.stdout.partition("temp=")[-1].partition("'C\n")[0] res = os.popen("vcgencmd measure_temp").readline() temp = (res.replace("temp=","").replace("'C\n","")) time.sleep(0.01) return temp async def receive(self): try: # print("Echoing '%s'" % self.data) # print("Echoing '%s'" % json.loads(self.data)) message = await self.websocket.recv() commands = json.loads(message) new_state = {} print("Echoing '%s'" % commands) # print(commands['relais1']) for key, value in commands.items(): function = self.functions[key] print("function '%s'" % function) new_state[key] = function(value) print("new_state '%s'" % new_state) print("key '%s'" % key) print("value '%s'" % value) print(json.dumps(new_state)) #self.sendMessage(json.dumps(new_state)) await self.websocket.send(json.dumps(new_state)) print("Echoing '%s'" % commands) # print(commands['relais1']) # self.sendMessage(json.dumps(commands)) # sampleDict = {'name': 'John', 'age': 30, 'data': 'New York'} # jsonData = json.dumps(sampleDict) # self.sendMessage(json.dumps(sampleDict)) except Exception as exception: logger.exception("handle message") raise return await self.websocket.recv() async def sendMessage(self, message): await self.websocket.send(message) async def close(self): await self._conn.__aexit__(*sys.exc_info()) async def main(): logging.basicConfig() # echo = await Websocket() with ModbusClient(host=HOST, port=PORT) as client: client.connect() time.sleep(0.025) print("Websocket server on port %s" % PORTNUM) # server = SimpleWebSocketServer('', PORTNUM, partial(Echo, client)) # server = await websockets.serve('', PORTNUM, partial(Echo, client)) # async with websockets.serve('', partial(Websocket, client), PORTNUM) # await asyncio.Future() # run forever # create_protocol = functools.partial(Websocket(), client) # async with websockets.serve('', "192.168.0.24", PORTNUM, create_protocol): async with websockets.serve("192.168.0.24", PORTNUM, functools.partial(Websocket(), client)): #async with websockets.serve(handler, "", 8001): await asyncio.Future() # run forever if __name__ == '__main__': # main() asyncio.run(main())
The old version:
import signal, sys, json import os import shutil import logging import time import sqlite3 from sqlite3 import Error from functools import partial from pymodbus.client.sync import ModbusTcpClient as ModbusClient from SimpleWebSocketServer import WebSocket, SimpleWebSocketServer logger = logging.getLogger(__name__) # Adressen UNIT = 0x1 HOST = '192.168.0.24' #HOST = '192.168.0.50' PORT = 5020 #PORT = 502 # Coil Output Base Address COILBASE = 512 # Portnummer listen PORTNUM = 8001 # Websocket class to echo received data class Echo(WebSocket): def __init__(self, client, server, sock, address): try: super(Echo, self).__init__(server, sock, address) self.modbus = client self.functions = { "relais1": partial(self.simple_switch, COILBASE + 16), "relais2": partial(self.simple_switch, COILBASE + 1), "relais3": partial(self.simple_switch, COILBASE + 2), "set_temp1": partial(self.set_temperature1, 524), "temperatur1": partial(self.read_temperature, 2), "temperatur2": partial(self.read_temperature, 3), "heizen1": partial(self.heating, 524), "heizen2": partial(self.heating, 525), "heatread": partial(self.simple_read, 524), "systemp": self.systemp } except Exception as exception: logger.exception("constructor") raise def simple_switch(self, address, value): """ value can be one of "on", "off" or "get" """ if value in ['on', 'off']: rq = self.modbus.write_coil(address, value == 'on', unit=UNIT) time.sleep(0.01) print("rq = self.modbus.write_coil(address, value == on, unit=UNIT)") elif value != 'get': raise ValueError(value) # Relaisregister reading rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) return "on" if rp.bits[0] else "off" def set_temperature1(self, address, value): print("set_temperature1 was triggerd") # Loop for temperature controll print("database_reading") print(self.database_reading()[1][0]) if (self.database_reading()[0][1]) == 524 and (self.database_reading()[1][1]) == 'on': print("DB-Reading from set_temperature1 'heizen1': ", self.database_reading()[1][1]) while True: print("Loop!!!!!!!") rp = self.modbus.read_coils(address, unit=UNIT) print(rp.bits[0]) time.sleep(0.01) if rp.bits[0] == False: rq = self.modbus.write_coil(address, value == "on", unit=UNIT) time.sleep(0.01) print("Heating is active!!!!") print(address) time.sleep(0.01) print(type(self.read_temperature( address == 2, 'get'))) print(type(value)) if float(self.read_temperature( address == 2, 'get')) >= float(value): print("Heating is on!!!!!!!") time.sleep(0.01) print(address) rq = self.modbus.write_coil(address, value == "off", unit=UNIT) time.sleep(0.01) print("Heating Temperture is achieved! -> deactivate heating") break # Breake müsste bei Heizen1 aus kommen, nicht wenn nur die Soll Temp erreicht wurde # return NULL def database_reading(self): def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # Database state request | If heating button was pressed sqliteConnection = sqlite3.connect("state") sqliteConnection.row_factory = dict_factory zeiger = sqliteConnection.cursor() zeiger.execute("SELECT * FROM heating_relais_state") # zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state") row = zeiger.fetchone() rowDict = dict(zip([c[0] for c in zeiger.description], row)) # row = json.dumps(row) # row1 = json.loads(row) print(type(row)) print("row1") # print(row1) # row1 = {} # print(row[0]) # Convert a dict Array into List[] result = row.items() row1 = list(result) print(row1) # print(list(row.keya())[1]) # print(list(row.items())[1]) # print(list(row.values())[1]) print(row1[1][1]) # self.sendMessage(json.dumps(row)) # row = zeiger.fetchone() # print("row2") # print(row) # self.sendMessage(json.dumps(row)) # print(rowDict) # inhalt = zeiger.fetchall() sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") # return json.dumps(row) return row1 def simple_read(self, address, value): try: if value != 'get': raise ValueError(value) # Relaisregister reading | If heating wire is physically on rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) def dict_factory(cursor, row): d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d # Database state request | If heating button was pressed sqliteConnection = sqlite3.connect("state") sqliteConnection.row_factory = dict_factory zeiger = sqliteConnection.cursor() zeiger.execute("SELECT * FROM heating_relais_state") # zeiger.execute("SELECT heating_relais_address = 524 FROM heating_relais_state") row = zeiger.fetchone() rowDict = dict(zip([c[0] for c in zeiger.description], row)) print("row12") print(row) self.sendMessage(json.dumps(row)) # Jump to the next row row = zeiger.fetchone() self.sendMessage(json.dumps(row)) # print(rowDict) # inhalt = zeiger.fetchall() sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") # json_object = json.dumps(inhalt) # print(json_object) time.sleep(0.01) return "on" if rp.bits[0] else "off" except AttributeError: pass # Send temperature data to client def read_temperature(self, address, value): if value != "get": raise ValueError(value) response = self.modbus.read_holding_registers(0x00, 8, unit=UNIT) time.sleep(0.01) t = response.registers[address] time.sleep(0.01) z = t/10 return z # return response.registers[address] # return response.registers[address] # Save heating_state in database # Get the heating_state with 'get' def heating(self, address, value): # Heizen an | Modul an dem der Heizdraht geschaltet wird if value in ["on", "off"]: # rq = self.modbus.write_coil(524, value == "on", unit=UNIT) # time.sleep(0.05) # rq = self.modbus.write_coil(address, value == "on", unit=UNIT) # time.sleep(0.01) # Database Handling | Heating_State = true does not mean Relaise_State = True | this depends on the temperature sqliteConnection = sqlite3.connect("state") zeiger = sqliteConnection.cursor() sql_anweisung = ("""CREATE TABLE IF NOT EXISTS heating_relais_state ( heating_relais_address INTEGER, state TEXT)""") zeiger.execute(sql_anweisung) # print(address) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (address, value)) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (524, value)) # zeiger.execute("INSERT INTO heating_relais_state VALUES (?,?)", (525, value)) # zeiger.execute(sql_anweisung) # if value == "on": zeiger.execute("UPDATE heating_relais_state SET state =? WHERE heating_relais_address =?", (value, address)) # zeiger.execute(sql_anweisung) zeiger.execute("SELECT * FROM heating_relais_state") inhalt = zeiger.fetchall() for inhalt in inhalt: print(inhalt) sqliteConnection.commit() if sqliteConnection: sqliteConnection.close() print("The SQLite connection is closed") elif value != 'get': raise ValueError(value) rp = self.modbus.read_coils(address, unit=UNIT) time.sleep(0.01) return "on" if rp.bits[0] else "off" def systemp(self, value): # result = subprocess.run(["vcgencmd", "measure_temp"], stdout=subprocess.PIPE) # temperature = result.stdout.partition("temp=")[-1].partition("'C\n")[0] res = os.popen("vcgencmd measure_temp").readline() temp = (res.replace("temp=","").replace("'C\n","")) time.sleep(0.01) return temp def handleMessage(self): try: # print("Echoing '%s'" % self.data) # print("Echoing '%s'" % json.loads(self.data)) commands = json.loads(self.data) new_state = {} print("Echoing '%s'" % commands) # print(commands['relais1']) for key, value in commands.items(): function = self.functions[key] print("function '%s'" % function) new_state[key] = function(value) print("new_state '%s'" % new_state) print("key '%s'" % key) print("value '%s'" % value) print(json.dumps(new_state)) self.sendMessage(json.dumps(new_state)) print("Echoing '%s'" % commands) # print(commands['relais1']) # self.sendMessage(json.dumps(commands)) # sampleDict = {'name': 'John', 'age': 30, 'data': 'New York'} # jsonData = json.dumps(sampleDict) # self.sendMessage(json.dumps(sampleDict)) except Exception as exception: logger.exception("handle message") raise def handleConnected(self): print("Connected") def handleClose(self): print("Disconnected") def main(): logging.basicConfig() with ModbusClient(host=HOST, port=PORT) as client: client.connect() time.sleep(0.02) print("Websocket server on port %s" % PORTNUM) # server = SimpleWebSocketServer('', PORTNUM, Echo) server = SimpleWebSocketServer('', PORTNUM, partial(Echo, client)) try: server.serveforever() finally: server.close() if __name__ == "__main__": main()