Hi,
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
import socket import struct import numpy as np import time from datetime import datetime import os import threading from collections import deque import sys from threading import Lock import mmap TARGET_IP = None TARGET_PORT = 8000 UDP_MESSAGE = "ESP32AGAIN" UDP_BROADCAST_IP = "255.255.255.255" UDP_BROADCAST_PORT = 55555 BUFFER_SIZE = 128 NUM_TO_PRINT = 32 BUFFER_LIMIT = 500 * 1024 * 1024 write_lock = Lock() buffer_queue = deque() timestamps = deque() cps_values = [] active_connection = False total_logs_received = 0 client_connection = None last_packet = None start_time = None end_time = None filename = None def process_data_chunk(data): result_array = ( (((data >> 14 ) & 1 ) << 0 ) | (((data >> 13 ) & 1 ) << 1 ) | (((data >> 15 ) & 1 ) << 2 ) | (((data >> 4 ) & 1 ) << 3 ) | (((data >> 9 ) & 1 ) << 4 ) | (((data >> 17 ) & 1 ) << 5 ) | (((data >> 1 ) & 1 ) << 6 ) | (((data >> 5 ) & 1 ) << 7 ) | (((data >> 10 ) & 1 ) << 8 ) | (((data >> 3 ) & 1 ) << 9 ) | (((data >> 12 ) & 1 ) << 10 ) | (((data >> 2 ) & 1 ) << 11 ) ) return result_array def process_data(data): result_array = np.array([process_data_chunk(struct.unpack( 'I' , data[i:i + 4 ])[ 0 ]) for i in range ( 0 , len (data), 4 )]) max_index = np.argmax(result_array) higher_indices = np.arange( 0 , max_index + 1 ) lower_indices = np.arange(max_index, len (result_array)) higher_elements = result_array[higher_indices] lower_elements = result_array[lower_indices] rearranged_buffer = np.concatenate([higher_elements, lower_elements]) max_index_rearranged = np.argmax(rearranged_buffer) rearranged_buffer = np.roll(rearranged_buffer, 10 - max_index_rearranged) return rearranged_buffer def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values): global total_logs_received global filename try : data = list (data) timestamps = list ( map ( float , timestamps)) cps_values = list (cps_values) lines_to_write = [] for i in range ( len (data)): packet_data = data[i] timestamp = timestamps[i] cps = cps_values[i] formatted_timestamp = time.strftime( '%Y-%m-%d %H:%M:%S.' , time.localtime(timestamp)) + f "{timestamp:.6f}" [ 11 :] formatted_cps = f "{max(0, int(cps) - 1):d}" if cps is not None else "N/A" line = f "{formatted_timestamp} - {formatted_cps} CPS - " formatted_line = " " .join( f "{max(num - 2048, 0):4d}" for num in packet_data) line + = formatted_line lines_to_write.append(line + "\n" ) total_logs_received + = 1 data_to_write = ''.join(lines_to_write) if os.path.getsize(filename) = = 0 : first_line = lines_to_write[ 0 ] with open (filename, "a+" ) as file : file .write(first_line) lines_to_write = lines_to_write[ 1 :] remaining_data = ''.join(lines_to_write).encode() data_length = len (remaining_data) with open (filename, "r+b" ) as file : file .seek( 0 , os.SEEK_END) current_size = file .tell() file .write(b '\0' * data_length) mmapped_file = mmap.mmap( file .fileno(), current_size + data_length) mmapped_file.seek(current_size) mmapped_file.write(remaining_data) mmapped_file.close() except Exception as write_error: print ( f "Error writing to file: {write_error}" ) def count_formatted_logs_in_last_second(timestamps, current_timestamp): count = 0 for ts in timestamps: if current_timestamp - ts < 1 and current_timestamp > = ts: count + = 1 return count def calculate_cps(timestamps): cps_values = [] for i in range ( len (timestamps)): cps = count_formatted_logs_in_last_second(timestamps, timestamps[i]) cps_values.append(cps) return cps_values def broadcast_message(): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1 ) time.sleep( 0.5 ) udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT)) time.sleep( 0.5 ) udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT)) udp_socket.close() def write_accumulated_data_to_file(): global buffer_queue global timestamps global cps_values global filename with write_lock: if buffer_queue and timestamps: try : cps_values = calculate_cps(timestamps) with open (filename, "a+" ) as file : print_decimal_chunks_to_file_and_terminal(buffer_queue, file , timestamps, cps_values) file .flush() buffer_queue.clear() timestamps.clear() cps_values = [] except Exception as write_error: print ( f "Error writing to file: {write_error}" ) def format_time(seconds): hours = int (seconds / / 3600 ) minutes = int ((seconds % 3600 ) / / 60 ) seconds = int (seconds % 60 ) return f "{hours}h {minutes}m {seconds}s" def handle_client(connection, address): global active_connection global buffer_queue global timestamps global cps_values global last_packet global end_time global start_time global filename try : buffer = bytearray(BUFFER_SIZE) reference_time = time.time() start_perf_counter = time.perf_counter_ns() start_time = reference_time while active_connection: try : total_received = 0 while total_received < BUFFER_SIZE: additional_data = connection.recv_into( buffer ) if additional_data = = 0 : print ( f "Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}" ) active_connection = False break total_received + = additional_data if not active_connection: break elapsed_ns = time.perf_counter_ns() - start_perf_counter timestamp = reference_time + elapsed_ns / 1e9 if total_received = = BUFFER_SIZE: data = process_data( buffer ) buffer_queue.append(data) timestamps.append(timestamp) if len (buffer_queue) > BUFFER_LIMIT / / (BUFFER_SIZE / / 4 ): print ( "Buffer size limit reached. Closing connection and writing data to file." ) threading.Thread(target = write_accumulated_data_to_file).start() active_connection = False break cps = count_formatted_logs_in_last_second(timestamps, timestamp) cps_values.append(cps) except Exception as receive_error: break if active_connection: end_perf_counter = time.perf_counter_ns() elapsed_ns = end_perf_counter - start_perf_counter end_time = reference_time + elapsed_ns / 1e9 last_packet = buffer [:BUFFER_SIZE] buffer_queue.pop() last_packet_ints = struct.unpack( "II" , last_packet[: 8 ]) total_measurement_seconds = max ( 0 , end_time - start_time - 5 ) total_measurement_time = format_time(total_measurement_seconds) network_congestion_time = format_time(last_packet_ints[ 0 ]) network_congestion_events = last_packet_ints[ 1 ] summary_line = ( f "Total measurement time: {total_measurement_time}. " f "Network congestion time: {network_congestion_time}. " f "Network congestion events: {network_congestion_events}" ) print ( f "Impulse count: {len(buffer_queue)}" ) print ( "Disconnection detected. Closing socket and writing data to file. Please wait... " ) writer_thread = threading.Thread(target = write_accumulated_data_to_file) writer_thread.start() writer_thread.join() active_connection = False finally : connection.close() with open (filename, "a+" ) as file : file .write( "\n" + summary_line + "\n" ) print ( "Finished writing" ) print ( "Connection closed." ) def get_local_ip(): try : hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address except Exception as e: print ( f "Error getting local IP address: {e}" ) def timer_callback(): if not buffer_queue: print ( "No pulses detected in 10 seconds, check your hardware setup" ) def receive_data(): global active_connection global client_connection global TARGET_IP global TARGET_PORT broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1 ) broadcast_socket.bind(( '0.0.0.0' , 8000 )) try : print ( "__________________________________________________________________________________________________________________" ) print ( "Listening for broadcast messages..." ) while True : data, address = broadcast_socket.recvfrom( 1024 ) message = data.decode( "utf-8" ) if message.strip() = = "Scintilator" : TARGET_IP = address[ 0 ] TARGET_PORT = address[ 1 ] print ( f "Subnet broadcast message received from {address}. IP and Port obtained: {TARGET_IP}:{TARGET_PORT}" ) time.sleep( 0.1 ) broadcast_message = "Scintilator" broadcast_socket.sendto(broadcast_message.encode( "utf-8" ), (TARGET_IP, 8000 )) print ( "Recognition message sent." ) active_connection = True break except Exception as e: print ( f "Error receiving broadcast message: {e}" ) finally : broadcast_socket.close() if TARGET_IP and TARGET_PORT and active_connection: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try : print ( f "Listening for client connections on {TARGET_IP}:{TARGET_PORT}..." ) server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1 ) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576 ) server_socket.bind((get_local_ip(), 8000 )) server_socket.listen() while True : if not active_connection: break client_connection, client_address = server_socket.accept() print ( f "Connected by {client_address}" ) while True : input_bit = input ( "Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: " ) if input_bit = = 1 : timer = threading.Timer( 10 , timer_callback) timer.start() if input_bit in [ '0' , '1' ]: break else : print ( "Invalid choice. Please enter 0 or 1." ) while True : user_input = float ( input ( "Enter the parameter (in seconds or quantity of pulses accordingly) : " )) if ((input_bit = = 0 ) and (user_input = = 0 )) or ((input_bit = = 1 ) and (user_input< 1 )): print ( "Invalid parameters entered" ) else : break print ( "Measurement started at" , datetime.now().strftime( "%Y-%m-%d %H:%M:%S" )) integer_part = int (user_input) fractional_part = user_input - integer_part if input_bit = = '0' : final_float = 2 * integer_part + fractional_part else : final_float = 2 * integer_part + 1 + fractional_part user_input_packed = struct.pack( "!f" , final_float) client_connection.sendall(user_input_packed) handle_client(client_connection, client_address) except Exception as bind_error: print ( f "Error during socket bind: {bind_error}" ) finally : threading.Thread(target = write_accumulated_data_to_file).start() server_socket.close() if __name__ = = "__main__" : timestamp_str = time.strftime( "%Y-%m-%d_%H-%M-%S" , time.localtime()) filename = os.path.splitext( "Rezultatai.txt" )[ 0 ] + " " + timestamp_str + os.path.splitext( "Rezultatai.txt" )[ 1 ] receive_data() while True : input_bit = input ( "Enter 0 if you want repeat, enter any other character if you want to terminate the program: " ) timestamp_str = time.strftime( "%Y-%m-%d_%H-%M-%S" , time.localtime()) filename = os.path.splitext( "Rezultatai.txt" )[ 0 ] + " " + timestamp_str + os.path.splitext( "Rezultatai.txt" )[ 1 ] if input_bit = = '0' : broadcast_message() sys.stdin.flush() sys.stdout.flush() receive_data() else : print ( "Program terminated" ) sys.exit( 0 ) |