A Tutorial Introduction¶
Curio is a library for concurrent systems programming that uses coroutines and common programming abstractions such as threads, sockets, files, locks, and queues. In addition, it supports cancellation, task groups, and other useful features. This tutorial describes the basics of the concurrency model. Consult the “howto” guide at https://curio.readthedocs.io/en/latest/howto.html for cookbook-style coding recipes.
Getting Started¶
Consider this program that prints a countdown:
# hello.py
import curio
async def countdown(n):
while n > 0:
print('T-minus', n)
await curio.sleep(1)
n -= 1
if __name__ == '__main__':
curio.run(countdown, 10)
Curio executes async
functions (or coroutine functions). To run a
program, you provide an initial function and arguments to
run()
. When it completes, run()
returns the result.
Tasks and Concurrency¶
This program does two things at once:
# hello.py
import curio
async def countdown(n):
while n > 0:
print('T-minus', n)
await curio.sleep(1)
n -= 1
async def kid(x, y):
print('Getting around to doing my homework')
await curio.sleep(1000)
return x * y
async def parent():
kid_task = await curio.spawn(kid, 37, 42)
count_task = await curio.spawn(countdown, 10)
await count_task.join()
print("Are you done yet?")
result = await kid_task.join()
print("Result:", result)
if __name__ == '__main__':
curio.run(parent)
curio.spawn()
launches a concurrent task. The join()
method waits
for it to finish and returns its result. The following output is produced:
bash % python3 hello.py
Getting around to doing my homework
T-minus 10
T-minus 9
...
T-minus 2
T-minus 1
Are you done yet?
.... hangs ....
At this point, the child is busy for the next 990 seconds, the parent
is blocked on join()
and nothing seems to be happening. Change
run()
to enable the monitor:
if __name__ == '__main__':
curio.run(parent, with_monitor=True)
Re-run the program and launch the monitor in a separate window:
bash % python3 -m curio.monitor
Curio Monitor: 4 tasks running
Type help for commands
curio >
Type ps
to view the status of each task:
curio > ps
Task State Cycles Timeout Sleep Task
------ ------------ ---------- ------- ------- --------------------------------------------------
1 READ_WAIT 1 None None Kernel._make_kernel_runtime.<locals>._kernel_task
3 FUTURE_WAIT 1 None None Monitor.monitor_task
4 TASK_JOIN 2 None None parent
5 TIME_SLEEP 1 None 970.186 kid
curio >
Here, you see the parent waiting to join and the kid sleeping.
Use the where
command to get a stack trace:
curio > w 4
Stack for Task(id=4, name='parent', state='TASK_JOIN') (most recent call last):
File "hello.py", line 24, in parent
result = await kid_task.join()
curio > w 5
Stack for Task(id=5, name='kid', state='TIME_SLEEP') (most recent call last):
File "hello.py", line 14, in kid
await curio.sleep(1000)
curio >
A timeout can be applied to any operation and tasks can be cancelled. Change the program as follows:
async def parent():
kid_task = await curio.spawn(kid, 37, 42)
count_task = await curio.spawn(countdown, 10)
await count_task.join()
print("Are you done yet?")
try:
result = await curio.timeout_after(10, kid_task.join)
print("Result:", result)
except curio.TaskTimeout as e:
print("We've got to go!")
await kid_task.cancel()
Likewise, cancellation can be caught. For example:
async def kid(x, y):
try:
print('Getting around to doing my homework')
await curio.sleep(1000)
return x * y
except curio.CancelledError:
print("No go diggy die!")
raise
Now the program produces this output:
bash % python3 hello.py
Getting around to doing my homework
T-minus 10
T-minus 9
...
T-minus 2
T-minus 1
Are you done yet?
We've got to go!
No go diggy die!
bash %
This is the basic gist of tasks. You can create tasks, join tasks, and cancel tasks.
Task Groups¶
Suppose you want the countdown
and kid
tasks to have a race.
That is, have them run concurrently, but whichever
one finishes first wins–cancelling the other task. This kind of coordination
is handled by a TaskGroup
. Change the parent()
function to this:
async def parent():
async with curio.TaskGroup(wait=any) as g:
await g.spawn(kid, 37, 42)
await g.spawn(countdown, 10)
if g.result is None:
print("Why didn't you finish?")
else:
print("Result:", g.result)
Here, a task group waits for any spawned task to finish (the
wait=any
argument). When this occurs, the losing task is
cancelled. The result
attribute of the group contains the result
of the task that won.
Running this code, you will either get output similar to this:
Getting around to doing my homework
T-minus 10
T-minus 9
T-minus 8
T-minus 7
Result: 1554
or you will get this if the kid()
took too long:
Getting around to doing my homework
T-minus 10
T-minus 9
...
T-minus 2
T-minus 1
No go diggy die!
Why didn't you finish?
A critical feature of a task group is that all created tasks will have completed or been cancelled when control-flow leaves the managed block–no child left behind.
Long-Running Operations¶
Suppose that kid()
involves an inefficient computation
of Fibonacci numbers:
def fib(n):
if n < 2:
return 1
else:
return fib(n-1) + fib(n-2)
async def kid(x, y):
try:
print('Getting around to doing my homework')
return fib(x) * fib(y)
except curio.CancelledError:
print("No go diggy die!")
raise
async def parent():
async with curio.TaskGroup(wait=any) as g:
await g.spawn(kid, 37, 42)
await g.spawn(countdown, 10)
if g.result is None:
print("Why didn't you finish?")
else:
print("Result:", g.result)
if __name__ == '__main__':
curio.run(parent, with_monitor=True)
If you run this version, everything becomes unresponsive and you
see no output. The problem is that fib()
takes over the CPU and
never yields. Important lesson: Curio DOES NOT provide preemptive
scheduling. If a task decides to compute large Fibonacci numbers or
mine bitcoins, everything blocks. Don’t do that.
For other tasks to make progress, you must modify kid()
to carry
out computationally intensive work elsewhere. Change the code to use
curio.run_in_process()
:
async def kid(x, y):
try:
print('Getting around to doing my homework')
fx = await curio.run_in_process(fib, x)
fy = await curio.run_in_process(fib, y)
return fx * fy
except curio.CancelledError:
print("No go diggy die!")
raise
With this change, you’ll see the countdown task running and
the kid task is cancelled if it takes too long (you might need
to greatly increase the countdown duration). Coincidentally, you
execute the two fib()
calculations in parallel on two CPUs using
spawn()
like this:
async def kid(x, y):
try:
print('Getting around to doing my homework')
async with curio.TaskGroup() as g:
tx = await g.spawn(curio.run_in_process, fib, x)
ty = await g.spawn(curio.run_in_process, fib, y)
return tx.result * ty.result
except curio.CancelledError:
print("Guess I'll fail!")
raise
The blocking problem also applies to I/O operations. For
example, suppose kid()
was modified to use a Fibonacci microservice:
import requests
def fib(n):
r = requests.get(f'http://www.dabeaz.com/cgi-bin/fib.py?n={n}')
resp = r.json()
return int(resp['value'])
The popular requests
library knows nothing of Curio. As such, it blocks
everything waiting for a response. Since it’s waiting for I/O (as opposed to
performing heavy CPU work), you can use curio.run_in_thread()
like this:
async def kid(x, y):
try:
print('Getting around to doing my homework')
fx = await curio.run_in_thread(fib, x)
fy = await curio.run_in_thread(fib, y)
return fx*fy
except curio.CancelledError:
print("No go diggy die!")
raise
As a rule of thumb, use processes for computationally intensive operations and use threads for I/O bound operations.
An Echo Server¶
A common use of Curio is network programming. Here is an echo server:
from curio import run, tcp_server
async def echo_client(client, addr):
print('Connection from', addr)
while True:
data = await client.recv(1000)
if not data:
break
await client.sendall(data)
print('Connection closed')
if __name__ == '__main__':
run(tcp_server, '', 25000, echo_client)
Run this program and connect to it using nc
or telnet
. You’ll
see the program echoing back data to you:
bash % nc localhost 25000
Hello (you type)
Hello (response)
Is anyone there? (you type)
Is anyone there? (response)
^C
bash %
In this program, the client
argument to echo_client()
is a
socket. It supports all of the usual I/O operations, but they are asynchronous
and should be prefaced by await
. If you prefer, you can perform
I/O using a file-like interface by converting the socket to
a stream like this:
async def echo_client(client, addr):
print("Connection from", addr)
async with client.as_stream() as s
async for line in s:
await s.write(line)
print('Connection closed')
if __name__ == '__main__':
run(tcp_server, '', 25000, echo_client)
Intertask Communication¶
If tasks need to communicate, use a Queue
. Here’s an example
of a publish-subscribe service:
from curio import run, TaskGroup, Queue, sleep
messages = Queue()
subscribers = set()
# Dispatch task that forwards incoming messages to subscribers
async def dispatcher():
async for msg in messages:
for q in list(subscribers):
await q.put(msg)
# Publish a message
async def publish(msg):
await messages.put(msg)
# A sample subscriber task
async def subscriber(name):
queue = Queue()
subscribers.add(queue)
try:
async for msg in queue:
print(name, 'got', msg)
finally:
subscribers.discard(queue)
# A sample producer task
async def producer():
for i in range(10):
await publish(i)
await sleep(0.1)
async def main():
async with TaskGroup() as g:
await g.spawn(dispatcher)
await g.spawn(subscriber, 'child1')
await g.spawn(subscriber, 'child2')
await g.spawn(subscriber, 'child3')
ptask = await g.spawn(producer)
await ptask.join()
await g.cancel_remaining()
if __name__ == '__main__':
run(main)
A Chat Server¶
Combining sockets and queues, you can implement a small chat server. For example:
from curio import run, spawn, TaskGroup, Queue, tcp_server
messages = Queue()
subscribers = set()
async def dispatcher():
while True:
msg = await messages.get()
for q in subscribers:
await q.put(msg)
async def publish(msg):
await messages.put(msg)
# Task that writes chat messages to clients
async def outgoing(client_stream):
queue = Queue()
try:
subscribers.add(queue)
while True:
name, msg = await queue.get()
await client_stream.write(name + b':' + msg)
finally:
subscribers.discard(queue)
# Task that reads chat messages and publishes them
async def incoming(client_stream, name):
async for line in client_stream:
await publish((name, line))
async def chat_handler(client, addr):
print('Connection from', addr)
async with client:
client_stream = client.as_stream()
await client_stream.write(b'Your name: ')
name = (await client_stream.readline()).strip()
await publish((name, b'joined\n'))
async with TaskGroup(wait=any) as workers:
await workers.spawn(outgoing, client_stream)
await workers.spawn(incoming, client_stream, name)
await publish((name, b'has gone away\n'))
print('Connection closed')
async def chat_server(host, port):
async with TaskGroup() as g:
await g.spawn(dispatcher)
await g.spawn(tcp_server, host, port, chat_handler)
if __name__ == '__main__':
run(chat_server('', 25000))
In this code, each connection results in two tasks (incoming
and
outgoing
). The incoming
task reads incoming lines and publishes
them. The outgoing
task subscribes to the feed and sends outgoing
messages. The workers
task group supervises these two tasks. If any
one of them terminates, the other task is cancelled right away.
The chat_server
task launches both the dispatcher
and a tcp_server
task and watches them. If cancelled, both of those tasks will be shut down.
Programming Advice¶
At this point, you have the core concepts. Here are a few tips:
- Think thread programming and synchronous code. Tasks execute like threads and programming techniques applied to threads apply to Curio.
- Curio uses the same I/O abstractions as in synchronous code (e.g., sockets, files, etc.).
Methods have the same names and perform the same functions. Just don’t forget to
add the extra
await
keyword. - Be extra wary of calls that do not use an explicit
await
. Although they will work, they could block progress of all other tasks. If you know that this is possible, use therun_in_process()
orrun_in_thread()
functions.
Debugging Tips¶
A common mistake is forgetting await
. For example:
async def countdown(n):
while n > 0:
print('T-minus', n)
curio.sleep(5) # Missing await
n -= 1
This usually produces a warning message:
example.py:8: RuntimeWarning: coroutine 'sleep' was never awaited
To debug running programs, use the monitor:
import curio
...
run(..., with_monitor=True)
The monitor shows the state of each task and can show stack traces.
To enter the monitor, run python3 -m curio.monitor
in a separate window.
The traceback()
method creates a stack trace that can be printed
or logged. For example:
print("Where are you?")
print(task.traceback())
Scheduler tracing can be enabled with code like this:
from curio.debug import schedtrace
import logging
logging.basicConfig(level=logging.DEBUG)
run(..., debug=schedtrace)
If you want even more detail, use traptrace
instead of schedtrace
.
More Information¶
The reference manual is found at https://curio.readthedocs.io/en/latest/reference.html.
Programming recipes are found at https://curio.readthedocs.io/en/latest/howto.html.
Watch https://www.youtube.com/watch?v=Y4Gt3Xjd7G8 to learn about the theory of operation.