Feb-12-2021, 02:32 PM
I adapted a snippet of code to send commands between my controller (Raspberry Pi) to a remote server using the OCPP 1.6 protocol.
The script initializes connection, sends a heartbeat to the server but problem is that this connection drops unexpectedly after about 1-2 hours. Is there anything I can do to ensure that the connection stays open indefinitely?
Herewith is the code:
The script initializes connection, sends a heartbeat to the server but problem is that this connection drops unexpectedly after about 1-2 hours. Is there anything I can do to ensure that the connection stays open indefinitely?
Herewith is the code:
import asyncio import websockets import RPi.GPIO as GPIO import time from ocpp.v16 import call, ChargePoint as cp from ocpp.v16.enums import RegistrationStatus #from gpiozero import Button #PB_port_1 = Button(23) #left push button on charger #PB_port_2 = Button(24) #right push button on charger GPIO.setmode (GPIO.BCM) GPIO.setup(23,GPIO.IN,pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(23,GPIO.RISING, bouncetime= 1000) class ChargePoint(cp): async def send_boot_notification(self): request = call.BootNotificationPayload( charge_point_model="DUAL PORT AC", charge_point_vendor="MENNEKES" ) response = await self.call(request) if response.status == RegistrationStatus.accepted: print("Connected to EV back office.") print("Heartbeat: " + str(response.interval)) print("Heartbeat: " + str(response.current_time)) async def runner(self): while True: if GPIO.event_detected(23): print("Button 23 pressed") # secondary loop to manage the operations of the charge station async def running_state(self): keep_running = True while keep_running: if PB_port_1.is_pressed or PB_port_2.is_pressed: #logic inverted; when PB is released on charger print("Push Button released") await self.Send_StatusNotification_available() break else: #logic inverted; when PB is pressed on charger print("Push Button pressed") await self.Send_StatusNotification_unavailable() break async def Heartbeat(self): while True: request = call.HeartbeatPayload() response = await self.call(request) #("Heartbeat: " + str(response.current_time)) time.sleep(60) # charge cable connected, no fault - status change to available async def Send_StatusNotification_available(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "NoError", status = "Available" ) response = await self.call(request) print ("Charge Point Available") # charge cable connected - status change to unavailable async def Send_StatusNotification_unavailable(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "NoError", status = "Unavailable" ) response = await self.call(request) print("Charge Point Occupied") # charge cable connected, fault - status change to fault async def Send_StatusNotification_fault(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "InternalError", status = "Faulted" ) response = await self.call(request) print("Charger Internal Error") #-------------------------------------------- async def main(): async with websockets.connect( 'wss://data.evbackoffice.com/json/SA-EVB-MEN0003', subprotocols=['ocpp1.6'], ping_interval = None ) as ws: cp = ChargePoint('SA-EVB-MEN0003', ws) await asyncio.gather(cp.start(),cp.send_boot_notification(),cp.Heartbeat()) while True: if not ws.open: try: print('Websocket not connected...reconnecting') async with websockets.connect( 'wss://data.evbackoffice.com/json/SA-EVB-MEN0003', subprotocols=['ocpp1.6'], ping_interval = None ) as ws: cp = ChargePoint('SA-EVB-MEN0003', ws) await asyncio.gather(cp.start(),cp.send_boot_notification(),cp.Heartbeat()) except: print('Unable to connect') if __name__ == '__main__': asyncio.run(main())