I’m building a web application where users can upload audio files, and each file is sent to an external API (Gemini API) for processing. However, I’m facing an issue:
When a user uploads an audio file and the system sends it to the Gemini API, everything works fine. But if the user uploads a second file while the first one is still being processed, the first request to Gemini gets canceled, and the system switches to processing the second file.
It seems that when a new request is made from my app to Gemini with a new file, the previous request from the same source (IP) is terminated.
Here’s the key flow:
When a user uploads an audio file and the system sends it to the Gemini API, everything works fine. But if the user uploads a second file while the first one is still being processed, the first request to Gemini gets canceled, and the system switches to processing the second file.
It seems that when a new request is made from my app to Gemini with a new file, the previous request from the same source (IP) is terminated.
Here’s the key flow:
Output:User uploads the first audio file, which is sent to Gemini for processing.
While the first audio is still being processed, the user uploads a second audio file.
The second request causes the first one to be canceled.
This behavior disrupts the user experience and prevents simultaneous uploads from being handled properly.
What I’ve Tried:Output:I’ve looked into using Python’s asyncio for concurrent request handling, but it seems to focus on sending multiple requests at once, which is not the case here.
I’ve considered adding a queue system to manage the uploads, but I’m not sure how to avoid request cancellation when sending to Gemini.
Ensuring unique file names for each upload, but this doesn’t seem to solve the cancellation issue.
What I Need:Output:A way to ensure that requests to the Gemini API are independent and don’t cancel previous requests.
Any suggestions for handling this type of scenario where multiple users (or the same user) upload files and expect each to be processed separately without interference.
Additional Context:Output:The application is built with Python (FastAPI).
The Gemini API is a third-party service, and I have limited control over how it handles requests.
Questions:Output:Is there a way to ensure that multiple requests from the same source (IP) to an API are processed independently?
Should I use a queue or a different approach (e.g., workers, connection pooling)?
Could this be an issue with Gemini API itself, and should I reach out to their support team?
Any insights or examples would be greatly appreciated
code so farimport os import logging import google.generativeai as genai import asyncio import time from typing import Optional, Dict, Any, List from dataclasses import dataclass from functools import partial # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') logger = logging.getLogger(__name__) @dataclass class TranscriptionTask: media_path: str api_key: str context: Optional[str] = None class TranscriptionQueue: def __init__(self, max_concurrent_tasks: int = 20, calls_per_minute: int = 15): self.queue = asyncio.Queue() self.max_concurrent_tasks = max_concurrent_tasks self.calls_per_minute = calls_per_minute self.active_tasks: List[asyncio.Task] = [] self.results: Dict[str, Any] = {} self.rate_limit_lock = asyncio.Lock() self.last_request_time = time.time() async def wait_for_rate_limit(self): """Ensure we don't exceed the API rate limit""" async with self.rate_limit_lock: current_time = time.time() elapsed = current_time - self.last_request_time wait_time = max(0, (60 / self.calls_per_minute) - elapsed) if wait_time > 0: logger.debug(f"Rate limiting: waiting {wait_time:.2f} seconds") await asyncio.sleep(wait_time) self.last_request_time = time.time() async def process_queue(self): while True: task = await self.queue.get() try: # Wait for rate limit before processing await self.wait_for_rate_limit() result = await transcribe_audio( task.media_path, task.api_key, task.context ) self.results[task.media_path] = result # Cleanup file after processing await cleanup_file(task.media_path) except Exception as e: logger.error(f"Error processing task for {task.media_path}: {e}") self.results[task.media_path] = { 'success': False, 'error': str(e) } finally: self.queue.task_done() async def start_workers(self): workers = [ asyncio.create_task(self.process_queue()) for _ in range(self.max_concurrent_tasks) ] self.active_tasks.extend(workers) async def stop_workers(self): for task in self.active_tasks: task.cancel() await asyncio.gather(*self.active_tasks, return_exceptions=True) self.active_tasks.clear() async def transcribe_audio( media_path: str, api_key: str, context: Optional[str] = None, max_retries: int = 3 ) -> Dict[str, Any]: """ Transcribe and analyze audio file using Google Generative AI Args: media_path (str): Path to the audio file api_key (str): API key to use for Google Generative AI context (str, optional): Additional context for analysis max_retries (int): Maximum number of retry attempts Returns: dict: Transcription and analysis results """ for attempt in range(max_retries): try: await initialize_genai(api_key) if context is None: context = "Conversation analysis " logger.info(f"Using default context: {context}") context = truncate_text(context, max_tokens=10000000) try: # Upload file using asyncio.to_thread myfile = await asyncio.to_thread(genai.upload_file, path=media_path) logger.info(f"Uploaded file: {myfile}") except Exception as upload_error: logger.error(f"File upload error: {upload_error}") raise try: model = genai.GenerativeModel("gemini-1.5-flash") with open("prompt_1.txt", "r", encoding="utf-8") as file: prompt_1_content = file.read() prompt = f""" Additional Specified Context: {context} {prompt_1_content} """ generation_config = { 'max_output_tokens': 16000012, 'temperature': 0.3, 'top_p': 0.8, } # Use generate_content_async instead of generate_content result = await model.generate_content_async( [myfile, prompt], generation_config=generation_config ) processed_text = truncate_text(result.text) return { 'success': True, 'transcription': processed_text, 'analysis': 'Comprehensive AI-generated analysis' } except Exception as model_error: logger.error(f"Model generation error on attempt {attempt + 1}: {model_error}") if attempt < max_retries - 1: logger.info(f"Retrying (Attempt {attempt + 2})") continue raise except Exception as e: logger.error(f"Audio transcription error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: logger.info(f"Retrying (Attempt {attempt + 2})") continue return { 'success': False, 'error': str(e) } # Keep existing helper functions async def initialize_genai(api_key: str) -> None: if not api_key: logger.error("No API key provided") raise ValueError("No API key provided") genai.configure(api_key=api_key) def truncate_text(text: str, max_tokens: int = 25000000) -> str: if len(text) > max_tokens * 4: logger.warning(f"Truncating text to fit within {max_tokens} token limit") return text[:max_tokens * 4] return text async def cleanup_file(media_path: str) -> None: try: await asyncio.to_thread(os.remove, media_path) logger.info(f"Cleaned up file: {media_path}") except Exception as cleanup_error: logger.warning(f"File cleanup error: {cleanup_error}")
Larz60+ write Jan-01-2025, 03:01 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
I have added tags for you, please use BBCode tags on future posts.
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
I have added tags for you, please use BBCode tags on future posts.