Posts: 33
Threads: 17
Joined: Feb 2019
I am trying to speed up loading a large CSV file into a MySQL database. I'm using this code it takes about 4 hours to load a 4GB file with millions of rows:
with open(source) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
for row in csv_reader:
cursor.execute(insert_sql,row)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
print("Committing the DB")
mydb.commit(
cursor.close()
mydb.close() That code does work! However, it takes many hours to complete. Between 4 and 5 hours is what it takes to ingest the file. This code is abbreviated, and does not show the try statements I have in the original.
I want to use the executemany() statement to make this faster. For that, you have to pass a list of tuples to the second argument.
If I build the list on each row iteration it gets too large, and I get out of memory errors when the list gets too large, and the script crashes.
I am not able to get a length of csv_reader or csv_file to use in a range statement.
How can I loop through the CSV file 1000 rows at a time and store the result in a list, use it in executemany, then store the next 1000 rows, etc until the end of the CSV file?
Posts: 37
Threads: 13
Joined: Apr 2018
Hope you will get some idea.
The following example assumes
CSV file contains column names in the first line
Connection is already built
File name is test.csv
Table name is MyTable
Python 3
with open ('test.csv', 'r') as f:
reader = csv.reader(f)
columns = next(reader)
query = 'insert into MyTable({0}) values ({1})'
query = query.format(','.join(columns), ','.join('?' * len(columns)))
cursor = connection.cursor()
for data in reader:
cursor.execute(query, data)
cursor.commit() If column names are not included in the file:
...
with open ('test.csv', 'r') as f:
reader = csv.reader(f)
data = next(reader)
query = 'insert into dbo.Test values ({0})'
query = query.format(','.join('?' * len(data)))
cursor = connection.cursor()
cursor.execute(query, data)
for data in reader:
cursor.execute(query, data)
cursor.commit() Source :
https://stackoverflow.com/questions/2125...ing-python
Posts: 2,168
Threads: 35
Joined: Sep 2016
May-06-2019, 08:14 PM
(This post was last modified: May-06-2019, 08:45 PM by Yoriz.)
Maybe something like this will work
Note All code not tested
with open(source) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
rows = []
row_count = 0
for row in csv_reader:
row_count += 1
rows.append(row)
if row_count == 1000:
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
rows = []
row_count = 0
if rows:
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
print("Committing the DB")
mydb.commit(
cursor.close()
mydb.close()
Using itertools
from itertools import zip_longest
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
with open(source) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
for rows in grouper(csv_reader, 1000):
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
print("Committing the DB")
mydb.commit(
cursor.close()
mydb.close()
Posts: 33
Threads: 17
Joined: Feb 2019
(May-06-2019, 08:14 PM)Yoriz Wrote: Maybe something like this will work
Note All code not tested
with open(source) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
rows = []
row_count = 0
for row in csv_reader:
row_count += 1
rows.append(row)
if row_count == 1000:
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
rows = []
row_count = 0
if rows:
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
print("Committing the DB")
mydb.commit(
cursor.close()
mydb.close()
Using itertools
from itertools import zip_longest
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
with open(source) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
next(csv_reader)
insert_sql = """ INSERT INTO billing_info_test (InvoiceId, PayerAccountId, LinkedAccountId) VALUES (%s, %s, %s) """
for rows in grouper(csv_reader, 1000):
cursor.executemany(insert_sql,rows)
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat())
print("Committing the DB")
mydb.commit(
cursor.close()
mydb.close()
Your first example worked really nicely! Thank you for that! It supercharged this process. I also want to learn the iterools method. I will try that tomorrow. Best wishes!
Posts: 2,168
Threads: 35
Joined: Sep 2016
The itertools version would need the following row change to report the last row of the chunk
print(cursor.rowcount, 'inserted with LinkedAccountId', row[2], 'at', datetime.now().isoformat()) to
print(cursor.rowcount, 'inserted with LinkedAccountId', rows[-1], 'at', datetime.now().isoformat())
Posts: 4,803
Threads: 77
Joined: Jan 2018
May-07-2019, 06:24 AM
(This post was last modified: May-07-2019, 06:26 AM by Gribouillis.)
With this version of grouper, the code will never load more than one row at a time
from collections import deque
import itertools as itt
exhaust = deque(maxlen=0).extend
def igrouper(iterable, size):
if size < 2:
if size == 1:
for x in iterable:
yield (x,)
return
else:
raise ValueError('Expected size > 0, got', size)
data = iter(iterable)
for item in data:
group = itt.chain((item,), itt.islice(data, size-1))
yield group
exhaust(group)
Posts: 2,128
Threads: 11
Joined: May 2017
What if the program has two threads?
One thread is reading the csv-file, grouping the data and send it through a queue to the other worker process, which is using the executemany function. To reduce the memory consumption, you should set maxsize of the queue.
Here an example. mmap should speed up the reading, but I guess the Database is slower with writing.
You have to find out the ideal chunksize. If the chunksize is too small, you have many calls to the database. (Overhead)
If the chunksize is bigger, you have lesser calls, which takes more time. To find something ideal between need measurements.
I use sometimes the progressbar2 package to show the progress in console.
This can be used to detect which chunksize is ideal.
At the beginning you have a burst until the queue is filled up to it's maxsize. Then you see, that the speed is reducing.
The queue blocks, if it's full.
Here the example:
import time
from pathlib import Path
from threading import Thread
from queue import Queue
from mmap import mmap, ACCESS_READ
from progressbar import ProgressBar, Timer, Bar, ETA, FileTransferSpeed
WIDGETS = [' [', Timer(), '] ', '[', FileTransferSpeed(), '] ' ,Bar(), ' (', ETA(), ') ',]
DONE = object()
def iter_lines(file):
path = Path(file)
size = path.stat().st_size
pb = ProgressBar(max_value=size, widgets=WIDGETS)
with open(file) as fd:
with mmap(fd.fileno(), size, access=ACCESS_READ) as reader:
pos = 0
while True:
idx = reader.find(b'\n', pos)
if idx == -1:
yield reader[pos:]
break
yield reader[pos:idx]
pos = idx + 1
pb.update(pos)
pb.finish()
raise GeneratorExit
def read_chunks(file, chunksize, queue):
iterator = iter_lines(file)
while True:
chunk = []
try:
for line, _ in zip(iterator, range(chunksize)):
chunk.append(line.decode())
except GeneratorExit:
queue.put(chunk)
queue.put(DONE)
return
queue.put(chunk)
def write_database(connection, queue):
while True:
chunk = queue.get()
if chunk is DONE:
# check for sentinel object
# break out if the reader is DONE
queue.task_done()
break
# write the data to database
time.sleep(0.1)
# this takes time
# then call task_done()
queue.task_done()
input_file = '/home/andre/Schreibtisch/big_file.txt'
chunk_size = 1000
queue_size = 20
queue = Queue(maxsize=queue_size)
reader = Thread(target=read_chunks, args=(input_file, chunk_size, queue))
writer = Thread(target=write_database, args=(None, queue))
reader.start()
writer.start()
|