Python Forum

Full Version: Step through a really large CSV file incrementally in Python
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I am trying to speed up loading a large CSV file into a MySQL database. I'm using this code it takes about 4 hours to load a 4GB file with millions of rows:

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for row in csv_reader:
        cursor.execute(insert_sql,row)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()
That code does work! However, it takes many hours to complete. Between 4 and 5 hours is what it takes to ingest the file. This code is abbreviated, and does not show the try statements I have in the original.

I want to use the executemany() statement to make this faster. For that, you have to pass a list of tuples to the second argument.

If I build the list on each row iteration it gets too large, and I get out of memory errors when the list gets too large, and the script crashes.

I am not able to get a length of csv_reader or csv_file to use in a range statement.

How can I loop through the CSV file 1000 rows at a time and store the result in a list, use it in executemany, then store the next 1000 rows, etc until the end of the CSV file?
Hope you will get some idea.

The following example assumes
CSV file contains column names in the first line
Connection is already built
File name is test.csv
Table name is MyTable
Python 3
with open ('test.csv', 'r') as f:
    reader = csv.reader(f)
    columns = next(reader) 
    query = 'insert into MyTable({0}) values ({1})'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))
    cursor = connection.cursor()
    for data in reader:
        cursor.execute(query, data)
    cursor.commit()
If column names are not included in the file:

...
with open ('test.csv', 'r') as f:
    reader = csv.reader(f)
    data = next(reader) 
    query = 'insert into dbo.Test values ({0})'
    query = query.format(','.join('?' * len(data)))
    cursor = connection.cursor()
    cursor.execute(query, data)
    for data in reader:
        cursor.execute(query, data)
    cursor.commit()
Source :

https://stackoverflow.com/questions/2125...ing-python
Maybe something like this will work

Note All code not tested

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    rows = []
    row_count = 0
    for row in csv_reader:
        row_count += 1
        rows.append(row)
        if row_count == 1000:
            cursor.executemany(insert_sql,rows)
            print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
            rows = []
            row_count = 0
    if rows:
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()


Using itertools
from itertools import zip_longest


def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)
    
with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for rows in grouper(csv_reader, 1000):
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()
(May-06-2019, 08:14 PM)Yoriz Wrote: [ -> ]Maybe something like this will work

Note All code not tested

with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    rows = []
    row_count = 0
    for row in csv_reader:
        row_count += 1
        rows.append(row)
        if row_count == 1000:
            cursor.executemany(insert_sql,rows)
            print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
            rows = []
            row_count = 0
    if rows:
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()


Using itertools
from itertools import zip_longest


def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)
    
with open(source) as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader)
    insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
    for rows in grouper(csv_reader, 1000):
        cursor.executemany(insert_sql,rows)
        print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
    print("Committing the DB")
    mydb.commit(
cursor.close()
mydb.close()

Your first example worked really nicely! Thank you for that! It supercharged this process. I also want to learn the iterools method. I will try that tomorrow. Best wishes!
The itertools version would need the following row change to report the last row of the chunk
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
to
print(cursor.rowcount, 'inserted with LinkedAccountId', rows[-1], 'at', datetime.now().isoformat())
With this version of grouper, the code will never load more than one row at a time
from collections import deque
import itertools as itt

exhaust = deque(maxlen=0).extend

def igrouper(iterable, size):
    if size < 2:
        if size == 1:
            for x in iterable:
                yield (x,)
            return
        else:
            raise ValueError('Expected size > 0, got', size)
    data = iter(iterable)
    for item in data:
        group = itt.chain((item,), itt.islice(data, size-1))
        yield group
        exhaust(group)
What if the program has two threads?
One thread is reading the csv-file, grouping the data and send it through a queue to the other worker process, which is using the executemany function. To reduce the memory consumption, you should set maxsize of the queue.


Here an example. mmap should speed up the reading, but I guess the Database is slower with writing.
You have to find out the ideal chunksize. If the chunksize is too small, you have many calls to the database. (Overhead)
If the chunksize is bigger, you have lesser calls, which takes more time. To find something ideal between need measurements.

I use sometimes the progressbar2 package to show the progress in console.
This can be used to detect which chunksize is ideal.

At the beginning you have a burst until the queue is filled up to it's maxsize. Then you see, that the speed is reducing.
The queue blocks, if it's full.


Here the example:
import time
from pathlib import Path
from threading import Thread
from queue import Queue
from mmap import mmap, ACCESS_READ

from progressbar import ProgressBar, Timer, Bar, ETA, FileTransferSpeed


WIDGETS = [' [', Timer(), '] ', '[', FileTransferSpeed(), '] ' ,Bar(), ' (', ETA(), ') ',]
DONE = object()


def iter_lines(file):
    path = Path(file)
    size = path.stat().st_size
    pb = ProgressBar(max_value=size, widgets=WIDGETS)
    with open(file) as fd:
        with mmap(fd.fileno(), size, access=ACCESS_READ) as reader:
            pos = 0
            while True:
                idx = reader.find(b'\n', pos)
                if idx == -1:
                    yield reader[pos:]
                    break
                yield reader[pos:idx]
                pos = idx + 1
                pb.update(pos)
    pb.finish()
    raise GeneratorExit


def read_chunks(file, chunksize, queue):
    iterator = iter_lines(file)
    while True:
        chunk = []
        try:
            for line, _ in zip(iterator, range(chunksize)):
                chunk.append(line.decode())
        except GeneratorExit:
            queue.put(chunk)
            queue.put(DONE)
            return
        queue.put(chunk)


def write_database(connection, queue):
    while True:
        chunk = queue.get()
        if chunk is DONE:
            # check for sentinel object
            # break out if the reader is DONE
            queue.task_done()
            break
        # write the data to database
        time.sleep(0.1)
        # this takes time
        # then call task_done()
        queue.task_done()


input_file = '/home/andre/Schreibtisch/big_file.txt'
chunk_size = 1000
queue_size = 20
queue = Queue(maxsize=queue_size)
reader = Thread(target=read_chunks, args=(input_file, chunk_size, queue))
writer = Thread(target=write_database, args=(None, queue))
reader.start()
writer.start()