Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Speedup the porting process
#1
Hi All,
I'm trying to port data from MySQL DB to PostgreSQL DB using the below script,The script is running fine ,but it is slow for higher record count(>10000),Is there an efficient way to speedup the porting process?Any code change or any other methods?


import psycopg2
import os
import time
import MySQLdb
import sys
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
from utils.config import Configuration as Config
from utils.postgres_helper import get_connection
from utils.utils import get_global_config

def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
    print("Function Call..")
    msql.execute(msql_command)
    
    for row in msql:
        try:
            print("Insertion of rows..")
            print(row)
            psql.execute(psql_command, row)
            cnx_psql.commit()
        except psycopg2.Error as e:
            print ("Cannot execute the query!!", e.pgerror)
            sys.exit("Problem occured with the query!!!")


def dB_Fetch():

# MySQLdb connection
 try:
    source_host = 'magento'
    conf = get_global_config()
    cnx_msql = MySQLdb.connect(host=conf.get(source_host, 'host'),
                               user=conf.get(source_host, 'user'),
                               passwd=conf.get(source_host, 'password'),
                               port=int(conf.get(source_host, 'port')),
                               db=conf.get(source_host, 'db'))
 except mysql.connector.Error as e:
   print ("MYSQL: Unable to connect!", e.msg)
   sys.exit(1)

# Postgresql connection
 try:
   cnx_psql = get_connection(get_global_config(), 'pg_dwh')
 except psycopg2.Error as e:
   print('PSQL: Unable to connect!\n{0}').format(e)
   sys.exit(1)

# Cursors initializations
 cur_msql = cnx_msql.cursor()
 cur_psql = cnx_psql.cursor()

 try:
   print("creating table using cursor")

   SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""

   SQL_create_sales_flat_quote="""DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
            (
          customer_id         BIGINT
        , entity_id           BIGINT
		, store_id 	          BIGINT
		, customer_email      TEXT
		, customer_firstname  TEXT
		, customer_middlename TEXT
		, customer_lastname   TEXT
		, customer_is_guest   BIGINT
		, customer_group_id   BIGINT
		, created_at 	      TIMESTAMP WITHOUT TIME ZONE
		, updated_at 	      TIMESTAMP WITHOUT TIME ZONE
		, is_active 	      BIGINT
		, items_count 	      BIGINT
		, items_qty 	      BIGINT
		, base_currency_code  TEXT
		, grand_total 	      NUMERIC(12,4)
		, base_to_global_rate NUMERIC(12,4)
		, base_subtotal       NUMERIC(12,4)
		, base_subtotal_with_discount   NUMERIC(12,4)
	    )
   ;"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
         (store_id        INTEGER
		, row_total       NUMERIC
		, updated_at      TIMESTAMP WITHOUT TIME ZONE
		, qty             NUMERIC
		, sku             CHARACTER VARYING
		, free_shipping   INTEGER
		, quote_id        INTEGER
		, price 	      NUMERIC
		, no_discount 	  INTEGER
		, item_id 	      INTEGER
		, product_type 	  CHARACTER VARYING
		, base_tax_amount NUMERIC
		, product_id 	  INTEGER
		, name 		      CHARACTER VARYING
		, created_at 	  TIMESTAMP WITHOUT TIME ZONE
	    );"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1
         (name              CHARACTER VARYING
		, sku               CHARACTER VARYING
		, type_id           CHARACTER VARYING
		, created_at        TIMESTAMP WITHOUT TIME ZONE
		, url_path          CHARACTER VARYING
		, price             NUMERIC
		, short_description CHARACTER VARYING
		, url_key           CHARACTER VARYING
		, thumbnail_label   CHARACTER VARYING
		, small_image       CHARACTER VARYING
		, thumbnail         CHARACTER VARYING
	    );"""

   print("Creating  Schema...")   
   cur_psql.execute(SQL_create_Staging_schema)

   print("Creating tables...")
   cur_psql.execute(SQL_create_sales_flat_quote)

   cur_psql.execute(SQL_create_sales_flat_quote_item)
   cnx_psql.commit();

   print("Fetching data from source server")
   #select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping.
   commands = [
              ("""SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename
                 , customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active
                 , items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                 , base_subtotal_with_discount from sales_flat_quote
                 where is_active=1
                 AND items_count != '0'
                 AND updated_at > '2019-05-12 00:00:00';""",
              """INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname
                , customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at
                , is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                , base_subtotal_with_discount) 
              SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;"""),
        
            ("""SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type
                       ,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""",
             """INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id
                                                          ,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name
                                                          ,created_at)
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")
            
             ,("""SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail
                        ,name,sku,type_id from catalog_product_flat_1;""", 
             """INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label
                                                            ,small_image,thumbnail,name,sku,type_id) 
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")            
            ]

   for msql_command, psql_command in commands:
       psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command)

 except (Exception, psycopg2.Error) as error:
     print ("Error while fetching data from PostgreSQL", error)
 finally:
     ## Closing cursors
     cur_msql.close()
     cur_psql.close()
     ## Committing
     cnx_psql.commit()
     ## Closing database connections
     cnx_msql.close()
     cnx_psql.close()

        
if __name__ == '__main__':
    dB_Fetch()
Reply
#2
If tables aren't huge, dumping to sql from one database and then reloading that sql into the new database
might be a lot faster. If not that, creating indexes on fields in your where clause would certainly speed up the
process.
Reply
#3
Related to above dumping solution: (was browsing doc's already)
( https://dev.mysql.com/doc/search/?d=201&p=1&q=dump )
( https://www.postgresql.org/docs/11/backu...MP-RESTORE )

Optional alternative: 30.3. Asynchronous Commit at postgresql doc.
postgresql doc Wrote:... transaction commit is normally synchronous: the server waits for the transaction's WAL records to be flushed to permanent storage before returning a success indication to the client. The client is therefore guaranteed that a transaction reported to be committed will be preserved, even in the event of a server crash immediately after. However, for short transactions this delay is a major component of the total transaction time. ...
Going beyond the idiomatic Python (Sensible Idiomatic usage - Blog post - Sep 11th 2017)
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Help with porting to python3 mason28 3 5,294 Jan-23-2021, 02:48 AM
Last Post: Larz60+
  Porting Applesoft BASIC "DECIDE" to Python RobinHood2020 0 1,610 Mar-29-2020, 06:55 PM
Last Post: RobinHood2020
  output mismatching when porting a python from python2 env to python3 env prayuktibid 2 2,574 Jan-21-2020, 04:41 AM
Last Post: prayuktibid
  How to sharing object between multiple process from main process using Pipe Subrata 1 3,679 Sep-03-2019, 09:49 PM
Last Post: woooee
  Porting a MATLAB program into Python? xpress_embedo 1 4,285 Jan-16-2017, 06:16 PM
Last Post: micseydel

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020