I’m trying to set up a producer/consumer pipeline using asyncio in Python. The goal is to have one coroutine that reads input continuously and puts it into a queue, while another coroutine pulls items from the queue and processes them in the background.

I think I need to use asyncio queues and tasks, but not quite sure how to structure it properly. Could someone provide a simple asyncio producer/consumer queue example? Ideally it would show both coroutines, use asyncio.gather to run them concurrently, and handle the event loop setup/cleanup.

Any help would be really appreciated!

  • CoderSupreme@programming.devOP
    link
    fedilink
    arrow-up
    5
    ·
    edit-2
    9 months ago
    import asyncio
    from asyncio import Queue
    
    async def read_input(queue: Queue):
        while True:
            user_input = await loop.run_in_executor(None, input, "Enter something: ")
            await queue.put(user_input)
    
    async def process_input(queue: Queue):
        while True:
            user_input = await queue.get()
            if user_input == "quit":
                break
            print(f"Processing input: {user_input}")
            await asyncio.sleep(1) # Simulate processing time
    
    async def main():
        queue = Queue()
        task1 = asyncio.create_task(read_input(queue))
        task2 = asyncio.create_task(process_input(queue))
    
        await asyncio.gather(task1, task2)
    
    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
    
        try:
            loop.run_until_complete(main())
        except KeyboardInterrupt:
            pass
        finally:
            loop.close()
    

    The read_input function acts as a producer, continuously reading input from the command line and putting it into a Queue.

    The process_input function acts as a consumer, taking items from the Queue and processing them. It prints out each input and simulates processing time with asyncio.sleep().

    The main function creates both tasks, runs them concurrently with asyncio.gather(), and handles cleanup of the event loop.

    This allows the producer and consumer to run asynchronously, communicating through the queue. The input call runs in a threadpool executor to avoid blocking the async loop.