Aug-31-2024, 10:22 AM
Hi everyone,
I'm running into a problem with a Python script deployed on an AWS EC2 instance, and I'm hoping to get some help from the community.
Overview of the Issue
My Python script is designed to perform the following tasks:
1. Read data from a CSV file stored in an AWS S3 bucket.
2. Process the data and perform several calculations.
3. Post processed data to an external API (http://myewards.in/api/v1/merchant/GetBi...ilsGinesys).
4. Insert results into a MySQL database.
The script runs fine for a while, successfully processing a few entries and making database updates. However, after some time it stops. Below I am sharing the error log for better understanding.
1. What might be causing the "No configured endpoint found" log message from botocore? Could this be a red herring, or is it indicative of a larger configuration issue?
2. Are there best practices for handling long-running Python scripts on EC2 that I might be missing? For example, should I handle retries differently or use a different service configuration?
3. Could there be a better way to manage AWS SDK clients and MySQL connections in a long-running Python application?
Also I am sharing the whole python code here.
I'm running into a problem with a Python script deployed on an AWS EC2 instance, and I'm hoping to get some help from the community.
Overview of the Issue
My Python script is designed to perform the following tasks:
1. Read data from a CSV file stored in an AWS S3 bucket.
2. Process the data and perform several calculations.
3. Post processed data to an external API (http://myewards.in/api/v1/merchant/GetBi...ilsGinesys).
4. Insert results into a MySQL database.
The script runs fine for a while, successfully processing a few entries and making database updates. However, after some time it stops. Below I am sharing the error log for better understanding.
Error:DEBUG:botocore.hooks:Event creating-client-class.s3: calling handler <function add_generate_presigned_url at 0x74b326b2e200>
DEBUG:botocore.configprovider:Looking for endpoint for s3 via: environment_service
DEBUG:botocore.configprovider:Looking for endpoint for s3 via: environment_global
DEBUG:botocore.configprovider:Looking for endpoint for s3 via: config_service
DEBUG:botocore.configprovider:Looking for endpoint for s3 via: config_global
DEBUG:botocore.configprovider:No configured endpoint found.
DEBUG:botocore.endpoint:Setting s3 timeout as (300, 300)
DEBUG:botocore.loaders:Loading JSON file: /usr/local/lib/python3.10/dist-packages/botocore/data/_retry.json
DEBUG:botocore.client:Registering retry handlers for service: s3
DEBUG:botocore.utils:Registering S3 region redirector handler
DEBUG:botocore.utils:Registering S3Express Identity Resolver
* Serving Flask app 'uploads_code'
Questions1. What might be causing the "No configured endpoint found" log message from botocore? Could this be a red herring, or is it indicative of a larger configuration issue?
2. Are there best practices for handling long-running Python scripts on EC2 that I might be missing? For example, should I handle retries differently or use a different service configuration?
3. Could there be a better way to manage AWS SDK clients and MySQL connections in a long-running Python application?
Also I am sharing the whole python code here.
import threading from flask import Flask, jsonify import csv import json import re from datetime import datetime import requests from concurrent.futures import ThreadPoolExecutor, as_completed from collections import defaultdict import mysql.connector from mysql.connector import pooling import boto3 import time import os import logging from botocore.config import Config # Initialize Flask application app = Flask(__name__) # Setup basic logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # setting up boto3 and botocore logger functions logging.getLogger('boto3').setLevel(logging.DEBUG) logging.getLogger('botocore').setLevel(logging.DEBUG) API_ENDPOINT = "http://myewards.in/api/v1/merchant/GetBillDetailsGinesys" MAX_WORKERS = 10 # Adjust according to your system capabilities # Toggle for S3 or local file system USE_S3 = True # S3 configuration S3_BUCKET_NAME='' AWS_ACCESS_KEY_ID='' AWS_SECRET_ACCESS_KEY='' AWS_DEFAULT_REGION='' # Define a custom configuration for S3 with a 300-second timeout config = Config( region_name=AWS_DEFAULT_REGION, connect_timeout=300, # 300 seconds for connection timeout read_timeout=300, # 300 seconds for read timeout signature_version='s3v4' ) s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=AWS_DEFAULT_REGION, config=config ) if USE_S3 else None # Database connection pooling dbconfig = { "host": "", "user": "dbuser", "password": "", "database": "" } connection_pool = pooling.MySQLConnectionPool(pool_name="mypool", pool_size=5, **dbconfig) lock = threading.Lock() # Define possible date formats POSSIBLE_DATE_FORMATS = [ '%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%d/%m/%Y', '%d/%m/%Y %H:%M:%S', '%Y/%m/%d', '%Y/%m/%d %H:%M:%S', '%d-%m-%Y', '%d-%m-%Y %H:%M:%S', '%m/%d/%Y', '%m/%d/%Y %H:%M:%S', '%d %b %Y', '%d %b %Y %H:%M:%S', '%b %d, %Y', '%b %d, %Y %H:%M:%S' ] # Columns that need to be adjusted based on QTY ADJUST_COLUMNS = [ "Qty * MRP", "Qty * ESP", "Promo Amount", "Item Level Discount", "Item Gross Amount", "Memo Discount Amount", "Memo Gross Amount", "LP Discount Amount", "Net Amount", "Total Discount Amount", "TAXAMT", "TAXABLEAMT", "HSN / SAC code of Item", "IGST Rate", "IGST Amount", "CGST Rate", "CGST Amount", "SGST Rate", "SGST Amount", "CESS Rate", "CESS Amount" ] def parse_float(value): try: if isinstance(value, (float, int)): return float(value) return float(value.replace(',', '')) except ValueError: return 0.0 def ensure_sign(value, sign): try: value = float(value) return value if (sign == 'positive' and value >= 0) or (sign == 'negative' and value <= 0) else -value except ValueError: return value def format_billing_time(date_str, time_str=None): if date_str: try: if time_str: billing_datetime = datetime.strptime(date_str + ' ' + time_str, '%Y-%m-%d %H:%M:%S') else: billing_datetime = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') return billing_datetime.strftime('%Y-%m-%d %H:%M:%S') except ValueError: return None else: return None def convert_createdon_format(date_str): for date_format in POSSIBLE_DATE_FORMATS: try: date_obj = datetime.strptime(date_str, date_format) return date_obj.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] except ValueError: continue return date_str # Return the original string if conversion fails def convert_billdate_format(date_str): for date_format in POSSIBLE_DATE_FORMATS: try: date_obj = datetime.strptime(date_str, date_format) return date_obj.strftime('%Y-%m-%d %H:%M:%S') except ValueError: continue return date_str # Return the original string if conversion fails def convert_dob_format(date_str): for date_format in POSSIBLE_DATE_FORMATS: try: date_obj = datetime.strptime(date_str, date_format) return date_obj.strftime('%Y-%m-%d') except ValueError: continue return date_str # Return the original string if conversion fails def read_csv_from_local(file_path): logger.debug(f"Reading CSV from local path: {file_path}") data = [] headers = [] with open(file_path, mode='r', encoding='utf-8-sig') as file: reader = csv.DictReader(file) headers = reader.fieldnames for row in reader: # Convert date formats as per requirement if 'BILLDATE' in row: row['BILLDATE'] = convert_billdate_format(row['BILLDATE']) if 'CREATEDON' in row: row['CREATEDON'] = convert_createdon_format(row['CREATEDON']) if 'Original Bill Date' in row: row['Original Bill Date'] = convert_createdon_format(row['Original Bill Date']) if 'Date of Birth' in row: row['Date of Birth'] = convert_dob_format(row['Date of Birth']) # Update MOBILE values if 'MOBILE' in row and 'ISDCODE' in row: try: mobile_value = int(row['MOBILE']) if row['ISDCODE'] == '91' and mobile_value < 6000000000: row['MOBILE'] = "" except ValueError: row['MOBILE'] = "" # Remove special characters in customer name fields for field in ["Customer First name", "Customer Middle name", "Customer Last name"]: if field in row: if re.search(r'[^a-zA-Z0-9\s]', row[field]): row[field] = "" # Ensure columns are negative if QTY is negative if 'QTY' in row: qty = parse_float(row['QTY']) sign = 'negative' if qty < 0 else 'positive' for col in ADJUST_COLUMNS: if col in row: row[col] = ensure_sign(row[col], sign) data.append(row) return headers, data def read_csv_from_s3(bucket_name, file_path): logger.debug(f"Reading CSV from S3 bucket: {bucket_name}, file path: {file_path}") data = [] headers = [] key = file_path.replace(f's3://{bucket_name}/', '') # Correctly handle the S3 key obj = s3.get_object(Bucket=bucket_name, Key=key) body = obj['Body'] reader = csv.DictReader(body.read().decode('utf-8-sig').splitlines()) headers = reader.fieldnames for row in reader: # Convert date formats as per requirement if 'BILLDATE' in row: row['BILLDATE'] = convert_billdate_format(row['BILLDATE']) if 'CREATEDON' in row: row['CREATEDON'] = convert_createdon_format(row['CREATEDON']) if 'Original Bill Date' in row: row['Original Bill Date'] = convert_createdon_format(row['Original Bill Date']) if 'Date of Birth' in row: row['Date of Birth'] = convert_dob_format(row['Date of Birth']) # Update MOBILE values if 'MOBILE' in row and 'ISDCode' in row: try: mobile_value = int(row['MOBILE']) if row['ISDCode'] == '91' and mobile_value < 6000000000: row['MOBILE'] = "" except ValueError: row['MOBILE'] = "" # Remove special characters in customer name fields for field in ["Customer First name", "Customer Middle name", "Customer Last name"]: if field in row: if re.search(r'[^a-zA-Z0-9\s]', row[field]): row[field] = "" # Ensure columns are negative if QTY is negative if 'QTY' in row: qty = parse_float(row['QTY']) sign = 'negative' if qty < 0 else 'positive' for col in ADJUST_COLUMNS: if col in row: row[col] = ensure_sign(row[col], sign) data.append(row) return headers, data def create_json_payload(merchant_id, customer_key, membership_id): taxes = [ {"name": "IGST", "amount": 0.0}, {"name": "CGST", "amount": 0.0}, {"name": "SGST", "amount": 0.0}, {"name": "CESS", "amount": 0.0} ] payload = { "membershipId": membership_id, "merchant_id": merchant_id, "customer_key": customer_key, "transaction": { "loyalty_flag": "1", "TerminalId": "", "cardNo": "", "MRPAmt": 0.0, "BasicAmt": 0.0, "PromoAmt": 0.0, "SaleAmt": 0.0, "ChargeAmt": 0.0, "NetPayable": 0.0, "RoundOff": 0.0, "ExTaxAmt": 0.0, "MDiscountAmt": 0.0, "EMRREDCouponRef": "", "CouponCode": "", "Remarks": "", "PromoNo": "", "PromoName": "", "CreatedOn": "", "CashierName": "", "MDiscountDesc": "", "IDiscountAmt": 0.0, "TotMnDiscount": 0.0, "TotPromoAmt": 0.0, "LPPointsEarned": 0.0, "LPDiscountBenefit": 0.0, "UDFSTRING1": "", "UDFSTRING2": "", "UDFSTRING3": "", "UDFSTRING4": "", "UDFSTRING5": "", "POSMode": "", "UDFSTRING6": "", "UDFSTRING7": "", "UDFSTRING8": "", "UDFSTRING9": "", "UDFSTRING10": "", "UDFNUM01": "", "UDFNUM02": "", "UDFNUM03": "", "UDFNUM04": "", "UDFNUM05": "", "UDFDATE01": "", "UDFDATE02": "", "UDFDATE03": "", "UDFDATE04": "", "UDFDATE05": "", "OwnerGSTINNo": "", "OwnerGSTINStateCode": "", "CustomerGSTIN": "", "CustomerGSTStateCode": "", "GSTDocNumber": "", "BILL_GUID": "", "ADMSITE_CODE": "", # Add ADMSITE_CODE "gross_amount": 0.0, "discount": 0.0, "taxes": taxes, "return_gross_amount": 0.0, "loyalty_gross_amount": 0.0, "loyalty_tax_amount": 0.0, "loyalty_charge_amount": 0.0, "return_discount": 0.0, "type": "regular", "net_amount": 0.0, "amount": 0.0, "return_net_amount": 0.0, "return_amount": 0.0, "number": "", # Mapping BILLNO to number "redemption": { "reward_id": "", "redeemed_amount": "" }, "billing_time": "", "return_bill": "", "MCouponAmount": 0.0, "items": [], "return_items": [], # Added return_items field "payment_details": { "payment": [ {"mode": "", "value": 0.0}, {"mode": "", "value": 0.0} ], "return_payment": [] }, "charges": [], "return_charges": [], "return_taxes": [] }, "customer": { "mobile": "", "name": "", "email": "", "ISDCode": "", "dob": "", "anniversary": "", "pincode": "", "address": "", "Address1": "", "city": "", "state": "", "lastname": "", "gender": "", "membershipcardnumber": "", "isEmployee": "", "MName": "", "Address2": "", "Address3": "", "SpouseName": "", "Salutation": "", "profession": "", "Country": "", "Remarks": "", "LPCardNo": "", "PanNo": "" } } return payload def create_item_details(row): qty = parse_float(row.get("QTY", 0.0)) cgst_amount = parse_float(row.get("CGST Amount", 0.0)) sgst_amount = parse_float(row.get("SGST Amount", 0.0)) total_discount = parse_float(row.get("Total Discount Amount", 0.0)) esp = parse_float(row.get("Extended Sale Price", 0.0)) # Updated calculations rsp = esp - ((cgst_amount + sgst_amount) / qty) if qty != 0 else 0 rate = rsp marked_price = parse_float(row.get("Maximum Retail Price", 0.0)) - ((cgst_amount + sgst_amount) / qty) if qty != 0 else 0 item_value = qty * rate i_discount_amt = parse_float(row.get("Item Level Discount", 0.0)) m_discount_amt = parse_float(row.get("Memo Discount Amount", 0.0)) promo_amt = parse_float(row.get("Promo Amount", 0.0)) lp_discount = parse_float(row.get("LP Discount Amount", 0.0)) discount = i_discount_amt + promo_amt + m_discount_amt + lp_discount discountAmt = i_discount_amt + m_discount_amt amount = item_value - discount cost_price = amount mrp_amt = parse_float(row.get("Maximum Retail Price", 0.0)) * qty item = { "description": row.get("ITEM", ""), "Division": row.get("DIVISION", ""), "item_code": row.get("ICODE", ""), "hsn_code": row.get("HSN / SAC code of Item", ""), "bar_code": row.get("BARCODE", ""), # bar_code set to item_code "qty": qty, "rate": rate, # Updated rate calculation "value": item_value, "discount": round(discount, 2), # Updated discount calculation "marked_price": marked_price, # Updated marked price calculation "rsp": round(rate, 2), # Updated RSP calculation "amount": round(amount, 2), # Updated amount calculation "cost_price": round(cost_price, 2), # Updated cost price calculation "taxes": [ {"name": "IGST", "amount": parse_float(row.get("IGST Amount", 0.0))}, {"name": "CGST", "amount": cgst_amount}, {"name": "SGST", "amount": sgst_amount}, {"name": "CESS", "amount": parse_float(row.get("CESS Amount", 0.0))} ], "lp_discount": lp_discount, "TotMnDiscount": m_discount_amt, # Updated TotMnDiscount calculation "TotPromoAmt": 0.0, # Added TotPromoAmt calculation "Section": row.get("SECTION", ""), "Department": row.get("DEPARTMENT", ""), "Article": row.get("ARTICLE", ""), "ESP": esp, "MRPAmt": mrp_amt, # Updated MRPAmt calculation "BasicAmt": esp * qty, "PromoAmt": promo_amt, "IDiscountAmt": i_discount_amt, # Updated IDiscountAmt calculation "IGrossAmt": parse_float(row.get("Item Gross Amount", 0.0)), "MDiscountAmt": m_discount_amt, # Updated MDiscountAmt calculation "DiscountAmt": round(discountAmt, 2), # Updated DiscountAmt calculation "NetAmt": parse_float(row.get("Net Amount", 0.0)), "TaxPercent": str(parse_float(row.get("CGST Rate", 0)) + parse_float(row.get("SGST Rate", 0)) + parse_float(row.get("CESS Rate", 0))), "TaxAmt": parse_float(row.get("TAXAMT", 0.0)), "IDiscountBasis": row.get("IDiscount Basis", ""), "IDiscountFactor": parse_float(row.get("IDiscount Factor", 0.0)), "MDiscountFactor": parse_float(row.get("MDiscount Factor", 0.0)), "PromoDiscountFactor": parse_float(row.get("Promo Discount Factor", 0.0)), "TaxableAmt": parse_float(row.get("TAXABLEAMT", 0.0)), "LPDiscountFactor": parse_float(row.get("LP Discount Factor", 0.0)), "MGrossAmt": parse_float(row.get("Memo Gross Amount", 0.0)), "RefBillNo": row.get("Original Bill No", ""), "SerialNo": row.get("Serial No", ""), "TaxDescription": row.get("Tax Description", ""), "IDiscountDesc": row.get("IDiscount Desc", ""), "Remarks": row.get("Remarks", ""), "PromoName": row.get("PROMONAME", ""), "ExTaxAmt": parse_float(row.get("Ex Tax Amount", 0.0)), "ReturnReason": row.get("Return Reason", ""), "TaxRegime": row.get("Tax Regime", ""), "IGSTRate": parse_float(row.get("IGST Rate", 0.0)), "IGSTAmt": parse_float(row.get("IGST Amount", 0.0)), "CGSTRate": parse_float(row.get("CGST Rate", 0.0)), "CGSTAmt": cgst_amount, "SGSTRate": parse_float(row.get("SGST Rate", 0.0)), "SGSTAmt": sgst_amount, "CESSRate": parse_float(row.get("CESS Rate", 0.0)), "CESSAmt": parse_float(row.get("CESS Amount", 0.0)), "ExtraChgFactor": parse_float(row.get("Extra Charge Factor", 0.0)), "PromoDiscountType": row.get("Promo Discount Type", ""), "RefBillDate": "", # Ensure these fields are wrapped in double quotes "RefStoreCUID": row.get("Org Bill Generated from Site", ""), "SalesPersonFName": row.get("SalesPersonFName", ""), "SalesPersonMName": row.get("SalesPersonMName", ""), "SalesPersonLName": row.get("SalesPersonLName", ""), "Cat1": row.get("CAT1", ""), "Cat2": row.get("CAT2", ""), "Cat3": row.get("CAT3", ""), "Cat4": row.get("CAT4", ""), "Cat5": row.get("CAT5", ""), "Cat6": row.get("CAT6", ""), "MCouponAmount": row.get("M Coupon Amount","") # Ensure MCouponAmount is initialized as a float } return item def create_return_payload(merchant_id, customer_key, membership_id): return_payload = { "membershipId": membership_id, "merchant_id": merchant_id, "customer_key": customer_key, "transaction": { "loyalty_flag": "1", "TerminalId": "", "cardNo": "", "MRPAmt": 0.0, "BasicAmt": 0.0, "PromoAmt": 0.0, "SaleAmt": 0.0, "ChargeAmt": 0.0, "NetPayable": 0.0, "RoundOff": 0.0, "ExTaxAmt": 0.0, "MDiscountAmt": 0.0, "EMRREDCouponRef": "", "CouponCode": "", "Remarks": "", "PromoNo": "", "PromoName": "", "CreatedOn": "", "CashierName": "", "MDiscountDesc": "", "IDiscountAmt": 0.0, "TotMnDiscount": 0.0, "TotPromoAmt": 0.0, "LPPointsEarned": 0.0, "LPDiscountBenefit": 0.0, "UDFSTRING1": "", "UDFSTRING2": "", "UDFSTRING3": "", "UDFSTRING4": "", "UDFSTRING5": "", "POSMode": "", "UDFSTRING6": "", "UDFSTRING7": "", "UDFSTRING8": "", "UDFSTRING9": "", "UDFSTRING10": "", "UDFNUM01": "", "UDFNUM02": "", "UDFNUM03": "", "UDFNUM04": "", "UDFNUM05": "", "UDFDATE01": "", "UDFDATE02": "", "UDFDATE03": "", "UDFDATE04": "", "UDFDATE05": "", "OwnerGSTINNo": "", "OwnerGSTINStateCode": "", "CustomerGSTIN": "", "CustomerGSTStateCode": "", "GSTDocNumber": "", "BILL_GUID": "", "ADMSITE_CODE": "", # Add ADMSITE_CODE "gross_amount": "", "discount": 0.0, "taxes": [], "return_gross_amount": 0.0, "loyalty_gross_amount": "", "loyalty_tax_amount": "", "loyalty_charge_amount": 0.0, "return_loyalty_gross_amount": 0.0, "return_loyalty_tax_amount": 0.0, "return_loyalty_charge_amount": "", "return_discount": 0.0, "type": "return", "net_amount": "", "amount": "", "return_net_amount": 0.0, "return_amount": 0.0, "number": "", # Mapping BILLNO to number "redemption": { "reward_id": "", "redeemed_amount": "" }, "billing_time": "", "return_bill_number": "", "return_bill_date": "", # Ensure these fields are wrapped in double quotes "return_taxes": [ {"name": "CGST", "amount": 0.0}, {"name": "SGST", "amount": 0.0} ], "return_bill": "1", "MCouponAmount": 0.0, "return_items": [], # Changed items to return_items "payment_details": { "payment": [], "return_payment": [ {"mode": "", "value": 0.0}, {"mode": "", "value": 0.0} ] }, "charges": [], "return_charges": [], "items": [], "taxes": [] }, "customer": { "mobile": "", "name": "", "email": "", "ISDCode": "", "dob": "", "anniversary": "", "pincode": "", "address": "", "Address1": "", "city": "", "state": "", "lastname": "", "gender": "", "membershipcardnumber": "", "isEmployee": "", "MName": "", "Address2": "", "Address3": "", "SpouseName": "", "Salutation": "", "profession": "", "Country": "", "Remarks": "", "LPCardNo": "", "PanNo": "" } } return return_payload def merge_data(billing_data, outlet_data, merchant_id): bills = defaultdict(lambda: { 'normal_data': create_json_payload(merchant_id, None, None), 'return_data': create_return_payload(merchant_id, None, None) }) for row in billing_data: for outlet_row in outlet_data: if row["ADMSITE_CODE"] == outlet_row["ADMSITE_CODE"]: customer_key = outlet_row["pos_merchant_id"] bill_number = row["BILLNO"] bill_guid = row["GUID"] membership_id = row["MOBILE"] qty = parse_float(row.get("QTY", 0.0)) if qty < 0: payload_type = 'return_data' else: payload_type = 'normal_data' bill = bills[bill_number][payload_type] bill["customer_key"] = customer_key bill["membershipId"] = membership_id bill["transaction"]["BILL_GUID"] = bill_guid bill["transaction"]["number"] = bill_number bill["transaction"]["CashierName"]= row["CREATEBY"] bill["transaction"]["TerminalId"] = row.get("TERMINALID", "") bill["transaction"]["PromoNo"] = row.get("M_PROMONO", "") bill["transaction"]["PromoName"] = row.get("M_PROMONAME", "") bill["transaction"]["POSMode"] = row.get("Mode", "") bill["transaction"]["ADMSITE_CODE"] = row["ADMSITE_CODE"] # Set ADMSITE_CODE item = create_item_details(row) if payload_type == 'return_data': bill["transaction"]["return_items"].append(item) else: bill["transaction"]["items"].append(item) # Update other transaction fields if necessary (e.g., set CreatedOn, billing_time, etc.) bill["transaction"]["CreatedOn"] = row.get("CREATEDON", "") bill["transaction"]["billing_time"] = row.get("BILLDATE", "") bill["transaction"]["cardNo"] = row.get("cardNo", "") bill["customer"]["mobile"] = row.get("MOBILE", "") bill["customer"]["name"] = row.get("Customer First name", "") bill["customer"]["MName"] = row.get("Customer Middle name", "") bill["customer"]["lastname"] = row.get("Customer Last name", "") bill["customer"]["gender"] = row.get("GENDER", "") bill["customer"]["Address1"] = row.get("ADDRESS1", "") bill["customer"]["Address2"] = row.get("ADDRESS2", "") bill["customer"]["Address3"] = row.get("ADDRESS3", "") bill["customer"]["ISDCode"] = row.get("ISDCODE", "") bill["customer"]["email"] = row.get("EMAIL", "") bill["customer"]["dob"] = row.get("Date of Birth", "") bill["customer"]["city"] = row.get("CITY", "") bill["customer"]["state"] = row.get("STATE", "") bill["customer"]["Country"] = row.get("COUNTRY", "") bill["customer"]["PIN"] = row.get("PIN", "") if payload_type == 'return_data': bill["transaction"]["return_bill_number"] = row.get("Original Bill No", "") bill["transaction"]["return_bill_date"] = row.get("Original Bill Date", "") break normal_data = [] return_data = [] for bill_number, data in bills.items(): normal_payload = data['normal_data'] return_payload = data['return_data'] def calculate_totals(payload, is_return=False): items_key = "return_items" if is_return else "items" total_mrp_amt = sum(item["MRPAmt"] for item in payload["transaction"][items_key]) total_basic_amt = sum(item["BasicAmt"] for item in payload["transaction"][items_key]) total_promo_amt = sum(item["PromoAmt"] for item in payload["transaction"][items_key]) total_m_discount_amt = sum(item["MDiscountAmt"] for item in payload["transaction"][items_key]) total_i_discount_amt = sum(item["IDiscountAmt"] for item in payload["transaction"][items_key]) total_gross_amount = sum(item["value"] for item in payload["transaction"][items_key]) total_net_amount = sum(item["NetAmt"] for item in payload["transaction"][items_key]) total_amount = sum(item["amount"] for item in payload["transaction"][items_key]) total_igross_amt = sum(item["IGrossAmt"] for item in payload["transaction"][items_key]) total_cgst_amount = sum(item["taxes"][1]["amount"] for item in payload["transaction"][items_key]) total_sgst_amount = sum(item["taxes"][2]["amount"] for item in payload["transaction"][items_key]) total_discount = sum(item["discount"] for item in payload["transaction"][items_key]) total_tot_mn_discount = sum(item["TotMnDiscount"] for item in payload["transaction"][items_key]) total_tot_promo_amt = sum(item["TotPromoAmt"] for item in payload["transaction"][items_key]) total_MCouponAmount = sum(parse_float(item["MCouponAmount"]) for item in payload["transaction"][items_key]) # Calculate loyalty_gross_amount loyalty_gross_amount = sum( item["IGrossAmt"] - item["TaxAmt"] for item in payload["transaction"][items_key] if item["IDiscountAmt"] == 0 ) payload["transaction"]["MRPAmt"] = round(total_mrp_amt, 2) payload["transaction"]["BasicAmt"] = round(total_basic_amt, 2) payload["transaction"]["PromoAmt"] = round(total_promo_amt, 2) payload["transaction"]["MDiscountAmt"] = round(total_m_discount_amt, 2) payload["transaction"]["IDiscountAmt"] = round(total_i_discount_amt, 2) payload["transaction"]["TotMnDiscount"] = round(total_tot_mn_discount, 2) payload["transaction"]["gross_amount"] = round(total_gross_amount, 2) payload["transaction"]["net_amount"] = round(total_gross_amount - total_discount, 2) payload["transaction"]["amount"] = round(total_net_amount, 2) payload["transaction"]["SaleAmt"] = round(total_basic_amt - total_promo_amt, 2) payload["transaction"]["discount"] = round(total_discount, 2) payload["transaction"]["MCouponAmount"] = round(total_MCouponAmount, 2) if is_return: payload["transaction"]["return_gross_amount"] = total_gross_amount payload["transaction"]["return_loyalty_gross_amount"] = loyalty_gross_amount payload["transaction"]["return_loyalty_tax_amount"] = sum(item["TaxAmt"] for item in payload["transaction"]["return_items"]) payload["transaction"]["return_net_amount"] = total_net_amount payload["transaction"]["return_amount"] = total_amount payload["transaction"]["return_taxes"] = [ {"name": "CGST", "amount": round(total_cgst_amount, 2)}, {"name": "SGST", "amount": round(total_sgst_amount, 2)} ] payload["transaction"]["gross_amount"] = "" payload["transaction"]["loyalty_gross_amount"] = "" payload["transaction"]["loyalty_tax_amount"] = "" payload["transaction"]["net_amount"] = "" payload["transaction"]["amount"] = "" payload["transaction"]["NetPayable"], payload["transaction"]["RoundOff"] = round_off(total_net_amount + payload["transaction"]["RoundOff"]) payload["transaction"]["NetPayable"] = str(payload["transaction"]["NetPayable"]) payload["transaction"]["RoundOff"] = str(payload["transaction"]["RoundOff"]) else: payload["transaction"]["taxes"] = [ {"name": "CGST", "amount": round(total_cgst_amount, 2)}, {"name": "SGST", "amount": round(total_sgst_amount, 2)} ] payload["transaction"]["loyalty_gross_amount"] = loyalty_gross_amount payload["transaction"]["NetPayable"], payload["transaction"]["RoundOff"] = round_off(total_net_amount + payload["transaction"]["RoundOff"]) if normal_payload["transaction"]["items"]: calculate_totals(normal_payload) normal_data.append(normal_payload) if return_payload["transaction"]["return_items"]: calculate_totals(return_payload, is_return=True) return_data.append(return_payload) return normal_data, return_data def round_off(amount): rounded_amount = round(amount) roundoff_value = rounded_amount - amount return rounded_amount, roundoff_value def send_payload_to_api(payload): try: response = requests.post(API_ENDPOINT, json=payload, verify=False) response.raise_for_status() # Raise an exception for non-200 status codes return True, None # Successful except requests.RequestException as e: return False, str(e) def process_data_chunk(chunk, log_filename): success_count = 0 failure_messages = [] # Ensure CSV log file is created with headers if it doesn't exist log_file_exists = os.path.isfile(log_filename) with open(log_filename, 'a', newline='') as log_file: log_writer = csv.writer(log_file) if not log_file_exists: log_writer.writerow(["unique_id", "status", "time_taken", "error"]) for payload in chunk: start_time = time.time() qty = sum(item['qty'] for item in payload['transaction']['items']) if payload['transaction']['items'] else sum(item['qty'] for item in payload['transaction']['return_items']) bill_type = '0' if qty > 0 else '1' unique_id = f"{payload['transaction']['ADMSITE_CODE']}_{payload['transaction']['billing_time']}_{payload['transaction']['BILL_GUID']}_{bill_type}" logger.debug(f"Sending payload to API, unique_id: {unique_id}") success, message = send_payload_to_api(payload) end_time = time.time() time_taken = end_time - start_time if success: success_count += 1 log_writer.writerow([unique_id, "success", time_taken, ""]) else: failure_messages.append(message) log_writer.writerow([unique_id, "failure", time_taken, message]) return success_count, failure_messages def process_and_print_results(data_chunks, log_filename): total_success_count = 0 total_failure_messages = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [executor.submit(process_data_chunk, chunk, log_filename) for chunk in data_chunks] for future in as_completed(futures): success_count, failure_messages = future.result() total_success_count += success_count total_failure_messages.extend(failure_messages) logger.debug(f"Successful API calls: {total_success_count}") if total_failure_messages: logger.debug("\nFailure messages:") for message in total_failure_messages: logger.debug(message) def write_output_with_unique_id(headers, billing_data, output_filename): headers.append('unique_id') with open(output_filename, 'w', newline='', encoding='utf-8-sig') as file: writer = csv.DictWriter(file, fieldnames=headers) writer.writeheader() for row in billing_data: qty = parse_float(row.get("QTY", 0.0)) bill_type = '0' if qty > 0 else '1' unique_id = f"{row['ADMSITE_CODE']}_{row['BILLDATE']}_{row['GUID']}_{bill_type}" row['unique_id'] = unique_id writer.writerow(row) def fetch_data_from_db(): connection = connection_pool.get_connection() cursor = connection.cursor(dictionary=True) try: # Fetching data from upload_automation_tracker table where status = 0 query = """ SELECT id, merchant_id, date, path, status, created_at, updated_at FROM upload_automation_tracker WHERE status = 0 ORDER BY created_at ASC; """ cursor.execute(query) tracker_results = cursor.fetchall() # Process each tracker result to get corresponding merchant details for row in tracker_results: merchant_id = row["merchant_id"] path = row["path"] date = row["date"] log_filename = f"upload_log_{merchant_id}_{date}.csv" output_filename = f"output_with_unique_id_{merchant_id}_{date}.csv" if not path: # Update the status to 3 if the path is empty with lock: update_query = "UPDATE upload_automation_tracker SET status = 3 WHERE id = %s" cursor.execute(update_query, (row["id"],)) connection.commit() logger.warning(f"File path not found for id: {row['id']}") continue # Update the status to 4 while processing with lock: update_query = "UPDATE upload_automation_tracker SET status = 4 WHERE id = %s" cursor.execute(update_query, (row["id"],)) connection.commit() # Fetching data from pos_registration table pos_query = """ SELECT pos_merchant_id, ADMSITE_CODE FROM pos_registration WHERE merchant_id = %s; """ cursor.execute(pos_query, (merchant_id,)) pos_results = cursor.fetchall() if not pos_results: # Update the status to 2 if the merchant_id is not found with lock: update_query = "UPDATE upload_automation_tracker SET status = 2 WHERE id = %s" cursor.execute(update_query, (row["id"],)) connection.commit() logger.warning(f"Merchant ID not found in pos_registration table for id: {row['id']}") continue # Assuming pos_registration has a unique (merchant_id, ADMSITE_CODE) pair adm_site_code = pos_results[0]["ADMSITE_CODE"] # Read CSV file from S3 or local file system if USE_S3: headers, billing_data = read_csv_from_s3(S3_BUCKET_NAME, path) else: headers, billing_data = read_csv_from_local(path) # Write output file with unique_id column write_output_with_unique_id(headers, billing_data, output_filename) # Merge and process the data normal_data, return_data = merge_data(billing_data, pos_results, merchant_id) normal_chunks = [normal_data[i:i + MAX_WORKERS] for i in range(0, len(normal_data), MAX_WORKERS)] return_chunks = [return_data[i:i + MAX_WORKERS] for i in range(0, len(return_data), MAX_WORKERS)] logger.debug(f"Processing normal bills for merchant_id: {merchant_id}") process_and_print_results(normal_chunks, log_filename) logger.debug(f"Processing return bills for merchant_id: {merchant_id}") process_and_print_results(return_chunks, log_filename) # Update the status to 1 (processed) with lock: update_query = "UPDATE upload_automation_tracker SET status = 1 WHERE id = %s" cursor.execute(update_query, (row["id"],)) connection.commit() except mysql.connector.Error as err: logger.error(f"Error: {err}") finally: cursor.close() connection.close() @app.route('/trigger', methods=['POST']) def trigger(): try: logger.debug("Trigger endpoint called") fetch_data_from_db() return jsonify({"message": "Data processing triggered successfully."}), 200 except Exception as e: logger.error(f"Error during trigger: {e}") return jsonify({"error": str(e)}), 500 def run_app(): app.run(host='0.0.0.0', port='80') # Run the Flask app in a separate thread flask_thread = threading.Thread(target=run_app) flask_thread.start()