I've the following test file that is read using the edit.py and loads into the table. I wanted to skip the header and not show up in the Output layout, right now the header comes in the output layout.
-----Input file
SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|
-----Input file
SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|
def read_mapping_data(): """" Read the metadata from json :return: """ global args global layouts global layout_fields global transforms global transform_fields global metadata_file_name with open(metadata_file_name, 'r') as metadata_file: metadata = json.load(metadata_file) for layout_name in metadata["layouts"]: layout = metadata["layouts"][layout_name] layouts.loc[layout_name] = pd.Series() df = pd.DataFrame(columns=['name', 'start', 'end', 'length']) for key in layout: if key == "fields": for field in layout['fields']: df.loc[field['seq']] = pd.Series( {'name': field['name'], 'start': int(field['start']) if 'start' in field.keys() else "", 'end': int(field['end']) if 'end' in field.keys() else "", 'length': int(field['end']) - int(field['start']) + 1 if 'end' in field.keys() else ""}) df.sort_index(inplace=True) layout_fields[layout_name] = df else: layouts.loc[layout_name][key] = layout[key] i = 0; for transform_name in metadata["transforms"]: mapping = metadata["transforms"][transform_name] transforms.loc[transform_name] = pd.Series() df = pd.DataFrame(columns=['in_field_name', 'transform']) for key in mapping: if key == "fields": for field in mapping['fields']: df.loc[field['out_field_name']] = pd.Series( {'in_field_name': field['in_field_name'] if 'in_field_name' in field.keys() else "", 'transform': field['transform'] if 'transform' in field.keys() else ""}) transform_fields[transform_name] = df i += 1 else: transforms.loc[transform_name][key] = mapping[key] logger.info("=====================================") logger.info("Reading Metadata Completed") logger.info("=====================================") def validate_arguments(): """ Parse and validate the arguments passed :return: """ global metadata_file_name error_messages = [] if not os.path.isfile(metadata_file_name): error_messages.append( "Metadata JSON file {} not available".format(metadata_file_name)) if len(error_messages) > 0: logger.error("=====================================") logger.error("=========Errors in Arguments=========") for error_message in error_messages: logger.error(error_message) logger.error("=====================================") return False return True def validate_metadata(): """ Validate metadata for layouts :return: """ global file_names global layouts global layout_fields global transforms global transform_names error_messages = [] for transform_name in transform_names: input_layout = transforms.loc[transform_name]['input'] input_type = layouts.loc[input_layout]['type'].upper() in_query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper() output_layout = transforms.loc[transform_name]['output'] output_type = layouts.loc[output_layout]['type'].upper() key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout][ 'key_column'] transform_type = transforms.loc[transform_name]['type'].upper() in_file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][ 'location'] out_file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[input_layout][ 'location'] input_delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][ 'delimiter'].upper() output_delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][ 'delimiter'].upper() if transform_type != "TRANSPOSE" and transform_type != "MAP": error_messages.append( "Unknown transform_type {} for transform {}".format(transform_type, transform_name)) if key_column == "" and transform_type == "TRANSPOSE": error_messages.append( "Key_column tag is missing in transform_type {} for transform {}".format(transform_type, transform_name)) if transform_name not in transforms.index: error_messages.append( "Transform {} is not available in metadata JSON file {}".format(transform_name, metadata_file_name)) if input_type == 'FILE' or input_type == 'JSON': if not os.path.isfile(in_file_location + file_names[input_layout]): error_messages.append( "Transform {} input File {} not available".format(transform_name, file_names[input_layout])) if output_type == 'FILE' or output_type == 'JSON': if os.path.isfile(out_file_location + file_names[output_layout]): error_messages.append( "Transform {} output File {} already available".format(transform_name, file_names[output_layout])) if input_type == 'FILE' and input_delimiter == 'FIXED' and "" in layout_fields[input_layout]['length'].tolist(): error_messages.append( "Transform {} input type is fixed but some field positions are missing") if output_type == 'FILE' and output_delimiter == 'FIXED' and "" in layout_fields[output_layout]['length'].tolist(): error_messages.append( "Transform {} output type is fixed but some field positions are missing") # dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower() db_schema = cfg["dsn:{}:schema".format(dsn)] try: if input_type == 'TABLE': df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,input_layout), con=staging_db_engine) except: error_messages.append( "Transform {} Input Table {} not available".format(transform_name, file_names[input_layout])) # try: if output_type == 'TABLE': df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,output_layout), con=staging_db_engine) except: error_messages.append( "Transform {} Output Table {} not available".format(transform_name, file_names[input_layout])) # try: if input_type == 'QUERY': df = pd.read_sql("select count(*) cnt from ({}) where 1=2".format(in_query), con=staging_db_engine) except: error_messages.append( "Transform {} Input Query {} not executable".format(transform_name, file_names[input_layout])) # # ########################################################## if output_type == 'QUERY': error_messages.append( "Transform {} Output type Query not supported".format(transform_name, file_names[input_layout])) # # transforms.index.values: # # file_location + file_names[input_layout] if len(error_messages) > 0: logger.error("=====================================") logger.error("=========Errors in Metadata==========") for error_message in error_messages: logger.error(error_message) logger.error("=====================================") return False return True def parse_input(l_transform_name): """ Read the input and create dataframe :return: """ global file_names global layouts global layout_fields global transforms global inputs if l_transform_name == "": transform_names = transforms.index.values.tolist() else: transform_names = [l_transform_name] for transform_name in transform_names: try: input_layout = transforms.loc[transform_name]['input'] input_type = layouts.loc[input_layout]['type'].upper() logger.info("=====================================") logger.info("Reading input {} for transform {}".format(transform_name, input_layout)) logger.info("=====================================") delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][ 'delimiter'].upper() header = 0 if pd.isnull(layouts.loc[input_layout]['header']) else int(layouts.loc[input_layout][ 'header']) footer = 0 if pd.isnull(layouts.loc[input_layout]['footer']) else int(layouts.loc[input_layout][ 'footer']) quote_char = '"' if pd.isnull(layouts.loc[input_layout]['quotechar']) else layouts.loc[input_layout][ 'quotechar'] file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][ 'location'] query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper() dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower() db_engine = db.get_db_engine(cfg, dsn) db_schema = cfg["dsn:{}:schema".format(dsn)] if input_type == 'FILE': if delimiter == 'TAB': delim = "\t" elif delimiter == "": delim = "," else: delim = delimiter if delimiter == 'FIXED': df = pd.read_fwf(file_location + file_names[input_layout], skiprows=header, skipfooter=footer, colspecs=list( layout_fields[input_layout][['start', 'end']].itertuples(index=False)), # widths=layout_fields[input_layout]['length'].tolist(), names=layout_fields[input_layout]['name'].tolist()) else: df = pd.read_csv(file_location + file_names[input_layout], delimiter="|", skiprows=header, skipfooter=footer,escapechar='\\',encoding='UTF-8', quotechar=quote_char, names=layout_fields[input_layout]['name'].tolist(),dtype=object) elif input_type == 'JSON': df = pd.read_json(file_location + file_names[input_layout]) elif input_type == 'TABLE': df = pd.read_sql_table(input_layout, con=db_engine, schema=db_schema) elif input_type == 'QUERY': df = pd.read_sql(query, con=db_engine, schema=db_schema) else: logger.warn("Unknown type {} for input {}".format(input_type, input_layout)) if df.empty: logger.warn("input {} is empty".format(input_layout)) inputs[input_layout] = df except: print("Unexpected error:", sys.exc_info()[0]) raise print("printing df") logger.info(df) def process_data(l_transform_name): """ Identify the processing action :return: """ if l_transform_name == "": transform_names = transforms.index.values.tolist() else: transform_names = [l_transform_name] for transform_name in transform_names: transform_type = transforms.loc[transform_name]['type'].upper() if transform_type == "TRANSPOSE": transpose_data(transform_name) elif transform_type == "MAP": data_mapping(transform_name) else: logger.warn("Unknown transform_type {} for transform {}".format(transform_type, transform_name)) break def transpose_data(l_transform_name): """ Transpose the input fields and map to output layout :return: """ global process_date global transforms global inputs global outputs input_layout = transforms.loc[l_transform_name]['input'] output_layout = transforms.loc[l_transform_name]['output'] key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout]['key_column'] out_df = pd.DataFrame(columns=['attribute', 'value']) for index in inputs[input_layout].index: print(index) df = inputs[input_layout].ix[[index]].T df = df.reset_index() df = df.rename(index=str, columns={"index": "attribute", index: "value"}) df['key'] = inputs[input_layout].at[index, key_column] df['process_date'] = process_date out_df = out_df.append(df, ignore_index=True) if output_layout in outputs.keys(): outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=True) else: outputs[output_layout] = out_df print(out_df) def data_mapping(l_transform_name): """ Read the input and create dataframe :return: """ import re global process_date global transforms global layout_fields global inputs global outputs output_layout = transforms.loc[l_transform_name]['output'] input_layout = transforms.loc[l_transform_name]['input'] trim = "NO" if pd.isnull(transforms.loc[l_transform_name]['trim']) else transforms.loc[l_transform_name][ 'trim'].upper() upper = "NO" if pd.isnull(transforms.loc[l_transform_name]['upper']) else transforms.loc[l_transform_name][ 'upper'].upper() lower = "NO" if pd.isnull(transforms.loc[l_transform_name]['lower']) else transforms.loc[l_transform_name][ 'lower'].upper() out_df = pd.DataFrame(columns=layout_fields[output_layout]['name'].tolist()) for index in out_df.columns: print(index) input_field = transform_fields[l_transform_name].loc[index]['in_field_name'] if index in transform_fields[ l_transform_name].index else "" input_field = "" if pd.isnull(input_field) else input_field transform = transform_fields[l_transform_name].loc[index]['transform'] if index in transform_fields[ l_transform_name].index else "" transform = "" if pd.isnull(transform) else transform if input_field == "": out_df[index] = "" elif transform == "": out_df[index] = inputs[input_layout][input_field] elif re.match(r'^\[([0-9])*(:)*([0-9])*\]$', transform): mapping = re.split(r'^\[([0-9])*(:)*([0-9])*\]$', transform) out_df[index] = inputs[input_layout][input_field].str[int(mapping[1]):int(mapping[3])] if trim == "YES": if upper == "YES": out_df[index] = out_df[index].apply(lambda x: x.upper().strip()) elif lower == "YES": out_df[index] = out_df[index].apply(lambda x: x.lower().strip()) else: out_df[index] = out_df[index].apply(lambda x: x.strip()) else: if upper == "YES": out_df[index] = out_df[index].apply(lambda x: x.upper()) elif lower == "YES": out_df[index] = out_df[index].apply(lambda x: x.lower()) if output_layout in outputs.keys(): outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=False) else: outputs[output_layout] = out_df print("output_layout:") print(out_df) def write_output(): """ Write the data in output layout :return: """ global transforms global layout_fields global outputs global file_names for output_layout in outputs: output_type = layouts.loc[output_layout]['type'].upper() output_fields = layout_fields[output_layout] output_fields.set_index(output_fields['name'], drop=True, inplace=True) delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][ 'delimiter'].upper() quote_char = '"' if pd.isnull(layouts.loc[output_layout]['quotechar']) else layouts.loc[output_layout][ 'quotechar'] file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[output_layout][ 'location'] dsn = "stg_dsn" if pd.isnull(layouts.loc[output_layout]['dsn']) else layouts.loc[output_layout]['dsn'].lower() db_schema = cfg["dsn:{}:schema".format(dsn)] db_engine = db.get_db_engine(cfg, dsn) try: if output_type == 'FILE': if delimiter == 'TAB': delim = "\t" elif delimiter == "": delim = "," else: delim = delimiter if delimiter == 'FIXED': outputs[output_layout]['temp_fixed_output'] = "" for column_name in outputs[output_layout].columns.values.tolist(): if column_name != "temp_fixed_output": print("======={}=======".format(outputs[output_layout][column_name].str.pad( output_fields.loc[column_name]['length']).str[ :output_fields.loc[column_name]['length']])) print("before======={}=======".format(outputs[output_layout]['temp_fixed_output'])) outputs[output_layout]['temp_fixed_output'] = outputs[output_layout]['temp_fixed_output'] +\ outputs[output_layout][column_name].str.pad( output_fields.loc[column_name][ 'length']).str[ :output_fields.loc[column_name]['length']] print("after======={}=======".format(outputs[output_layout]['temp_fixed_output'])) if 'key' in output_fields.index: if output_fields.loc['key']['length'] < outputs[output_layout].key.map(lambda x: len(x)).max(): logger.error("Key is getting truncated while writing the output {}".format(output_layout)) outputs[output_layout][['temp_fixed_output']].to_csv(file_location + file_names[output_layout], header=False, index=False, quoting=csv.QUOTE_NONE, sep='"') outputs[output_layout].drop('temp_fixed_output', axis=1, inplace=True) else: outputs[output_layout].to_csv(file_location + file_names[output_layout], header=False, index=False, quoting=csv.QUOTE_NONNUMERIC, quotechar=quote_char, sep=delim) elif output_type == 'JSON': with open(file_location + file_names[output_layout], "w") as json_file: json_file.write(outputs[output_layout].to_json(orient='records')) elif output_type == 'TABLE': outputs[output_layout].to_sql(name=output_layout, con=db_engine, if_exists='append', index=False, schema=db_schema,chunksize=5000) else: logger.error("{} not implemented yet".format(output_type)) except: logger.error("Error in writing the output {}".format(output_layout)) raise # ----------------- PROCESS BEGINS ---------------------- ######################################## # Parse arguments ######################################## parser = argparse.ArgumentParser(description='Process data') parser.add_argument("--metadata_file_name", metavar='Metadata JSON File Name', type=str, required=True, help='Metadata JSON config file') parser.add_argument("--file_names", metavar='File Names Dict', type=str, default="{}", help='File names in Dict format') parser.add_argument("--transform_names", metavar='Transform Name', type=str, default="[]", help='Name of transform from metadata json, process all transforms if none specified') parser.add_argument("--module_name", metavar='Module Name', type=str, default="", help='Module Name') args = parser.parse_args() logger.info("=====================================") logger.info("Arguments: {}".format(vars(args))) logger.info("=====================================") process_date = datetime.now().isoformat() file_names = ast.literal_eval(args.file_names) staging_db_engine = db.get_db_engine(cfg, 'stg_dsn') transform_names = ast.literal_eval(args.transform_names) metadata_file_name = args.metadata_file_name if os.path.isabs(args.metadata_file_name) \ else os.path.abspath(os.path.expanduser(args.metadata_file_name)) layouts = pd.DataFrame(columns=['type', 'delimiter', 'location', 'key_column', 'query', 'header', 'footer', 'quotechar', 'dsn']) layout_fields = {} transforms = pd.DataFrame(columns=['input', 'output', 'type', 'trim', 'upper', 'lower']) transform_fields = {} inputs = {} outputs = {} ######################################## # Get lock ######################################## # Acquire lock to make sure only one instance is running for a given metadata JSON if misc.get_lock("transform_{}".format(os.path.basename(args.metadata_file_name))) is not True: logger.error("Failed to get lock. Another instance may be running.") sys.exit(1) try: logger.info("=====================================") logger.info("Validate the arguments passed") logger.info("=====================================") if not validate_arguments(): logger.error("Invalid arguments.") sys.exit(1) logger.info("=====================================") logger.info("Reading data from JSON: {}".format(args.metadata_file_name)) logger.info("=====================================") read_mapping_data() logger.info("=====================================") logger.info("Validate data") logger.info("=====================================") if not validate_metadata(): logger.error("Errors in data.") sys.exit(1) logger.info("=====================================") logger.info("Reading all the inputs") logger.info("=====================================") if len(transform_names) > 0: parse_input("") else: for transform_name in transform_names: parse_input(transform_name) logger.info("=====================================") logger.info("Apply {} transform".format(transform_names)) logger.info("=====================================") if len(transform_names) > 0: process_data("") else: for transform_name in transform_names: process_data(transform_name) logger.info("=====================================") logger.info("Write the output") logger.info("=====================================") write_output() #write_to_output_upsert() finally: # cleanup logger.debug("Clean")