May-14-2019, 01:46 PM
(This post was last modified: May-14-2019, 01:46 PM by Sandy7771989.)
Hi All,
I'm trying to port data from MySQL DB to PostgreSQL DB using the below script,The script is running fine ,but it is slow for higher record count(>10000),Is there an efficient way to speedup the porting process?Any code change or any other methods?
I'm trying to port data from MySQL DB to PostgreSQL DB using the below script,The script is running fine ,but it is slow for higher record count(>10000),Is there an efficient way to speedup the porting process?Any code change or any other methods?
import psycopg2 import os import time import MySQLdb import sys from pprint import pprint from datetime import datetime from psycopg2 import sql from utils.config import Configuration as Config from utils.postgres_helper import get_connection from utils.utils import get_global_config def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command): print("Function Call..") msql.execute(msql_command) for row in msql: try: print("Insertion of rows..") print(row) psql.execute(psql_command, row) cnx_psql.commit() except psycopg2.Error as e: print ("Cannot execute the query!!", e.pgerror) sys.exit("Problem occured with the query!!!") def dB_Fetch(): # MySQLdb connection try: source_host = 'magento' conf = get_global_config() cnx_msql = MySQLdb.connect(host=conf.get(source_host, 'host'), user=conf.get(source_host, 'user'), passwd=conf.get(source_host, 'password'), port=int(conf.get(source_host, 'port')), db=conf.get(source_host, 'db')) except mysql.connector.Error as e: print ("MYSQL: Unable to connect!", e.msg) sys.exit(1) # Postgresql connection try: cnx_psql = get_connection(get_global_config(), 'pg_dwh') except psycopg2.Error as e: print('PSQL: Unable to connect!\n{0}').format(e) sys.exit(1) # Cursors initializations cur_msql = cnx_msql.cursor() cur_psql = cnx_psql.cursor() try: print("creating table using cursor") SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;""" SQL_create_sales_flat_quote="""DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote ( customer_id BIGINT , entity_id BIGINT , store_id BIGINT , customer_email TEXT , customer_firstname TEXT , customer_middlename TEXT , customer_lastname TEXT , customer_is_guest BIGINT , customer_group_id BIGINT , created_at TIMESTAMP WITHOUT TIME ZONE , updated_at TIMESTAMP WITHOUT TIME ZONE , is_active BIGINT , items_count BIGINT , items_qty BIGINT , base_currency_code TEXT , grand_total NUMERIC(12,4) , base_to_global_rate NUMERIC(12,4) , base_subtotal NUMERIC(12,4) , base_subtotal_with_discount NUMERIC(12,4) ) ;""" SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item (store_id INTEGER , row_total NUMERIC , updated_at TIMESTAMP WITHOUT TIME ZONE , qty NUMERIC , sku CHARACTER VARYING , free_shipping INTEGER , quote_id INTEGER , price NUMERIC , no_discount INTEGER , item_id INTEGER , product_type CHARACTER VARYING , base_tax_amount NUMERIC , product_id INTEGER , name CHARACTER VARYING , created_at TIMESTAMP WITHOUT TIME ZONE );""" SQL_create_sales_flat_quote_item="""DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1 (name CHARACTER VARYING , sku CHARACTER VARYING , type_id CHARACTER VARYING , created_at TIMESTAMP WITHOUT TIME ZONE , url_path CHARACTER VARYING , price NUMERIC , short_description CHARACTER VARYING , url_key CHARACTER VARYING , thumbnail_label CHARACTER VARYING , small_image CHARACTER VARYING , thumbnail CHARACTER VARYING );""" print("Creating Schema...") cur_psql.execute(SQL_create_Staging_schema) print("Creating tables...") cur_psql.execute(SQL_create_sales_flat_quote) cur_psql.execute(SQL_create_sales_flat_quote_item) cnx_psql.commit(); print("Fetching data from source server") #select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping. commands = [ ("""SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename , customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active , items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal , base_subtotal_with_discount from sales_flat_quote where is_active=1 AND items_count != '0' AND updated_at > '2019-05-12 00:00:00';""", """INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname , customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at , is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal , base_subtotal_with_discount) SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;"""), ("""SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type ,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""", """INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id ,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name ,created_at) SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""") ,("""SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail ,name,sku,type_id from catalog_product_flat_1;""", """INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label ,small_image,thumbnail,name,sku,type_id) SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""") ] for msql_command, psql_command in commands: psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command) except (Exception, psycopg2.Error) as error: print ("Error while fetching data from PostgreSQL", error) finally: ## Closing cursors cur_msql.close() cur_psql.close() ## Committing cnx_psql.commit() ## Closing database connections cnx_msql.close() cnx_psql.close() if __name__ == '__main__': dB_Fetch()