Python Forum
How can I use concurrency to migrate database in Python?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How can I use concurrency to migrate database in Python?
#1
I have a script used to migrate data from SQLite to Postgres. I just use a for loop to transfer tables one by one. Now, I want to experiment with transfering multiple tables in concurrency using thread, multiprocessing or asyncio to speed up the program then compare the runtimes between those ways.
I'm testing with multiprocessing but i got some errors. Can someone help me? How do you do one of those ways?

Here is my script:
import psycopg2, sqlite3, sys
import time
import multiprocessing

sqdb="C://Users//duongnb//Desktop//Python//SqliteToPostgreFull//testmydb6.db"
sqlike="table"
pgdb="testmydb11"
pguser="postgres"
pgpswd="1234"
pghost="127.0.0.1"
pgport="5432"

 
consq=sqlite3.connect(sqdb)
cursq=consq.cursor()
 
tabnames=[]
print() 
cursq.execute('SELECT name FROM sqlite_master WHERE type="table" AND name LIKE "%table%";')
tabgrab = cursq.fetchall()
for item in tabgrab:
    tabnames.append(item[0])
print(tabgrab)

def copyTable(table):
        cursq.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name = ?;", (table,))
        create = cursq.fetchone()[0]
        cursq.execute("SELECT * FROM %s;" %table)
        rows=cursq.fetchall()
        colcount=len(rows[0])
        pholder='%s,'*colcount
        newholder=pholder[:-1]
    
        try:
    
            conpg = psycopg2.connect(database=pgdb, user=pguser, password=pgpswd,
                                host=pghost, port=pgport)
            curpg = conpg.cursor()
            curpg.execute("DROP TABLE IF EXISTS %s;" %table)
            create = create.replace("AUTOINCREMENT", "")
            curpg.execute(create)
            curpg.executemany("INSERT INTO %s VALUES (%s);" % (table, newholder),rows)
            conpg.commit()
            
            if conpg:
                conpg.close()
    
        except psycopg2.DatabaseError as e:
            print ('Error %s' % e) 
            sys.exit(1)
    
        finally:
            print("Complete")    

consq.close()

if __name__ == "__main__":
    start_time = time.time()
    for table in tabnames:
        p = multiprocessing.Process(target = copyTable, args = (table))
        p.start()
    for table in tabnames:
        p.join()
    print("All processes finished.")      
            
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")
My error :
Error:
Microsoft Windows [Version 10.0.18362.535] (c) 2019 Microsoft Corporation. All rights reserved. (base) C:\Users\duongnb\Desktop\Python>python -u "c:\Users\duongnb\Desktop\Python\SqliteToPostgreFull\sqq.py" [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-4: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-8: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-1: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-10: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 7 were given [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-2: Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given All processes finished. [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Duration 0.16455531120300293 seconds[('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Process Process-6: Process Process-9: [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] Traceback (most recent call last): Traceback (most recent call last): [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() Process Process-5: File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) Traceback (most recent call last): Process Process-3: TypeError: copyTable() takes 1 positional argument but 6 were given TypeError: copyTable() takes 1 positional argument but 6 were given File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given Traceback (most recent call last): [('table1',), ('table2',), ('table3',), ('table4',), ('table5',), ('table6',), ('table7',), ('table8',), ('table9',), ('table10',)] File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) Process Process-7: TypeError: copyTable() takes 1 positional argument but 6 were given Traceback (most recent call last): File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap self.run() File "C:\Users\duongnb\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run self._target(*self._args, **self._kwargs) TypeError: copyTable() takes 1 positional argument but 6 were given (base) C:\Users\duongnb\Desktop\Python>C:/Users/duongnb/AppData/Local/Continuum/anaconda3/Scripts/activate (base) C:\Users\duongnb\Desktop\Python>conda activate base (base) C:\Users\duongnb\Desktop\Python>
Reply
#2
debugging your code
Reply
#3
try to change args = (table) to args = (table,)
(table) is same as table - and it accepts each char of the table name (iterable with 6/7 elements, depending on the len) as separate argument

Also I don't think you need line 62. i.e. no need to loop over tables second time. you will join the last p several times
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply
#4
Thanks, its working, but what is the different ?
Reply
#5
(Jan-31-2020, 03:08 AM)binhduonggttn Wrote: what is the different ?
>>> table = 'table1'
>>> (table)
'table1'
>>> type((table))
<class 'str'>
>>> len((table))
6
>>> (table, )
('table1',)
>>> type((table, ))
<class 'tuple'>
>>> len((table, ))
1
>>>
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  migrate code from tkinter to pygame Frankduc 16 3,105 Jun-01-2022, 12:45 PM
Last Post: Frankduc
  Global Variables. Migrate code from MatLab Felipe 8 6,818 Jan-13-2017, 01:19 AM
Last Post: Felipe

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020