Python Forum
Problems writing a large text file in python
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Problems writing a large text file in python
#1
Hi,

I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:

print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file

So the PC gathers data, an processes it to its final form like this:

2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289

This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.

Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...

I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.

import socket
import struct
import numpy as np
import time
from datetime import datetime
import os
import threading
from collections import deque
import sys
from threading import Lock
import mmap

TARGET_IP = None
TARGET_PORT = 8000

UDP_MESSAGE = "ESP32AGAIN"
UDP_BROADCAST_IP = "255.255.255.255"  
UDP_BROADCAST_PORT = 55555 

BUFFER_SIZE = 128
NUM_TO_PRINT = 32
BUFFER_LIMIT = 500 * 1024 * 1024

write_lock = Lock()
buffer_queue = deque()  
timestamps = deque()  
cps_values = []
active_connection = False
total_logs_received = 0
client_connection = None
last_packet = None
start_time = None
end_time = None
filename = None

def process_data_chunk(data):
    result_array = (
        (((data >> 14) & 1) << 0) |
        (((data >> 13) & 1) << 1) |
        (((data >> 15) & 1) << 2) |
        (((data >> 4) & 1) << 3) |
        (((data >> 9) & 1) << 4) |
        (((data >> 17) & 1) << 5) |
        (((data >> 1) & 1) << 6) |
        (((data >> 5) & 1) << 7) |
        (((data >> 10) & 1) << 8) |
        (((data >> 3) & 1) << 9) |
        (((data >> 12) & 1) << 10) |
        (((data >> 2) & 1) << 11)
    )
    return result_array
def process_data(data):
    result_array = np.array([process_data_chunk(struct.unpack('I', data[i:i+4])[0]) for i in range(0, len(data), 4)])
    max_index = np.argmax(result_array)
    higher_indices = np.arange(0, max_index + 1) 
    lower_indices = np.arange(max_index, len(result_array))
    higher_elements = result_array[higher_indices]
    lower_elements = result_array[lower_indices]
    rearranged_buffer = np.concatenate([higher_elements, lower_elements])
    max_index_rearranged = np.argmax(rearranged_buffer)
    rearranged_buffer = np.roll(rearranged_buffer, 10 - max_index_rearranged)
    return rearranged_buffer

def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values):
    global total_logs_received
    global filename
    try:
        data = list(data)
        timestamps = list(map(float, timestamps))  
        cps_values = list(cps_values)
        lines_to_write = []

        for i in range(len(data)):
            packet_data = data[i]
            timestamp = timestamps[i]
            cps = cps_values[i]
            formatted_timestamp = time.strftime('%Y-%m-%d %H:%M:%S.', time.localtime(timestamp)) + f"{timestamp:.6f}"[11:]
            formatted_cps = f"{max(0, int(cps) - 1):d}" if cps is not None else "N/A"
            line = f"{formatted_timestamp} - {formatted_cps} CPS - "
            formatted_line = " ".join(f"{max(num - 2048, 0):4d}" for num in packet_data)
            line += formatted_line
            lines_to_write.append(line + "\n")
            total_logs_received += 1
        data_to_write = ''.join(lines_to_write)

        if os.path.getsize(filename) == 0:
            first_line = lines_to_write[0]
            with open(filename, "a+") as file:
                file.write(first_line)
            lines_to_write = lines_to_write[1:]


        remaining_data = ''.join(lines_to_write).encode()
        data_length = len(remaining_data)
            
        with open(filename, "r+b") as file:
            file.seek(0, os.SEEK_END)
            current_size = file.tell()
            file.write(b'\0' * data_length) 
                
            mmapped_file = mmap.mmap(file.fileno(), current_size + data_length)
            mmapped_file.seek(current_size)  
            mmapped_file.write(remaining_data)
            mmapped_file.close()

    except Exception as write_error:
        print(f"Error writing to file: {write_error}")
        
def count_formatted_logs_in_last_second(timestamps, current_timestamp):
    count = 0
    for ts in timestamps:
        if current_timestamp - ts < 1 and current_timestamp >= ts:
            count += 1
    return count

def calculate_cps(timestamps):
    cps_values = []
    for i in range(len(timestamps)):
        cps = count_formatted_logs_in_last_second(timestamps, timestamps[i])
        cps_values.append(cps)
    return cps_values

def broadcast_message():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    time.sleep(0.5)
    udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
    time.sleep(0.5)
    udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
    udp_socket.close()

def write_accumulated_data_to_file():
    global buffer_queue
    global timestamps
    global cps_values
    global filename

    with write_lock:
        if buffer_queue and timestamps:
            try:
                cps_values = calculate_cps(timestamps)
                with open(filename, "a+") as file:
                    print_decimal_chunks_to_file_and_terminal(buffer_queue, file, timestamps, cps_values)
                    file.flush()   
                buffer_queue.clear()
                timestamps.clear()
                cps_values = []

            except Exception as write_error:
                print(f"Error writing to file: {write_error}")

def format_time(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    return f"{hours}h {minutes}m {seconds}s"

def handle_client(connection, address):
    global active_connection
    global buffer_queue
    global timestamps
    global cps_values
    global last_packet
    global end_time
    global start_time
    global filename

    try:
        buffer = bytearray(BUFFER_SIZE)
        reference_time = time.time() 
        start_perf_counter = time.perf_counter_ns() 

        start_time = reference_time

        while active_connection:
            try:
                total_received = 0
                while total_received < BUFFER_SIZE:
                    additional_data = connection.recv_into(buffer)
                    if additional_data == 0:
                        print(f"Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
                        active_connection = False
                        break

                    total_received += additional_data

                if not active_connection:
                    break

                elapsed_ns = time.perf_counter_ns() - start_perf_counter
                timestamp = reference_time + elapsed_ns / 1e9  

                if total_received == BUFFER_SIZE:
                    data = process_data(buffer)
                    buffer_queue.append(data)
                    timestamps.append(timestamp)

                    if len(buffer_queue) > BUFFER_LIMIT // (BUFFER_SIZE // 4):
                        print("Buffer size limit reached. Closing connection and writing data to file.")
                        threading.Thread(target=write_accumulated_data_to_file).start()
                        active_connection = False
                        break

                    cps = count_formatted_logs_in_last_second(timestamps, timestamp)
                    cps_values.append(cps)

            except Exception as receive_error:
                break

        if active_connection:
            end_perf_counter = time.perf_counter_ns()
            elapsed_ns = end_perf_counter - start_perf_counter
            end_time = reference_time + elapsed_ns / 1e9  

            last_packet = buffer[:BUFFER_SIZE]  
            buffer_queue.pop()
            last_packet_ints = struct.unpack("II", last_packet[:8])

            total_measurement_seconds = max(0, end_time - start_time - 5)
            total_measurement_time = format_time(total_measurement_seconds)
            network_congestion_time = format_time(last_packet_ints[0])
            network_congestion_events = last_packet_ints[1]
            
            summary_line = (
                f"Total measurement time: {total_measurement_time}. "
                f"Network congestion time: {network_congestion_time}. "
                f"Network congestion events: {network_congestion_events}"
            )
            print(f"Impulse count: {len(buffer_queue)}")
            print("Disconnection detected. Closing socket and writing data to file. Please wait... ")
            writer_thread = threading.Thread(target=write_accumulated_data_to_file)
            writer_thread.start()
            writer_thread.join()  
            active_connection = False

    finally:
        connection.close()
        with open(filename, "a+") as file:
                file.write("\n" + summary_line + "\n")
        print("Finished writing")
        print("Connection closed.")
        
def get_local_ip():
    try:
        hostname = socket.gethostname()
        ip_address = socket.gethostbyname(hostname)
        return ip_address
    except Exception as e:
        print(f"Error getting local IP address: {e}")

def timer_callback():
    if not buffer_queue:
        print("No pulses detected in 10 seconds, check your hardware setup")

def receive_data():
    global active_connection
    global client_connection
    global TARGET_IP
    global TARGET_PORT

    broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    broadcast_socket.bind(('0.0.0.0', 8000))  

    try:
        print("__________________________________________________________________________________________________________________")
        print("Listening for broadcast messages...")
        while True:
            data, address = broadcast_socket.recvfrom(1024)
            message = data.decode("utf-8")
            if message.strip() == "Scintilator":
                TARGET_IP = address[0]
                TARGET_PORT = address[1]  
                print(f"Subnet broadcast message received from {address}. IP and Port obtained: {TARGET_IP}:{TARGET_PORT}")

                time.sleep(0.1)

                broadcast_message = "Scintilator"
                broadcast_socket.sendto(broadcast_message.encode("utf-8"), (TARGET_IP,8000))  
                print("Recognition message sent.")

                active_connection = True
                break
    except Exception as e:
        print(f"Error receiving broadcast message: {e}")
    finally:
        broadcast_socket.close()

    if TARGET_IP and TARGET_PORT and active_connection:
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        try:
            print(f"Listening for client connections on {TARGET_IP}:{TARGET_PORT}...")
            server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)

            server_socket.bind((get_local_ip(), 8000))
            server_socket.listen()

            while True:
                if not active_connection:
                    break

                client_connection, client_address = server_socket.accept()
                print(f"Connected by {client_address}")

                while True:
                    input_bit = input("Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: ")
                    if input_bit == 1:
                        timer = threading.Timer(10, timer_callback)
                        timer.start()
                    if input_bit in ['0', '1']:
                        break
                    else:
                        print("Invalid choice. Please enter 0 or 1.")

                while True:
                    user_input = float(input("Enter the parameter (in seconds or quantity of pulses accordingly) : "))
                    if ((input_bit == 0) and (user_input == 0)) or ((input_bit == 1) and (user_input<1)):
                        print("Invalid parameters entered")
                    else:
                        break

                print("Measurement started at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

                integer_part = int(user_input)
                fractional_part = user_input - integer_part
                
                if input_bit == '0':
                    final_float = 2 * integer_part + fractional_part
                else:
                    final_float = 2 * integer_part + 1 + fractional_part

                user_input_packed = struct.pack("!f", final_float)
                client_connection.sendall(user_input_packed)

                handle_client(client_connection, client_address)

        except Exception as bind_error:
            print(f"Error during socket bind: {bind_error}")

        finally:
            threading.Thread(target=write_accumulated_data_to_file).start()
            server_socket.close()

if __name__ == "__main__":

    timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
    filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
    receive_data()
    while True:
        input_bit = input("Enter 0 if you want repeat, enter any other character if you want to terminate the program: ")
        timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
        filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
        if input_bit == '0':
            broadcast_message()
            sys.stdin.flush()
            sys.stdout.flush()
            receive_data()
        else:
            print("Program terminated")
            sys.exit(0)
Reply
#2
Can you explain why you make a special case when os.path.getsize(filename) == 0 in print_decimal_chunks_to_file_and_terminal() and also why you are using mmap and why you are opening the file once in mode a+ and once in mode r+b ? Also what is the point of copying the entire list at line 90 ?
« We can solve any problem by introducing an extra level of indirection »
Reply
#3
(Dec-20-2024, 09:22 AM)Gribouillis Wrote: Can you explain why you make a special case when os.path.getsize(filename) == 0 in print_decimal_chunks_to_file_and_terminal() and also why you are using mmap and why you are opening the file once in mode a+ and once in mode r+b ? Also what is the point of copying the entire list at line 90 ?

It is just well tested and it works well. Do you have any observations about the file writing speed problem?
Reply
#4
(Dec-20-2024, 10:54 AM)Vilius Wrote: It is just well tested and it works well. Do you have any observations about the file writing speed problem?
It seemed to me that the first step towards improving the speed is to understand why you have such a convoluted code for a task like just writing to a file, but I may be wrong.
« We can solve any problem by introducing an extra level of indirection »
Reply
#5
I don't want to wade through 363 lines of your code, but, when dealing with large amounts of data, generators are the way to go.

This code filters lines from an apache type access log, according to the number of bytes sent.

Lines look like this:

140.180.132.213 - - [24/Feb/2008:00:08:59 -0600] "GET /ply/ply.html HTTP/1.1" 200 97238

access_log = '/home/pedro/myPython/yield/tutorial2008/generators/access.log'
outfile = '/home/pedro/myPython/yield/tutorial2008/data.log'

# filter lines from access_log according to bytes_sent and save as text
with open(access_log) as wwwlog, open(outfile, 'w') as of:
    # generator
    lines = (line for line in wwwlog)
    for line in lines:
        res = line.rsplit(None,1)[1]
        if not res == '-':
            bytes_sent = int(line.rsplit(None,1)[1])
            # put any arbitrary number here
            if bytes_sent > 97200:
                of.write(line)
My sample log file is about 8000 lines long, this filters out 614 lines and saves them to data.log in the time it takes to press the enter key. The resulting file is 56.5 kB. If you have a half a million lines to write, use a generator, then you never overload your memory.

If you can't do that, open the output file append and write each line as it is made, I'm pretty sure that's what apache2 does!
Gribouillis likes this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  writing list to csv file problem jacksfrustration 5 2,370 Jul-04-2024, 08:15 PM
Last Post: deanhystad
  speed up getting embedding from bert model for large set of text veda 7 2,222 May-27-2024, 08:28 AM
Last Post: Pedroski55
Sad problems with reading csv file. MassiJames 3 2,600 Nov-16-2023, 03:41 PM
Last Post: snippsat
  Writing to CSV Problems gbtur8up 1 1,320 Oct-18-2023, 10:20 AM
Last Post: Larz60+
  Replace a text/word in docx file using Python Devan 4 21,920 Oct-17-2023, 06:03 PM
Last Post: Devan
  save values permanently in python (perhaps not in a text file)? flash77 8 2,735 Jul-07-2023, 05:44 PM
Last Post: flash77
  Converted EXE file size is too large Rajasekaran 0 2,631 Mar-30-2023, 11:50 AM
Last Post: Rajasekaran
Thumbs Up Need to compare the Excel file name with a directory text file. veeran1991 1 2,037 Dec-15-2022, 04:32 PM
Last Post: Larz60+
  validate large json file with millions of records in batches herobpv 3 2,216 Dec-10-2022, 10:36 PM
Last Post: bowlofred
  Writing string to file results in one character per line RB76SFJPsJJDu3bMnwYM 4 3,771 Sep-27-2022, 01:38 PM
Last Post: buran

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020