Sep-04-2024, 07:26 AM
I have a number of tasks that I want to chain to build a pipeline. It consists of two main tasks, fetch_data and post_process. The fetch_data task is a recursive task. What I want is to have all the fetch_data tasks to complete before post_process is run.
However, the post_processing seems to run after the first task is completed. How do i make the post processing run after all the fetch tasks are completed?
However, the post_processing seems to run after the first task is completed. How do i make the post processing run after all the fetch tasks are completed?
@shared_task def multi_fetch(count): fetch_id = uuid.uuid4() if count > 0: multi_fetch.delay(count - 1) print(f"Fetching data....[{fetch_id}]") time.sleep(random.randint(5, 15)) return fetch_id @shared_task def multi_post_process(fetch_id): print(f"Post processing...[{fetch_id}]") workflow = chain(fetch_data.s(5), multi_post_process.s()) workflow.run()