Python Forum

Full Version: Bulk loading of data using python
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I want to bulk load the data into snowflake warehouse using pandas. Please find whole requirement below:

1. I have source data in snowflake table. I am reading the same in a dataframe.
2. After loading the data in dataframe, I have made changes in data using some pandas functions.
3. After these changes i need to load the data in snowflake again.

file size : 200k records
Things i have tried:
1. first created the for loop which was creating the insert statement at go and also loading the same. This script ran for ~4 hours and loaded ~9k records(so this is not a viable option).
2. Then i created the whole insert query earlier before executing it on database. This approach is also failing and taking a lot of time(same as the above one).
3. I tried parallel processing and also created batch for data. Still no luck.
4. Later i tried copy into approach and it is working.

But i do not want to use COPY into as it is only snowflake specific.

Please help me with bulk loading of data using python.
Maybe you forgot to include your code. Show your code in python tags, don't just explain what you did, show instead
import snowflake.connector
from Snowflake_config_fn import SNDBX_DB_LANDING_LAYER_config, HRMNY_DB_HRMNY_SND_ZN_config
import pandas as pd

source_tb = SNDBX_DB_LANDING_LAYER_config()
target_tb = HRMNY_DB_HRMNY_SND_ZN_config()
mapping_tb = HRMNY_DB_HRMNY_SND_ZN_config()
META_JOIN_tb = SNDBX_DB_LANDING_LAYER_config()

# Connect to Snowflake
source_conn = snowflake.connector.connect(**source_tb)
target_conn = snowflake.connector.connect(**target_tb)

#--------------------------------------------------------------------------------------------------------------------

#GENERATING THE JOIN CLAUSE USING MDM_META_JOIN TABLE.

# an empty list to store join conditions
join_conditions = []

# an empty list to store table aliases
table_aliases = []

# an empty list to store join tables
join_tbls = []

with source_conn.cursor() as source_cur:
    source_cur.execute(f"SELECT * FROM {META_JOIN_tb['schema']}.MDM_META_JOIN")

    for row in source_cur:
        source_tbl = row[0]
        source_alias = row[1]
        join_tbl = row[2]
        join_alias = row[3]  # Use the join_alias directly
        join_exp = row[4]
        target_tbl = row[5]

        # Append each join_exp to the list for join_conditions
        join_conditions.append(join_exp)

        # Append source and join aliases to the list for table_aliases
        table_aliases.extend([source_alias, join_alias])

        # Append join table to the list for join_tbls
        join_tbls.append(join_tbl)

# unique set of table aliases
unique_table_aliases = set(table_aliases)

# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])

print(join_clause)

#------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#GETTING THE SOURCE COLUMNS AND TARGET COLUMNS FROM MDM_COLUMN_MAPPING TABLE.


# An empty list to store the desired columns
desired_tcolumns = []

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT TARGET_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Append the target columns to the desired_columns list
    for row in mapping_cur:
        desired_tcolumns.append(row[0])

# An empty list to store the desired columns
desired_scolumns = []

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT SOURCE_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Append the target columns to the desired_columns list
    for row in mapping_cur:
        desired_scolumns.append(row[0])
        
#----------------------------------------------------------------------------------------------------------------------------------------------------       
        
#GENERATING THE SELECT QUERY TO RETRIEVE DATA FROM MULTIPLE SOURCE TABLES.


# an empty list to store the SELECT statements
select_col_list = []

# a dictionary to map source columns to their respective source tables and aliases
source_table_mapping = {}

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT SOURCE_TABLE, SOURCE_ALIAS, SOURCE_COLUMN, TARGET_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Populating the source_table_mapping dictionary
    for row in mapping_cur:
        Mapping_tbl_source_table = row[0]
        Mapping_tbl_source_alias = row[1]  # Add SOURCE_ALIAS
        Mapping_tbl_source_col = row[2]
        Mapping_tbl_target_col = row[3]
        source_table_mapping[Mapping_tbl_source_col] = (Mapping_tbl_source_table, Mapping_tbl_source_alias)  # Store source table and alias as a tuple

# Iterate through the desired columns and generate the SELECT statements
for Mapping_tbl_source_col, Mapping_tbl_target_col in zip(desired_scolumns, desired_tcolumns):
    if Mapping_tbl_source_col:
        
        # Check if the source column is in the source_table_mapping dictionary
        if Mapping_tbl_source_col in source_table_mapping:
            Mapping_tbl_source_table, Mapping_tbl_source_alias = source_table_mapping[Mapping_tbl_source_col]
        else:
            Mapping_tbl_source_table = source_alias  # Default to source_alias if not found
            Mapping_tbl_source_alias = source_alias  # Default to source_alias if not found
        
        # Including the source_table and source_alias in the SELECT statement
        select_cols_nullcheck = f"{Mapping_tbl_source_alias}.{Mapping_tbl_source_col} as {Mapping_tbl_target_col}"
    else:
        # If the source column is empty, using NULL in the SELECT statement
        select_cols_nullcheck = f"NULL as {Mapping_tbl_target_col}"
    
    select_col_list.append(select_cols_nullcheck)

# Join the SELECT statements into a single query
select_query = ",\n".join(select_col_list)

# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])

# Combine everything into the final query
query = f"""SELECT
{select_query}
FROM {source_tbl} AS {source_alias}  
{join_clause}
"""

print(query)

# Execute the query to load data
with source_conn.cursor() as source_cur:
    source_cur.execute(query)

    # Fetch data into a Pandas DataFrame
    Source_Df = source_cur.fetch_pandas_all()
    
# Close the Snowflake connection when done
source_conn.close()    

#---------------------------------------------------------------------------------------------------------------------------------


#GETTING THE SOURCE DATAFRAME TO THE TARGET DATAFRAME(BY ADDING COLUMNS WHICH DONT HAVE SOUCRE MAPPING COLUMN AND FILLING THEM WITH NULL)


# Define the desired column order from Target_Df
Target_Columns_df = [
    'SOURCE',
    'RECORD_ID',
    'CUSTOMER_ID',
    'FIRST_NAME',
    'MIDDLE_NAME',
    'LAST_NAME',
    'FULL_NAME',
    'NPI_NUMBER',
    'IMS',
    'SYMPHONY',
    'CRM',
    'GENDER',
    'SPECIALITY',
    'ADDR_ID',
    'ADDRESS_1',
    'ADDRESS_2',
    'STATE',
    'CITY',
    'PHONE_NUMBER',
    'EMAIL_ADDRESS',
    'COUNTRY',
    'INSTITUTION',
    'RECORD_STATUS',
    'AUDITCREATEDDATE',
    'AUDITCREATEDBY',
    'IS_PROCESSED',
    'AUDITUPDATEDATE',
    'AUDITUPDATEDBY',
    'ZIPCODE'
]

# Reorder the columns of Source_Df and add missing columns with None values
Pre_Target_Df = Source_Df.reindex(columns=Target_Columns_df, fill_value=None)

Target_Df = Pre_Target_Df[Pre_Target_Df['NPI_NUMBER'].str.len() == 10]

Target_Df1 = Target_Df[Target_Df['ZIPCODE'].str.len() == 5]
print (Target_Df1)

#-------------------------------------------------------------------------------------------------------------------------

import pandas as pd
import datetime


# Make a copy of the DataFrame
Target_Df2 = Target_Df1.copy()

# Fill 'VEEVA' in the 'SOURCE' column
Target_Df2['SOURCE'] = 'CRM'

# Fill the current date in 'AUDITCREATEDDATE'
current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
Target_Df2['AUDITCREATEDDATE'] = current_date

# Fill NULL in 'AUDITUPDATEDATE'
Target_Df2['AUDITUPDATEDATE'] = None
Target_Df2['RECORD_ID'] = None
Target_Df2['FULL_NAME'] = None
Target_Df2['IMS'] = None
Target_Df2['SYMPHONY'] = None
Target_Df2['INSTITUTION'] = None
Target_Df2['AUDITCREATEDBY'] = 'Ingestion Framework job'
Target_Df2['IS_PROCESSED'] = None
Target_Df2['AUDITUPDATEDBY'] = None
Target_Df2['CUSTOMER_ID'] = None
Target_Df2['ADDRESS_1'] = None


#----------------------------------------------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------------------------------------------

#LOADING TARGET_DF IN THE MDM_SOURCE_TO_SURROGATE_TEMP table 


# Define the target table name
target_table = "HRMNY_DB.HRMNY_SND_ZN.MDM_SOURCE_TO_SURROGATE_TEMP"

Target_Df2 = Target_Df2.applymap(lambda val: str(val).replace("'", "''") if val is not None else 'NULL')


# Using SQL INSERT INTO statements to insert data from the DataFrame into the target table
try:
    cursor = target_conn.cursor()

    for index, row in Target_Df2.iterrows():
        # Prepare the column names and values dynamically
        columns = ', '.join(row.index)
        #values = ', '.join([f"'{val}'" if val is not None else 'NULL' for val in row])
        values = ', '.join([f"'{val}'" if val is not None and val != 'NULL' else 'NULL' for val in row])

        print (values)

        # Defining the INSERT INTO SQL statement for the current row
        insert_sql = f"""
        INSERT INTO {target_table} ({columns})
        VALUES ({values});
        """
        
        cursor.execute(insert_sql)

    target_conn.commit()  # Commit the transaction

    print("Data inserted successfully into Snowflake table.")

except Exception as e:
    target_conn.rollback()  # Rollback the transaction in case of an error
    print(f"Error: {str(e)}")

finally:
    # Closing the Snowflake connection
    target_conn.close()