Python Forum

Full Version: Using executemany to import the data
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi All,

I'm using the below code to port the data from MySQL to postgreSQL database and it is working fine, but it's rather slow though( more than 3 minutes is taking for porting some 500 rows, so i think the for loop in one of the function is iterating each rows and inserting it to database, but is there a way to execute the whole rows at once rather than looping through all rows? Ca it be done via executemany, if so what all changes i need to do? or is there a faster way to do the porting want the gap to be as narrow as possible.


Below the function i need to change,I have pasted the whole porting script below for reference.

If anyone have any reference it will be of great help.


def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
    print("Function Call..")
    msql.execute(msql_command)
    
    for row in msql:
        try:
            print("Insertion of rows..")
            print(row)
            psql.execute(psql_command, row)
            cnx_psql.commit()
import psycopg2
import os
import time
import MySQLdb
import sys
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
from utils.config import Configuration as Config
from utils.postgres_helper import get_connection
from utils.utils import get_global_config


def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command):
    print("Function Call..")
    msql.execute(msql_command)
    
    for row in msql:
        try:
            print("Insertion of rows..")
            print(row)
            psql.execute(psql_command, row)
            cnx_psql.commit()
        except psycopg2.Error as e:
            print ("Cannot execute the query!!", e.pgerror)
            sys.exit("Problem occured with the query!!!")


def dB_Fetch():

# MySQLdb connection
 try:
    source_host = 'magento'
    conf = get_global_config()
    cnx_msql = MySQLdb.connect(host=conf.get(source_host, 'host'),
                               user=conf.get(source_host, 'user'),
                               passwd=conf.get(source_host, 'password'),
                               port=int(conf.get(source_host, 'port')),
                               db=conf.get(source_host, 'db'))
 except mysql.connector.Error as e:
   print ("MYSQL: Unable to connect!", e.msg)
   sys.exit(1)

# Postgresql connection
 try:
   cnx_psql = get_connection(get_global_config(), 'pg_dwh')
 except psycopg2.Error as e:
   print('PSQL: Unable to connect!\n{0}').format(e)
   sys.exit(1)

# Cursors initializations
 cur_msql = cnx_msql.cursor()
 cur_psql = cnx_psql.cursor()

 try:
   print("creating table using cursor")

   SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""

   SQL_create_sales_flat_quote="""DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
            (
                  customer_id         BIGINT
                , entity_id           BIGINT
		, store_id 	      BIGINT
		, customer_email      TEXT
		, customer_firstname  TEXT
		, customer_middlename TEXT
		, customer_lastname   TEXT
		, customer_is_guest   BIGINT
		, customer_group_id   BIGINT
		, created_at 	      TIMESTAMP WITHOUT TIME ZONE
		, updated_at 	      TIMESTAMP WITHOUT TIME ZONE
		, is_active 	      BIGINT
		, items_count 	      BIGINT
		, items_qty 	      BIGINT
		, base_currency_code  TEXT
		, grand_total 	      NUMERIC(12,4)
		, base_to_global_rate NUMERIC(12,4)
		, base_subtotal       NUMERIC(12,4)
		, base_subtotal_with_discount   NUMERIC(12,4)
	    )
   ;"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
                 (store_id        INTEGER
		, row_total       NUMERIC
		, updated_at      TIMESTAMP WITHOUT TIME ZONE
		, qty             NUMERIC
		, sku             CHARACTER VARYING
		, free_shipping   INTEGER
		, quote_id        INTEGER
		, price 	  NUMERIC
		, no_discount 	  INTEGER
		, item_id 	  INTEGER
		, product_type 	  CHARACTER VARYING
		, base_tax_amount NUMERIC
		, product_id 	  INTEGER
		, name 		  CHARACTER VARYING
		, created_at 	  TIMESTAMP WITHOUT TIME ZONE
	    );"""

   SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1
                 (name              CHARACTER VARYING
		, sku               CHARACTER VARYING
		, type_id           CHARACTER VARYING
		, created_at        TIMESTAMP WITHOUT TIME ZONE
		, url_path          CHARACTER VARYING
		, price             NUMERIC
		, short_description CHARACTER VARYING
		, url_key           CHARACTER VARYING
		, thumbnail_label   CHARACTER VARYING
		, small_image       CHARACTER VARYING
		, thumbnail         CHARACTER VARYING
		 
	    );"""


   print("Creating  Schema...")   
   cur_psql.execute(SQL_create_Staging_schema)

   print("Creating tables...")
   cur_psql.execute(SQL_create_sales_flat_quote)

   cur_psql.execute(SQL_create_sales_flat_quote_item)
   cnx_psql.commit();

   print("Fetching data from source server")
   #select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping.
   commands = [
              ("""SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename
                 , customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active
                 , items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                 , base_subtotal_with_discount from sales_flat_quote
                 where is_active=1
                 AND items_count != '0'
                 AND updated_at > '2019-05-12 00:00:00';""",
              """INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname
                , customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at
                , is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal
                , base_subtotal_with_discount) 
              SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;"""),
        
            ("""SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type
                       ,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""",
             """INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id
                                                          ,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name
                                                          ,created_at)
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")
            
             ,("""SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail
                        ,name,sku,type_id from catalog_product_flat_1;""", 
             """INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label
                                                            ,small_image,thumbnail,name,sku,type_id) 
                SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""")            
            ]

   for msql_command, psql_command in commands:
       psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command)

 except (Exception, psycopg2.Error) as error:
     print ("Error while fetching data from PostgreSQL", error)
 finally:
     ## Closing cursors
     cur_msql.close()
     cur_psql.close()
     ## Committing
     cnx_psql.commit()
     ## Closing database connections
     cnx_msql.close()
     cnx_psql.close()

        
if __name__ == '__main__':
    dB_Fetch()
Can anyone please help me or suggest any solution for this?