Python Forum

Full Version: Endpoint Configuration Issues in Python Script on AWS EC2
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi everyone,

I'm running into a problem with a Python script deployed on an AWS EC2 instance, and I'm hoping to get some help from the community.

Overview of the Issue
My Python script is designed to perform the following tasks:

1. Read data from a CSV file stored in an AWS S3 bucket.
2. Process the data and perform several calculations.
3. Post processed data to an external API (http://myewards.in/api/v1/merchant/GetBi...ilsGinesys).
4. Insert results into a MySQL database.
The script runs fine for a while, successfully processing a few entries and making database updates. However, after some time it stops. Below I am sharing the error log for better understanding.

Error:
DEBUG:botocore.hooks:Event creating-client-class.s3: calling handler <function add_generate_presigned_url at 0x74b326b2e200> DEBUG:botocore.configprovider:Looking for endpoint for s3 via: environment_service DEBUG:botocore.configprovider:Looking for endpoint for s3 via: environment_global DEBUG:botocore.configprovider:Looking for endpoint for s3 via: config_service DEBUG:botocore.configprovider:Looking for endpoint for s3 via: config_global DEBUG:botocore.configprovider:No configured endpoint found. DEBUG:botocore.endpoint:Setting s3 timeout as (300, 300) DEBUG:botocore.loaders:Loading JSON file: /usr/local/lib/python3.10/dist-packages/botocore/data/_retry.json DEBUG:botocore.client:Registering retry handlers for service: s3 DEBUG:botocore.utils:Registering S3 region redirector handler DEBUG:botocore.utils:Registering S3Express Identity Resolver * Serving Flask app 'uploads_code'
Questions
1. What might be causing the "No configured endpoint found" log message from botocore? Could this be a red herring, or is it indicative of a larger configuration issue?
2. Are there best practices for handling long-running Python scripts on EC2 that I might be missing? For example, should I handle retries differently or use a different service configuration?
3. Could there be a better way to manage AWS SDK clients and MySQL connections in a long-running Python application?

Also I am sharing the whole python code here.

import threading
from flask import Flask, jsonify
import csv
import json
import re
from datetime import datetime
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import mysql.connector
from mysql.connector import pooling
import boto3
import time
import os
import logging
from botocore.config import Config

# Initialize Flask application
app = Flask(__name__)

# Setup basic logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# setting up boto3 and botocore logger functions
logging.getLogger('boto3').setLevel(logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.DEBUG)

API_ENDPOINT = "http://myewards.in/api/v1/merchant/GetBillDetailsGinesys"
MAX_WORKERS = 10  # Adjust according to your system capabilities

# Toggle for S3 or local file system
USE_S3 = True

# S3 configuration
S3_BUCKET_NAME=''
AWS_ACCESS_KEY_ID=''
AWS_SECRET_ACCESS_KEY=''
AWS_DEFAULT_REGION=''

# Define a custom configuration for S3 with a 300-second timeout
config = Config(
    region_name=AWS_DEFAULT_REGION,
    connect_timeout=300,  # 300 seconds for connection timeout
    read_timeout=300,      # 300 seconds for read timeout
    signature_version='s3v4'
)

s3 = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_DEFAULT_REGION,
    config=config
) if USE_S3 else None

# Database connection pooling
dbconfig = {
    "host": "",
    "user": "dbuser",
    "password": "",
    "database": ""
}
connection_pool = pooling.MySQLConnectionPool(pool_name="mypool", pool_size=5, **dbconfig)

lock = threading.Lock()

# Define possible date formats
POSSIBLE_DATE_FORMATS = [
    '%Y-%m-%d',
    '%Y-%m-%d %H:%M:%S',
    '%d/%m/%Y',
    '%d/%m/%Y %H:%M:%S',
    '%Y/%m/%d',
    '%Y/%m/%d %H:%M:%S',
    '%d-%m-%Y',
    '%d-%m-%Y %H:%M:%S',
    '%m/%d/%Y',
    '%m/%d/%Y %H:%M:%S',
    '%d %b %Y',
    '%d %b %Y %H:%M:%S',
    '%b %d, %Y',
    '%b %d, %Y %H:%M:%S'
]

# Columns that need to be adjusted based on QTY
ADJUST_COLUMNS = [
    "Qty * MRP", "Qty * ESP", "Promo Amount", "Item Level Discount",
    "Item Gross Amount", "Memo Discount Amount", "Memo Gross Amount",
    "LP Discount Amount", "Net Amount", "Total Discount Amount",
    "TAXAMT", "TAXABLEAMT", "HSN / SAC code of Item", "IGST Rate",
    "IGST Amount", "CGST Rate", "CGST Amount", "SGST Rate",
    "SGST Amount", "CESS Rate", "CESS Amount"
]

def parse_float(value):
    try:
        if isinstance(value, (float, int)):
            return float(value)
        return float(value.replace(',', ''))
    except ValueError:
        return 0.0

def ensure_sign(value, sign):
    try:
        value = float(value)
        return value if (sign == 'positive' and value >= 0) or (sign == 'negative' and value <= 0) else -value
    except ValueError:
        return value

def format_billing_time(date_str, time_str=None):
    if date_str:
        try:
            if time_str:
                billing_datetime = datetime.strptime(date_str + ' ' + time_str, '%Y-%m-%d %H:%M:%S')
            else:
                billing_datetime = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
            return billing_datetime.strftime('%Y-%m-%d %H:%M:%S')
        except ValueError:
            return None
    else:
        return None

def convert_createdon_format(date_str):
    for date_format in POSSIBLE_DATE_FORMATS:
        try:
            date_obj = datetime.strptime(date_str, date_format)
            return date_obj.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
        except ValueError:
            continue
    return date_str  # Return the original string if conversion fails

def convert_billdate_format(date_str):
    for date_format in POSSIBLE_DATE_FORMATS:
        try:
            date_obj = datetime.strptime(date_str, date_format)
            return date_obj.strftime('%Y-%m-%d %H:%M:%S')
        except ValueError:
            continue
    return date_str  # Return the original string if conversion fails

def convert_dob_format(date_str):
    for date_format in POSSIBLE_DATE_FORMATS:
        try:
            date_obj = datetime.strptime(date_str, date_format)
            return date_obj.strftime('%Y-%m-%d')
        except ValueError:
            continue
    return date_str  # Return the original string if conversion fails

def read_csv_from_local(file_path):
    logger.debug(f"Reading CSV from local path: {file_path}")
    data = []
    headers = []
    with open(file_path, mode='r', encoding='utf-8-sig') as file:
        reader = csv.DictReader(file)
        headers = reader.fieldnames
        for row in reader:
            # Convert date formats as per requirement
            if 'BILLDATE' in row:
                row['BILLDATE'] = convert_billdate_format(row['BILLDATE'])
            if 'CREATEDON' in row:
                row['CREATEDON'] = convert_createdon_format(row['CREATEDON'])
            if 'Original Bill Date' in row:
                row['Original Bill Date'] = convert_createdon_format(row['Original Bill Date'])
            if 'Date of Birth' in row:
                row['Date of Birth'] = convert_dob_format(row['Date of Birth'])

            # Update MOBILE values
            if 'MOBILE' in row and 'ISDCODE' in row:
                try:
                    mobile_value = int(row['MOBILE'])
                    if row['ISDCODE'] == '91' and mobile_value < 6000000000:
                        row['MOBILE'] = ""
                except ValueError:
                    row['MOBILE'] = ""

            # Remove special characters in customer name fields
            for field in ["Customer First name", "Customer Middle name", "Customer Last name"]:
                if field in row:
                    if re.search(r'[^a-zA-Z0-9\s]', row[field]):
                        row[field] = ""

            # Ensure columns are negative if QTY is negative
            if 'QTY' in row:
                qty = parse_float(row['QTY'])
                sign = 'negative' if qty < 0 else 'positive'
                for col in ADJUST_COLUMNS:
                    if col in row:
                        row[col] = ensure_sign(row[col], sign)

            data.append(row)
    return headers, data

def read_csv_from_s3(bucket_name, file_path):
    logger.debug(f"Reading CSV from S3 bucket: {bucket_name}, file path: {file_path}")
    data = []
    headers = []
    key = file_path.replace(f's3://{bucket_name}/', '')  # Correctly handle the S3 key
    obj = s3.get_object(Bucket=bucket_name, Key=key)
    body = obj['Body']
    reader = csv.DictReader(body.read().decode('utf-8-sig').splitlines())
    headers = reader.fieldnames
    for row in reader:
        # Convert date formats as per requirement
        if 'BILLDATE' in row:
            row['BILLDATE'] = convert_billdate_format(row['BILLDATE'])
        if 'CREATEDON' in row:
            row['CREATEDON'] = convert_createdon_format(row['CREATEDON'])
        if 'Original Bill Date' in row:
            row['Original Bill Date'] = convert_createdon_format(row['Original Bill Date'])
        if 'Date of Birth' in row:
            row['Date of Birth'] = convert_dob_format(row['Date of Birth'])

        # Update MOBILE values
        if 'MOBILE' in row and 'ISDCode' in row:
            try:
                mobile_value = int(row['MOBILE'])
                if row['ISDCode'] == '91' and mobile_value < 6000000000:
                    row['MOBILE'] = ""
            except ValueError:
                row['MOBILE'] = ""

        # Remove special characters in customer name fields
        for field in ["Customer First name", "Customer Middle name", "Customer Last name"]:
            if field in row:
                if re.search(r'[^a-zA-Z0-9\s]', row[field]):
                    row[field] = ""

        # Ensure columns are negative if QTY is negative
        if 'QTY' in row:
            qty = parse_float(row['QTY'])
            sign = 'negative' if qty < 0 else 'positive'
            for col in ADJUST_COLUMNS:
                if col in row:
                    row[col] = ensure_sign(row[col], sign)

        data.append(row)
    return headers, data

def create_json_payload(merchant_id, customer_key, membership_id):
    taxes = [
        {"name": "IGST", "amount": 0.0},
        {"name": "CGST", "amount": 0.0},
        {"name": "SGST", "amount": 0.0},
        {"name": "CESS", "amount": 0.0}
    ]
    payload = {
        "membershipId": membership_id,
        "merchant_id": merchant_id,
        "customer_key": customer_key,
        "transaction": {
            "loyalty_flag": "1",
            "TerminalId": "",
            "cardNo": "",
            "MRPAmt": 0.0,
            "BasicAmt": 0.0,
            "PromoAmt": 0.0,
            "SaleAmt": 0.0,
            "ChargeAmt": 0.0,
            "NetPayable": 0.0,
            "RoundOff": 0.0,
            "ExTaxAmt": 0.0,
            "MDiscountAmt": 0.0,
            "EMRREDCouponRef": "",
            "CouponCode": "",
            "Remarks": "",
            "PromoNo": "",
            "PromoName": "",
            "CreatedOn": "",
            "CashierName": "",
            "MDiscountDesc": "",
            "IDiscountAmt": 0.0,
            "TotMnDiscount": 0.0,
            "TotPromoAmt": 0.0,
            "LPPointsEarned": 0.0,
            "LPDiscountBenefit": 0.0,
            "UDFSTRING1": "",
            "UDFSTRING2": "",
            "UDFSTRING3": "",
            "UDFSTRING4": "",
            "UDFSTRING5": "",
            "POSMode": "",
            "UDFSTRING6": "",
            "UDFSTRING7": "",
            "UDFSTRING8": "",
            "UDFSTRING9": "",
            "UDFSTRING10": "",
            "UDFNUM01": "",
            "UDFNUM02": "",
            "UDFNUM03": "",
            "UDFNUM04": "",
            "UDFNUM05": "",
            "UDFDATE01": "",
            "UDFDATE02": "",
            "UDFDATE03": "",
            "UDFDATE04": "",
            "UDFDATE05": "",
            "OwnerGSTINNo": "",
            "OwnerGSTINStateCode": "",
            "CustomerGSTIN": "",
            "CustomerGSTStateCode": "",
            "GSTDocNumber": "",
            "BILL_GUID": "",
            "ADMSITE_CODE": "",  # Add ADMSITE_CODE
            "gross_amount": 0.0,
            "discount": 0.0,
            "taxes": taxes,
            "return_gross_amount": 0.0,
            "loyalty_gross_amount": 0.0,
            "loyalty_tax_amount": 0.0,
            "loyalty_charge_amount": 0.0,
            "return_discount": 0.0,
            "type": "regular",
            "net_amount": 0.0,
            "amount": 0.0,
            "return_net_amount": 0.0,
            "return_amount": 0.0,
            "number": "",  # Mapping BILLNO to number
            "redemption": {
                "reward_id": "",
                "redeemed_amount": ""
            },
            "billing_time": "",
            "return_bill": "",
            "MCouponAmount": 0.0,
            "items": [],
            "return_items": [],  # Added return_items field
            "payment_details": {
                "payment": [
                    {"mode": "", "value": 0.0},
                    {"mode": "", "value": 0.0}
                ],
                "return_payment": []
            },
            "charges": [],
            "return_charges": [],
            "return_taxes": []
        },
        "customer": {
            "mobile": "",
            "name": "",
            "email": "",
            "ISDCode": "",
            "dob": "",
            "anniversary": "",
            "pincode": "",
            "address": "",
            "Address1": "",
            "city": "",
            "state": "",
            "lastname": "",
            "gender": "",
            "membershipcardnumber": "",
            "isEmployee": "",
            "MName": "",
            "Address2": "",
            "Address3": "",
            "SpouseName": "",
            "Salutation": "",
            "profession": "",
            "Country": "",
            "Remarks": "",
            "LPCardNo": "",
            "PanNo": ""
        }
    }
    return payload

def create_item_details(row):
    qty = parse_float(row.get("QTY", 0.0))
    cgst_amount = parse_float(row.get("CGST Amount", 0.0))
    sgst_amount = parse_float(row.get("SGST Amount", 0.0))
    total_discount = parse_float(row.get("Total Discount Amount", 0.0))
    esp = parse_float(row.get("Extended Sale Price", 0.0))

    # Updated calculations
    rsp = esp - ((cgst_amount + sgst_amount) / qty) if qty != 0 else 0
    rate = rsp
    marked_price = parse_float(row.get("Maximum Retail Price", 0.0)) - ((cgst_amount + sgst_amount) / qty) if qty != 0 else 0
    item_value = qty * rate
    i_discount_amt = parse_float(row.get("Item Level Discount", 0.0))
    m_discount_amt = parse_float(row.get("Memo Discount Amount", 0.0))
    promo_amt = parse_float(row.get("Promo Amount", 0.0))
    lp_discount = parse_float(row.get("LP Discount Amount", 0.0))
    discount = i_discount_amt + promo_amt + m_discount_amt + lp_discount
    discountAmt = i_discount_amt + m_discount_amt
    amount = item_value - discount
    cost_price = amount
    mrp_amt = parse_float(row.get("Maximum Retail Price", 0.0)) * qty

    item = {
        "description": row.get("ITEM", ""),
        "Division": row.get("DIVISION", ""),
        "item_code": row.get("ICODE", ""),
        "hsn_code": row.get("HSN / SAC code of Item", ""),
        "bar_code": row.get("BARCODE", ""),  # bar_code set to item_code
        "qty": qty,
        "rate": rate,  # Updated rate calculation
        "value": item_value,
        "discount": round(discount, 2),  # Updated discount calculation
        "marked_price": marked_price,  # Updated marked price calculation
        "rsp": round(rate, 2),  # Updated RSP calculation
        "amount": round(amount, 2),  # Updated amount calculation
        "cost_price": round(cost_price, 2),  # Updated cost price calculation
        "taxes": [
            {"name": "IGST", "amount": parse_float(row.get("IGST Amount", 0.0))},
            {"name": "CGST", "amount": cgst_amount},
            {"name": "SGST", "amount": sgst_amount},
            {"name": "CESS", "amount": parse_float(row.get("CESS Amount", 0.0))}
        ],
        "lp_discount": lp_discount,
        "TotMnDiscount": m_discount_amt,  # Updated TotMnDiscount calculation
        "TotPromoAmt": 0.0,  # Added TotPromoAmt calculation
        "Section": row.get("SECTION", ""),
        "Department": row.get("DEPARTMENT", ""),
        "Article": row.get("ARTICLE", ""),
        "ESP": esp,
        "MRPAmt": mrp_amt,  # Updated MRPAmt calculation
        "BasicAmt": esp * qty,
        "PromoAmt": promo_amt,
        "IDiscountAmt": i_discount_amt,  # Updated IDiscountAmt calculation
        "IGrossAmt": parse_float(row.get("Item Gross Amount", 0.0)),
        "MDiscountAmt": m_discount_amt,  # Updated MDiscountAmt calculation
        "DiscountAmt": round(discountAmt, 2),  # Updated DiscountAmt calculation
        "NetAmt": parse_float(row.get("Net Amount", 0.0)),
        "TaxPercent": str(parse_float(row.get("CGST Rate", 0)) + parse_float(row.get("SGST Rate", 0)) + parse_float(row.get("CESS Rate", 0))),
        "TaxAmt": parse_float(row.get("TAXAMT", 0.0)),
        "IDiscountBasis": row.get("IDiscount Basis", ""),
        "IDiscountFactor": parse_float(row.get("IDiscount Factor", 0.0)),
        "MDiscountFactor": parse_float(row.get("MDiscount Factor", 0.0)),
        "PromoDiscountFactor": parse_float(row.get("Promo Discount Factor", 0.0)),
        "TaxableAmt": parse_float(row.get("TAXABLEAMT", 0.0)),
        "LPDiscountFactor": parse_float(row.get("LP Discount Factor", 0.0)),
        "MGrossAmt": parse_float(row.get("Memo Gross Amount", 0.0)),
        "RefBillNo": row.get("Original Bill No", ""),
        "SerialNo": row.get("Serial No", ""),
        "TaxDescription": row.get("Tax Description", ""),
        "IDiscountDesc": row.get("IDiscount Desc", ""),
        "Remarks": row.get("Remarks", ""),
        "PromoName": row.get("PROMONAME", ""),
        "ExTaxAmt": parse_float(row.get("Ex Tax Amount", 0.0)),
        "ReturnReason": row.get("Return Reason", ""),
        "TaxRegime": row.get("Tax Regime", ""),
        "IGSTRate": parse_float(row.get("IGST Rate", 0.0)),
        "IGSTAmt": parse_float(row.get("IGST Amount", 0.0)),
        "CGSTRate": parse_float(row.get("CGST Rate", 0.0)),
        "CGSTAmt": cgst_amount,
        "SGSTRate": parse_float(row.get("SGST Rate", 0.0)),
        "SGSTAmt": sgst_amount,
        "CESSRate": parse_float(row.get("CESS Rate", 0.0)),
        "CESSAmt": parse_float(row.get("CESS Amount", 0.0)),
        "ExtraChgFactor": parse_float(row.get("Extra Charge Factor", 0.0)),
        "PromoDiscountType": row.get("Promo Discount Type", ""),
        "RefBillDate": "",  # Ensure these fields are wrapped in double quotes
        "RefStoreCUID": row.get("Org Bill Generated from Site", ""),
        "SalesPersonFName": row.get("SalesPersonFName", ""),
        "SalesPersonMName": row.get("SalesPersonMName", ""),
        "SalesPersonLName": row.get("SalesPersonLName", ""),
        "Cat1": row.get("CAT1", ""),
        "Cat2": row.get("CAT2", ""),
        "Cat3": row.get("CAT3", ""),
        "Cat4": row.get("CAT4", ""),
        "Cat5": row.get("CAT5", ""),
        "Cat6": row.get("CAT6", ""),
        "MCouponAmount": row.get("M Coupon Amount","")  # Ensure MCouponAmount is initialized as a float
    }
    return item

def create_return_payload(merchant_id, customer_key, membership_id):
    return_payload = {
        "membershipId": membership_id,
        "merchant_id": merchant_id,
        "customer_key": customer_key,
        "transaction": {
            "loyalty_flag": "1",
            "TerminalId": "",
            "cardNo": "",
            "MRPAmt": 0.0,
            "BasicAmt": 0.0,
            "PromoAmt": 0.0,
            "SaleAmt": 0.0,
            "ChargeAmt": 0.0,
            "NetPayable": 0.0,
            "RoundOff": 0.0,
            "ExTaxAmt": 0.0,
            "MDiscountAmt": 0.0,
            "EMRREDCouponRef": "",
            "CouponCode": "",
            "Remarks": "",
            "PromoNo": "",
            "PromoName": "",
            "CreatedOn": "",
            "CashierName": "",
            "MDiscountDesc": "",
            "IDiscountAmt": 0.0,
            "TotMnDiscount": 0.0,
            "TotPromoAmt": 0.0,
            "LPPointsEarned": 0.0,
            "LPDiscountBenefit": 0.0,
            "UDFSTRING1": "",
            "UDFSTRING2": "",
            "UDFSTRING3": "",
            "UDFSTRING4": "",
            "UDFSTRING5": "",
            "POSMode": "",
            "UDFSTRING6": "",
            "UDFSTRING7": "",
            "UDFSTRING8": "",
            "UDFSTRING9": "",
            "UDFSTRING10": "",
            "UDFNUM01": "",
            "UDFNUM02": "",
            "UDFNUM03": "",
            "UDFNUM04": "",
            "UDFNUM05": "",
            "UDFDATE01": "",
            "UDFDATE02": "",
            "UDFDATE03": "",
            "UDFDATE04": "",
            "UDFDATE05": "",
            "OwnerGSTINNo": "",
            "OwnerGSTINStateCode": "",
            "CustomerGSTIN": "",
            "CustomerGSTStateCode": "",
            "GSTDocNumber": "",
            "BILL_GUID": "",
            "ADMSITE_CODE": "",  # Add ADMSITE_CODE
            "gross_amount": "",
            "discount": 0.0,
            "taxes": [],
            "return_gross_amount": 0.0,
            "loyalty_gross_amount": "",
            "loyalty_tax_amount": "",
            "loyalty_charge_amount": 0.0,
            "return_loyalty_gross_amount": 0.0,
            "return_loyalty_tax_amount": 0.0,
            "return_loyalty_charge_amount": "",
            "return_discount": 0.0,
            "type": "return",
            "net_amount": "",
            "amount": "",
            "return_net_amount": 0.0,
            "return_amount": 0.0,
            "number": "",  # Mapping BILLNO to number
            "redemption": {
                "reward_id": "",
                "redeemed_amount": ""
            },
            "billing_time": "",
            "return_bill_number": "",
            "return_bill_date": "",  # Ensure these fields are wrapped in double quotes
            "return_taxes": [
                {"name": "CGST", "amount": 0.0},
                {"name": "SGST", "amount": 0.0}
            ],
            "return_bill": "1",
            "MCouponAmount": 0.0,
            "return_items": [],  # Changed items to return_items
            "payment_details": {
                "payment": [],
                "return_payment": [
                    {"mode": "", "value": 0.0},
                    {"mode": "", "value": 0.0}
                ]
            },
            "charges": [],
            "return_charges": [],
            "items": [],
            "taxes": []
        },
        "customer": {
            "mobile": "",
            "name": "",
            "email": "",
            "ISDCode": "",
            "dob": "",
            "anniversary": "",
            "pincode": "",
            "address": "",
            "Address1": "",
            "city": "",
            "state": "",
            "lastname": "",
            "gender": "",
            "membershipcardnumber": "",
            "isEmployee": "",
            "MName": "",
            "Address2": "",
            "Address3": "",
            "SpouseName": "",
            "Salutation": "",
            "profession": "",
            "Country": "",
            "Remarks": "",
            "LPCardNo": "",
            "PanNo": ""
        }
    }
    return return_payload

def merge_data(billing_data, outlet_data, merchant_id):
    bills = defaultdict(lambda: {
        'normal_data': create_json_payload(merchant_id, None, None),
        'return_data': create_return_payload(merchant_id, None, None)
    })

    for row in billing_data:
        for outlet_row in outlet_data:
            if row["ADMSITE_CODE"] == outlet_row["ADMSITE_CODE"]:
                customer_key = outlet_row["pos_merchant_id"]
                bill_number = row["BILLNO"]
                bill_guid = row["GUID"]
                membership_id = row["MOBILE"]

                qty = parse_float(row.get("QTY", 0.0))

                if qty < 0:
                    payload_type = 'return_data'
                else:
                    payload_type = 'normal_data'

                bill = bills[bill_number][payload_type]
                bill["customer_key"] = customer_key
                bill["membershipId"] = membership_id
                bill["transaction"]["BILL_GUID"] = bill_guid
                bill["transaction"]["number"] = bill_number
                bill["transaction"]["CashierName"]= row["CREATEBY"]
                bill["transaction"]["TerminalId"] = row.get("TERMINALID", "")
                bill["transaction"]["PromoNo"] = row.get("M_PROMONO", "")
                bill["transaction"]["PromoName"] = row.get("M_PROMONAME", "")
                bill["transaction"]["POSMode"] = row.get("Mode", "")
                bill["transaction"]["ADMSITE_CODE"] = row["ADMSITE_CODE"]  # Set ADMSITE_CODE

                item = create_item_details(row)
                if payload_type == 'return_data':
                    bill["transaction"]["return_items"].append(item)
                else:
                    bill["transaction"]["items"].append(item)

                # Update other transaction fields if necessary (e.g., set CreatedOn, billing_time, etc.)
                bill["transaction"]["CreatedOn"] = row.get("CREATEDON", "")
                bill["transaction"]["billing_time"] = row.get("BILLDATE", "")
                bill["transaction"]["cardNo"] = row.get("cardNo", "")

                bill["customer"]["mobile"] = row.get("MOBILE", "")
                bill["customer"]["name"] = row.get("Customer First name", "")
                bill["customer"]["MName"] = row.get("Customer Middle name", "")
                bill["customer"]["lastname"] = row.get("Customer Last name", "")
                bill["customer"]["gender"] = row.get("GENDER", "")
                bill["customer"]["Address1"] = row.get("ADDRESS1", "")
                bill["customer"]["Address2"] = row.get("ADDRESS2", "")
                bill["customer"]["Address3"] = row.get("ADDRESS3", "")
                bill["customer"]["ISDCode"] = row.get("ISDCODE", "")
                bill["customer"]["email"] = row.get("EMAIL", "")
                bill["customer"]["dob"] = row.get("Date of Birth", "")
                bill["customer"]["city"] = row.get("CITY", "")
                bill["customer"]["state"] = row.get("STATE", "")
                bill["customer"]["Country"] = row.get("COUNTRY", "")
                bill["customer"]["PIN"] = row.get("PIN", "")

                if payload_type == 'return_data':
                    bill["transaction"]["return_bill_number"] = row.get("Original Bill No", "")
                    bill["transaction"]["return_bill_date"] = row.get("Original Bill Date", "")

                break

    normal_data = []
    return_data = []

    for bill_number, data in bills.items():
        normal_payload = data['normal_data']
        return_payload = data['return_data']

        def calculate_totals(payload, is_return=False):
            items_key = "return_items" if is_return else "items"
            total_mrp_amt = sum(item["MRPAmt"] for item in payload["transaction"][items_key])
            total_basic_amt = sum(item["BasicAmt"] for item in payload["transaction"][items_key])
            total_promo_amt = sum(item["PromoAmt"] for item in payload["transaction"][items_key])
            total_m_discount_amt = sum(item["MDiscountAmt"] for item in payload["transaction"][items_key])
            total_i_discount_amt = sum(item["IDiscountAmt"] for item in payload["transaction"][items_key])
            total_gross_amount = sum(item["value"] for item in payload["transaction"][items_key])
            total_net_amount = sum(item["NetAmt"] for item in payload["transaction"][items_key])
            total_amount = sum(item["amount"] for item in payload["transaction"][items_key])
            total_igross_amt = sum(item["IGrossAmt"] for item in payload["transaction"][items_key])
            total_cgst_amount = sum(item["taxes"][1]["amount"] for item in payload["transaction"][items_key])
            total_sgst_amount = sum(item["taxes"][2]["amount"] for item in payload["transaction"][items_key])
            total_discount = sum(item["discount"] for item in payload["transaction"][items_key])
            total_tot_mn_discount = sum(item["TotMnDiscount"] for item in payload["transaction"][items_key])
            total_tot_promo_amt = sum(item["TotPromoAmt"] for item in payload["transaction"][items_key])
            total_MCouponAmount = sum(parse_float(item["MCouponAmount"]) for item in payload["transaction"][items_key])

            # Calculate loyalty_gross_amount
            loyalty_gross_amount = sum(
                item["IGrossAmt"] - item["TaxAmt"] for item in payload["transaction"][items_key] if item["IDiscountAmt"] == 0
            )

            payload["transaction"]["MRPAmt"] = round(total_mrp_amt, 2)
            payload["transaction"]["BasicAmt"] = round(total_basic_amt, 2)
            payload["transaction"]["PromoAmt"] = round(total_promo_amt, 2)
            payload["transaction"]["MDiscountAmt"] = round(total_m_discount_amt, 2)
            payload["transaction"]["IDiscountAmt"] = round(total_i_discount_amt, 2)
            payload["transaction"]["TotMnDiscount"] = round(total_tot_mn_discount, 2)
            payload["transaction"]["gross_amount"] = round(total_gross_amount, 2)
            payload["transaction"]["net_amount"] = round(total_gross_amount - total_discount, 2)
            payload["transaction"]["amount"] = round(total_net_amount, 2)
            payload["transaction"]["SaleAmt"] = round(total_basic_amt - total_promo_amt, 2)
            payload["transaction"]["discount"] = round(total_discount, 2)
            payload["transaction"]["MCouponAmount"] = round(total_MCouponAmount, 2)

            if is_return:
                payload["transaction"]["return_gross_amount"] = total_gross_amount
                payload["transaction"]["return_loyalty_gross_amount"] = loyalty_gross_amount
                payload["transaction"]["return_loyalty_tax_amount"] = sum(item["TaxAmt"] for item in payload["transaction"]["return_items"])
                payload["transaction"]["return_net_amount"] = total_net_amount
                payload["transaction"]["return_amount"] = total_amount
                payload["transaction"]["return_taxes"] = [
                    {"name": "CGST", "amount": round(total_cgst_amount, 2)},
                    {"name": "SGST", "amount": round(total_sgst_amount, 2)}
                ]
                payload["transaction"]["gross_amount"] = ""
                payload["transaction"]["loyalty_gross_amount"] = ""
                payload["transaction"]["loyalty_tax_amount"] = ""
                payload["transaction"]["net_amount"] = ""
                payload["transaction"]["amount"] = ""
                payload["transaction"]["NetPayable"], payload["transaction"]["RoundOff"] = round_off(total_net_amount + payload["transaction"]["RoundOff"])
                payload["transaction"]["NetPayable"] = str(payload["transaction"]["NetPayable"])
                payload["transaction"]["RoundOff"] = str(payload["transaction"]["RoundOff"])
            else:
                payload["transaction"]["taxes"] = [
                    {"name": "CGST", "amount": round(total_cgst_amount, 2)},
                    {"name": "SGST", "amount": round(total_sgst_amount, 2)}
                ]
                payload["transaction"]["loyalty_gross_amount"] = loyalty_gross_amount
                payload["transaction"]["NetPayable"], payload["transaction"]["RoundOff"] = round_off(total_net_amount + payload["transaction"]["RoundOff"])

        if normal_payload["transaction"]["items"]:
            calculate_totals(normal_payload)
            normal_data.append(normal_payload)

        if return_payload["transaction"]["return_items"]:
            calculate_totals(return_payload, is_return=True)
            return_data.append(return_payload)

    return normal_data, return_data

def round_off(amount):
    rounded_amount = round(amount)
    roundoff_value = rounded_amount - amount
    return rounded_amount, roundoff_value

def send_payload_to_api(payload):
    try:
        response = requests.post(API_ENDPOINT, json=payload, verify=False)
        response.raise_for_status()  # Raise an exception for non-200 status codes
        return True, None  # Successful
    except requests.RequestException as e:
        return False, str(e)

def process_data_chunk(chunk, log_filename):
    success_count = 0
    failure_messages = []

    # Ensure CSV log file is created with headers if it doesn't exist
    log_file_exists = os.path.isfile(log_filename)
    with open(log_filename, 'a', newline='') as log_file:
        log_writer = csv.writer(log_file)
        if not log_file_exists:
            log_writer.writerow(["unique_id", "status", "time_taken", "error"])

        for payload in chunk:
            start_time = time.time()
            qty = sum(item['qty'] for item in payload['transaction']['items']) if payload['transaction']['items'] else sum(item['qty'] for item in payload['transaction']['return_items'])
            bill_type = '0' if qty > 0 else '1'
            unique_id = f"{payload['transaction']['ADMSITE_CODE']}_{payload['transaction']['billing_time']}_{payload['transaction']['BILL_GUID']}_{bill_type}"
            logger.debug(f"Sending payload to API, unique_id: {unique_id}")
            success, message = send_payload_to_api(payload)
            end_time = time.time()
            time_taken = end_time - start_time

            if success:
                success_count += 1
                log_writer.writerow([unique_id, "success", time_taken, ""])
            else:
                failure_messages.append(message)
                log_writer.writerow([unique_id, "failure", time_taken, message])

    return success_count, failure_messages

def process_and_print_results(data_chunks, log_filename):
    total_success_count = 0
    total_failure_messages = []

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(process_data_chunk, chunk, log_filename) for chunk in data_chunks]
        for future in as_completed(futures):
            success_count, failure_messages = future.result()
            total_success_count += success_count
            total_failure_messages.extend(failure_messages)

    logger.debug(f"Successful API calls: {total_success_count}")
    if total_failure_messages:
        logger.debug("\nFailure messages:")
        for message in total_failure_messages:
            logger.debug(message)

def write_output_with_unique_id(headers, billing_data, output_filename):
    headers.append('unique_id')
    with open(output_filename, 'w', newline='', encoding='utf-8-sig') as file:
        writer = csv.DictWriter(file, fieldnames=headers)
        writer.writeheader()
        for row in billing_data:
            qty = parse_float(row.get("QTY", 0.0))
            bill_type = '0' if qty > 0 else '1'
            unique_id = f"{row['ADMSITE_CODE']}_{row['BILLDATE']}_{row['GUID']}_{bill_type}"
            row['unique_id'] = unique_id
            writer.writerow(row)

def fetch_data_from_db():
    connection = connection_pool.get_connection()
    cursor = connection.cursor(dictionary=True)

    try:
        # Fetching data from upload_automation_tracker table where status = 0
        query = """
        SELECT
            id,
            merchant_id,
            date,
            path,
            status,
            created_at,
            updated_at
        FROM upload_automation_tracker
        WHERE status = 0
        ORDER BY created_at ASC;
        """
        cursor.execute(query)
        tracker_results = cursor.fetchall()

        # Process each tracker result to get corresponding merchant details
        for row in tracker_results:
            merchant_id = row["merchant_id"]
            path = row["path"]
            date = row["date"]
            log_filename = f"upload_log_{merchant_id}_{date}.csv"
            output_filename = f"output_with_unique_id_{merchant_id}_{date}.csv"

            if not path:
                # Update the status to 3 if the path is empty
                with lock:
                    update_query = "UPDATE upload_automation_tracker SET status = 3 WHERE id = %s"
                    cursor.execute(update_query, (row["id"],))
                    connection.commit()
                logger.warning(f"File path not found for id: {row['id']}")
                continue

            # Update the status to 4 while processing
            with lock:
                update_query = "UPDATE upload_automation_tracker SET status = 4 WHERE id = %s"
                cursor.execute(update_query, (row["id"],))
                connection.commit()

            # Fetching data from pos_registration table
            pos_query = """
            SELECT
                pos_merchant_id,
                ADMSITE_CODE
            FROM pos_registration
            WHERE merchant_id = %s;
            """
            cursor.execute(pos_query, (merchant_id,))
            pos_results = cursor.fetchall()

            if not pos_results:
                # Update the status to 2 if the merchant_id is not found
                with lock:
                    update_query = "UPDATE upload_automation_tracker SET status = 2 WHERE id = %s"
                    cursor.execute(update_query, (row["id"],))
                    connection.commit()
                logger.warning(f"Merchant ID not found in pos_registration table for id: {row['id']}")
                continue

            # Assuming pos_registration has a unique (merchant_id, ADMSITE_CODE) pair
            adm_site_code = pos_results[0]["ADMSITE_CODE"]

            # Read CSV file from S3 or local file system
            if USE_S3:
                headers, billing_data = read_csv_from_s3(S3_BUCKET_NAME, path)
            else:
                headers, billing_data = read_csv_from_local(path)

            # Write output file with unique_id column
            write_output_with_unique_id(headers, billing_data, output_filename)

            # Merge and process the data
            normal_data, return_data = merge_data(billing_data, pos_results, merchant_id)

            normal_chunks = [normal_data[i:i + MAX_WORKERS] for i in range(0, len(normal_data), MAX_WORKERS)]
            return_chunks = [return_data[i:i + MAX_WORKERS] for i in range(0, len(return_data), MAX_WORKERS)]

            logger.debug(f"Processing normal bills for merchant_id: {merchant_id}")
            process_and_print_results(normal_chunks, log_filename)

            logger.debug(f"Processing return bills for merchant_id: {merchant_id}")
            process_and_print_results(return_chunks, log_filename)

            # Update the status to 1 (processed)
            with lock:
                update_query = "UPDATE upload_automation_tracker SET status = 1 WHERE id = %s"
                cursor.execute(update_query, (row["id"],))
                connection.commit()

    except mysql.connector.Error as err:
        logger.error(f"Error: {err}")
    finally:
        cursor.close()
        connection.close()

@app.route('/trigger', methods=['POST'])
def trigger():
    try:
        logger.debug("Trigger endpoint called")
        fetch_data_from_db()
        return jsonify({"message": "Data processing triggered successfully."}), 200
    except Exception as e:
        logger.error(f"Error during trigger: {e}")
        return jsonify({"error": str(e)}), 500

def run_app():
    app.run(host='0.0.0.0', port='80')

# Run the Flask app in a separate thread
flask_thread = threading.Thread(target=run_app)
flask_thread.start()