import snowflake.connector
from Snowflake_config_fn import SNDBX_DB_LANDING_LAYER_config, HRMNY_DB_HRMNY_SND_ZN_config
import pandas as pd
source_tb = SNDBX_DB_LANDING_LAYER_config()
target_tb = HRMNY_DB_HRMNY_SND_ZN_config()
mapping_tb = HRMNY_DB_HRMNY_SND_ZN_config()
META_JOIN_tb = SNDBX_DB_LANDING_LAYER_config()
# Connect to Snowflake
source_conn = snowflake.connector.connect(**source_tb)
target_conn = snowflake.connector.connect(**target_tb)
#--------------------------------------------------------------------------------------------------------------------
#GENERATING THE JOIN CLAUSE USING MDM_META_JOIN TABLE.
# an empty list to store join conditions
join_conditions = []
# an empty list to store table aliases
table_aliases = []
# an empty list to store join tables
join_tbls = []
with source_conn.cursor() as source_cur:
source_cur.execute(f"SELECT * FROM {META_JOIN_tb['schema']}.MDM_META_JOIN")
for row in source_cur:
source_tbl = row[0]
source_alias = row[1]
join_tbl = row[2]
join_alias = row[3] # Use the join_alias directly
join_exp = row[4]
target_tbl = row[5]
# Append each join_exp to the list for join_conditions
join_conditions.append(join_exp)
# Append source and join aliases to the list for table_aliases
table_aliases.extend([source_alias, join_alias])
# Append join table to the list for join_tbls
join_tbls.append(join_tbl)
# unique set of table aliases
unique_table_aliases = set(table_aliases)
# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])
print(join_clause)
#------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
#GETTING THE SOURCE COLUMNS AND TARGET COLUMNS FROM MDM_COLUMN_MAPPING TABLE.
# An empty list to store the desired columns
desired_tcolumns = []
# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
mapping_cur.execute(f"""
SELECT TARGET_COLUMN
FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
""")
# Append the target columns to the desired_columns list
for row in mapping_cur:
desired_tcolumns.append(row[0])
# An empty list to store the desired columns
desired_scolumns = []
# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
mapping_cur.execute(f"""
SELECT SOURCE_COLUMN
FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
""")
# Append the target columns to the desired_columns list
for row in mapping_cur:
desired_scolumns.append(row[0])
#----------------------------------------------------------------------------------------------------------------------------------------------------
#GENERATING THE SELECT QUERY TO RETRIEVE DATA FROM MULTIPLE SOURCE TABLES.
# an empty list to store the SELECT statements
select_col_list = []
# a dictionary to map source columns to their respective source tables and aliases
source_table_mapping = {}
# Fetch column mappings for the source and target tables from MDM_COLUMN_MAPPING
with source_conn.cursor() as mapping_cur:
mapping_cur.execute(f"""
SELECT SOURCE_TABLE, SOURCE_ALIAS, SOURCE_COLUMN, TARGET_COLUMN
FROM {target_tb['database']}.{target_tb['schema']}.MDM_COLUMN_MAPPING
""")
# Populating the source_table_mapping dictionary
for row in mapping_cur:
Mapping_tbl_source_table = row[0]
Mapping_tbl_source_alias = row[1] # Add SOURCE_ALIAS
Mapping_tbl_source_col = row[2]
Mapping_tbl_target_col = row[3]
source_table_mapping[Mapping_tbl_source_col] = (Mapping_tbl_source_table, Mapping_tbl_source_alias) # Store source table and alias as a tuple
# Iterate through the desired columns and generate the SELECT statements
for Mapping_tbl_source_col, Mapping_tbl_target_col in zip(desired_scolumns, desired_tcolumns):
if Mapping_tbl_source_col:
# Check if the source column is in the source_table_mapping dictionary
if Mapping_tbl_source_col in source_table_mapping:
Mapping_tbl_source_table, Mapping_tbl_source_alias = source_table_mapping[Mapping_tbl_source_col]
else:
Mapping_tbl_source_table = source_alias # Default to source_alias if not found
Mapping_tbl_source_alias = source_alias # Default to source_alias if not found
# Including the source_table and source_alias in the SELECT statement
select_cols_nullcheck = f"{Mapping_tbl_source_alias}.{Mapping_tbl_source_col} as {Mapping_tbl_target_col}"
else:
# If the source column is empty, using NULL in the SELECT statement
select_cols_nullcheck = f"NULL as {Mapping_tbl_target_col}"
select_col_list.append(select_cols_nullcheck)
# Join the SELECT statements into a single query
select_query = ",\n".join(select_col_list)
# Generate the JOIN clause for the query
join_clause = "\n".join([f"JOIN {join_tbl} AS {join_alias} ON {join_exp}" for join_tbl, join_alias, join_exp in zip(join_tbls, unique_table_aliases, join_conditions)])
# Combine everything into the final query
query = f"""SELECT
{select_query}
FROM {source_tbl} AS {source_alias}
{join_clause}
"""
print(query)
# Execute the query to load data
with source_conn.cursor() as source_cur:
source_cur.execute(query)
# Fetch data into a Pandas DataFrame
Source_Df = source_cur.fetch_pandas_all()
# Close the Snowflake connection when done
source_conn.close()
#---------------------------------------------------------------------------------------------------------------------------------
#GETTING THE SOURCE DATAFRAME TO THE TARGET DATAFRAME(BY ADDING COLUMNS WHICH DONT HAVE SOUCRE MAPPING COLUMN AND FILLING THEM WITH NULL)
# Define the desired column order from Target_Df
Target_Columns_df = [
'SOURCE',
'RECORD_ID',
'CUSTOMER_ID',
'FIRST_NAME',
'MIDDLE_NAME',
'LAST_NAME',
'FULL_NAME',
'NPI_NUMBER',
'IMS',
'SYMPHONY',
'CRM',
'GENDER',
'SPECIALITY',
'ADDR_ID',
'ADDRESS_1',
'ADDRESS_2',
'STATE',
'CITY',
'PHONE_NUMBER',
'EMAIL_ADDRESS',
'COUNTRY',
'INSTITUTION',
'RECORD_STATUS',
'AUDITCREATEDDATE',
'AUDITCREATEDBY',
'IS_PROCESSED',
'AUDITUPDATEDATE',
'AUDITUPDATEDBY',
'ZIPCODE'
]
# Reorder the columns of Source_Df and add missing columns with None values
Pre_Target_Df = Source_Df.reindex(columns=Target_Columns_df, fill_value=None)
Target_Df = Pre_Target_Df[Pre_Target_Df['NPI_NUMBER'].str.len() == 10]
Target_Df1 = Target_Df[Target_Df['ZIPCODE'].str.len() == 5]
print (Target_Df1)
#-------------------------------------------------------------------------------------------------------------------------
import pandas as pd
import datetime
# Make a copy of the DataFrame
Target_Df2 = Target_Df1.copy()
# Fill 'VEEVA' in the 'SOURCE' column
Target_Df2['SOURCE'] = 'CRM'
# Fill the current date in 'AUDITCREATEDDATE'
current_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
Target_Df2['AUDITCREATEDDATE'] = current_date
# Fill NULL in 'AUDITUPDATEDATE'
Target_Df2['AUDITUPDATEDATE'] = None
Target_Df2['RECORD_ID'] = None
Target_Df2['FULL_NAME'] = None
Target_Df2['IMS'] = None
Target_Df2['SYMPHONY'] = None
Target_Df2['INSTITUTION'] = None
Target_Df2['AUDITCREATEDBY'] = 'Ingestion Framework job'
Target_Df2['IS_PROCESSED'] = None
Target_Df2['AUDITUPDATEDBY'] = None
Target_Df2['CUSTOMER_ID'] = None
Target_Df2['ADDRESS_1'] = None
#----------------------------------------------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------------------------------------------
#LOADING TARGET_DF IN THE MDM_SOURCE_TO_SURROGATE_TEMP table
# Define the target table name
target_table = "HRMNY_DB.HRMNY_SND_ZN.MDM_SOURCE_TO_SURROGATE_TEMP"
Target_Df2 = Target_Df2.applymap(lambda val: str(val).replace("'", "''") if val is not None else 'NULL')
# Using SQL INSERT INTO statements to insert data from the DataFrame into the target table
try:
cursor = target_conn.cursor()
for index, row in Target_Df2.iterrows():
# Prepare the column names and values dynamically
columns = ', '.join(row.index)
#values = ', '.join([f"'{val}'" if val is not None else 'NULL' for val in row])
values = ', '.join([f"'{val}'" if val is not None and val != 'NULL' else 'NULL' for val in row])
print (values)
# Defining the INSERT INTO SQL statement for the current row
insert_sql = f"""
INSERT INTO {target_table} ({columns})
VALUES ({values});
"""
cursor.execute(insert_sql)
target_conn.commit() # Commit the transaction
print("Data inserted successfully into Snowflake table.")
except Exception as e:
target_conn.rollback() # Rollback the transaction in case of an error
print(f"Error: {str(e)}")
finally:
# Closing the Snowflake connection
target_conn.close()
buran write Sep-28-2023, 10:31 AM:Please, use proper tags when post code, traceback, output, etc. This time I have added tags for you.
See
BBcode help for more info.