Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Skip header
#1
I've the following test file that is read using the edit.py and loads into the table. I wanted to skip the header and not show up in the Output layout, right now the header comes in the output layout.

-----Input file

SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|

def read_mapping_data():
    """"
    Read the metadata from json
    :return:
    """
    global args
    global layouts
    global layout_fields
    global transforms
    global transform_fields
    global metadata_file_name
    with open(metadata_file_name, 'r') as metadata_file:
        metadata = json.load(metadata_file)
    for layout_name in metadata["layouts"]:
        layout = metadata["layouts"][layout_name]
        layouts.loc[layout_name] = pd.Series()
        df = pd.DataFrame(columns=['name', 'start', 'end', 'length'])
        for key in layout:
            if key == "fields":
                for field in layout['fields']:
                    df.loc[field['seq']] = pd.Series(
                        {'name': field['name'], 'start': int(field['start']) if 'start' in field.keys() else "",
                         'end': int(field['end']) if 'end' in field.keys() else "",
                         'length': int(field['end']) - int(field['start']) + 1 if 'end' in field.keys() else ""})
                df.sort_index(inplace=True)
                layout_fields[layout_name] = df
            else:
                layouts.loc[layout_name][key] = layout[key]
    i = 0;
    for transform_name in metadata["transforms"]:
        mapping = metadata["transforms"][transform_name]
        transforms.loc[transform_name] = pd.Series()
        df = pd.DataFrame(columns=['in_field_name', 'transform'])
        for key in mapping:
            if key == "fields":
                for field in mapping['fields']:
                    df.loc[field['out_field_name']] = pd.Series(
                        {'in_field_name': field['in_field_name'] if 'in_field_name' in field.keys() else "",
                         'transform': field['transform'] if 'transform' in field.keys() else ""})
                    transform_fields[transform_name] = df
                    i += 1
            else:
                transforms.loc[transform_name][key] = mapping[key]
    logger.info("=====================================")
    logger.info("Reading Metadata Completed")
    logger.info("=====================================")


def validate_arguments():
    """
    Parse and validate the arguments passed
    :return:
    """
    global metadata_file_name
    error_messages = []
    if not os.path.isfile(metadata_file_name):
        error_messages.append(
            "Metadata JSON file {} not available".format(metadata_file_name))
    if len(error_messages) > 0:
        logger.error("=====================================")
        logger.error("=========Errors in Arguments=========")
        for error_message in error_messages:
            logger.error(error_message)
        logger.error("=====================================")
        return False
    return True


def validate_metadata():
    """
    Validate metadata for layouts
    :return:
    """
    global file_names
    global layouts
    global layout_fields
    global transforms
    global transform_names
    error_messages = []
    for transform_name in transform_names:
        input_layout = transforms.loc[transform_name]['input']
        input_type = layouts.loc[input_layout]['type'].upper()
        in_query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
        output_layout = transforms.loc[transform_name]['output']
        output_type = layouts.loc[output_layout]['type'].upper()
        key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout][
            'key_column']
        transform_type = transforms.loc[transform_name]['type'].upper()
        in_file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
            'location']
        out_file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[input_layout][
            'location']
        input_delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
            'delimiter'].upper()
        output_delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
            'delimiter'].upper()
        if transform_type != "TRANSPOSE" and transform_type != "MAP":
            error_messages.append(
                "Unknown transform_type {} for transform {}".format(transform_type, transform_name))
        if key_column == "" and transform_type == "TRANSPOSE":
            error_messages.append(
                "Key_column tag is missing in transform_type {} for transform {}".format(transform_type, transform_name))
        if transform_name not in transforms.index:
            error_messages.append(
                "Transform {} is not available in metadata JSON file {}".format(transform_name, metadata_file_name))
        if input_type == 'FILE' or input_type == 'JSON':
            if not os.path.isfile(in_file_location + file_names[input_layout]):
                error_messages.append(
                    "Transform {} input File {} not available".format(transform_name, file_names[input_layout]))
        if output_type == 'FILE' or output_type == 'JSON':
            if os.path.isfile(out_file_location + file_names[output_layout]):
                error_messages.append(
                    "Transform {} output File {} already available".format(transform_name, file_names[output_layout]))
        if input_type == 'FILE'  and input_delimiter == 'FIXED' and "" in layout_fields[input_layout]['length'].tolist():
            error_messages.append(
                "Transform {} input type is fixed but some field positions are missing")
        if output_type == 'FILE'  and output_delimiter == 'FIXED' and "" in layout_fields[output_layout]['length'].tolist():
            error_messages.append(
                "Transform {} output type is fixed but some field positions are missing")
        #
        dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
        db_schema = cfg["dsn:{}:schema".format(dsn)]
        try:
            if input_type == 'TABLE':
                df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,input_layout), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Input Table {} not available".format(transform_name, file_names[input_layout]))
        #
        try:
            if output_type == 'TABLE':
                df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,output_layout), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Output Table {} not available".format(transform_name, file_names[input_layout]))
        #
        try:
            if input_type == 'QUERY':
                df = pd.read_sql("select count(*) cnt from ({}) where 1=2".format(in_query), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Input Query {} not executable".format(transform_name, file_names[input_layout]))
        #
        #
        ##########################################################
        if output_type == 'QUERY':
            error_messages.append(
                "Transform {} Output type Query not supported".format(transform_name, file_names[input_layout]))
            #
            # transforms.index.values:
            #
    # file_location + file_names[input_layout]
    if len(error_messages) > 0:
        logger.error("=====================================")
        logger.error("=========Errors in Metadata==========")
        for error_message in error_messages:
            logger.error(error_message)
        logger.error("=====================================")
        return False
    return True


def parse_input(l_transform_name):
    """
    Read the input and create dataframe
    :return:
    """
    global file_names
    global layouts
    global layout_fields
    global transforms
    global inputs
    if l_transform_name == "":
        transform_names = transforms.index.values.tolist()
    else:
        transform_names = [l_transform_name]
    for transform_name in transform_names:
        try:
            input_layout = transforms.loc[transform_name]['input']
            input_type = layouts.loc[input_layout]['type'].upper()
            logger.info("=====================================")
            logger.info("Reading input {} for transform {}".format(transform_name, input_layout))
            logger.info("=====================================")
            delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
                'delimiter'].upper()
            header = 0 if pd.isnull(layouts.loc[input_layout]['header']) else int(layouts.loc[input_layout][
                                                                                      'header'])
            footer = 0 if pd.isnull(layouts.loc[input_layout]['footer']) else int(layouts.loc[input_layout][
                                                                                      'footer'])
            quote_char = '"' if pd.isnull(layouts.loc[input_layout]['quotechar']) else layouts.loc[input_layout][
                'quotechar']
            file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
                'location']
            query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
            dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
            db_engine = db.get_db_engine(cfg, dsn)
            db_schema = cfg["dsn:{}:schema".format(dsn)]
            if input_type == 'FILE':
                if delimiter == 'TAB':
                    delim = "\t"
                elif delimiter == "":
                    delim = ","
                else:
                    delim = delimiter
                if delimiter == 'FIXED':
                    df = pd.read_fwf(file_location + file_names[input_layout],
                                     skiprows=header, skipfooter=footer,
                                     colspecs=list(
                                         layout_fields[input_layout][['start', 'end']].itertuples(index=False)),
                                     # widths=layout_fields[input_layout]['length'].tolist(),
                                     names=layout_fields[input_layout]['name'].tolist())
                else:
                    df = pd.read_csv(file_location + file_names[input_layout], delimiter="|",
                                     skiprows=header, skipfooter=footer,escapechar='\\',encoding='UTF-8', quotechar=quote_char,
                                     names=layout_fields[input_layout]['name'].tolist(),dtype=object)
            elif input_type == 'JSON':
                df = pd.read_json(file_location + file_names[input_layout])
            elif input_type == 'TABLE':
                df = pd.read_sql_table(input_layout, con=db_engine, schema=db_schema)
            elif input_type == 'QUERY':
                df = pd.read_sql(query, con=db_engine, schema=db_schema)
            else:
                logger.warn("Unknown type {} for input {}".format(input_type, input_layout))
            if df.empty:
                logger.warn("input {} is empty".format(input_layout))
            inputs[input_layout] = df
        except:
            print("Unexpected error:", sys.exc_info()[0])
            raise
        print("printing df")
        logger.info(df)


def process_data(l_transform_name):
    """
    Identify the processing action
    :return:
    """
    if l_transform_name == "":
        transform_names = transforms.index.values.tolist()
    else:
        transform_names = [l_transform_name]
    for transform_name in transform_names:
        transform_type = transforms.loc[transform_name]['type'].upper()
        if transform_type == "TRANSPOSE":
            transpose_data(transform_name)
        elif transform_type == "MAP":
            data_mapping(transform_name)
        else:
            logger.warn("Unknown transform_type {} for transform {}".format(transform_type, transform_name))
            break


def transpose_data(l_transform_name):
    """
    Transpose the input fields and map to output layout
    :return:
    """
    global process_date
    global transforms
    global inputs
    global outputs
    input_layout = transforms.loc[l_transform_name]['input']
    output_layout = transforms.loc[l_transform_name]['output']
    key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout]['key_column']
    out_df = pd.DataFrame(columns=['attribute', 'value'])
    for index in inputs[input_layout].index:
        print(index)
        df = inputs[input_layout].ix[[index]].T
        df = df.reset_index()
        df = df.rename(index=str, columns={"index": "attribute", index: "value"})
        df['key'] = inputs[input_layout].at[index, key_column]
        df['process_date'] = process_date
        out_df = out_df.append(df, ignore_index=True)
    if output_layout in outputs.keys():
        outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=True)
    else:
        outputs[output_layout] = out_df
    print(out_df)


def data_mapping(l_transform_name):
    """
    Read the input and create dataframe
    :return:
    """
    import re
    global process_date
    global transforms
    global layout_fields
    global inputs
    global outputs
    output_layout = transforms.loc[l_transform_name]['output']
    input_layout = transforms.loc[l_transform_name]['input']
    trim = "NO" if pd.isnull(transforms.loc[l_transform_name]['trim']) else transforms.loc[l_transform_name][
        'trim'].upper()
    upper = "NO" if pd.isnull(transforms.loc[l_transform_name]['upper']) else transforms.loc[l_transform_name][
        'upper'].upper()
    lower = "NO" if pd.isnull(transforms.loc[l_transform_name]['lower']) else transforms.loc[l_transform_name][
        'lower'].upper()
    out_df = pd.DataFrame(columns=layout_fields[output_layout]['name'].tolist())
    for index in out_df.columns:
        print(index)
        input_field = transform_fields[l_transform_name].loc[index]['in_field_name'] if index in transform_fields[
            l_transform_name].index else ""
        input_field = "" if pd.isnull(input_field) else input_field
        transform = transform_fields[l_transform_name].loc[index]['transform'] if index in transform_fields[
            l_transform_name].index else ""
        transform = "" if pd.isnull(transform) else transform
        if input_field == "":
            out_df[index] = ""
        elif transform == "":
            out_df[index] = inputs[input_layout][input_field]
        elif re.match(r'^\[([0-9])*(:)*([0-9])*\]$', transform):
            mapping = re.split(r'^\[([0-9])*(:)*([0-9])*\]$', transform)
            out_df[index] = inputs[input_layout][input_field].str[int(mapping[1]):int(mapping[3])]
        if trim == "YES":
            if upper == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.upper().strip())
            elif lower == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.lower().strip())
            else:
                out_df[index] = out_df[index].apply(lambda x: x.strip())
        else:
            if upper == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.upper())
            elif lower == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.lower())
    if output_layout in outputs.keys():
        outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=False)
    else:
        outputs[output_layout] = out_df
    print("output_layout:")
    print(out_df)


def write_output():
    """
    Write the data in output layout
    :return:
    """
    global transforms
    global layout_fields
    global outputs
    global file_names
    for output_layout in outputs:
        output_type = layouts.loc[output_layout]['type'].upper()
        output_fields = layout_fields[output_layout]
        output_fields.set_index(output_fields['name'], drop=True, inplace=True)
        delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
            'delimiter'].upper()
        quote_char = '"' if pd.isnull(layouts.loc[output_layout]['quotechar']) else layouts.loc[output_layout][
            'quotechar']
        file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[output_layout][
            'location']
        dsn = "stg_dsn" if pd.isnull(layouts.loc[output_layout]['dsn']) else layouts.loc[output_layout]['dsn'].lower()
        db_schema = cfg["dsn:{}:schema".format(dsn)]
        db_engine = db.get_db_engine(cfg, dsn)
        try:
            if output_type == 'FILE':
                if delimiter == 'TAB':
                    delim = "\t"
                elif delimiter == "":
                    delim = ","
                else:
                    delim = delimiter
                if delimiter == 'FIXED':
                    outputs[output_layout]['temp_fixed_output'] = ""
                    for column_name in outputs[output_layout].columns.values.tolist():
                        if column_name != "temp_fixed_output":
                            print("======={}=======".format(outputs[output_layout][column_name].str.pad(
                                output_fields.loc[column_name]['length']).str[
                                                            :output_fields.loc[column_name]['length']]))
                            print("before======={}=======".format(outputs[output_layout]['temp_fixed_output']))
                            outputs[output_layout]['temp_fixed_output'] = outputs[output_layout]['temp_fixed_output'] +\
                                                                          outputs[output_layout][column_name].str.pad(
                                                                              output_fields.loc[column_name][
                                                                                  'length']).str[
                                                                          :output_fields.loc[column_name]['length']]
                            print("after======={}=======".format(outputs[output_layout]['temp_fixed_output']))
                    if 'key' in output_fields.index:
                        if output_fields.loc['key']['length'] < outputs[output_layout].key.map(lambda x: len(x)).max():
                            logger.error("Key is getting truncated while writing the output {}".format(output_layout))
                    outputs[output_layout][['temp_fixed_output']].to_csv(file_location + file_names[output_layout],
                                                                         header=False, index=False,
                                                                         quoting=csv.QUOTE_NONE, sep='"')
                    outputs[output_layout].drop('temp_fixed_output', axis=1, inplace=True)
                else:
                    outputs[output_layout].to_csv(file_location + file_names[output_layout], header=False, index=False,
                                                  quoting=csv.QUOTE_NONNUMERIC, quotechar=quote_char, sep=delim)
            elif output_type == 'JSON':
                with open(file_location + file_names[output_layout], "w") as json_file:
                    json_file.write(outputs[output_layout].to_json(orient='records'))
            elif output_type == 'TABLE':
                outputs[output_layout].to_sql(name=output_layout, con=db_engine, if_exists='append',
                                             index=False, schema=db_schema,chunksize=5000)
            else:
                logger.error("{} not implemented yet".format(output_type))
        except:
            logger.error("Error in writing the output {}".format(output_layout))
            raise

# ----------------- PROCESS BEGINS  ----------------------

########################################
# Parse arguments
########################################

parser = argparse.ArgumentParser(description='Process data')
parser.add_argument("--metadata_file_name", metavar='Metadata JSON File Name', type=str, required=True,
                    help='Metadata JSON config file')
parser.add_argument("--file_names", metavar='File Names Dict', type=str, default="{}",
                    help='File names in Dict format')
parser.add_argument("--transform_names", metavar='Transform Name', type=str, default="[]",
                    help='Name of transform from metadata json, process all transforms if none specified')

parser.add_argument("--module_name", metavar='Module Name', type=str, default="",
                    help='Module Name')

args = parser.parse_args()

logger.info("=====================================")
logger.info("Arguments: {}".format(vars(args)))
logger.info("=====================================")

process_date = datetime.now().isoformat()

file_names = ast.literal_eval(args.file_names)
staging_db_engine = db.get_db_engine(cfg, 'stg_dsn')
transform_names = ast.literal_eval(args.transform_names)
metadata_file_name = args.metadata_file_name if os.path.isabs(args.metadata_file_name) \
    else os.path.abspath(os.path.expanduser(args.metadata_file_name))

layouts = pd.DataFrame(columns=['type', 'delimiter', 'location', 'key_column', 'query',
                                'header', 'footer', 'quotechar', 'dsn'])
layout_fields = {}
transforms = pd.DataFrame(columns=['input', 'output', 'type', 'trim', 'upper', 'lower'])
transform_fields = {}
inputs = {}
outputs = {}

########################################
# Get lock
########################################
# Acquire lock to make sure only one instance is running for a given metadata JSON
if misc.get_lock("transform_{}".format(os.path.basename(args.metadata_file_name))) is not True:
    logger.error("Failed to get lock. Another instance may be running.")
    sys.exit(1)

try:
    logger.info("=====================================")
    logger.info("Validate the arguments passed")
    logger.info("=====================================")
    if not validate_arguments():
        logger.error("Invalid arguments.")
        sys.exit(1)

    logger.info("=====================================")
    logger.info("Reading data from JSON: {}".format(args.metadata_file_name))
    logger.info("=====================================")
    read_mapping_data()


    logger.info("=====================================")
    logger.info("Validate data")
    logger.info("=====================================")
    if not validate_metadata():
        logger.error("Errors in data.")
        sys.exit(1)

    logger.info("=====================================")
    logger.info("Reading all the inputs")
    logger.info("=====================================")
    if len(transform_names) > 0:
        parse_input("")
    else:
        for transform_name in transform_names:
            parse_input(transform_name)


    logger.info("=====================================")
    logger.info("Apply {} transform".format(transform_names))
    logger.info("=====================================")
    if len(transform_names) > 0:
        process_data("")
    else:
        for transform_name in transform_names:
            process_data(transform_name)

    logger.info("=====================================")
    logger.info("Write the output")
    logger.info("=====================================")
    write_output()
    #write_to_output_upsert()

finally:
    # cleanup
    logger.debug("Clean")
Reply


Messages In This Thread
Skip header - by python_bg1 - Jul-24-2019, 08:39 PM
RE: Skip header - by ichabod801 - Jul-25-2019, 01:34 AM

Possibly Related Threads…
Thread Author Replies Views Last Post
  need to skip password prompt, continue... tester_V 2 1,501 Oct-19-2021, 05:49 PM
Last Post: tester_V
  Delimiters - How to skip some html tags from being translate Melcu54 0 1,677 May-26-2021, 06:21 AM
Last Post: Melcu54
  How to skip to a different line while importing a different script kat_gamer 2 2,267 Feb-03-2021, 04:10 AM
Last Post: deanhystad
  How to skip a folder directory in a loop mfkzolo 2 12,663 Nov-18-2020, 07:56 AM
Last Post: mfkzolo
  How to skip LinkedIn signup link using python script? Mangesh121 0 1,814 Aug-26-2020, 01:22 PM
Last Post: Mangesh121
  How to calculate column mean and row skip non numeric and na Mekala 5 4,996 May-06-2020, 10:52 AM
Last Post: anbu23
  Functions, skip an arugment SpongeB0B 2 2,137 Mar-27-2020, 12:10 PM
Last Post: SpongeB0B
  Skip a line doug2019 4 3,193 Oct-09-2019, 06:56 PM
Last Post: ichabod801
  Python: why skip the 'else' if password is wrong Max_988 1 2,000 Jun-20-2019, 12:19 AM
Last Post: woooee
  Function to skip meta data in a .csv file using Python ajgardev 1 2,540 Jul-22-2018, 12:53 PM
Last Post: Larz60+

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020