Hi,
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
import socket import struct import numpy as np import time from datetime import datetime import os import threading from collections import deque import sys from threading import Lock import mmap TARGET_IP = None TARGET_PORT = 8000 UDP_MESSAGE = "ESP32AGAIN" UDP_BROADCAST_IP = "255.255.255.255" UDP_BROADCAST_PORT = 55555 BUFFER_SIZE = 128 NUM_TO_PRINT = 32 BUFFER_LIMIT = 500 * 1024 * 1024 write_lock = Lock() buffer_queue = deque() timestamps = deque() cps_values = [] active_connection = False total_logs_received = 0 client_connection = None last_packet = None start_time = None end_time = None filename = None def process_data_chunk(data): result_array = ( (((data >> 14) & 1) << 0) | (((data >> 13) & 1) << 1) | (((data >> 15) & 1) << 2) | (((data >> 4) & 1) << 3) | (((data >> 9) & 1) << 4) | (((data >> 17) & 1) << 5) | (((data >> 1) & 1) << 6) | (((data >> 5) & 1) << 7) | (((data >> 10) & 1) << 8) | (((data >> 3) & 1) << 9) | (((data >> 12) & 1) << 10) | (((data >> 2) & 1) << 11) ) return result_array def process_data(data): result_array = np.array([process_data_chunk(struct.unpack('I', data[i:i+4])[0]) for i in range(0, len(data), 4)]) max_index = np.argmax(result_array) higher_indices = np.arange(0, max_index + 1) lower_indices = np.arange(max_index, len(result_array)) higher_elements = result_array[higher_indices] lower_elements = result_array[lower_indices] rearranged_buffer = np.concatenate([higher_elements, lower_elements]) max_index_rearranged = np.argmax(rearranged_buffer) rearranged_buffer = np.roll(rearranged_buffer, 10 - max_index_rearranged) return rearranged_buffer def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values): global total_logs_received global filename try: data = list(data) timestamps = list(map(float, timestamps)) cps_values = list(cps_values) lines_to_write = [] for i in range(len(data)): packet_data = data[i] timestamp = timestamps[i] cps = cps_values[i] formatted_timestamp = time.strftime('%Y-%m-%d %H:%M:%S.', time.localtime(timestamp)) + f"{timestamp:.6f}"[11:] formatted_cps = f"{max(0, int(cps) - 1):d}" if cps is not None else "N/A" line = f"{formatted_timestamp} - {formatted_cps} CPS - " formatted_line = " ".join(f"{max(num - 2048, 0):4d}" for num in packet_data) line += formatted_line lines_to_write.append(line + "\n") total_logs_received += 1 data_to_write = ''.join(lines_to_write) if os.path.getsize(filename) == 0: first_line = lines_to_write[0] with open(filename, "a+") as file: file.write(first_line) lines_to_write = lines_to_write[1:] remaining_data = ''.join(lines_to_write).encode() data_length = len(remaining_data) with open(filename, "r+b") as file: file.seek(0, os.SEEK_END) current_size = file.tell() file.write(b'\0' * data_length) mmapped_file = mmap.mmap(file.fileno(), current_size + data_length) mmapped_file.seek(current_size) mmapped_file.write(remaining_data) mmapped_file.close() except Exception as write_error: print(f"Error writing to file: {write_error}") def count_formatted_logs_in_last_second(timestamps, current_timestamp): count = 0 for ts in timestamps: if current_timestamp - ts < 1 and current_timestamp >= ts: count += 1 return count def calculate_cps(timestamps): cps_values = [] for i in range(len(timestamps)): cps = count_formatted_logs_in_last_second(timestamps, timestamps[i]) cps_values.append(cps) return cps_values def broadcast_message(): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) time.sleep(0.5) udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT)) time.sleep(0.5) udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT)) udp_socket.close() def write_accumulated_data_to_file(): global buffer_queue global timestamps global cps_values global filename with write_lock: if buffer_queue and timestamps: try: cps_values = calculate_cps(timestamps) with open(filename, "a+") as file: print_decimal_chunks_to_file_and_terminal(buffer_queue, file, timestamps, cps_values) file.flush() buffer_queue.clear() timestamps.clear() cps_values = [] except Exception as write_error: print(f"Error writing to file: {write_error}") def format_time(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours}h {minutes}m {seconds}s" def handle_client(connection, address): global active_connection global buffer_queue global timestamps global cps_values global last_packet global end_time global start_time global filename try: buffer = bytearray(BUFFER_SIZE) reference_time = time.time() start_perf_counter = time.perf_counter_ns() start_time = reference_time while active_connection: try: total_received = 0 while total_received < BUFFER_SIZE: additional_data = connection.recv_into(buffer) if additional_data == 0: print(f"Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}") active_connection = False break total_received += additional_data if not active_connection: break elapsed_ns = time.perf_counter_ns() - start_perf_counter timestamp = reference_time + elapsed_ns / 1e9 if total_received == BUFFER_SIZE: data = process_data(buffer) buffer_queue.append(data) timestamps.append(timestamp) if len(buffer_queue) > BUFFER_LIMIT // (BUFFER_SIZE // 4): print("Buffer size limit reached. Closing connection and writing data to file.") threading.Thread(target=write_accumulated_data_to_file).start() active_connection = False break cps = count_formatted_logs_in_last_second(timestamps, timestamp) cps_values.append(cps) except Exception as receive_error: break if active_connection: end_perf_counter = time.perf_counter_ns() elapsed_ns = end_perf_counter - start_perf_counter end_time = reference_time + elapsed_ns / 1e9 last_packet = buffer[:BUFFER_SIZE] buffer_queue.pop() last_packet_ints = struct.unpack("II", last_packet[:8]) total_measurement_seconds = max(0, end_time - start_time - 5) total_measurement_time = format_time(total_measurement_seconds) network_congestion_time = format_time(last_packet_ints[0]) network_congestion_events = last_packet_ints[1] summary_line = ( f"Total measurement time: {total_measurement_time}. " f"Network congestion time: {network_congestion_time}. " f"Network congestion events: {network_congestion_events}" ) print(f"Impulse count: {len(buffer_queue)}") print("Disconnection detected. Closing socket and writing data to file. Please wait... ") writer_thread = threading.Thread(target=write_accumulated_data_to_file) writer_thread.start() writer_thread.join() active_connection = False finally: connection.close() with open(filename, "a+") as file: file.write("\n" + summary_line + "\n") print("Finished writing") print("Connection closed.") def get_local_ip(): try: hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address except Exception as e: print(f"Error getting local IP address: {e}") def timer_callback(): if not buffer_queue: print("No pulses detected in 10 seconds, check your hardware setup") def receive_data(): global active_connection global client_connection global TARGET_IP global TARGET_PORT broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) broadcast_socket.bind(('0.0.0.0', 8000)) try: print("__________________________________________________________________________________________________________________") print("Listening for broadcast messages...") while True: data, address = broadcast_socket.recvfrom(1024) message = data.decode("utf-8") if message.strip() == "Scintilator": TARGET_IP = address[0] TARGET_PORT = address[1] print(f"Subnet broadcast message received from {address}. IP and Port obtained: {TARGET_IP}:{TARGET_PORT}") time.sleep(0.1) broadcast_message = "Scintilator" broadcast_socket.sendto(broadcast_message.encode("utf-8"), (TARGET_IP,8000)) print("Recognition message sent.") active_connection = True break except Exception as e: print(f"Error receiving broadcast message: {e}") finally: broadcast_socket.close() if TARGET_IP and TARGET_PORT and active_connection: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: print(f"Listening for client connections on {TARGET_IP}:{TARGET_PORT}...") server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) server_socket.bind((get_local_ip(), 8000)) server_socket.listen() while True: if not active_connection: break client_connection, client_address = server_socket.accept() print(f"Connected by {client_address}") while True: input_bit = input("Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: ") if input_bit == 1: timer = threading.Timer(10, timer_callback) timer.start() if input_bit in ['0', '1']: break else: print("Invalid choice. Please enter 0 or 1.") while True: user_input = float(input("Enter the parameter (in seconds or quantity of pulses accordingly) : ")) if ((input_bit == 0) and (user_input == 0)) or ((input_bit == 1) and (user_input<1)): print("Invalid parameters entered") else: break print("Measurement started at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) integer_part = int(user_input) fractional_part = user_input - integer_part if input_bit == '0': final_float = 2 * integer_part + fractional_part else: final_float = 2 * integer_part + 1 + fractional_part user_input_packed = struct.pack("!f", final_float) client_connection.sendall(user_input_packed) handle_client(client_connection, client_address) except Exception as bind_error: print(f"Error during socket bind: {bind_error}") finally: threading.Thread(target=write_accumulated_data_to_file).start() server_socket.close() if __name__ == "__main__": timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1] receive_data() while True: input_bit = input("Enter 0 if you want repeat, enter any other character if you want to terminate the program: ") timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1] if input_bit == '0': broadcast_message() sys.stdin.flush() sys.stdout.flush() receive_data() else: print("Program terminated") sys.exit(0)