Python Forum
How to handle multiple audio uploads without canceling ongoing API requests?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How to handle multiple audio uploads without canceling ongoing API requests?
#1
I’m building a web application where users can upload audio files, and each file is sent to an external API (Gemini API) for processing. However, I’m facing an issue:

When a user uploads an audio file and the system sends it to the Gemini API, everything works fine. But if the user uploads a second file while the first one is still being processed, the first request to Gemini gets canceled, and the system switches to processing the second file.

It seems that when a new request is made from my app to Gemini with a new file, the previous request from the same source (IP) is terminated.

Here’s the key flow:
Output:
User uploads the first audio file, which is sent to Gemini for processing. While the first audio is still being processed, the user uploads a second audio file. The second request causes the first one to be canceled. This behavior disrupts the user experience and prevents simultaneous uploads from being handled properly.
What I’ve Tried:
Output:
I’ve looked into using Python’s asyncio for concurrent request handling, but it seems to focus on sending multiple requests at once, which is not the case here. I’ve considered adding a queue system to manage the uploads, but I’m not sure how to avoid request cancellation when sending to Gemini. Ensuring unique file names for each upload, but this doesn’t seem to solve the cancellation issue.
What I Need:
Output:
A way to ensure that requests to the Gemini API are independent and don’t cancel previous requests. Any suggestions for handling this type of scenario where multiple users (or the same user) upload files and expect each to be processed separately without interference.
Additional Context:
Output:
The application is built with Python (FastAPI). The Gemini API is a third-party service, and I have limited control over how it handles requests.
Questions:
Output:
Is there a way to ensure that multiple requests from the same source (IP) to an API are processed independently? Should I use a queue or a different approach (e.g., workers, connection pooling)? Could this be an issue with Gemini API itself, and should I reach out to their support team? Any insights or examples would be greatly appreciated
code so far
import os
import logging
import google.generativeai as genai
import asyncio
import time
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from functools import partial

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class TranscriptionTask:
    media_path: str
    api_key: str
    context: Optional[str] = None

class TranscriptionQueue:
    def __init__(self, max_concurrent_tasks: int = 20, calls_per_minute: int = 15):
        self.queue = asyncio.Queue()
        self.max_concurrent_tasks = max_concurrent_tasks
        self.calls_per_minute = calls_per_minute
        self.active_tasks: List[asyncio.Task] = []
        self.results: Dict[str, Any] = {}
        self.rate_limit_lock = asyncio.Lock()
        self.last_request_time = time.time()

    async def wait_for_rate_limit(self):
        """Ensure we don't exceed the API rate limit"""
        async with self.rate_limit_lock:
            current_time = time.time()
            elapsed = current_time - self.last_request_time
            wait_time = max(0, (60 / self.calls_per_minute) - elapsed)
            
            if wait_time > 0:
                logger.debug(f"Rate limiting: waiting {wait_time:.2f} seconds")
                await asyncio.sleep(wait_time)
            
            self.last_request_time = time.time()

    async def process_queue(self):
        while True:
            task = await self.queue.get()
            try:
                # Wait for rate limit before processing
                await self.wait_for_rate_limit()
                
                result = await transcribe_audio(
                    task.media_path,
                    task.api_key,
                    task.context
                )
                self.results[task.media_path] = result
                
                # Cleanup file after processing
                await cleanup_file(task.media_path)
                
            except Exception as e:
                logger.error(f"Error processing task for {task.media_path}: {e}")
                self.results[task.media_path] = {
                    'success': False,
                    'error': str(e)
                }
            finally:
                self.queue.task_done()

    async def start_workers(self):
        workers = [
            asyncio.create_task(self.process_queue())
            for _ in range(self.max_concurrent_tasks)
        ]
        self.active_tasks.extend(workers)

    async def stop_workers(self):
        for task in self.active_tasks:
            task.cancel()
        await asyncio.gather(*self.active_tasks, return_exceptions=True)
        self.active_tasks.clear()

async def transcribe_audio(
    media_path: str, 
    api_key: str,
    context: Optional[str] = None, 
    max_retries: int = 3
) -> Dict[str, Any]:
    """
    Transcribe and analyze audio file using Google Generative AI
    
    Args:
        media_path (str): Path to the audio file
        api_key (str): API key to use for Google Generative AI
        context (str, optional): Additional context for analysis
        max_retries (int): Maximum number of retry attempts
        
    Returns:
        dict: Transcription and analysis results
    """
    for attempt in range(max_retries):
        try:
            await initialize_genai(api_key)

            if context is None:
                context = "Conversation analysis "
                logger.info(f"Using default context: {context}")

            context = truncate_text(context, max_tokens=10000000)

            try:
                # Upload file using asyncio.to_thread
                myfile = await asyncio.to_thread(genai.upload_file, path=media_path)
                logger.info(f"Uploaded file: {myfile}")
            except Exception as upload_error:
                logger.error(f"File upload error: {upload_error}")
                raise

            try:
                model = genai.GenerativeModel("gemini-1.5-flash")

                with open("prompt_1.txt", "r", encoding="utf-8") as file:
                    prompt_1_content = file.read()

                prompt = f"""
                Additional Specified Context: {context}

                {prompt_1_content}
                """

                generation_config = {
                    'max_output_tokens': 16000012,
                    'temperature': 0.3,
                    'top_p': 0.8,
                }

                # Use generate_content_async instead of generate_content
                result = await model.generate_content_async(
                    [myfile, prompt],
                    generation_config=generation_config
                )

                processed_text = truncate_text(result.text)

                return {
                    'success': True,
                    'transcription': processed_text,
                    'analysis': 'Comprehensive AI-generated analysis'
                }

            except Exception as model_error:
                logger.error(f"Model generation error on attempt {attempt + 1}: {model_error}")
                if attempt < max_retries - 1:
                    logger.info(f"Retrying (Attempt {attempt + 2})")
                    continue
                raise

        except Exception as e:
            logger.error(f"Audio transcription error on attempt {attempt + 1}: {e}")
            if attempt < max_retries - 1:
                logger.info(f"Retrying (Attempt {attempt + 2})")
                continue
            return {
                'success': False,
                'error': str(e)
            }

# Keep existing helper functions
async def initialize_genai(api_key: str) -> None:
    if not api_key:
        logger.error("No API key provided")
        raise ValueError("No API key provided")
    genai.configure(api_key=api_key)

def truncate_text(text: str, max_tokens: int = 25000000) -> str:
    if len(text) > max_tokens * 4:
        logger.warning(f"Truncating text to fit within {max_tokens} token limit")
        return text[:max_tokens * 4]
    return text

async def cleanup_file(media_path: str) -> None:
    try:
        await asyncio.to_thread(os.remove, media_path)
        logger.info(f"Cleaned up file: {media_path}")
    except Exception as cleanup_error:
        logger.warning(f"File cleanup error: {cleanup_error}")
Larz60+ write Jan-01-2025, 03:01 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
I have added tags for you, please use BBCode tags on future posts.
Reply
#2
The issue likely comes from how the Gemini API handles simultaneous requests from the same source. Use a task queue to manage uploads sequentially and ensure each is processed independently. If needed, add slight delays between requests or run workers on separate threads to avoid conflicts.
buran write Dec-25-2024, 05:45 AM:
Spam link removed. This is second warning. At third occasion your account will be purged.
Reply
#3
perhaps the policy at Gemini API is to allow no more that one upload at a time. or perhaps when the 2nd upload was made, it used the same "from port" as the first one which would result in a totally duplicate connection (in TCP)

there are 4 numbers that define 1 TCP connection:

1. the address here
2. the port here
3. the address there
4. the port there

if the same address and port number here tries to connect to the same address an port number there that creates an ambiguous situation. the other end gets a request to establish a connect that it already has. that is sure to break the existing connection.

if the protocol requires a specific port number to send from, then only one connection at a time is possible. getting multiple addresses may be a way to resolve this. if each upload requires logging in, you may need to use different user names for each upload. can two uploads in parallel really work? if they can, then there is hope. we need to see ALL the code, including where the connections are made and where the "from port" number comes from.

doing two uploads using the same library of code may be the problem. it may have only one place for a local port number and thus will get two connections mixed up. the problem may be in the networking code.
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  How to use asyncio run_forever to continuously handle requests mansky 1 2,238 Oct-28-2023, 04:26 AM
Last Post: deanhystad
  What's the best way for multiple modules to handle database activity? SuchUmami 3 1,408 Jul-08-2023, 05:52 PM
Last Post: deanhystad
  How to make each thread send multiple requests in python3? Contra_Boy 0 2,145 Apr-29-2020, 02:42 PM
Last Post: Contra_Boy
  Pythonic way to handle/spread alerts class in multiple modules psolar 11 6,389 Feb-12-2020, 04:11 PM
Last Post: psolar
  How to transfer a music file, from "uploads" directory into my project sylas 15 11,350 Sep-10-2017, 09:16 PM
Last Post: sylas

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020