Python Forum
Problems writing a large text file in python
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Problems writing a large text file in python
#1
Hi,

I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:

print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file

So the PC gathers data, an processes it to its final form like this:

2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289

This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.

Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...

I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import socket
import struct
import numpy as np
import time
from datetime import datetime
import os
import threading
from collections import deque
import sys
from threading import Lock
import mmap
 
TARGET_IP = None
TARGET_PORT = 8000
 
UDP_MESSAGE = "ESP32AGAIN"
UDP_BROADCAST_IP = "255.255.255.255" 
UDP_BROADCAST_PORT = 55555
 
BUFFER_SIZE = 128
NUM_TO_PRINT = 32
BUFFER_LIMIT = 500 * 1024 * 1024
 
write_lock = Lock()
buffer_queue = deque() 
timestamps = deque() 
cps_values = []
active_connection = False
total_logs_received = 0
client_connection = None
last_packet = None
start_time = None
end_time = None
filename = None
 
def process_data_chunk(data):
    result_array = (
        (((data >> 14) & 1) << 0) |
        (((data >> 13) & 1) << 1) |
        (((data >> 15) & 1) << 2) |
        (((data >> 4) & 1) << 3) |
        (((data >> 9) & 1) << 4) |
        (((data >> 17) & 1) << 5) |
        (((data >> 1) & 1) << 6) |
        (((data >> 5) & 1) << 7) |
        (((data >> 10) & 1) << 8) |
        (((data >> 3) & 1) << 9) |
        (((data >> 12) & 1) << 10) |
        (((data >> 2) & 1) << 11)
    )
    return result_array
def process_data(data):
    result_array = np.array([process_data_chunk(struct.unpack('I', data[i:i+4])[0]) for i in range(0, len(data), 4)])
    max_index = np.argmax(result_array)
    higher_indices = np.arange(0, max_index + 1)
    lower_indices = np.arange(max_index, len(result_array))
    higher_elements = result_array[higher_indices]
    lower_elements = result_array[lower_indices]
    rearranged_buffer = np.concatenate([higher_elements, lower_elements])
    max_index_rearranged = np.argmax(rearranged_buffer)
    rearranged_buffer = np.roll(rearranged_buffer, 10 - max_index_rearranged)
    return rearranged_buffer
 
def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values):
    global total_logs_received
    global filename
    try:
        data = list(data)
        timestamps = list(map(float, timestamps)) 
        cps_values = list(cps_values)
        lines_to_write = []
 
        for i in range(len(data)):
            packet_data = data[i]
            timestamp = timestamps[i]
            cps = cps_values[i]
            formatted_timestamp = time.strftime('%Y-%m-%d %H:%M:%S.', time.localtime(timestamp)) + f"{timestamp:.6f}"[11:]
            formatted_cps = f"{max(0, int(cps) - 1):d}" if cps is not None else "N/A"
            line = f"{formatted_timestamp} - {formatted_cps} CPS - "
            formatted_line = " ".join(f"{max(num - 2048, 0):4d}" for num in packet_data)
            line += formatted_line
            lines_to_write.append(line + "\n")
            total_logs_received += 1
        data_to_write = ''.join(lines_to_write)
 
        if os.path.getsize(filename) == 0:
            first_line = lines_to_write[0]
            with open(filename, "a+") as file:
                file.write(first_line)
            lines_to_write = lines_to_write[1:]
 
 
        remaining_data = ''.join(lines_to_write).encode()
        data_length = len(remaining_data)
             
        with open(filename, "r+b") as file:
            file.seek(0, os.SEEK_END)
            current_size = file.tell()
            file.write(b'\0' * data_length)
                 
            mmapped_file = mmap.mmap(file.fileno(), current_size + data_length)
            mmapped_file.seek(current_size) 
            mmapped_file.write(remaining_data)
            mmapped_file.close()
 
    except Exception as write_error:
        print(f"Error writing to file: {write_error}")
         
def count_formatted_logs_in_last_second(timestamps, current_timestamp):
    count = 0
    for ts in timestamps:
        if current_timestamp - ts < 1 and current_timestamp >= ts:
            count += 1
    return count
 
def calculate_cps(timestamps):
    cps_values = []
    for i in range(len(timestamps)):
        cps = count_formatted_logs_in_last_second(timestamps, timestamps[i])
        cps_values.append(cps)
    return cps_values
 
def broadcast_message():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    time.sleep(0.5)
    udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
    time.sleep(0.5)
    udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
    udp_socket.close()
 
def write_accumulated_data_to_file():
    global buffer_queue
    global timestamps
    global cps_values
    global filename
 
    with write_lock:
        if buffer_queue and timestamps:
            try:
                cps_values = calculate_cps(timestamps)
                with open(filename, "a+") as file:
                    print_decimal_chunks_to_file_and_terminal(buffer_queue, file, timestamps, cps_values)
                    file.flush()  
                buffer_queue.clear()
                timestamps.clear()
                cps_values = []
 
            except Exception as write_error:
                print(f"Error writing to file: {write_error}")
 
def format_time(seconds):
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    seconds = int(seconds % 60)
    return f"{hours}h {minutes}m {seconds}s"
 
def handle_client(connection, address):
    global active_connection
    global buffer_queue
    global timestamps
    global cps_values
    global last_packet
    global end_time
    global start_time
    global filename
 
    try:
        buffer = bytearray(BUFFER_SIZE)
        reference_time = time.time()
        start_perf_counter = time.perf_counter_ns()
 
        start_time = reference_time
 
        while active_connection:
            try:
                total_received = 0
                while total_received < BUFFER_SIZE:
                    additional_data = connection.recv_into(buffer)
                    if additional_data == 0:
                        print(f"Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
                        active_connection = False
                        break
 
                    total_received += additional_data
 
                if not active_connection:
                    break
 
                elapsed_ns = time.perf_counter_ns() - start_perf_counter
                timestamp = reference_time + elapsed_ns / 1e9 
 
                if total_received == BUFFER_SIZE:
                    data = process_data(buffer)
                    buffer_queue.append(data)
                    timestamps.append(timestamp)
 
                    if len(buffer_queue) > BUFFER_LIMIT // (BUFFER_SIZE // 4):
                        print("Buffer size limit reached. Closing connection and writing data to file.")
                        threading.Thread(target=write_accumulated_data_to_file).start()
                        active_connection = False
                        break
 
                    cps = count_formatted_logs_in_last_second(timestamps, timestamp)
                    cps_values.append(cps)
 
            except Exception as receive_error:
                break
 
        if active_connection:
            end_perf_counter = time.perf_counter_ns()
            elapsed_ns = end_perf_counter - start_perf_counter
            end_time = reference_time + elapsed_ns / 1e9 
 
            last_packet = buffer[:BUFFER_SIZE] 
            buffer_queue.pop()
            last_packet_ints = struct.unpack("II", last_packet[:8])
 
            total_measurement_seconds = max(0, end_time - start_time - 5)
            total_measurement_time = format_time(total_measurement_seconds)
            network_congestion_time = format_time(last_packet_ints[0])
            network_congestion_events = last_packet_ints[1]
             
            summary_line = (
                f"Total measurement time: {total_measurement_time}. "
                f"Network congestion time: {network_congestion_time}. "
                f"Network congestion events: {network_congestion_events}"
            )
            print(f"Impulse count: {len(buffer_queue)}")
            print("Disconnection detected. Closing socket and writing data to file. Please wait... ")
            writer_thread = threading.Thread(target=write_accumulated_data_to_file)
            writer_thread.start()
            writer_thread.join() 
            active_connection = False
 
    finally:
        connection.close()
        with open(filename, "a+") as file:
                file.write("\n" + summary_line + "\n")
        print("Finished writing")
        print("Connection closed.")
         
def get_local_ip():
    try:
        hostname = socket.gethostname()
        ip_address = socket.gethostbyname(hostname)
        return ip_address
    except Exception as e:
        print(f"Error getting local IP address: {e}")
 
def timer_callback():
    if not buffer_queue:
        print("No pulses detected in 10 seconds, check your hardware setup")
 
def receive_data():
    global active_connection
    global client_connection
    global TARGET_IP
    global TARGET_PORT
 
    broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    broadcast_socket.bind(('0.0.0.0', 8000)) 
 
    try:
        print("__________________________________________________________________________________________________________________")
        print("Listening for broadcast messages...")
        while True:
            data, address = broadcast_socket.recvfrom(1024)
            message = data.decode("utf-8")
            if message.strip() == "Scintilator":
                TARGET_IP = address[0]
                TARGET_PORT = address[1
                print(f"Subnet broadcast message received from {address}. IP and Port obtained: {TARGET_IP}:{TARGET_PORT}")
 
                time.sleep(0.1)
 
                broadcast_message = "Scintilator"
                broadcast_socket.sendto(broadcast_message.encode("utf-8"), (TARGET_IP,8000)) 
                print("Recognition message sent.")
 
                active_connection = True
                break
    except Exception as e:
        print(f"Error receiving broadcast message: {e}")
    finally:
        broadcast_socket.close()
 
    if TARGET_IP and TARGET_PORT and active_connection:
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
        try:
            print(f"Listening for client connections on {TARGET_IP}:{TARGET_PORT}...")
            server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
 
            server_socket.bind((get_local_ip(), 8000))
            server_socket.listen()
 
            while True:
                if not active_connection:
                    break
 
                client_connection, client_address = server_socket.accept()
                print(f"Connected by {client_address}")
 
                while True:
                    input_bit = input("Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: ")
                    if input_bit == 1:
                        timer = threading.Timer(10, timer_callback)
                        timer.start()
                    if input_bit in ['0', '1']:
                        break
                    else:
                        print("Invalid choice. Please enter 0 or 1.")
 
                while True:
                    user_input = float(input("Enter the parameter (in seconds or quantity of pulses accordingly) : "))
                    if ((input_bit == 0) and (user_input == 0)) or ((input_bit == 1) and (user_input<1)):
                        print("Invalid parameters entered")
                    else:
                        break
 
                print("Measurement started at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
 
                integer_part = int(user_input)
                fractional_part = user_input - integer_part
                 
                if input_bit == '0':
                    final_float = 2 * integer_part + fractional_part
                else:
                    final_float = 2 * integer_part + 1 + fractional_part
 
                user_input_packed = struct.pack("!f", final_float)
                client_connection.sendall(user_input_packed)
 
                handle_client(client_connection, client_address)
 
        except Exception as bind_error:
            print(f"Error during socket bind: {bind_error}")
 
        finally:
            threading.Thread(target=write_accumulated_data_to_file).start()
            server_socket.close()
 
if __name__ == "__main__":
 
    timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
    filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
    receive_data()
    while True:
        input_bit = input("Enter 0 if you want repeat, enter any other character if you want to terminate the program: ")
        timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
        filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
        if input_bit == '0':
            broadcast_message()
            sys.stdin.flush()
            sys.stdout.flush()
            receive_data()
        else:
            print("Program terminated")
            sys.exit(0)
Reply
#2
Can you explain why you make a special case when os.path.getsize(filename) == 0 in print_decimal_chunks_to_file_and_terminal() and also why you are using mmap and why you are opening the file once in mode a+ and once in mode r+b ? Also what is the point of copying the entire list at line 90 ?
« We can solve any problem by introducing an extra level of indirection »
Reply
#3
(Dec-20-2024, 09:22 AM)Gribouillis Wrote: Can you explain why you make a special case when os.path.getsize(filename) == 0 in print_decimal_chunks_to_file_and_terminal() and also why you are using mmap and why you are opening the file once in mode a+ and once in mode r+b ? Also what is the point of copying the entire list at line 90 ?

It is just well tested and it works well. Do you have any observations about the file writing speed problem?
Reply
#4
(Dec-20-2024, 10:54 AM)Vilius Wrote: It is just well tested and it works well. Do you have any observations about the file writing speed problem?
It seemed to me that the first step towards improving the speed is to understand why you have such a convoluted code for a task like just writing to a file, but I may be wrong.
« We can solve any problem by introducing an extra level of indirection »
Reply
#5
I don't want to wade through 363 lines of your code, but, when dealing with large amounts of data, generators are the way to go.

This code filters lines from an apache type access log, according to the number of bytes sent.

Lines look like this:

140.180.132.213 - - [24/Feb/2008:00:08:59 -0600] "GET /ply/ply.html HTTP/1.1" 200 97238

1
2
3
4
5
6
7
8
9
10
11
12
13
14
access_log = '/home/pedro/myPython/yield/tutorial2008/generators/access.log'
outfile = '/home/pedro/myPython/yield/tutorial2008/data.log'
 
# filter lines from access_log according to bytes_sent and save as text
with open(access_log) as wwwlog, open(outfile, 'w') as of:
    # generator
    lines = (line for line in wwwlog)
    for line in lines:
        res = line.rsplit(None,1)[1]
        if not res == '-':
            bytes_sent = int(line.rsplit(None,1)[1])
            # put any arbitrary number here
            if bytes_sent > 97200:
                of.write(line)
My sample log file is about 8000 lines long, this filters out 614 lines and saves them to data.log in the time it takes to press the enter key. The resulting file is 56.5 kB. If you have a half a million lines to write, use a generator, then you never overload your memory.

If you can't do that, open the output file append and write each line as it is made, I'm pretty sure that's what apache2 does!
Gribouillis likes this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  writing list to csv file problem jacksfrustration 5 2,452 Jul-04-2024, 08:15 PM
Last Post: deanhystad
  speed up getting embedding from bert model for large set of text veda 7 2,283 May-27-2024, 08:28 AM
Last Post: Pedroski55
Sad problems with reading csv file. MassiJames 3 2,680 Nov-16-2023, 03:41 PM
Last Post: snippsat
  Writing to CSV Problems gbtur8up 1 1,366 Oct-18-2023, 10:20 AM
Last Post: Larz60+
  Replace a text/word in docx file using Python Devan 4 23,130 Oct-17-2023, 06:03 PM
Last Post: Devan
  save values permanently in python (perhaps not in a text file)? flash77 8 2,779 Jul-07-2023, 05:44 PM
Last Post: flash77
  Converted EXE file size is too large Rajasekaran 0 2,691 Mar-30-2023, 11:50 AM
Last Post: Rajasekaran
Thumbs Up Need to compare the Excel file name with a directory text file. veeran1991 1 2,077 Dec-15-2022, 04:32 PM
Last Post: Larz60+
  validate large json file with millions of records in batches herobpv 3 2,241 Dec-10-2022, 10:36 PM
Last Post: bowlofred
  Writing string to file results in one character per line RB76SFJPsJJDu3bMnwYM 4 3,846 Sep-27-2022, 01:38 PM
Last Post: buran

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020