Jun-06-2017, 11:46 AM
I am using Python 2.7 and the Pika library to publish messages to a RabbitMQ server. The script below will read the last line of a text file every 30 seconds, then publish the last line to the rabbitmq server.
My problem is that the internet connection is very unstable and has a high latency (600ms - 800ms). The code will run fine for a couple minutes, but then it starts throwing different exceptions. The most recent exception is:
The idea behind this software is as follows: I have a piece of industrial hardware in a remote location that outputs a string of values once per second to a specific TCP port. I then run TeraTerm and connect to this port, and then log/dump the data to a text file. Finally, I run this python script to read the last line every 30 seconds, then upload it to rabbitmq. With the highly unstable internet connection, I dont need every 30th sample. In other words, if a message fails, the software does not need to re-try. It can move on to the next message 30 seconds later.
Anyone have any suggestions?
My problem is that the internet connection is very unstable and has a high latency (600ms - 800ms). The code will run fine for a couple minutes, but then it starts throwing different exceptions. The most recent exception is:
ERROR:pika.adapters.base_connection:Connection to 61.21.223.233:5672 failed: timeout WARNING:pika.connection:Could not connect, 2 attempts left Traceback (most recent call last): File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 50, in <module> dmsPublish(csample) File "C:\Users\CF-30 ASCI\Desktop\ascii.py", line 27, in dmsPublish connection = pika.BlockingConnection(params) # Connect to CloudAMQP File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 339, in __init__ self._process_io_for_connection_setup() File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 374, in _process_io_for_connection_setup self._open_error_result.is_ready) File "C:\Python27\lib\site-packages\pika\adapters\blocking_connection.py", line 410, in _flush_output self._impl.ioloop.poll() File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 400, in poll self.get_next_deadline()) error: (10022, 'An invalid argument was supplied')To get around all these random exceptions, I used a catch all exception, and it seems to be working (see code below). I know that isnt right however, and am wondering if anyone has any experience or thoughts as to how to make this more reliable.
The idea behind this software is as follows: I have a piece of industrial hardware in a remote location that outputs a string of values once per second to a specific TCP port. I then run TeraTerm and connect to this port, and then log/dump the data to a text file. Finally, I run this python script to read the last line every 30 seconds, then upload it to rabbitmq. With the highly unstable internet connection, I dont need every 30th sample. In other words, if a message fails, the software does not need to re-try. It can move on to the next message 30 seconds later.
Anyone have any suggestions?
import os import csv import time import pika import logging import datetime logging.basicConfig() def getLastFile(filename): distance = 1024 with open(filename,'rb') as f: lastline = f.readlines()[-1] return lastline def dmsPublish(message): url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:[email protected]/%2f?connection_attempts=3&heartbeat_interval=3600') params = pika.URLParameters(url) params.socket_timeout = 5 params.connection_attempts = 3 params.retry_delay = 3 print time.ctime() + " [x] Connecting to RabbitMQ server" connection = pika.BlockingConnection(params) # Connect to CloudAMQP channel = connection.channel() # start a channel channel.queue_declare(queue='LastLine') # Declare a queue channel.basic_publish(exchange='', routing_key='LastLine', body=message) print time.ctime() + " [x] Message sent to rabbitMQ" connection.close() loop = 1 while True: lastline = getLastFile("teraterm.log") sampletime = datetime.datetime.utcnow().isoformat() + "," csample = sampletime + lastline print " [x] Most Recent Sample: " + csample try: dmsPublish(csample) except: print " [!] Error... trying again." print time.ctime() + " [x] Sleeping 30 seconds ....." time.sleep(30)