Python Forum

Full Version: How can I use concurrency to migrate database in Python?
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I have a script used to migrate data from SQLite to Postgres. I just use a for loop to transfer tables one by one. Now, I want to experiment with transfering multiple tables in concurrency using thread, multiprocessing or asyncio to speed up the program then compare the runtimes between those ways.
I'm testing with multiprocessing but i got some errors. Can someone help me? How do you do one of those ways?

Here is my script:
import psycopg2, sqlite3, sys
import time
import multiprocessing

sqdb="C://Users//duongnb//Desktop//Python//SqliteToPostgreFull//testmydb6.db"
sqlike="table"
pgdb="testmydb11"
pguser="postgres"
pgpswd="1234"
pghost="127.0.0.1"
pgport="5432"

 
consq=sqlite3.connect(sqdb)
cursq=consq.cursor()
 
tabnames=[]
print() 
cursq.execute('SELECT name FROM sqlite_master WHERE type="table" AND name LIKE "%table%";')
tabgrab = cursq.fetchall()
for item in tabgrab:
    tabnames.append(item[0])
print(tabgrab)

def copyTable(table):
        cursq.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = ?;", (table,))
        create = cursq.fetchone()[0]
        cursq.execute("SELECT * FROM %s;" %table)
        rows=cursq.fetchall()
        colcount=len(rows[0])
        pholder='%s,'*colcount
        newholder=pholder[:-1]
    
        try:
    
            conpg = psycopg2.connect(database=pgdb, user=pguser, password=pgpswd,
                                host=pghost, port=pgport)
            curpg = conpg.cursor()
            curpg.execute("DROP TABLE IF EXISTS %s;" %table)
            create = create.replace("AUTOINCREMENT", "")
            curpg.execute(create)
            curpg.executemany("INSERT INTO %s VALUES (%s);" % (table, newholder),rows)
            conpg.commit()
            
            if conpg:
                conpg.close()
    
        except psycopg2.DatabaseError as e:
            print ('Error %s' % e) 
            sys.exit(1)
    
        finally:
            print("Complete")    

consq.close()

if __name__ == "__main__":
    start_time = time.time()
    for table in tabnames:
        p = multiprocessing.Process(target = copyTable, args = (table))
        p.start()
    for table in tabnames:
        p.join()
    print("All processes finished.")      
            
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")
My error :
Error:
Microsoft Windows [Version 10.0.18362.535] (c) 2019 Microsoft Corporation. All rights reserved. (base) C:\Users\duongnb\Desktop\Python>python -u "c:\Users\duongnb\Desktop\Python\SqliteToPostgreFull\sqq.py" [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-4: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-8: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-1: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-10: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 7 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-2: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given All processes finished. [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Duration 0.16455531120300293 seconds[('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-6: Process Process-9: [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Traceback (most recent call last): Traceback (most recent call last): [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() Process Process-5: File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) Traceback (most recent call last): Process Process-3: TypeError: copyTable() takes 1 positional argument but 6 were given TypeError: copyTable() takes 1 positional argument but 6 were given File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given Traceback (most recent call last): [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) Process Process-7: TypeError: copyTable() takes 1 positional argument but 6 were given Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given (base) C:\Users\duongnb\Desktop\Python>C:/Users/duongnb/AppData/Local/Continuum/anaconda3/Scripts/activate (base) C:\Users\duongnb\Desktop\Python>conda activate base (base) C:\Users\duongnb\Desktop\Python>
debugging your code
try to change args = (table) to args = (table,)
(table) is same as table - and it accepts each char of the table name (iterable with 6/7 elements, depending on the len) as separate argument

Also I don't think you need line 62. i.e. no need to loop over tables second time. you will join the last p several times
Thanks, its working, but what is the different ?
(Jan-31-2020, 03:08 AM)binhduonggttn Wrote: [ -> ]what is the different ?
>>> table = 'table1'
>>> (table)
'table1'
>>> type((table))
<class 'str'>
>>> len((table))
6
>>> (table, )
('table1',)
>>> type((table, ))
<class 'tuple'>
>>> len((table, ))
1
>>>