Jan-30-2025, 03:57 PM
I'm trying to load a PostgreSQL table with information from MSsql. I have 2 tables that are identical on both db's. On the Mssql side there is a column called ImageSource which is defined as Image, and on the PG side I have it as Bytea. The script posted is just trying to pull 1 record to test with, but I'm running into an issue in the DF portion. The 2 tables have 21 fields defined.
This is the error message I'm receiving. I can see it connect, and my column list shows 21 fields. I blanked out all db info stuff but that connection stuff is working as I can see activity(connections) on both DB's
When I get this working, I'll have 50,000 to 100K records to export daily so if you see any speed improvements I can make please denote as I'm very new to Python. hence the BatchSize
Error Message from --> df = pd.DataFrame(batch, columns=columns)
raise ValueError(f"Shape of passed values is {passed}, indices imply {implied}")
ValueError: Shape of passed values is (1, 1), indices imply (1, 21)
How do I fix this error?
Many thanks.
This is the error message I'm receiving. I can see it connect, and my column list shows 21 fields. I blanked out all db info stuff but that connection stuff is working as I can see activity(connections) on both DB's
When I get this working, I'll have 50,000 to 100K records to export daily so if you see any speed improvements I can make please denote as I'm very new to Python. hence the BatchSize
Error Message from --> df = pd.DataFrame(batch, columns=columns)
raise ValueError(f"Shape of passed values is {passed}, indices imply {implied}")
ValueError: Shape of passed values is (1, 1), indices imply (1, 21)
How do I fix this error?
Many thanks.
import pyodbc import psycopg2 import io from sqlalchemy import create_engine import pandas as pd import time from tqdm import tqdm import numpy as np # MSSQL connection mssql_conn = pyodbc.connect('DRIVER={SQL Server};SERVER=xxxx;DATABASE=yyyy;UID=xxxx;PWD=xxxx;Trusted_Connection=yes') mssql_cursor = mssql_conn.cursor() # PostgreSQL connection pg_conn = psycopg2.connect(database="hhhh", user="xxxxx", password="xxxxx", host="xxxxx", port="xxxx") pg_cursor = pg_conn.cursor() # Create SQLAlchemy engine for PostgreSQL engine = create_engine('postgresql://postgres:xxxxx@xxxxx/xxxxx') # Query to fetch data from MSSQL mssql_query = "select id,imagename,imagetype,imagesource,receiveddatetime,imagepath,site,machinenbr,linenbr,takeupnbr,spoolnbr,imageindex,cameranbr,spoolstartdt,left((convert(time(0),SpoolStartTime)),8) as SpoolStartTime,DefectDate,left((convert(time(0),DefectTime)),8) as DefectTime,defectnbr,defectclass,reviewer,UserDefectInput FROM Image_Classification_Master(nolock) WHERE id = 13604630" mssql_cursor.execute(mssql_query) # Fetch column names columns = [column[0] for column in mssql_cursor.description] # Process and insert data in batches batch_size = 5000 while True: batch = mssql_cursor.fetchmany(batch_size) if not batch: break # Convert batch to DataFrame df = pd.DataFrame(batch, columns=columns) # Process Image field if 'ImageSource' in df.columns: df['ImageSource'] = df['ImageSource'].apply(lambda x: psycopg2.Binary(x) if x else None) # Insert batch into PostgreSQL df.to_sql('Image_Classification_Master', engine, if_exists='append', index=False, method='multi', chunksize=batch_size) total_rows = len(df) rows_inserted = 0 start_time = time.time() with tqdm(total=total_rows, desc="Inserting data") as pbar: for i in range(0, total_rows, batch_size): batch = df.iloc[i:i+batch_size] batch.to_sql('Image_Classification_Master', engine, if_exists='append', index=False, method='multi') rows_inserted += len(batch) pbar.update(len(batch)) # Calculate and display progress progress = (rows_inserted / total_rows) * 100 elapsed_time = time.time() - start_time estimated_total_time = (elapsed_time / rows_inserted) * total_rows remaining_time = estimated_total_time - elapsed_time print(f"Progress: {progress:.2f}% | Rows inserted: {rows_inserted}/{total_rows}") print(f"Elapsed time: {elapsed_time:.2f}s | Estimated time remaining: {remaining_time:.2f}s") print("Insertion complete!") # Close connections mssql_cursor.close() mssql_conn.close() pg_cursor.close() pg_conn.close()