I’m trying to set up a producer/consumer pipeline using asyncio in Python. The goal is to have one coroutine that reads input continuously and puts it into a queue, while another coroutine pulls items from the queue and processes them in the background.
I think I need to use asyncio queues and tasks, but not quite sure how to structure it properly. Could someone provide a simple asyncio producer/consumer queue example? Ideally it would show both coroutines, use asyncio.gather to run them concurrently, and handle the event loop setup/cleanup.
Any help would be really appreciated!
import asyncio from asyncio import Queue async def read_input(queue: Queue): while True: user_input = await loop.run_in_executor(None, input, "Enter something: ") await queue.put(user_input) async def process_input(queue: Queue): while True: user_input = await queue.get() if user_input == "quit": break print(f"Processing input: {user_input}") await asyncio.sleep(1) # Simulate processing time async def main(): queue = Queue() task1 = asyncio.create_task(read_input(queue)) task2 = asyncio.create_task(process_input(queue)) await asyncio.gather(task1, task2) if __name__ == "__main__": loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt: pass finally: loop.close()
The
read_input
function acts as a producer, continuously reading input from the command line and putting it into aQueue
.The
process_input
function acts as a consumer, taking items from theQueue
and processing them. It prints out each input and simulates processing time withasyncio.sleep()
.The
main
function creates both tasks, runs them concurrently withasyncio.gather()
, and handles cleanup of the event loop.This allows the producer and consumer to run asynchronously, communicating through the queue. The
input
call runs in a threadpool executor to avoid blocking the async loop.