wiki

lowering async await in rust

We have a simple program (playground link) built with rust’s async/await feature.

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    sleep(Duration::from_secs(1)).await;
}

We want to understand all the magic rust compiler did to make this come to life. Here are a few references:

Below (adapted from here) is roughly how rust compiler compiles the rust source code into machine code.

We will dive into the code generation process of async/await in a moment.

High level intermediate representations

Let’s first try to expand all the macros with cargo-expand.

> cargo expand
    Checking libc v0.2.112
    Checking log v0.4.14
    Checking memchr v2.4.1
    Checking parking_lot_core v0.8.5
    Checking signal-hook-registry v1.4.0
    Checking num_cpus v1.13.1
    Checking mio v0.7.14
    Checking parking_lot v0.11.2
    Checking tokio v1.15.0
    Checking generator v0.1.0 (/home/by/Workspace/playground/rust/generator)
    Finished dev [unoptimized + debuginfo] target(s) in 4.48s

#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use tokio::time::{sleep, Duration};
fn main() {
    let body = async {
        sleep(Duration::from_secs(1)).await;
    };
    #[allow(clippy::expect_used)]
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .expect("Failed building the Runtime")
        .block_on(body);
}

We can see that the async main function is replaced with a variable called body. We now have a synchronous main function which stops at a block_on function, whose signature shows that it accepts a future.

How does this async {sleep(Duration::from_secs(1)).await;} turn out to be a future?

Let’s go lower, and expand this program to the High-Level Intermediate Representation (HIR). HIR does not actually have canonical text representation. We copy the text representation from the playground. It is slightly edited with some style changes.

#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use tokio::time::{};
use tokio::time::sleep;
use tokio::time::Duration;

fn main() {
    let body =
        #[lang = "from_generator"](|mut _task_context|
            {
                match #[lang = "into_future"](sleep(Duration::from_secs(1)))
                    {
                    mut pinned => loop {
                        match unsafe {
                                  #[lang = "poll"](#[lang = "new_unchecked"](&mut pinned),
                                                   #[lang = "get_context"](_task_context))
                        } {
                            #[lang = "Ready"] { 0: result } => break result,
                            #[lang = "Pending"] {} => {}
                        }
                        _task_context = yield ();
                    },
                };
            });

    #[allow(clippy :: expect_used)]
    tokio::runtime::Builder::new_multi_thread().enable_all().build().expect("Failed building the Runtime").block_on(body);
}

There are quite a few lang_items in this snippet.

We view those lang_items as compiler plugins to generate some specific codes (maybe from some specific inputs). For example, the lang_item from_generator is used to generate a future from a generator. We used a few lang_items in our example. Here is a list of all the lang items.

In our case, lowering to HIR is basically a combination of expanding async in async fn main { body } and expanding await in future.await, where the body is our async main function, and future is our sleeping task.

These two expansion are accomplished by make_async_expr and lower_expr_await.

make_async_expr takes an async function or an async block, and converts it to a future. Below is its comment.

Lower an `async` construct to a generator that is then wrapped so it implements `Future`.

This results in:

```text
std::future::from_generator(static move? |_task_context| -> <ret_ty> {
    <body>
})
```

lower_expr_await desugar the expression into part of a generator. Below is its comment.

Desugar `<expr>.await` into:
```rust
match ::std::future::IntoFuture::into_future(<expr>) {
    mut pinned => loop {
        match unsafe { ::std::future::Future::poll(
            <::std::pin::Pin>::new_unchecked(&mut pinned),
            ::std::future::get_context(task_context),
        ) } {
            ::std::task::Poll::Ready(result) => break result,
            ::std::task::Poll::Pending => {}
        }
        task_context = yield ();sb
    }
}
```

Substitute all the variable values, body is then set to

std::future::from_generator(static move? |task_context| -> () {
    match ::std::future::IntoFuture::into_future(sleep(Duration::from_secs(1))) {
        mut pinned => loop {
            match unsafe { ::std::future::Future::poll(
                <::std::pin::Pin>::new_unchecked(&mut pinned),
                ::std::future::get_context(task_context),
            ) } {
                ::std::task::Poll::Ready(result) => break result,
                ::std::task::Poll::Pending => {}
            }
            task_context = yield ();
        }
    }
})

We will come to the task_context thing in a later point. For now, we are satisfied with the fact that, task_context is passed from the async runtime and it is used by the reactor to notify the executor a future is ready to continue.

The argument of from_generator seems to be a closure, but it is a generator. The secret lies in the yield statement.

Generator code generation

What is this yield thing? We have encountered yield in other languages. Legend has it that in programming languages with cooperative multitasking feature, when one procedure runs to the yielding point it automagically gives up its control of the CPU so that other tasks can continue, and when other procedures yield, it have a chance to continue. But how? Frequently it is implemented with setjmp/longjmp. What about rust? Is it using mechanism like that?

Let’s go lower to Rust’s Mid-level Intermediate Representation (MIR) with RUSTFLAGS="--emit mir" cargo -v run. Below is MIR of the generated coroutine of the async main function (found in the path target/debug/deps/*.mir).

fn main::{closure#0}(_1: Pin<&mut [static generator@src/main.rs:4:17: 6:2]>, _2: ResumeTy) -> GeneratorState<(), ()> {
    debug _task_context => _18;          // in scope 0 at src/main.rs:4:17: 6:2
    let mut _0: std::ops::GeneratorState<(), ()>; // return place in scope 0 at src/main.rs:4:17: 6:2
    let mut _3: tokio::time::Sleep;      // in scope 0 at src/main.rs:5:34: 5:40
    let mut _4: tokio::time::Sleep;      // in scope 0 at src/main.rs:5:5: 5:34
    let mut _5: std::time::Duration;     // in scope 0 at src/main.rs:5:11: 5:33
    let mut _6: std::task::Poll<()>;     // in scope 0 at src/main.rs:5:34: 5:40
    let mut _7: std::pin::Pin<&mut tokio::time::Sleep>; // in scope 0 at src/main.rs:5:34: 5:40
    let mut _8: &mut tokio::time::Sleep; // in scope 0 at src/main.rs:5:34: 5:40
    let mut _9: &mut tokio::time::Sleep; // in scope 0 at src/main.rs:5:34: 5:40
    let mut _10: &mut std::task::Context; // in scope 0 at src/main.rs:5:34: 5:40
    let mut _11: &mut std::task::Context; // in scope 0 at src/main.rs:5:34: 5:40
    let mut _12: std::future::ResumeTy;  // in scope 0 at src/main.rs:5:34: 5:40
    let mut _13: isize;                  // in scope 0 at src/main.rs:5:34: 5:40
    let mut _15: std::future::ResumeTy;  // in scope 0 at src/main.rs:5:34: 5:40
    let mut _16: ();                     // in scope 0 at src/main.rs:5:34: 5:40
    let mut _17: ();                     // in scope 0 at src/main.rs:4:17: 6:2
    let mut _18: std::future::ResumeTy;  // in scope 0 at src/main.rs:4:17: 6:2
    let mut _19: u32;                    // in scope 0 at src/main.rs:4:17: 6:2
    scope 1 {
        debug pinned => (((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep); // in scope 1 at src/main.rs:5:34: 5:40
        let _14: ();                     // in scope 1 at src/main.rs:5:34: 5:40
        scope 2 {
        }
        scope 3 {
            debug result => _14;         // in scope 3 at src/main.rs:5:34: 5:40
        }
    }

    bb0: {
        _19 = discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))); // scope 0 at src/main.rs:4:17: 6:2
        switchInt(move _19) -> [0_u32: bb1, 1_u32: bb17, 2_u32: bb16, 3_u32: bb15, otherwise: bb18]; // scope 0 at src/main.rs:4:17: 6:2
    }

    bb1: {
        _18 = move _2;                   // scope 0 at src/main.rs:4:17: 6:2
        _5 = Duration::from_secs(const 1_u64) -> [return: bb2, unwind: bb14]; // scope 0 at src/main.rs:5:11: 5:33
                                         // mir::Constant
                                         // + span: src/main.rs:5:11: 5:30
                                         // + literal: Const { ty: fn(u64) -> std::time::Duration {std::time::Duration::from_secs}, val: Value(Scalar(<ZST>)) }
    }

    bb2: {
        _4 = tokio::time::sleep(move _5) -> [return: bb3, unwind: bb14]; // scope 0 at src/main.rs:5:5: 5:34
                                         // mir::Constant
                                         // + span: src/main.rs:5:5: 5:10
                                         // + literal: Const { ty: fn(std::time::Duration) -> tokio::time::Sleep {tokio::time::sleep}, val: Value(Scalar(<ZST>)) }
    }

    bb3: {
        _3 = <Sleep as IntoFuture>::into_future(move _4) -> [return: bb4, unwind: bb14]; // scope 0 at src/main.rs:5:34: 5:40
                                         // mir::Constant
                                         // + span: src/main.rs:5:34: 5:40
                                         // + literal: Const { ty: fn(tokio::time::Sleep) -> <tokio::time::Sleep as std::future::IntoFuture>::Future {<tokio::time::Sleep as std::future::IntoFuture>::into_future}, val: Value(Scalar(<ZST>)) }
    }

    bb4: {
        (((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep) = move _3; // scope 0 at src/main.rs:5:34: 5:40
        goto -> bb5;                     // scope 1 at src/main.rs:5:34: 5:40
    }

    bb5: {
        _9 = &mut (((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep); // scope 2 at src/main.rs:5:34: 5:40
        _8 = &mut (*_9);                 // scope 2 at src/main.rs:5:34: 5:40
        _7 = Pin::<&mut Sleep>::new_unchecked(move _8) -> [return: bb6, unwind: bb13]; // scope 2 at src/main.rs:5:34: 5:40
                                         // mir::Constant
                                         // + span: src/main.rs:5:34: 5:40
                                         // + literal: Const { ty: unsafe fn(&mut tokio::time::Sleep) -> std::pin::Pin<&mut tokio::time::Sleep> {std::pin::Pin::<&mut tokio::time::Sleep>::new_unchecked}, val: Value(Scalar(<ZST>)) }
    }

    bb6: {
        _12 = _18;                       // scope 2 at src/main.rs:5:34: 5:40
        _11 = get_context(move _12) -> [return: bb7, unwind: bb13]; // scope 2 at src/main.rs:5:34: 5:40
                                         // mir::Constant
                                         // + span: src/main.rs:5:34: 5:40
                                         // + literal: Const { ty: unsafe fn(std::future::ResumeTy) -> &mut std::task::Context {std::future::get_context}, val: Value(Scalar(<ZST>)) }
    }

    bb7: {
        _10 = &mut (*_11);               // scope 2 at src/main.rs:5:34: 5:40
        _6 = <Sleep as Future>::poll(move _7, move _10) -> [return: bb8, unwind: bb13]; // scope 2 at src/main.rs:5:34: 5:40
                                         // mir::Constant
                                         // + span: src/main.rs:5:34: 5:40
                                         // + literal: Const { ty: for<'r, 's, 't0> fn(std::pin::Pin<&'r mut tokio::time::Sleep>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<tokio::time::Sleep as std::future::Future>::Output> {<tokio::time::Sleep as std::future::Future>::poll}, val: Value(Scalar(<ZST>)) }
    }

    bb8: {
        _13 = discriminant(_6);          // scope 1 at src/main.rs:5:34: 5:40
        switchInt(move _13) -> [0_isize: bb11, 1_isize: bb9, otherwise: bb10]; // scope 1 at src/main.rs:5:34: 5:40
    }

    bb9: {
        ((_0 as Yielded).0: ()) = move _16; // scope 1 at src/main.rs:5:34: 5:40
        discriminant(_0) = 0;            // scope 1 at src/main.rs:5:34: 5:40
        discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))) = 3; // scope 1 at src/main.rs:5:34: 5:40
        return;                          // scope 1 at src/main.rs:5:34: 5:40
    }

    bb10: {
        unreachable;                     // scope 1 at src/main.rs:5:34: 5:40
    }

    bb11: {
        _14 = ((_6 as Ready).0: ());     // scope 1 at src/main.rs:5:34: 5:40
        drop((((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep)) -> [return: bb12, unwind: bb14]; // scope 0 at src/main.rs:5:39: 5:40
    }

    bb12: {
        _17 = const ();                  // scope 0 at src/main.rs:4:17: 6:2
        ((_0 as Complete).0: ()) = move _17; // scope 0 at src/main.rs:6:2: 6:2
        discriminant(_0) = 1;            // scope 0 at src/main.rs:6:2: 6:2
        discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))) = 1; // scope 0 at src/main.rs:6:2: 6:2
        return;                          // scope 0 at src/main.rs:6:2: 6:2
    }

    bb13 (cleanup): {
        drop((((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep)) -> bb14; // scope 0 at src/main.rs:5:39: 5:40
    }

    bb14 (cleanup): {
        discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))) = 2; // scope 0 at src/main.rs:4:17: 6:2
        resume;                          // scope 0 at src/main.rs:4:17: 6:2
    }

    bb15: {
        _15 = move _2;                   // scope 0 at src/main.rs:4:17: 6:2
        _18 = move _15;                  // scope 1 at src/main.rs:5:34: 5:40
        goto -> bb5;                     // scope 1 at src/main.rs:5:34: 5:40
    }

    bb16: {
        assert(const false, "`async fn` resumed after panicking") -> bb16; // scope 0 at src/main.rs:4:17: 6:2
    }

    bb17: {
        assert(const false, "`async fn` resumed after completion") -> bb17; // scope 0 at src/main.rs:4:17: 6:2
    }

    bb18: {
        unreachable;                     // scope 0 at src/main.rs:4:17: 6:2
    }
}

We can generate a control flow graph of the generated coroutine with RUSTFLAGS="-Z dump-mir=main -Z dump-mir-graphviz -Z dump-mir-dataflow -Z dump-mir-spanview --emit=mir" cargo -v run.

The entry point of this generated coroutine is basic block bb0 (the block 0 in the above diagram).

bb0: {
    _19 = discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))); // scope 0 at src/main.rs:4:17: 6:2
    switchInt(move _19) -> [0_u32: bb1, 1_u32: bb17, 2_u32: bb16, 3_u32: bb15, otherwise: bb18]; // scope 0 at src/main.rs:4:17: 6:2
}

bb0 first finds out the current state of the generated generator (variable _1 in the second line). The current state is a enum, whose branches are identified by the discriminants, tags prepended to the actual payload. Below is a llvm itermediate representation to obtain a discriminant.

%2 = bitcast %"[static generator@src/sleep1.rs:4:17: 6:2]"* %1 to %"[static generator@src/sleep1.rs:4:17: 6:2]::Suspend0"*
%3 = bitcast %"[static generator@src/sleep1.rs:4:17: 6:2]::Suspend0"* %2 to %"tokio::time::driver::sleep::Sleep"*
call void @llvm.dbg.declare(metadata %"[static generator@src/sleep1.rs:4:17: 6:2]"** %_1.dbg.spill, metadata !3453, metadata !DIExpression(DW_OP_deref)), !dbg !3457
call void @llvm.dbg.declare(metadata {}* %result.dbg.spill, metadata !3455, metadata !DIExpression()), !dbg !3458
%4 = getelementptr inbounds %"[static generator@src/sleep1.rs:4:17: 6:2]", %"[static generator@src/sleep1.rs:4:17: 6:2]"* %_1, i32 0, i32 1, !dbg !3459
%5 = load i8, i8* %4, align 128, !dbg !3459, !range !232
%_23 = zext i8 %5 to i32, !dbg !3459

Our program decides jumping to which basic block based on the state’s current discriminant. For example, when the discriminant is 0, the program jumps to bb1. Some branch is unreachable because those discriminants are just not possible to have those values (the otherwise branch above). Some states (the 1_u32 and 2_u32 branches above) are malformed. The state 0_u32 means that we just get started. The state 3_u32 means that polling is already started, but the task is not finished yet. When the sleeping task is finished, the state is transitioned to 1_u32.

Let’s look at an exemplary state transition.

bb6: {
    _12 = _18;                       // scope 2 at src/main.rs:5:34: 5:40
    _11 = get_context(move _12) -> [return: bb7, unwind: bb13]; // scope 2 at src/main.rs:5:34: 5:40
                                     // mir::Constant
                                     // + span: src/main.rs:5:34: 5:40
                                     // + literal: Const { ty: unsafe fn(std::future::ResumeTy) -> &mut std::task::Context {std::future::get_context}, val: Value(Scalar(<ZST>)) }
}

bb7: {
    _10 = &mut (*_11);               // scope 2 at src/main.rs:5:34: 5:40
    _6 = <Sleep as Future>::poll(move _7, move _10) -> [return: bb8, unwind: bb13]; // scope 2 at src/main.rs:5:34: 5:40
                                     // mir::Constant
                                     // + span: src/main.rs:5:34: 5:40
                                     // + literal: Const { ty: for<'r, 's, 't0> fn(std::pin::Pin<&'r mut tokio::time::Sleep>, &'s mut std::task::Context<'t0>) -> std::task::Poll<<tokio::time::Sleep as std::future::Future>::Output> {<tokio::time::Sleep as std::future::Future>::poll}, val: Value(Scalar(<ZST>)) }
}

bb8: {
    _13 = discriminant(_6);          // scope 1 at src/main.rs:5:34: 5:40
    switchInt(move _13) -> [0_isize: bb11, 1_isize: bb9, otherwise: bb10]; // scope 1 at src/main.rs:5:34: 5:40
}

bb9: {
    ((_0 as Yielded).0: ()) = move _16; // scope 1 at src/main.rs:5:34: 5:40
    discriminant(_0) = 0;            // scope 1 at src/main.rs:5:34: 5:40
    discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))) = 3; // scope 1 at src/main.rs:5:34: 5:40
    return;                          // scope 1 at src/main.rs:5:34: 5:40
}

bb11: {
    _14 = ((_6 as Ready).0: ());     // scope 1 at src/main.rs:5:34: 5:40
    drop((((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2])) as variant#3).0: tokio::time::Sleep)) -> [return: bb12, unwind: bb14]; // scope 0 at src/main.rs:5:39: 5:40
}

bb12: {
    _17 = const ();                  // scope 0 at src/main.rs:4:17: 6:2
    ((_0 as Complete).0: ()) = move _17; // scope 0 at src/main.rs:6:2: 6:2
    discriminant(_0) = 1;            // scope 0 at src/main.rs:6:2: 6:2
    discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 6:2]))) = 1; // scope 0 at src/main.rs:6:2: 6:2
    return;                          // scope 0 at src/main.rs:6:2: 6:2
}

bb6 and bb7 obtains the result of the poll function of the sleeping future. Depending on whether the sleeping task is finished, the control flow may go from bb8 to bb9 (which sets the state to be 3) or bb11 and bb12 (which sets the state to be 1).

In summary, the rust compiler generates a closure which captures the state of the async block. The state transition is driven by repeated execution of this closure. The pausing of a coroutine is just an early return on no final results, while the resumption is just a rerun of the closure.

To make this more clear, let’s add one more suspension point (playground link).

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    sleep(Duration::from_secs(1)).await;
    sleep(Duration::from_secs(1)).await;
}

This time the entry point has one more branches to go. A new state 4_u32, which represents the time gap between the first future finished and the second future still running, is created.

bb0: {
    _33 = discriminant((*(_1.0: &mut [static generator@src/main.rs:4:17: 7:2]))); // scope 0 at src/main.rs:4:17: 7:2
    switchInt(move _33) -> [0_u32: bb1, 1_u32: bb30, 2_u32: bb29, 3_u32: bb27, 4_u32: bb28, otherwise: bb31]; // scope 0 at src/main.rs:4:17: 7:2
}

Bridging generators to futures

One final thing for the rust compiler, the async runtime accepts only futures. Fortunately, it’s quite simple to convert a generator to a future.

The from_generator function does exactly this.

/// Wrap a generator in a future.
///
/// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
/// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
// This is `const` to avoid extra errors after we recover from `const async fn`
#[lang = "from_generator"]
#[doc(hidden)]
#[unstable(feature = "gen_future", issue = "50547")]
#[rustc_const_unstable(feature = "gen_future", issue = "50547")]
#[inline]
pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return>
where
    T: Generator<ResumeTy, Yield = ()>,
{
    #[rustc_diagnostic_item = "gen_future"]
    struct GenFuture<T: Generator<ResumeTy, Yield = ()>>(T);

    // We rely on the fact that async/await futures are immovable in order to create
    // self-referential borrows in the underlying generator.
    impl<T: Generator<ResumeTy, Yield = ()>> !Unpin for GenFuture<T> {}

    impl<T: Generator<ResumeTy, Yield = ()>> Future for GenFuture<T> {
        type Output = T::Return;
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            // SAFETY: Safe because we're !Unpin + !Drop, and this is just a field projection.
            let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };

            // Resume the generator, turning the `&mut Context` into a `NonNull` raw pointer. The
            // `.await` lowering will safely cast that back to a `&mut Context`.
            match gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) {
                GeneratorState::Yielded(()) => Poll::Pending,
                GeneratorState::Complete(x) => Poll::Ready(x),
            }
        }
    }

    GenFuture(gen)
}

As we can see, what the poll function for a generator future does is essentially data conversion. One delicate yet crucial matter is the before-mentioned parameter task_context.

Recall the generated coroutine is something like

|task_context| -> () {
    match ::std::future::IntoFuture::into_future(sleep(Duration::from_secs(1))) {
        mut pinned => loop {
            match unsafe { ::std::future::Future::poll(
                <::std::pin::Pin>::new_unchecked(&mut pinned),
                ::std::future::get_context(task_context),
            ) } {
                ::std::task::Poll::Ready(result) => break result,
                ::std::task::Poll::Pending => {}
            }
            task_context = yield ();
        }
    }
}

The generator takes an argument task_context and in the suspension point we have a peculiar statement task_context = yield (). Who is this task_context and where did it come from?

It turns out, task_context is just the argument passed from resume function. To illustrate this, let’s generate a generator gen, which is something like

|closure_args| -> () {
    do_something();
    yield_args = yield ();
}

When we run gen.resume(args_1), the generator’s closure_args is set to args_1. Then, when we run gen.resume(args_2), the yield_args is set to args_2. So in our case, when the async runtime calls gen.resume(ResumeTy(NonNull::from(cx).cast::<Context<'static>>())) task_context is repeatedly set to ResumeTy(NonNull::from(cx).cast::<Context<'static>>()), which is nothing but a wrapper of cx, a futures::task::Context. In this way, futures inside the generator can inform the executor when they are ready to make progress (see The Future Trait - Asynchronous Programming in Rust for more information).