Jun-07-2019, 09:57 AM
Hi All,
I'm using the below code to port the data from MySQL to postgreSQL database and it is working fine, but it's rather slow though( more than 3 minutes is taking for porting some 500 rows, so i think the for loop in one of the function is iterating each rows and inserting it to database, but is there a way to execute the whole rows at once rather than looping through all rows? Ca it be done via executemany, if so what all changes i need to do? or is there a faster way to do the porting want the gap to be as narrow as possible.
Below the function i need to change,I have pasted the whole porting script below for reference.
If anyone have any reference it will be of great help.
I'm using the below code to port the data from MySQL to postgreSQL database and it is working fine, but it's rather slow though( more than 3 minutes is taking for porting some 500 rows, so i think the for loop in one of the function is iterating each rows and inserting it to database, but is there a way to execute the whole rows at once rather than looping through all rows? Ca it be done via executemany, if so what all changes i need to do? or is there a faster way to do the porting want the gap to be as narrow as possible.
Below the function i need to change,I have pasted the whole porting script below for reference.
If anyone have any reference it will be of great help.
1 2 3 4 5 6 7 8 9 10 |
def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command): print ( "Function Call.." ) msql.execute(msql_command) for row in msql: try : print ( "Insertion of rows.." ) print (row) psql.execute(psql_command, row) cnx_psql.commit() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import psycopg2 import os import time import MySQLdb import sys from pprint import pprint from datetime import datetime from psycopg2 import sql from utils.config import Configuration as Config from utils.postgres_helper import get_connection from utils.utils import get_global_config def psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,msql, psql, msql_command, psql_command): print ( "Function Call.." ) msql.execute(msql_command) for row in msql: try : print ( "Insertion of rows.." ) print (row) psql.execute(psql_command, row) cnx_psql.commit() except psycopg2.Error as e: print ( "Cannot execute the query!!" , e.pgerror) sys.exit( "Problem occured with the query!!!" ) def dB_Fetch(): # MySQLdb connection try : source_host = 'magento' conf = get_global_config() cnx_msql = MySQLdb.connect(host = conf.get(source_host, 'host' ), user = conf.get(source_host, 'user' ), passwd = conf.get(source_host, 'password' ), port = int (conf.get(source_host, 'port' )), db = conf.get(source_host, 'db' )) except mysql.connector.Error as e: print ( "MYSQL: Unable to connect!" , e.msg) sys.exit( 1 ) # Postgresql connection try : cnx_psql = get_connection(get_global_config(), 'pg_dwh' ) except psycopg2.Error as e: print ( 'PSQL: Unable to connect!\n{0}' ). format (e) sys.exit( 1 ) # Cursors initializations cur_msql = cnx_msql.cursor() cur_psql = cnx_psql.cursor() try : print ( "creating table using cursor" ) SQL_create_Staging_schema = """CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;""" SQL_create_sales_flat_quote = """DROP TABLE IF EXISTS staging.sales_flat_quote;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote ( customer_id BIGINT , entity_id BIGINT , store_id BIGINT , customer_email TEXT , customer_firstname TEXT , customer_middlename TEXT , customer_lastname TEXT , customer_is_guest BIGINT , customer_group_id BIGINT , created_at TIMESTAMP WITHOUT TIME ZONE , updated_at TIMESTAMP WITHOUT TIME ZONE , is_active BIGINT , items_count BIGINT , items_qty BIGINT , base_currency_code TEXT , grand_total NUMERIC(12,4) , base_to_global_rate NUMERIC(12,4) , base_subtotal NUMERIC(12,4) , base_subtotal_with_discount NUMERIC(12,4) ) ;""" SQL_create_sales_flat_quote_item = """DROP TABLE IF EXISTS staging.sales_flat_quote_item;CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item (store_id INTEGER , row_total NUMERIC , updated_at TIMESTAMP WITHOUT TIME ZONE , qty NUMERIC , sku CHARACTER VARYING , free_shipping INTEGER , quote_id INTEGER , price NUMERIC , no_discount INTEGER , item_id INTEGER , product_type CHARACTER VARYING , base_tax_amount NUMERIC , product_id INTEGER , name CHARACTER VARYING , created_at TIMESTAMP WITHOUT TIME ZONE );""" SQL_create_sales_flat_quote_item = """DROP TABLE IF EXISTS staging.catalog_product_flat_1;CREATE TABLE IF NOT EXISTS staging.catalog_product_flat_1 (name CHARACTER VARYING , sku CHARACTER VARYING , type_id CHARACTER VARYING , created_at TIMESTAMP WITHOUT TIME ZONE , url_path CHARACTER VARYING , price NUMERIC , short_description CHARACTER VARYING , url_key CHARACTER VARYING , thumbnail_label CHARACTER VARYING , small_image CHARACTER VARYING , thumbnail CHARACTER VARYING );""" print ( "Creating Schema..." ) cur_psql.execute(SQL_create_Staging_schema) print ( "Creating tables..." ) cur_psql.execute(SQL_create_sales_flat_quote) cur_psql.execute(SQL_create_sales_flat_quote_item) cnx_psql.commit(); print ( "Fetching data from source server" ) #select from Magento & insert into DWH: SELECT & INSERT SQL statements are created as PAIRS for looping. commands = [ ( """SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename , customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active , items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal , base_subtotal_with_discount from sales_flat_quote where is_active=1 AND items_count != '0' AND updated_at > '2019-05-12 00:00:00';""" , """INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname , customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at , is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal , base_subtotal_with_discount) SELECT %s, %s, %s,%s,%s,%s,%s,%s,%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""" ), ( """SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type ,base_tax_amount,product_id,name,created_at from sales_flat_quote_item WHERE updated_at > '2019-05-12 00:00:00';""" , """INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id ,price,no_discount ,item_id,product_type,base_tax_amount,product_id,name ,created_at) SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""" ) ,( """SELECT created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail ,name,sku,type_id from catalog_product_flat_1;""" , """INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label ,small_image,thumbnail,name,sku,type_id) SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s;""" ) ] for msql_command, psql_command in commands: psql_Source_fetch_and_DestInsert(cnx_msql,cnx_psql,cur_msql, cur_psql, msql_command, psql_command) except (Exception, psycopg2.Error) as error: print ( "Error while fetching data from PostgreSQL" , error) finally : ## Closing cursors cur_msql.close() cur_psql.close() ## Committing cnx_psql.commit() ## Closing database connections cnx_msql.close() cnx_psql.close() if __name__ = = '__main__' : dB_Fetch() |