Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Skip header
#1
I've the following test file that is read using the edit.py and loads into the table. I wanted to skip the header and not show up in the Output layout, right now the header comes in the output layout.

-----Input file

SERIAL|MNO|YR|DESC|AMT|MSRP|DLR|NAME|STATE|DATE|M_CAT|CNTRY|WMON|WMIL|
SADCJ2B|7315 |2017|RAV4 |61,638.96|57,250.00|4495| |PA|20170515|FPACE |CAN|18|8000|
SADCJ2F|C761|2019|CAMRY Premium |56,139.19|57,821.00|5339| |NC|20190531|FPACE |USA|36|1000|

def read_mapping_data():
    """"
    Read the metadata from json
    :return:
    """
    global args
    global layouts
    global layout_fields
    global transforms
    global transform_fields
    global metadata_file_name
    with open(metadata_file_name, 'r') as metadata_file:
        metadata = json.load(metadata_file)
    for layout_name in metadata["layouts"]:
        layout = metadata["layouts"][layout_name]
        layouts.loc[layout_name] = pd.Series()
        df = pd.DataFrame(columns=['name', 'start', 'end', 'length'])
        for key in layout:
            if key == "fields":
                for field in layout['fields']:
                    df.loc[field['seq']] = pd.Series(
                        {'name': field['name'], 'start': int(field['start']) if 'start' in field.keys() else "",
                         'end': int(field['end']) if 'end' in field.keys() else "",
                         'length': int(field['end']) - int(field['start']) + 1 if 'end' in field.keys() else ""})
                df.sort_index(inplace=True)
                layout_fields[layout_name] = df
            else:
                layouts.loc[layout_name][key] = layout[key]
    i = 0;
    for transform_name in metadata["transforms"]:
        mapping = metadata["transforms"][transform_name]
        transforms.loc[transform_name] = pd.Series()
        df = pd.DataFrame(columns=['in_field_name', 'transform'])
        for key in mapping:
            if key == "fields":
                for field in mapping['fields']:
                    df.loc[field['out_field_name']] = pd.Series(
                        {'in_field_name': field['in_field_name'] if 'in_field_name' in field.keys() else "",
                         'transform': field['transform'] if 'transform' in field.keys() else ""})
                    transform_fields[transform_name] = df
                    i += 1
            else:
                transforms.loc[transform_name][key] = mapping[key]
    logger.info("=====================================")
    logger.info("Reading Metadata Completed")
    logger.info("=====================================")


def validate_arguments():
    """
    Parse and validate the arguments passed
    :return:
    """
    global metadata_file_name
    error_messages = []
    if not os.path.isfile(metadata_file_name):
        error_messages.append(
            "Metadata JSON file {} not available".format(metadata_file_name))
    if len(error_messages) > 0:
        logger.error("=====================================")
        logger.error("=========Errors in Arguments=========")
        for error_message in error_messages:
            logger.error(error_message)
        logger.error("=====================================")
        return False
    return True


def validate_metadata():
    """
    Validate metadata for layouts
    :return:
    """
    global file_names
    global layouts
    global layout_fields
    global transforms
    global transform_names
    error_messages = []
    for transform_name in transform_names:
        input_layout = transforms.loc[transform_name]['input']
        input_type = layouts.loc[input_layout]['type'].upper()
        in_query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
        output_layout = transforms.loc[transform_name]['output']
        output_type = layouts.loc[output_layout]['type'].upper()
        key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout][
            'key_column']
        transform_type = transforms.loc[transform_name]['type'].upper()
        in_file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
            'location']
        out_file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[input_layout][
            'location']
        input_delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
            'delimiter'].upper()
        output_delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
            'delimiter'].upper()
        if transform_type != "TRANSPOSE" and transform_type != "MAP":
            error_messages.append(
                "Unknown transform_type {} for transform {}".format(transform_type, transform_name))
        if key_column == "" and transform_type == "TRANSPOSE":
            error_messages.append(
                "Key_column tag is missing in transform_type {} for transform {}".format(transform_type, transform_name))
        if transform_name not in transforms.index:
            error_messages.append(
                "Transform {} is not available in metadata JSON file {}".format(transform_name, metadata_file_name))
        if input_type == 'FILE' or input_type == 'JSON':
            if not os.path.isfile(in_file_location + file_names[input_layout]):
                error_messages.append(
                    "Transform {} input File {} not available".format(transform_name, file_names[input_layout]))
        if output_type == 'FILE' or output_type == 'JSON':
            if os.path.isfile(out_file_location + file_names[output_layout]):
                error_messages.append(
                    "Transform {} output File {} already available".format(transform_name, file_names[output_layout]))
        if input_type == 'FILE'  and input_delimiter == 'FIXED' and "" in layout_fields[input_layout]['length'].tolist():
            error_messages.append(
                "Transform {} input type is fixed but some field positions are missing")
        if output_type == 'FILE'  and output_delimiter == 'FIXED' and "" in layout_fields[output_layout]['length'].tolist():
            error_messages.append(
                "Transform {} output type is fixed but some field positions are missing")
        #
        dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
        db_schema = cfg["dsn:{}:schema".format(dsn)]
        try:
            if input_type == 'TABLE':
                df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,input_layout), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Input Table {} not available".format(transform_name, file_names[input_layout]))
        #
        try:
            if output_type == 'TABLE':
                df = pd.read_sql("select count(*) cnt from {}.{} where 1=2".format(db_schema,output_layout), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Output Table {} not available".format(transform_name, file_names[input_layout]))
        #
        try:
            if input_type == 'QUERY':
                df = pd.read_sql("select count(*) cnt from ({}) where 1=2".format(in_query), con=staging_db_engine)
        except:
            error_messages.append(
                "Transform {} Input Query {} not executable".format(transform_name, file_names[input_layout]))
        #
        #
        ##########################################################
        if output_type == 'QUERY':
            error_messages.append(
                "Transform {} Output type Query not supported".format(transform_name, file_names[input_layout]))
            #
            # transforms.index.values:
            #
    # file_location + file_names[input_layout]
    if len(error_messages) > 0:
        logger.error("=====================================")
        logger.error("=========Errors in Metadata==========")
        for error_message in error_messages:
            logger.error(error_message)
        logger.error("=====================================")
        return False
    return True


def parse_input(l_transform_name):
    """
    Read the input and create dataframe
    :return:
    """
    global file_names
    global layouts
    global layout_fields
    global transforms
    global inputs
    if l_transform_name == "":
        transform_names = transforms.index.values.tolist()
    else:
        transform_names = [l_transform_name]
    for transform_name in transform_names:
        try:
            input_layout = transforms.loc[transform_name]['input']
            input_type = layouts.loc[input_layout]['type'].upper()
            logger.info("=====================================")
            logger.info("Reading input {} for transform {}".format(transform_name, input_layout))
            logger.info("=====================================")
            delimiter = "" if pd.isnull(layouts.loc[input_layout]['delimiter']) else layouts.loc[input_layout][
                'delimiter'].upper()
            header = 0 if pd.isnull(layouts.loc[input_layout]['header']) else int(layouts.loc[input_layout][
                                                                                      'header'])
            footer = 0 if pd.isnull(layouts.loc[input_layout]['footer']) else int(layouts.loc[input_layout][
                                                                                      'footer'])
            quote_char = '"' if pd.isnull(layouts.loc[input_layout]['quotechar']) else layouts.loc[input_layout][
                'quotechar']
            file_location = "" if pd.isnull(layouts.loc[input_layout]['location']) else layouts.loc[input_layout][
                'location']
            query = "" if pd.isnull(layouts.loc[input_layout]['query']) else layouts.loc[input_layout]['query'].upper()
            dsn = "stg_dsn" if pd.isnull(layouts.loc[input_layout]['dsn']) else layouts.loc[input_layout]['dsn'].lower()
            db_engine = db.get_db_engine(cfg, dsn)
            db_schema = cfg["dsn:{}:schema".format(dsn)]
            if input_type == 'FILE':
                if delimiter == 'TAB':
                    delim = "\t"
                elif delimiter == "":
                    delim = ","
                else:
                    delim = delimiter
                if delimiter == 'FIXED':
                    df = pd.read_fwf(file_location + file_names[input_layout],
                                     skiprows=header, skipfooter=footer,
                                     colspecs=list(
                                         layout_fields[input_layout][['start', 'end']].itertuples(index=False)),
                                     # widths=layout_fields[input_layout]['length'].tolist(),
                                     names=layout_fields[input_layout]['name'].tolist())
                else:
                    df = pd.read_csv(file_location + file_names[input_layout], delimiter="|",
                                     skiprows=header, skipfooter=footer,escapechar='\\',encoding='UTF-8', quotechar=quote_char,
                                     names=layout_fields[input_layout]['name'].tolist(),dtype=object)
            elif input_type == 'JSON':
                df = pd.read_json(file_location + file_names[input_layout])
            elif input_type == 'TABLE':
                df = pd.read_sql_table(input_layout, con=db_engine, schema=db_schema)
            elif input_type == 'QUERY':
                df = pd.read_sql(query, con=db_engine, schema=db_schema)
            else:
                logger.warn("Unknown type {} for input {}".format(input_type, input_layout))
            if df.empty:
                logger.warn("input {} is empty".format(input_layout))
            inputs[input_layout] = df
        except:
            print("Unexpected error:", sys.exc_info()[0])
            raise
        print("printing df")
        logger.info(df)


def process_data(l_transform_name):
    """
    Identify the processing action
    :return:
    """
    if l_transform_name == "":
        transform_names = transforms.index.values.tolist()
    else:
        transform_names = [l_transform_name]
    for transform_name in transform_names:
        transform_type = transforms.loc[transform_name]['type'].upper()
        if transform_type == "TRANSPOSE":
            transpose_data(transform_name)
        elif transform_type == "MAP":
            data_mapping(transform_name)
        else:
            logger.warn("Unknown transform_type {} for transform {}".format(transform_type, transform_name))
            break


def transpose_data(l_transform_name):
    """
    Transpose the input fields and map to output layout
    :return:
    """
    global process_date
    global transforms
    global inputs
    global outputs
    input_layout = transforms.loc[l_transform_name]['input']
    output_layout = transforms.loc[l_transform_name]['output']
    key_column = "" if pd.isnull(layouts.loc[input_layout]['key_column']) else layouts.loc[input_layout]['key_column']
    out_df = pd.DataFrame(columns=['attribute', 'value'])
    for index in inputs[input_layout].index:
        print(index)
        df = inputs[input_layout].ix[[index]].T
        df = df.reset_index()
        df = df.rename(index=str, columns={"index": "attribute", index: "value"})
        df['key'] = inputs[input_layout].at[index, key_column]
        df['process_date'] = process_date
        out_df = out_df.append(df, ignore_index=True)
    if output_layout in outputs.keys():
        outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=True)
    else:
        outputs[output_layout] = out_df
    print(out_df)


def data_mapping(l_transform_name):
    """
    Read the input and create dataframe
    :return:
    """
    import re
    global process_date
    global transforms
    global layout_fields
    global inputs
    global outputs
    output_layout = transforms.loc[l_transform_name]['output']
    input_layout = transforms.loc[l_transform_name]['input']
    trim = "NO" if pd.isnull(transforms.loc[l_transform_name]['trim']) else transforms.loc[l_transform_name][
        'trim'].upper()
    upper = "NO" if pd.isnull(transforms.loc[l_transform_name]['upper']) else transforms.loc[l_transform_name][
        'upper'].upper()
    lower = "NO" if pd.isnull(transforms.loc[l_transform_name]['lower']) else transforms.loc[l_transform_name][
        'lower'].upper()
    out_df = pd.DataFrame(columns=layout_fields[output_layout]['name'].tolist())
    for index in out_df.columns:
        print(index)
        input_field = transform_fields[l_transform_name].loc[index]['in_field_name'] if index in transform_fields[
            l_transform_name].index else ""
        input_field = "" if pd.isnull(input_field) else input_field
        transform = transform_fields[l_transform_name].loc[index]['transform'] if index in transform_fields[
            l_transform_name].index else ""
        transform = "" if pd.isnull(transform) else transform
        if input_field == "":
            out_df[index] = ""
        elif transform == "":
            out_df[index] = inputs[input_layout][input_field]
        elif re.match(r'^\[([0-9])*(:)*([0-9])*\]$', transform):
            mapping = re.split(r'^\[([0-9])*(:)*([0-9])*\]$', transform)
            out_df[index] = inputs[input_layout][input_field].str[int(mapping[1]):int(mapping[3])]
        if trim == "YES":
            if upper == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.upper().strip())
            elif lower == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.lower().strip())
            else:
                out_df[index] = out_df[index].apply(lambda x: x.strip())
        else:
            if upper == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.upper())
            elif lower == "YES":
                out_df[index] = out_df[index].apply(lambda x: x.lower())
    if output_layout in outputs.keys():
        outputs[output_layout] = outputs[output_layout].append(out_df, ignore_index=False)
    else:
        outputs[output_layout] = out_df
    print("output_layout:")
    print(out_df)


def write_output():
    """
    Write the data in output layout
    :return:
    """
    global transforms
    global layout_fields
    global outputs
    global file_names
    for output_layout in outputs:
        output_type = layouts.loc[output_layout]['type'].upper()
        output_fields = layout_fields[output_layout]
        output_fields.set_index(output_fields['name'], drop=True, inplace=True)
        delimiter = "" if pd.isnull(layouts.loc[output_layout]['delimiter']) else layouts.loc[output_layout][
            'delimiter'].upper()
        quote_char = '"' if pd.isnull(layouts.loc[output_layout]['quotechar']) else layouts.loc[output_layout][
            'quotechar']
        file_location = "" if pd.isnull(layouts.loc[output_layout]['location']) else layouts.loc[output_layout][
            'location']
        dsn = "stg_dsn" if pd.isnull(layouts.loc[output_layout]['dsn']) else layouts.loc[output_layout]['dsn'].lower()
        db_schema = cfg["dsn:{}:schema".format(dsn)]
        db_engine = db.get_db_engine(cfg, dsn)
        try:
            if output_type == 'FILE':
                if delimiter == 'TAB':
                    delim = "\t"
                elif delimiter == "":
                    delim = ","
                else:
                    delim = delimiter
                if delimiter == 'FIXED':
                    outputs[output_layout]['temp_fixed_output'] = ""
                    for column_name in outputs[output_layout].columns.values.tolist():
                        if column_name != "temp_fixed_output":
                            print("======={}=======".format(outputs[output_layout][column_name].str.pad(
                                output_fields.loc[column_name]['length']).str[
                                                            :output_fields.loc[column_name]['length']]))
                            print("before======={}=======".format(outputs[output_layout]['temp_fixed_output']))
                            outputs[output_layout]['temp_fixed_output'] = outputs[output_layout]['temp_fixed_output'] +\
                                                                          outputs[output_layout][column_name].str.pad(
                                                                              output_fields.loc[column_name][
                                                                                  'length']).str[
                                                                          :output_fields.loc[column_name]['length']]
                            print("after======={}=======".format(outputs[output_layout]['temp_fixed_output']))
                    if 'key' in output_fields.index:
                        if output_fields.loc['key']['length'] < outputs[output_layout].key.map(lambda x: len(x)).max():
                            logger.error("Key is getting truncated while writing the output {}".format(output_layout))
                    outputs[output_layout][['temp_fixed_output']].to_csv(file_location + file_names[output_layout],
                                                                         header=False, index=False,
                                                                         quoting=csv.QUOTE_NONE, sep='"')
                    outputs[output_layout].drop('temp_fixed_output', axis=1, inplace=True)
                else:
                    outputs[output_layout].to_csv(file_location + file_names[output_layout], header=False, index=False,
                                                  quoting=csv.QUOTE_NONNUMERIC, quotechar=quote_char, sep=delim)
            elif output_type == 'JSON':
                with open(file_location + file_names[output_layout], "w") as json_file:
                    json_file.write(outputs[output_layout].to_json(orient='records'))
            elif output_type == 'TABLE':
                outputs[output_layout].to_sql(name=output_layout, con=db_engine, if_exists='append',
                                             index=False, schema=db_schema,chunksize=5000)
            else:
                logger.error("{} not implemented yet".format(output_type))
        except:
            logger.error("Error in writing the output {}".format(output_layout))
            raise

# ----------------- PROCESS BEGINS  ----------------------

########################################
# Parse arguments
########################################

parser = argparse.ArgumentParser(description='Process data')
parser.add_argument("--metadata_file_name", metavar='Metadata JSON File Name', type=str, required=True,
                    help='Metadata JSON config file')
parser.add_argument("--file_names", metavar='File Names Dict', type=str, default="{}",
                    help='File names in Dict format')
parser.add_argument("--transform_names", metavar='Transform Name', type=str, default="[]",
                    help='Name of transform from metadata json, process all transforms if none specified')

parser.add_argument("--module_name", metavar='Module Name', type=str, default="",
                    help='Module Name')

args = parser.parse_args()

logger.info("=====================================")
logger.info("Arguments: {}".format(vars(args)))
logger.info("=====================================")

process_date = datetime.now().isoformat()

file_names = ast.literal_eval(args.file_names)
staging_db_engine = db.get_db_engine(cfg, 'stg_dsn')
transform_names = ast.literal_eval(args.transform_names)
metadata_file_name = args.metadata_file_name if os.path.isabs(args.metadata_file_name) \
    else os.path.abspath(os.path.expanduser(args.metadata_file_name))

layouts = pd.DataFrame(columns=['type', 'delimiter', 'location', 'key_column', 'query',
                                'header', 'footer', 'quotechar', 'dsn'])
layout_fields = {}
transforms = pd.DataFrame(columns=['input', 'output', 'type', 'trim', 'upper', 'lower'])
transform_fields = {}
inputs = {}
outputs = {}

########################################
# Get lock
########################################
# Acquire lock to make sure only one instance is running for a given metadata JSON
if misc.get_lock("transform_{}".format(os.path.basename(args.metadata_file_name))) is not True:
    logger.error("Failed to get lock. Another instance may be running.")
    sys.exit(1)

try:
    logger.info("=====================================")
    logger.info("Validate the arguments passed")
    logger.info("=====================================")
    if not validate_arguments():
        logger.error("Invalid arguments.")
        sys.exit(1)

    logger.info("=====================================")
    logger.info("Reading data from JSON: {}".format(args.metadata_file_name))
    logger.info("=====================================")
    read_mapping_data()


    logger.info("=====================================")
    logger.info("Validate data")
    logger.info("=====================================")
    if not validate_metadata():
        logger.error("Errors in data.")
        sys.exit(1)

    logger.info("=====================================")
    logger.info("Reading all the inputs")
    logger.info("=====================================")
    if len(transform_names) > 0:
        parse_input("")
    else:
        for transform_name in transform_names:
            parse_input(transform_name)


    logger.info("=====================================")
    logger.info("Apply {} transform".format(transform_names))
    logger.info("=====================================")
    if len(transform_names) > 0:
        process_data("")
    else:
        for transform_name in transform_names:
            process_data(transform_name)

    logger.info("=====================================")
    logger.info("Write the output")
    logger.info("=====================================")
    write_output()
    #write_to_output_upsert()

finally:
    # cleanup
    logger.debug("Clean")
Reply
#2
I don't use json.load, so I'm not totally sure this would work, but try metadata_file.readline() right after line 12.

Also, please don't use global. Parameters and return values are much clearer and less bug prone. See the function tutorial on how to do that.
Craig "Ichabod" O'Brien - xenomind.com
I wish you happiness.
Recommended Tutorials: BBCode, functions, classes, text adventures
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  need to skip password prompt, continue... tester_V 2 1,473 Oct-19-2021, 05:49 PM
Last Post: tester_V
  Delimiters - How to skip some html tags from being translate Melcu54 0 1,662 May-26-2021, 06:21 AM
Last Post: Melcu54
  How to skip to a different line while importing a different script kat_gamer 2 2,248 Feb-03-2021, 04:10 AM
Last Post: deanhystad
  How to skip a folder directory in a loop mfkzolo 2 12,591 Nov-18-2020, 07:56 AM
Last Post: mfkzolo
  How to skip LinkedIn signup link using python script? Mangesh121 0 1,799 Aug-26-2020, 01:22 PM
Last Post: Mangesh121
  How to calculate column mean and row skip non numeric and na Mekala 5 4,953 May-06-2020, 10:52 AM
Last Post: anbu23
  Functions, skip an arugment SpongeB0B 2 2,108 Mar-27-2020, 12:10 PM
Last Post: SpongeB0B
  Skip a line doug2019 4 3,156 Oct-09-2019, 06:56 PM
Last Post: ichabod801
  Python: why skip the 'else' if password is wrong Max_988 1 1,973 Jun-20-2019, 12:19 AM
Last Post: woooee
  Function to skip meta data in a .csv file using Python ajgardev 1 2,518 Jul-22-2018, 12:53 PM
Last Post: Larz60+

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020