I adapted a snippet of code to send commands between my controller (Raspberry Pi) to a remote server using the OCPP 1.6 protocol.
The script initializes connection, sends a heartbeat to the server but problem is that this connection drops unexpectedly after about 1-2 hours. Is there anything I can do to ensure that the connection stays open indefinitely?
Herewith is the code:
The script initializes connection, sends a heartbeat to the server but problem is that this connection drops unexpectedly after about 1-2 hours. Is there anything I can do to ensure that the connection stays open indefinitely?
Herewith is the code:
import asyncio import websockets import RPi.GPIO as GPIO import time from ocpp.v16 import call, ChargePoint as cp from ocpp.v16.enums import RegistrationStatus #from gpiozero import Button #PB_port_1 = Button(23) #left push button on charger #PB_port_2 = Button(24) #right push button on charger GPIO.setmode (GPIO.BCM) GPIO.setup(23,GPIO.IN,pull_up_down=GPIO.PUD_UP) GPIO.add_event_detect(23,GPIO.RISING, bouncetime= 1000) class ChargePoint(cp): async def send_boot_notification(self): request = call.BootNotificationPayload( charge_point_model="DUAL PORT AC", charge_point_vendor="MENNEKES" ) response = await self.call(request) if response.status == RegistrationStatus.accepted: print("Connected to EV back office.") print("Heartbeat: " + str(response.interval)) print("Heartbeat: " + str(response.current_time)) async def runner(self): while True: if GPIO.event_detected(23): print("Button 23 pressed") # secondary loop to manage the operations of the charge station async def running_state(self): keep_running = True while keep_running: if PB_port_1.is_pressed or PB_port_2.is_pressed: #logic inverted; when PB is released on charger print("Push Button released") await self.Send_StatusNotification_available() break else: #logic inverted; when PB is pressed on charger print("Push Button pressed") await self.Send_StatusNotification_unavailable() break async def Heartbeat(self): while True: request = call.HeartbeatPayload() response = await self.call(request) #("Heartbeat: " + str(response.current_time)) time.sleep(60) # charge cable connected, no fault - status change to available async def Send_StatusNotification_available(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "NoError", status = "Available" ) response = await self.call(request) print ("Charge Point Available") # charge cable connected - status change to unavailable async def Send_StatusNotification_unavailable(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "NoError", status = "Unavailable" ) response = await self.call(request) print("Charge Point Occupied") # charge cable connected, fault - status change to fault async def Send_StatusNotification_fault(self): request = call.StatusNotificationPayload( connector_id = 0, error_code = "InternalError", status = "Faulted" ) response = await self.call(request) print("Charger Internal Error") #-------------------------------------------- async def main(): async with websockets.connect( 'wss://data.evbackoffice.com/json/SA-EVB-MEN0003', subprotocols=['ocpp1.6'], ping_interval = None ) as ws: cp = ChargePoint('SA-EVB-MEN0003', ws) await asyncio.gather(cp.start(),cp.send_boot_notification(),cp.Heartbeat()) while True: if not ws.open: try: print('Websocket not connected...reconnecting') async with websockets.connect( 'wss://data.evbackoffice.com/json/SA-EVB-MEN0003', subprotocols=['ocpp1.6'], ping_interval = None ) as ws: cp = ChargePoint('SA-EVB-MEN0003', ws) await asyncio.gather(cp.start(),cp.send_boot_notification(),cp.Heartbeat()) except: print('Unable to connect') if __name__ == '__main__': asyncio.run(main())
Larz60+ write Feb-12-2021, 09:20 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.