Python Forum
Bulk loading of data using python
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Bulk loading of data using python
#1
I want to bulk load the data into snowflake warehouse using pandas. Please find whole requirement below:

1. I have source data in snowflake table. I am reading the same in a dataframe.
2. After loading the data in dataframe, I have made changes in data using some pandas functions.
3. After these changes i need to load the data in snowflake again.

file size : 200k records
Things i have tried:
1. first created the for loop which was creating the insert statement at go and also loading the same. This script ran for ~4 hours and loaded ~9k records(so this is not a viable option).
2. Then i created the whole insert query earlier before executing it on database. This approach is also failing and taking a lot of time(same as the above one).
3. I tried parallel processing and also created batch for data. Still no luck.
4. Later i tried copy into approach and it is working.

But i do not want to use COPY into as it is only snowflake specific.

Please help me with bulk loading of data using python.
Reply
#2
Maybe you forgot to include your code. Show your code in python tags, don't just explain what you did, show instead
If you can't explain it to a six year old, you don't understand it yourself, Albert Einstein
How to Ask Questions The Smart Way: link and another link
Create MCV example
Debug small programs

Reply
#3
import snowflake.connector
from Snowflake_config_fn import SNDBX_DB_LANDING_LAYER_config, HRMNY_DB_HRMNY_SND_ZN_config
import pandas as pd

source_tb = SNDBX_DB_LANDING_LAYER_config()
target_tb = HRMNY_DB_HRMNY_SND_ZN_config()
mapping_tb = HRMNY_DB_HRMNY_SND_ZN_config()
META_JOIN_tb = SNDBX_DB_LANDING_LAYER_config()

# Connect to Snowflake
source_conn = snowflake.connector.connect(**source_tb)
target_conn = snowflake.connector.connect(**target_tb)

#--------------------------------------------------------------------------------------------------------------------

#GENERATING THE JOIN CLAUSE USING MDM_META_JOIN TABLE.

# an empty list to store join conditions
join_conditions = []

# an empty list to store table aliases
table_aliases = []

# an empty list to store join tables
join_tbls = []

with source_conn.cursor() as source_cur:
    source_cur.execute(f"SELECT * FROM {META_JOIN_tb['schema']}.MDM_META_JOIN")

    for row in source_cur:
        source_tbl = row[0]
        source_alias = row[1]
        join_tbl = row[2]
        join_alias = row[3]  # Use the join_alias directly
        join_exp = row[4]
        target_tbl = row[5]

        # Append each join_exp to the list for join_conditions
        join_conditions.append(join_exp)

        # Append source and join aliases to the list for table_aliases
        table_aliases.extend([source_alias, join_alias])

        # Append join table to the list for join_tbls
        join_tbls.append(join_tbl)

# unique set of table aliases
unique_table_aliases = set(table_aliases)

# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])

print(join_clause)

#------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#GETTING THE SOURCE COLUMNS AND TARGET COLUMNS FROM MDM_COLUMN_MAPPING TABLE.


# An empty list to store the desired columns
desired_tcolumns = []

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT TARGET_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Append the target columns to the desired_columns list
    for row in mapping_cur:
        desired_tcolumns.append(row[0])

# An empty list to store the desired columns
desired_scolumns = []

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT SOURCE_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Append the target columns to the desired_columns list
    for row in mapping_cur:
        desired_scolumns.append(row[0])
        
#----------------------------------------------------------------------------------------------------------------------------------------------------       
        
#GENERATING THE SELECT QUERY TO RETRIEVE DATA FROM MULTIPLE SOURCE TABLES.


# an empty list to store the SELECT statements
select_col_list = []

# a dictionary to map source columns to their respective source tables and aliases
source_table_mapping = {}

# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
    mapping_cur.execute(f"""
        SELECT SOURCE_TABLE, SOURCE_ALIAS, SOURCE_COLUMN, TARGET_COLUMN
        FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
    """)
    
    # Populating the source_table_mapping dictionary
    for row in mapping_cur:
        Mapping_tbl_source_table = row[0]
        Mapping_tbl_source_alias = row[1]  # Add SOURCE_ALIAS
        Mapping_tbl_source_col = row[2]
        Mapping_tbl_target_col = row[3]
        source_table_mapping[Mapping_tbl_source_col] = (Mapping_tbl_source_table, Mapping_tbl_source_alias)  # Store source table and alias as a tuple

# Iterate through the desired columns and generate the SELECT statements
for Mapping_tbl_source_col, Mapping_tbl_target_col in zip(desired_scolumns, desired_tcolumns):
    if Mapping_tbl_source_col:
        
        # Check if the source column is in the source_table_mapping dictionary
        if Mapping_tbl_source_col in source_table_mapping:
            Mapping_tbl_source_table, Mapping_tbl_source_alias = source_table_mapping[Mapping_tbl_source_col]
        else:
            Mapping_tbl_source_table = source_alias  # Default to source_alias if not found
            Mapping_tbl_source_alias = source_alias  # Default to source_alias if not found
        
        # Including the source_table and source_alias in the SELECT statement
        select_cols_nullcheck = f"{Mapping_tbl_source_alias}.{Mapping_tbl_source_col} as {Mapping_tbl_target_col}"
    else:
        # If the source column is empty, using NULL in the SELECT statement
        select_cols_nullcheck = f"NULL as {Mapping_tbl_target_col}"
    
    select_col_list.append(select_cols_nullcheck)

# Join the SELECT statements into a single query
select_query = ",\n".join(select_col_list)

# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])

# Combine everything into the final query
query = f"""SELECT
{select_query}
FROM {source_tbl} AS {source_alias}  
{join_clause}
"""

print(query)

# Execute the query to load data
with source_conn.cursor() as source_cur:
    source_cur.execute(query)

    # Fetch data into a Pandas DataFrame
    Source_Df = source_cur.fetch_pandas_all()
    
# Close the Snowflake connection when done
source_conn.close()    

#---------------------------------------------------------------------------------------------------------------------------------


#GETTING THE SOURCE DATAFRAME TO THE TARGET DATAFRAME(BY ADDING COLUMNS WHICH DONT HAVE SOUCRE MAPPING COLUMN AND FILLING THEM WITH NULL)


# Define the desired column order from Target_Df
Target_Columns_df = [
    'SOURCE',
    'RECORD_ID',
    'CUSTOMER_ID',
    'FIRST_NAME',
    'MIDDLE_NAME',
    'LAST_NAME',
    'FULL_NAME',
    'NPI_NUMBER',
    'IMS',
    'SYMPHONY',
    'CRM',
    'GENDER',
    'SPECIALITY',
    'ADDR_ID',
    'ADDRESS_1',
    'ADDRESS_2',
    'STATE',
    'CITY',
    'PHONE_NUMBER',
    'EMAIL_ADDRESS',
    'COUNTRY',
    'INSTITUTION',
    'RECORD_STATUS',
    'AUDITCREATEDDATE',
    'AUDITCREATEDBY',
    'IS_PROCESSED',
    'AUDITUPDATEDATE',
    'AUDITUPDATEDBY',
    'ZIPCODE'
]

# Reorder the columns of Source_Df and add missing columns with None values
Pre_Target_Df = Source_Df.reindex(columns=Target_Columns_df, fill_value=None)

Target_Df = Pre_Target_Df[Pre_Target_Df['NPI_NUMBER'].str.len() == 10]

Target_Df1 = Target_Df[Target_Df['ZIPCODE'].str.len() == 5]
print (Target_Df1)

#-------------------------------------------------------------------------------------------------------------------------

import pandas as pd
import datetime


# Make a copy of the DataFrame
Target_Df2 = Target_Df1.copy()

# Fill 'VEEVA' in the 'SOURCE' column
Target_Df2['SOURCE'] = 'CRM'

# Fill the current date in 'AUDITCREATEDDATE'
current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
Target_Df2['AUDITCREATEDDATE'] = current_date

# Fill NULL in 'AUDITUPDATEDATE'
Target_Df2['AUDITUPDATEDATE'] = None
Target_Df2['RECORD_ID'] = None
Target_Df2['FULL_NAME'] = None
Target_Df2['IMS'] = None
Target_Df2['SYMPHONY'] = None
Target_Df2['INSTITUTION'] = None
Target_Df2['AUDITCREATEDBY'] = 'Ingestion Framework job'
Target_Df2['IS_PROCESSED'] = None
Target_Df2['AUDITUPDATEDBY'] = None
Target_Df2['CUSTOMER_ID'] = None
Target_Df2['ADDRESS_1'] = None


#----------------------------------------------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------------------------------------------

#LOADING TARGET_DF IN THE MDM_SOURCE_TO_SURROGATE_TEMP table 


# Define the target table name
target_table = "HRMNY_DB.HRMNY_SND_ZN.MDM_SOURCE_TO_SURROGATE_TEMP"

Target_Df2 = Target_Df2.applymap(lambda val: str(val).replace("'", "''") if val is not None else 'NULL')


# Using SQL INSERT INTO statements to insert data from the DataFrame into the target table
try:
    cursor = target_conn.cursor()

    for index, row in Target_Df2.iterrows():
        # Prepare the column names and values dynamically
        columns = ', '.join(row.index)
        #values = ', '.join([f"'{val}'" if val is not None else 'NULL' for val in row])
        values = ', '.join([f"'{val}'" if val is not None and val != 'NULL' else 'NULL' for val in row])

        print (values)

        # Defining the INSERT INTO SQL statement for the current row
        insert_sql = f"""
        INSERT INTO {target_table} ({columns})
        VALUES ({values});
        """
        
        cursor.execute(insert_sql)

    target_conn.commit()  # Commit the transaction

    print("Data inserted successfully into Snowflake table.")

except Exception as e:
    target_conn.rollback()  # Rollback the transaction in case of an error
    print(f"Error: {str(e)}")

finally:
    # Closing the Snowflake connection
    target_conn.close()
buran write Sep-28-2023, 10:31 AM:
Please, use proper tags when post code, traceback, output, etc. This time I have added tags for you.
See BBcode help for more info.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Python C Extension Module loading issue on Cygwin mesibo 0 666 Sep-22-2023, 05:41 AM
Last Post: mesibo
  Mac os (M2) "Python is loading libcrypto in an unsafe way" jpelayo 1 3,663 Sep-22-2022, 08:15 AM
Last Post: jpelayo
  saving and loading text from the clipboard with python program MaartenRo 2 1,666 Jan-22-2022, 05:04 AM
Last Post: MaartenRo
  How can I get Python Bulk Email Verification Script With API? zainalee 1 2,499 Jun-06-2021, 09:19 AM
Last Post: snippsat
Video Python Bulk Email Verification Script With API Aj1128 0 2,631 Nov-28-2020, 11:38 AM
Last Post: Aj1128
  Bulk add column to dataframe sambanerjee 1 2,136 Sep-24-2020, 07:34 PM
Last Post: sambanerjee
  bulk update in elasticsearch pythonlearner1 1 6,005 Jun-10-2020, 10:01 PM
Last Post: pythonlearner1
  Bulk Generating Cloze Deletions based on Tatoeba sentences and word frequency lists wizzie 10 5,170 Dec-23-2019, 12:16 PM
Last Post: wizzie
  Loading data Zankawah 1 2,205 Jul-30-2018, 11:12 AM
Last Post: Larz60+
  Newbie question for bulk insert into SQL Server database zydjohn 6 12,424 Dec-14-2017, 11:04 PM
Last Post: Larz60+

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020