Python Forum

Full Version: Managing recursive tasks in celery.
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
I have a number of tasks that I want to chain to build a pipeline. It consists of two main tasks, fetch_data and post_process. The fetch_data task is a recursive task. What I want is to have all the fetch_data tasks to complete before post_process is run.

However, the post_processing seems to run after the first task is completed. How do i make the post processing run after all the fetch tasks are completed?
@shared_task
def multi_fetch(count):
    fetch_id = uuid.uuid4()
    if count > 0:
        multi_fetch.delay(count - 1)
    print(f"Fetching data....[{fetch_id}]")
    time.sleep(random.randint(5, 15))
    return fetch_id


@shared_task
def multi_post_process(fetch_id):
    print(f"Post processing...[{fetch_id}]")


workflow = chain(fetch_data.s(5), multi_post_process.s())
workflow.run()
Looking at the celery documentation, I think you want to use a group for multi_fetch (just fetch now since group will do the multi) to run multiple tasks at the same time and wait for them to all finish.
workflow = group(fetch.s(i) for i in range(6))
To chain the group with your post process.
workflow = (group(fetch.s(i) for i in range(6)) | multi_post_process.s())