Python Forum
Step through a really large CSV file incrementally in Python
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Step through a really large CSV file incrementally in Python
#1
I am trying to speed up loading a large CSV file into a MySQL database. I'm using this code it takes about 4 hours to load a 4GB file with millions of rows:

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for row in csv_reader:
        cursor.execute(insert_sql,row)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()
That code does work! However, it takes many hours to complete. Between 4 and 5 hours is what it takes to ingest the file. This code is abbreviated, and does not show the try statements I have in the original.

I want to use the executemany() statement to make this faster. For that, you have to pass a list of tuples to the second argument.

If I build the list on each row iteration it gets too large, and I get out of memory errors when the list gets too large, and the script crashes.

I am not able to get a length of csv_reader or csv_file to use in a range statement.

How can I loop through the CSV file 1000 rows at a time and store the result in a list, use it in executemany, then store the next 1000 rows, etc until the end of the CSV file?
Reply
#2
Hope you will get some idea.

The following example assumes
CSV file contains column names in the first line
Connection is already built
File name is test.csv
Table name is MyTable
Python 3
with open ('test.csv', 'r') as f:
    reader = csv.reader(f)
    columns = next(reader) 
    query = 'insert into MyTable({0}) values ({1})'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))
    cursor = connection.cursor()
    for data in reader:
        cursor.execute(query, data)
    cursor.commit()
If column names are not included in the file:

...
with open ('test.csv', 'r') as f:
    reader = csv.reader(f)
    data = next(reader) 
    query = 'insert into dbo.Test values ({0})'
    query = query.format(','.join('?' * len(data)))
    cursor = connection.cursor()
    cursor.execute(query, data)
    for data in reader:
        cursor.execute(query, data)
    cursor.commit()
Source :

https://stackoverflow.com/questions/2125...ing-python
Reply
#3
Maybe something like this will work

Note All code not tested

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    rows = []
    row_count = 0
    for row in csv_reader:
        row_count += 1
        rows.append(row)
        if row_count == 1000:
            cursor.executemany(insert_sql,rows)
            print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
            rows = []
            row_count = 0
    if rows:
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()


Using itertools
from itertools import zip_longest


def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)
    
with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for rows in grouper(csv_reader, 1000):
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()
Reply
#4
(May-06-2019, 08:14 PM)Yoriz Wrote: Maybe something like this will work

Note All code not tested

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    rows = []
    row_count = 0
    for row in csv_reader:
        row_count += 1
        rows.append(row)
        if row_count == 1000:
            cursor.executemany(insert_sql,rows)
            print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
            rows = []
            row_count = 0
    if rows:
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()


Using itertools
from itertools import zip_longest


def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)
    
with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for rows in grouper(csv_reader, 1000):
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()

Your first example worked really nicely! Thank you for that! It supercharged this process. I also want to learn the iterools method. I will try that tomorrow. Best wishes!
Reply
#5
The itertools version would need the following row change to report the last row of the chunk
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
to
print(cursor.rowcount, 'inserted with LinkedAccountId', rows[-1], 'at', datetime.now().isoformat())
Reply
#6
With this version of grouper, the code will never load more than one row at a time
from collections import deque
import itertools as itt

exhaust = deque(maxlen=0).extend

def igrouper(iterable, size):
    if size < 2:
        if size == 1:
            for x in iterable:
                yield (x,)
            return
        else:
            raise ValueError('Expected size > 0, got', size)
    data = iter(iterable)
    for item in data:
        group = itt.chain((item,), itt.islice(data, size-1))
        yield group
        exhaust(group)
Reply
#7
What if the program has two threads?
One thread is reading the csv-file, grouping the data and send it through a queue to the other worker process, which is using the executemany function. To reduce the memory consumption, you should set maxsize of the queue.


Here an example. mmap should speed up the reading, but I guess the Database is slower with writing.
You have to find out the ideal chunksize. If the chunksize is too small, you have many calls to the database. (Overhead)
If the chunksize is bigger, you have lesser calls, which takes more time. To find something ideal between need measurements.

I use sometimes the progressbar2 package to show the progress in console.
This can be used to detect which chunksize is ideal.

At the beginning you have a burst until the queue is filled up to it's maxsize. Then you see, that the speed is reducing.
The queue blocks, if it's full.


Here the example:
import time
from pathlib import Path
from threading import Thread
from queue import Queue
from mmap import mmap, ACCESS_READ

from progressbar import ProgressBar, Timer, Bar, ETA, FileTransferSpeed


WIDGETS = [' [', Timer(), '] ', '[', FileTransferSpeed(), '] ' ,Bar(), ' (', ETA(), ') ',]
DONE = object()


def iter_lines(file):
    path = Path(file)
    size = path.stat().st_size
    pb = ProgressBar(max_value=size, widgets=WIDGETS)
    with open(file) as fd:
        with mmap(fd.fileno(), size, access=ACCESS_READ) as reader:
            pos = 0
            while True:
                idx = reader.find(b'\n', pos)
                if idx == -1:
                    yield reader[pos:]
                    break
                yield reader[pos:idx]
                pos = idx + 1
                pb.update(pos)
    pb.finish()
    raise GeneratorExit


def read_chunks(file, chunksize, queue):
    iterator = iter_lines(file)
    while True:
        chunk = []
        try:
            for line, _ in zip(iterator, range(chunksize)):
                chunk.append(line.decode())
        except GeneratorExit:
            queue.put(chunk)
            queue.put(DONE)
            return
        queue.put(chunk)


def write_database(connection, queue):
    while True:
        chunk = queue.get()
        if chunk is DONE:
            # check for sentinel object
            # break out if the reader is DONE
            queue.task_done()
            break
        # write the data to database
        time.sleep(0.1)
        # this takes time
        # then call task_done()
        queue.task_done()


input_file = '/home/andre/Schreibtisch/big_file.txt'
chunk_size = 1000
queue_size = 20
queue = Queue(maxsize=queue_size)
reader = Thread(target=read_chunks, args=(input_file, chunk_size, queue))
writer = Thread(target=write_database, args=(None, queue))
reader.start()
writer.start()
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Converted EXE file size is too large Rajasekaran 0 1,510 Mar-30-2023, 11:50 AM
Last Post: Rajasekaran
  validate large json file with millions of records in batches herobpv 3 1,264 Dec-10-2022, 10:36 PM
Last Post: bowlofred
  Pyinstaller distribution file seems too large hammer 4 2,708 Mar-31-2022, 02:33 PM
Last Post: snippsat
  Initializing, reading and updating a large JSON file medatib531 0 1,769 Mar-10-2022, 07:58 PM
Last Post: medatib531
  can't read QRcode in large file simoneek 0 1,502 Sep-16-2020, 08:52 AM
Last Post: simoneek
  Iterate 2 large text files across lines and replace lines in second file medatib531 13 5,827 Aug-10-2020, 11:01 PM
Last Post: medatib531
  Read/Sort Large text file avoiding line-by-line read using mmep or hdf5 Robotguy 0 2,048 Jul-22-2020, 08:11 PM
Last Post: Robotguy
  Loading large .csv file with pandas hangejj 2 2,376 Jun-08-2020, 01:32 AM
Last Post: hangejj
  Generate Cartesian Products with Itertools Incrementally CoderMan 2 1,836 Jun-04-2020, 04:51 PM
Last Post: CoderMan
  Moving large amount of data between MySql and Sql Server using Python ste80adr 4 3,399 Apr-24-2020, 01:24 PM
Last Post: Jeff900

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020