A Tutorial Introduction ======================= Curio is a library for concurrent systems programming that uses coroutines and common programming abstractions such as threads, sockets, files, locks, and queues. In addition, it supports cancellation, task groups, and other useful features. This tutorial describes the basics of the concurrency model. Consult the "howto" guide at https://curio.readthedocs.io/en/latest/howto.html for cookbook-style coding recipes. Getting Started --------------- Consider this program that prints a countdown:: # hello.py import curio async def countdown(n): while n > 0: print('T-minus', n) await curio.sleep(1) n -= 1 if __name__ == '__main__': curio.run(countdown, 10) Curio executes ``async`` functions (or coroutine functions). To run a program, you provide an initial function and arguments to ``run()``. When it completes, ``run()`` returns the result. Tasks and Concurrency --------------------- This program does two things at once:: # hello.py import curio async def countdown(n): while n > 0: print('T-minus', n) await curio.sleep(1) n -= 1 async def kid(x, y): print('Getting around to doing my homework') await curio.sleep(1000) return x * y async def parent(): kid_task = await curio.spawn(kid, 37, 42) count_task = await curio.spawn(countdown, 10) await count_task.join() print("Are you done yet?") result = await kid_task.join() print("Result:", result) if __name__ == '__main__': curio.run(parent) ``curio.spawn()`` launches a concurrent task. The ``join()`` method waits for it to finish and returns its result. The following output is produced:: bash % python3 hello.py Getting around to doing my homework T-minus 10 T-minus 9 ... T-minus 2 T-minus 1 Are you done yet? .... hangs .... At this point, the child is busy for the next 990 seconds, the parent is blocked on ``join()`` and nothing seems to be happening. Change ``run()`` to enable the monitor:: if __name__ == '__main__': curio.run(parent, with_monitor=True) Re-run the program and launch the monitor in a separate window:: bash % python3 -m curio.monitor Curio Monitor: 4 tasks running Type help for commands curio > Type ``ps`` to view the status of each task:: curio > ps Task State Cycles Timeout Sleep Task ------ ------------ ---------- ------- ------- -------------------------------------------------- 1 READ_WAIT 1 None None Kernel._make_kernel_runtime.._kernel_task 3 FUTURE_WAIT 1 None None Monitor.monitor_task 4 TASK_JOIN 2 None None parent 5 TIME_SLEEP 1 None 970.186 kid curio > Here, you see the parent waiting to join and the kid sleeping. Use the ``where`` command to get a stack trace:: curio > w 4 Stack for Task(id=4, name='parent', state='TASK_JOIN') (most recent call last): File "hello.py", line 24, in parent result = await kid_task.join() curio > w 5 Stack for Task(id=5, name='kid', state='TIME_SLEEP') (most recent call last): File "hello.py", line 14, in kid await curio.sleep(1000) curio > A timeout can be applied to any operation and tasks can be cancelled. Change the program as follows:: async def parent(): kid_task = await curio.spawn(kid, 37, 42) count_task = await curio.spawn(countdown, 10) await count_task.join() print("Are you done yet?") try: result = await curio.timeout_after(10, kid_task.join) print("Result:", result) except curio.TaskTimeout as e: print("We've got to go!") await kid_task.cancel() Likewise, cancellation can be caught. For example:: async def kid(x, y): try: print('Getting around to doing my homework') await curio.sleep(1000) return x * y except curio.CancelledError: print("No go diggy die!") raise Now the program produces this output:: bash % python3 hello.py Getting around to doing my homework T-minus 10 T-minus 9 ... T-minus 2 T-minus 1 Are you done yet? We've got to go! No go diggy die! bash % This is the basic gist of tasks. You can create tasks, join tasks, and cancel tasks. Task Groups ----------- Suppose you want the ``countdown`` and ``kid`` tasks to have a race. That is, have them run concurrently, but whichever one finishes first wins--cancelling the other task. This kind of coordination is handled by a ``TaskGroup``. Change the ``parent()`` function to this:: async def parent(): async with curio.TaskGroup(wait=any) as g: await g.spawn(kid, 37, 42) await g.spawn(countdown, 10) if g.result is None: print("Why didn't you finish?") else: print("Result:", g.result) Here, a task group waits for any spawned task to finish (the ``wait=any`` argument). When this occurs, the losing task is cancelled. The ``result`` attribute of the group contains the result of the task that won. Running this code, you will either get output similar to this:: Getting around to doing my homework T-minus 10 T-minus 9 T-minus 8 T-minus 7 Result: 1554 or you will get this if the ``kid()`` took too long:: Getting around to doing my homework T-minus 10 T-minus 9 ... T-minus 2 T-minus 1 No go diggy die! Why didn't you finish? A critical feature of a task group is that all created tasks will have completed or been cancelled when control-flow leaves the managed block--no child left behind. Long-Running Operations ----------------------- Suppose that ``kid()`` involves an inefficient computation of Fibonacci numbers:: def fib(n): if n < 2: return 1 else: return fib(n-1) + fib(n-2) async def kid(x, y): try: print('Getting around to doing my homework') return fib(x) * fib(y) except curio.CancelledError: print("No go diggy die!") raise async def parent(): async with curio.TaskGroup(wait=any) as g: await g.spawn(kid, 37, 42) await g.spawn(countdown, 10) if g.result is None: print("Why didn't you finish?") else: print("Result:", g.result) if __name__ == '__main__': curio.run(parent, with_monitor=True) If you run this version, everything becomes unresponsive and you see no output. The problem is that ``fib()`` takes over the CPU and never yields. Important lesson: Curio DOES NOT provide preemptive scheduling. If a task decides to compute large Fibonacci numbers or mine bitcoins, everything blocks. Don't do that. For other tasks to make progress, you must modify ``kid()`` to carry out computationally intensive work elsewhere. Change the code to use ``curio.run_in_process()``:: async def kid(x, y): try: print('Getting around to doing my homework') fx = await curio.run_in_process(fib, x) fy = await curio.run_in_process(fib, y) return fx * fy except curio.CancelledError: print("No go diggy die!") raise With this change, you'll see the countdown task running and the kid task is cancelled if it takes too long (you might need to greatly increase the countdown duration). Coincidentally, you execute the two ``fib()`` calculations in parallel on two CPUs using ``spawn()`` like this:: async def kid(x, y): try: print('Getting around to doing my homework') async with curio.TaskGroup() as g: tx = await g.spawn(curio.run_in_process, fib, x) ty = await g.spawn(curio.run_in_process, fib, y) return tx.result * ty.result except curio.CancelledError: print("Guess I'll fail!") raise The blocking problem also applies to I/O operations. For example, suppose ``kid()`` was modified to use a Fibonacci microservice:: import requests def fib(n): r = requests.get(f'http://www.dabeaz.com/cgi-bin/fib.py?n={n}') resp = r.json() return int(resp['value']) The popular ``requests`` library knows nothing of Curio. As such, it blocks everything waiting for a response. Since it's waiting for I/O (as opposed to performing heavy CPU work), you can use ``curio.run_in_thread()`` like this:: async def kid(x, y): try: print('Getting around to doing my homework') fx = await curio.run_in_thread(fib, x) fy = await curio.run_in_thread(fib, y) return fx*fy except curio.CancelledError: print("No go diggy die!") raise As a rule of thumb, use processes for computationally intensive operations and use threads for I/O bound operations. An Echo Server -------------- A common use of Curio is network programming. Here is an echo server:: from curio import run, tcp_server async def echo_client(client, addr): print('Connection from', addr) while True: data = await client.recv(1000) if not data: break await client.sendall(data) print('Connection closed') if __name__ == '__main__': run(tcp_server, '', 25000, echo_client) Run this program and connect to it using ``nc`` or ``telnet``. You'll see the program echoing back data to you:: bash % nc localhost 25000 Hello (you type) Hello (response) Is anyone there? (you type) Is anyone there? (response) ^C bash % In this program, the ``client`` argument to ``echo_client()`` is a socket. It supports all of the usual I/O operations, but they are asynchronous and should be prefaced by ``await``. If you prefer, you can perform I/O using a file-like interface by converting the socket to a stream like this:: async def echo_client(client, addr): print("Connection from", addr) async with client.as_stream() as s async for line in s: await s.write(line) print('Connection closed') if __name__ == '__main__': run(tcp_server, '', 25000, echo_client) Intertask Communication ----------------------- If tasks need to communicate, use a ``Queue``. Here's an example of a publish-subscribe service:: from curio import run, TaskGroup, Queue, sleep messages = Queue() subscribers = set() # Dispatch task that forwards incoming messages to subscribers async def dispatcher(): while True: msg = await messages.get() for q in list(subscribers): await q.put(msg) # Publish a message async def publish(msg): await messages.put(msg) # A sample subscriber task async def subscriber(name): queue = Queue() subscribers.add(queue) try: while True: msg = await queue.get() print(name, 'got', msg) finally: subscribers.discard(queue) # A sample producer task async def producer(): for i in range(10): await publish(i) await sleep(0.1) async def main(): async with TaskGroup() as g: await g.spawn(dispatcher) await g.spawn(subscriber, 'child1') await g.spawn(subscriber, 'child2') await g.spawn(subscriber, 'child3') ptask = await g.spawn(producer) await ptask.join() await g.cancel_remaining() if __name__ == '__main__': run(main) A Chat Server ------------- Combining sockets and queues, you can implement a small chat server. For example:: from curio import run, spawn, TaskGroup, Queue, tcp_server messages = Queue() subscribers = set() async def dispatcher(): while True: msg = await messages.get() for q in subscribers: await q.put(msg) async def publish(msg): await messages.put(msg) # Task that writes chat messages to clients async def outgoing(client_stream): queue = Queue() try: subscribers.add(queue) while True: name, msg = await queue.get() await client_stream.write(name + b':' + msg) finally: subscribers.discard(queue) # Task that reads chat messages and publishes them async def incoming(client_stream, name): async for line in client_stream: await publish((name, line)) async def chat_handler(client, addr): print('Connection from', addr) async with client: client_stream = client.as_stream() await client_stream.write(b'Your name: ') name = (await client_stream.readline()).strip() await publish((name, b'joined\n')) async with TaskGroup(wait=any) as workers: await workers.spawn(outgoing, client_stream) await workers.spawn(incoming, client_stream, name) await publish((name, b'has gone away\n')) print('Connection closed') async def chat_server(host, port): async with TaskGroup() as g: await g.spawn(dispatcher) await g.spawn(tcp_server, host, port, chat_handler) if __name__ == '__main__': run(chat_server('', 25000)) In this code, each connection results in two tasks (``incoming`` and ``outgoing``). The ``incoming`` task reads incoming lines and publishes them. The ``outgoing`` task subscribes to the feed and sends outgoing messages. The ``workers`` task group supervises these two tasks. If any one of them terminates, the other task is cancelled right away. The ``chat_server`` task launches both the ``dispatcher`` and a ``tcp_server`` task and watches them. If cancelled, both of those tasks will be shut down. Programming Advice ------------------ At this point, you have the core concepts. Here are a few tips: - Think thread programming and synchronous code. Tasks execute like threads and programming techniques applied to threads apply to Curio. - Curio uses the same I/O abstractions as in synchronous code (e.g., sockets, files, etc.). Methods have the same names and perform the same functions. Just don't forget to add the extra ``await`` keyword. - Be extra wary of calls that do not use an explicit ``await``. Although they will work, they could block progress of all other tasks. If you know that this is possible, use the ``run_in_process()`` or ``run_in_thread()`` functions. Debugging Tips -------------- A common mistake is forgetting ``await``. For example:: async def countdown(n): while n > 0: print('T-minus', n) curio.sleep(5) # Missing await n -= 1 This usually produces a warning message:: example.py:8: RuntimeWarning: coroutine 'sleep' was never awaited To debug running programs, use the monitor:: import curio ... run(..., with_monitor=True) The monitor shows the state of each task and can show stack traces. To enter the monitor, run ``python3 -m curio.monitor`` in a separate window. The ``traceback()`` method creates a stack trace that can be printed or logged. For example:: print("Where are you?") print(task.traceback()) Scheduler tracing can be enabled with code like this:: from curio.debug import schedtrace import logging logging.basicConfig(level=logging.DEBUG) run(..., debug=schedtrace) If you want even more detail, use ``traptrace`` instead of ``schedtrace``. More Information ---------------- The reference manual is found at https://curio.readthedocs.io/en/latest/reference.html. Programming recipes are found at https://curio.readthedocs.io/en/latest/howto.html. Watch https://www.youtube.com/watch?v=Y4Gt3Xjd7G8 to learn about the theory of operation.