Posts: 4
Threads: 2
Joined: Apr 2019
May-08-2019, 07:37 PM
(This post was last modified: May-08-2019, 09:53 PM by Yoriz.)
Hi All,
I have a requirement to port a table from MySQL database server to postgreSQL Database server.
I'm using a Magento website and its backend is MySQL,and my Datawarehouse is running on postgreSQL.My requirement is to port few table's from source to destination,Only few columns need to be ported into a new schema in the Destination.
I'm using the below query ,Can anyone check this and guide me to perfect it.
need to load three tables or import certain Query from Magento to my data ware house in a staging table using python.
Can you suggest how this can be done?Below is the code i'm using now, it's not a generic one, can you help me to improve this.
import psycopg2
Conn_DWH = psycopg2.connect("host=postgres dbname=Postgres user=postgres password=*** ")
Conn_Magento = psycopg2.connect("host=Magento dbname=Magento user=Magento password=*** ")
conndwh = Conn_DWH.cursor()
curmag= Conn_Magento.cursor()
cur.execute("CREATE TABLE sales_flat_quote (customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active);")
sql = ('INSERT INTO "sales_flat_quote" ( customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active) values (%s, %s,%s, %s,%s,%s, %s,%s,%s, %s,%s,%s);')
conndwh.execute('select customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active from "sales_flat_quote" where is_active=1;')
tempG = conndwh.fetchall()
data = (tempG)
if data:
curmag.execute (sql, data)
print('data loaded to warehouse db')
else:
print('data is empty')
#commit transactions
Conn_DWH.commit()
Conn_Magento.commit()
#close connections
conndwh.close()
curmag.close()
Conn_DWH.close()
Conn_Magento.close()
Posts: 12,038
Threads: 487
Joined: Sep 2016
May-08-2019, 09:43 PM
(This post was last modified: May-08-2019, 09:44 PM by Larz60+.)
why not just dump it all to an sql file, then reload into PostgreSQL
(Don't forget indexes, stored procedures, etc.
Posts: 4
Threads: 2
Joined: Apr 2019
(May-08-2019, 09:43 PM)Larz60+ Wrote: why not just dump it all to an sql file, then reload into PostgreSQL
(Don't forget indexes, stored procedures, etc.
Hi Lars,
Thanks a lot for the reply.But I'm new to python and it's pretty hard to understand the existing code which is using multi corn foreign data wrapper along with MySQLdb API.So is there any sample script or references which I can refer.
Both systems are using different Database systems,If there is any suggestions or sample piece of code it'll be very much useful.
Thanks
Sandy
Posts: 8
Threads: 4
Joined: May 2019
May-09-2019, 11:07 AM
(This post was last modified: May-09-2019, 11:10 AM by buran.)
Hi All,
I have a requirement to create a staging table in our data ware house which is running on Postgres and import data from our Magento website which is using MySQL database using Python.
I have created the below query for the importing purpose,Can you please check and confirm whether this is fine? Or any alternative methods are there for doing it?
Also i want to know how we can manage the DATATYPE mismatch issues while porting?
If someone can post a sample or edit the below code it will be of great help.
import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
#from utils.utils import get_global_config
def psql_command(msql, psql, msql_command, psql_command):
msql.execute(msql_command)
for row in cur_msql:
try:
psql.execute(command, row)
except psycopg2.Error as e:
print "Cannot execute the query!!", e.pgerror
sys.exit("Some problem occured with the query!!!")
def dB_Fetch():
try:
cnx_msql = mysql.connector.connect( host=host_mysql, user=user_mysql, passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
try:
print("creating table using cursor")
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
(
entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
)
;"""
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
print("Creating Schema")
cur_psql.execute(SQL_create_Staging_schema)
print("schema succesfully created")
print("Creating staging.sales_flat_quote table")
cur_psql.execute(SQL_create_sales_flat_quote)
print("staging.sales_flat_quote table successfully created")
print("Creating staging.sales_flat_quote_item table")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("staging.sales_flat_quote_item table successfully created")
cur_psql.commit();
print("Fetching data from source server")
commands = [("SELECT customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active from sales_flat_quote where is_active=1;",
"INSERT INTO staging.sales_flat_quote (customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active) \
VALUES (%(customer_id)s, %(entity_id)s, %(store_id)s, %(created_at)s, %(updated_at)s, %(items_count)s, %(base_row_total)s, %(row_total)s, %(base_discount_amount)s, %(base_subtotal_with_discount)s, %(base_to_global_rate)s, %(is_active)s)"),
("SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at from sales_flat_quote_item",
"INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at) VALUES (%(store_id)s, %(row_total)s, %(updated_at)s, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s, %(price)s, %(no_discount)s, %(item_id)s, %(product_type)s, %(base_tax_amount)s, %(product_id)s, %(name)s, %(created_at)s)")
]
for msql_command, psql_command in commands:
psql_command(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()
Posts: 8
Threads: 4
Joined: May 2019
Hi
I have re-edited my PY script,Can you please tell me how can i deal with DATA TYPE matching and any how if this can be improved?
Please suggest.
import psycopg2
import os
import time
import MySQLdb
import sys
import mysql.connector
from pprint import pprint
from datetime import datetime
from psycopg2 import sql
from utils.postgres import get_connection
from utils.utils import get_global_config
def psql_command(msql, psql, msql_command, psql_command):
msql.execute(msql_command)
for row in cur_msql:
try:
psql.execute(command, row)
except psycopg2.Error as e:
print "Cannot execute the query!!", e.pgerror
sys.exit("Some problem occured with the query!!!")
def dB_Fetch():
# Read all connection information from config.ini
config = get_config_object()
try:
cnx_msql = get_db_connection("magento")
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = get_connection(get_global_config(), 'pg_dwh')#get_db_connection("pg_dwh")
#cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
try:
print("creating table using cursor")
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
(
entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at DATETIME
, updated_at DATETIME
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
)
;"""
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at DATETIME
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at DATETIME
);"""
cur_psql.execute(SQL_create_Staging_schema)
cur_psql.execute(SQL_create_sales_flat_quote)
cur_psql.execute(SQL_create_sales_flat_quote_item)
cur_psql.commit();
commands = [("SELECT customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active from sales_flat_quote where is_active=1;",
"INSERT INTO staging.sales_flat_quote (customer_id,entity_id,store_id,created_at,updated_at,items_count,base_row_total,row_total,base_discount_amount,base_subtotal_with_discount,base_to_global_rate,is_active) \
VALUES (%(customer_id)s, %(entity_id)s, %(store_id)s, %(created_at)s, %(updated_at)s, %(items_count)s, %(base_row_total)s, %(row_total)s, %(base_discount_amount)s, %(base_subtotal_with_discount)s, %(base_to_global_rate)s, %(is_active)s)"),
("SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at from sales_flat_quote_item",
"INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at) VALUES (%(store_id)s, %(row_total)s, %(updated_at)s, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s, %(price)s, %(no_discount)s, %(item_id)s, %(product_type)s, %(base_tax_amount)s, %(product_id)s, %(name)s, %(created_at)s)"),
("SELECT select created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail,name,sku,type_id from catalog_product_flat_1",
"INSERT INTO staging.catalog_product_flat_1 (created_at,url_path,price,short_description,url_key,thumbnail_label,small_image,thumbnail,name,sku,type_id) \
VALUES (%(created_at)s, %(url_path)s, %(price)s, %(short_description)s, %(url_key)s, %(thumbnail_label)s, %(small_image)s, %(thumbnail)s, %(name)s, %(sku)s, %(type_id)s)"),
]
for msql_command, psql_command in commands:
psql_command(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Closing cursors
cur_msql.close()
cur_psql.close()
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()
Posts: 8
Threads: 4
Joined: May 2019
May-10-2019, 01:55 PM
(This post was last modified: May-10-2019, 01:55 PM by Sandy7771989.)
Hi All,
I'm getting the below error while i'm calling my function,Does anyone have any suggestions on how to deal with it? I want to insert the selected data into the destination table with the insert command.
Quote:Error while fetching data from PostgreSQL tuple indices must be integers or slices, not str
Sub Function:
def psql_func(msql, psql, msql_command, psql_command):
print("function call")
msql.execute(msql_command)
print(" ggg call")
for row in msql:
try:
print ("query!!")
print(row[0])
print(psql_command)
psql.execute(psql_command, row)
print (" execute query!!")
except psycopg2.Error as e:
print ("Cannot execute the query!!", e.pgerror)
sys.exit("Some problem occured with the query!!!")
#Main Function
def dB_Fetch():
try:
cnx_msql = psycopg2.connect(user="sandy",
password="postgres",
host="127.0.0.1",
port="5432",
database="clone")
except mysql.connector.Error as e:
print ("MYSQL: Unable to connect!", e.msg)
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(user="sandy",
password="postgres",
host="127.0.0.1",
port="5432",
database="postgres")
#cnx_psql = psycopg2.connect(conn_string_psql)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# Cursors initializations
cur_msql = cnx_msql.cursor()
cur_psql = cnx_psql.cursor()
try:
SQL_create_Staging_schema="""CREATE SCHEMA IF NOT EXISTS staging AUTHORIZATION postgres;"""
SQL_create_Staging_schema_clone="""CREATE SCHEMA IF NOT EXISTS clone AUTHORIZATION postgres;"""
SQL_create_sales_flat_quote="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote
( customer_id BIGINT
, entity_id BIGINT
, store_id BIGINT
, customer_email TEXT
, customer_firstname TEXT
, customer_middlename TEXT
, customer_lastname TEXT
, customer_is_guest BIGINT
, customer_group_id BIGINT
, created_at TIMESTAMP WITHOUT TIME ZONE
, updated_at TIMESTAMP WITHOUT TIME ZONE
, is_active BIGINT
, items_count BIGINT
, items_qty BIGINT
, base_currency_code TEXT
, grand_total NUMERIC(12,4)
, base_to_global_rate NUMERIC(12,4)
, base_subtotal NUMERIC(12,4)
, base_subtotal_with_discount NUMERIC(12,4)
)
;"""
SQL_create_sales_flat_quote_clone=""" same structure as above table """
SQL_create_sales_flat_quote_item="""CREATE TABLE IF NOT EXISTS staging.sales_flat_quote_item
( store_id INTEGER
, row_total NUMERIC
, updated_at TIMESTAMP WITHOUT TIME ZONE
, qty NUMERIC
, sku CHARACTER VARYING
, free_shipping INTEGER
, quote_id INTEGER
, price NUMERIC
, no_discount INTEGER
, item_id INTEGER
, product_type CHARACTER VARYING
, base_tax_amount NUMERIC
, product_id INTEGER
, name CHARACTER VARYING
, created_at TIMESTAMP WITHOUT TIME ZONE
);"""
SQL_create_sales_flat_quote_item_clone="""same structure as above"""
INS1="INSERT INTO clone.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal, base_subtotal_with_discount) \
VALUES (1, 1, 3, 'sss@email','sss','dddd','rrr',1,1,now(), now(), 4,67, 78, 888, 690, 1, 1,1)"
INS2="INSERT INTO clone.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at) VALUES (789999, 222, now(), 3, '33', 1, 222111, 22, 2, 2, 2, 2, 3,'sasy', now())"
cur_psql.execute(SQL_create_Staging_schema)
cur_psql.execute(SQL_create_Staging_schema_clone)
cnx_psql.commit();
cur_psql.execute(SQL_create_sales_flat_quote)
print("table 1")
cur_psql.execute(SQL_create_sales_flat_quote_item)
print("table 2")
cur_psql.execute(SQL_create_catalog_product_flat_1)
cnx_psql.commit();
cur_psql.execute(SQL_create_sales_flat_quote_clone)
cur_psql.execute(SQL_create_sales_flat_quote_item_clone)
print("schema created")
cur_psql.execute(INS1)
print("inserted to clone1 ")
cur_psql.execute(INS2)
print("inserted to clone2 ")
#cur_psql.commit();
commands = [("SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal, base_subtotal_with_discount from clone.sales_flat_quote where is_active=1 AND items_count != '0' AND updated_at > '2019-05-09 00:00:00';",
"INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal, base_subtotal_with_discount) \
VALUES (%(customer_id)s, %(entity_id)s, %(store_id)s,%(customer_email)s,%(customer_firstname)s,%(customer_firstname)s,%(customer_middlename)s,%(customer_lastname)s,%(customer_is_guest)s, %(customer_group_id)s, %(created_at)s, %(updated_at)s, %(is_active)s, %(items_count)s, %(items_qty)s, %(base_currency_code)s, %(grand_total)s, %(base_to_global_rate)s, %(base_subtotal)s, %(base_subtotal_with_discount)s)"),
("SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at from clone.sales_flat_quote_item WHERE updated_at > '2019-05-09 00:00:00'",
"INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at) VALUES (%(store_id)s, %(row_total)s, %(updated_at)s, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s, %(price)s, %(no_discount)s, %(item_id)s, %(product_type)s, %(base_tax_amount)s, %(product_id)s, %(name)s, %(created_at)s)")
]
for msql_command, psql_command in commands:
psql_func(cur_msql, cur_psql, msql_command, psql_command)
except (Exception, psycopg2.Error) as error:
print ("Error while fetching data from PostgreSQL", error)
finally:
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql.close()
cnx_psql.close()
if __name__ == '__main__':
dB_Fetch()
Posts: 4
Threads: 2
Joined: Apr 2019
May-12-2019, 11:41 AM
(This post was last modified: May-12-2019, 11:42 AM by Sandy777.)
Hi Al,
I'm getting the below error while running my script,I'm trying to port data from MySQLdb to postgreSQL Database.
If anyone have any idea on what this error is and how to resolve it,Kindly let me know?
Quote:Error while fetching data from PostgreSQL tuple indices must be integers or slices, not str
##Sub function
def psql_func(msql, psql, msql_command, psql_command):
print("function call")
msql.execute(msql_command)
print(" ggg call")
for row in msql:
try:
print ("query!!")
print(row[0])
print(psql_command)
psql.execute(psql_command, row)
print (" execute query!!")
except psycopg2.Error as e:
print ("Cannot execute the query!!", e.pgerror)
sys.exit("Some problem occured with the query!!!")
##Part of Main function
commands = [("SELECT customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal, base_subtotal_with_discount from clone.sales_flat_quote where is_active=1 AND items_count != '0' AND updated_at > '2019-05-09 00:00:00';",
"INSERT INTO staging.sales_flat_quote (customer_id, entity_id, store_id, customer_email , customer_firstname, customer_middlename, customer_lastname , customer_is_guest, customer_group_id, created_at, updated_at, is_active, items_count, items_qty, base_currency_code, grand_total, base_to_global_rate, base_subtotal, base_subtotal_with_discount) \
VALUES (%(customer_id)s, %(entity_id)s, %(store_id)s,%(customer_email)s,%(customer_firstname)s,%(customer_firstname)s,%(customer_middlename)s,%(customer_lastname)s,%(customer_is_guest)s, %(customer_group_id)s, %(created_at)s, %(updated_at)s, %(is_active)s, %(items_count)s, %(items_qty)s, %(base_currency_code)s, %(grand_total)s, %(base_to_global_rate)s, %(base_subtotal)s, %(base_subtotal_with_discount)s)"),
("SELECT store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at from clone.sales_flat_quote_item WHERE updated_at > '2019-05-09 00:00:00'",
"INSERT INTO staging.sales_flat_quote_item (store_id,row_total,updated_at,qty,sku,free_shipping,quote_id,price,no_discount,item_id,product_type,base_tax_amount,product_id,name,created_at) VALUES (%(store_id)s, %(row_total)s, %(updated_at)s, %(qty)s, %(sku)s, %(free_shipping)s, %(quote_id)s, %(price)s, %(no_discount)s, %(item_id)s, %(product_type)s, %(base_tax_amount)s, %(product_id)s, %(name)s, %(created_at)s)")
]
for msql_command, psql_command in commands:
psql_func(cur_msql, cur_psql, msql_command, psql_command
|