Python Forum
Issue in my multiprocessing Python code?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Issue in my multiprocessing Python code?
#1
Hi all,

I am written a Python 3.6 ETL process where I am searching and then translating the 42000 products from a csv file.

The whole ETL process is taking more than 10 hours to complete.

I tried to wrap my code in multiprocessing block like below-
def process_product_data(product_df, language_code):
    logger.info("Processing of product data is started...")
    translated_product_combind = []
    file_path = "../output/output_" + language_code + ".csv"
    with Pool(4) as p:
        logger.info("multiprocessing is started...")
        translated_product = p.map(fetch_product_details, product_df)
        translated_product_combind.append(translated_product)
        logger.info("multiprocessing is done in {} seconds".format(time.time() - start_time))
    generate_csv(file_path,language_code, translated_product_combind)
    logger.info("Processing of product data is completed in {} seconds".format(time.time() - start_time))
    return translated_product_combind
def fetch_product_details(product,language_code):
    check_flg = False
    title = product.Title.replace('%', '')
    logger.info('Product Title is: {}'.format(title))
    desc = product.Description
    logger.info('Product Description is: {}'.format(desc))
    product_url = search_product(language_code, title)
    logger.info("Product Url is: {}".format(product_url))
    if product_url != 'Not Found':
        translated_product, check_flg = translate_data_web(product_url)
    if (check_flg == False):
        translated_product = translate_data(language_code, title, desc)
    else:
        translated_product = translate_data(language_code, title, desc)
    return translated_product
Issue is when I run the script it got stuck in this line and there is no any error in logs-
multiprocessing is started...
After this my next function(translated_product_combind()) is not called.

It seems I have not put the multiprocessing block in a right way.
So can anyone please see it and tell me what I am doing wrong here?
Reply
#2
Who is calling process_product_data()?
If it's not the main thread you should use:

from multiprocessing.pool import ThreadPool
Reply
#3
(Jul-19-2018, 10:17 AM)gontajones Wrote: Who is calling process_product_data()?
If it's not the main thread you should use:

from multiprocessing.pool import ThreadPool

process_product_data() is the part of a function which is I a am calling as main like below-
def start_etl():
    logger.info("ETL process is started....")
    update_required = ftp_process()
    sleep(10)
    #update_required=True
    if update_required:
        product_df = clean_feed()
        for language in language_code_list:
            process_product_data(product_df, language)
            sleep(60)
            translated_product_df = read_translated_file(language)
            upload_products(translated_product_df, language)
        generate_combined_csv()
        logger.info("ETL Process is completed !")
    else:
        logger.info("Feed is not updated on FTP !")


# main function for testing
if __name__ == '__main__':
    start_etl()

I have tried with below one also but result is same no any process-
from multiprocessing.pool import ThreadPool
def process_product_data(product_df, language_code):
    logger.info("Processing of product data is started...")
    translated_product_combind = []
    file_path = "../output/output_" + language_code + ".csv"
    with ThreadPool(2) as p:
        logger.info("multiprocessing is started...")
        translated_product = p.map(fetch_product_details, product_df)
Reply
#4
Quote:process_product_data() is the part of a function which is I a am calling as main like below-
So lets get back to Pool().

The issue is that Pool.map() only accepts 1 argument but the function that you are passing asks for 2:
fetch_product_details(product,language_code)
This should generate an error.

If you really need the language_code argument, you can create and wrapper like the second answer here:
python-using-list-multiple-arguments-in-pool-map
Reply
#5
Thanks gontajones! I got the cause of the issue.
And yes language_code is a required arguement.

But couldn't understand by the link how can I wrape and use it in my code :(

I replaced map with starmap-
def process_product_data(product_df, language_code):
    logger.info("Processing of product data is started...")
    translated_product_combind = []
    file_path = "../output/output_" + language_code + ".csv"
    with Pool(2) as p:
        logger.info("multiprocessing is started...")
        translated_product = p.starmap(fetch_product_details, product_df)
Reply
#6
Is your code working now?

I see that you're still not passing language_code to fetch_product_details().
Reply
#7
No it's not working!
Reply
#8
The idea of Pool.starmap() is something like this:

from multiprocessing import Pool
import os
import time


def hello(a, b):
    print(f"{a} inside hello()")
    print(f"{b} Proccess id: ", os.getpid())
    time.sleep(1)
    return a * a


if __name__ == "__main__":
    p = Pool(2)
    objects = [(3, 'Test1',), (4, 'Test2',), ]
    pool_output = p.starmap(hello, objects)

    print(pool_output)
Now you have to adapt it to the content/type of product_df and language_code.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Is this a multiprocessing bug in Python - or am I doing something wrong? haimat 1 1,175 Oct-18-2023, 06:07 AM
Last Post: absalom1
  Updating Code And Having Issue With Keys Xileron 8 1,150 May-25-2023, 11:14 PM
Last Post: DigiGod
  python multiprocessing help -- to extract 10 sql table into csv mg24 3 1,374 Nov-20-2022, 11:50 PM
Last Post: mg24
  python multiprocessing to download sql table mg24 5 1,473 Oct-31-2022, 03:53 PM
Last Post: Larz60+
  PyRun_SimpleFile calling multiprocessing Python Class cause endless init loop Xeno 2 1,039 Sep-19-2022, 02:32 AM
Last Post: Xeno
  Python multiprocessing Pool apply async wait for process to complete sunny9495 6 6,408 Apr-02-2022, 06:31 AM
Last Post: sunny9495
  NameError issue with daughter's newb code MrGonk 2 1,449 Sep-16-2021, 01:29 PM
Last Post: BashBedlam
  Calculator code issue using list kirt6405 4 2,268 Jun-11-2021, 10:13 PM
Last Post: topfox
  python multiprocessing import Pool, cpu_count: causes forever loop | help to remove Hassibayub 0 1,860 Jun-18-2020, 05:27 PM
Last Post: Hassibayub
  Issue with code for auto checkout nqk28703 2 2,167 Nov-01-2019, 09:33 AM
Last Post: nqk28703

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020