The Tokio Upgrade from 0.2 to 1.x

At Fly.io, we run a Rust-based load-balancer which handles almost all of our traffic. It stands on the shoulders of Tokio and Hyper. When the Tokio team announced 0.3 and then 1.0, we figured we'd have to upgrade sooner than later to access related crate upgrades. The Rust ecosystem moves pretty fast and we wanted to be able to keep our software up to date.

After a few weeks of patience, most of the ecosystem had already been upgraded. I believe this was a well-coordinated move from the Tokio team, making sure a good chunk of the ecosystem was going to be available when (or soon after) they released 1.0. We use hyper, reqwest, async-compression, tokio-io-timeout, tokio-tungstenite and sentry in addition to tokio, so we waited until they were ready.

Differences with tokio 0.2

Here are some notes about the Tokio upgrade. They're by no means exhaustive.

No implementations of Stream

With Tokio 1.0, the team has decided to forego all Stream implementations until its stabilization in the stdlib.

If you still want Stream's, then you probably should try tokio-stream. It implements Stream for TcpListener, Interval and many more.

If not, most of the time, you can get around this change by looping:

let listener = tokio::net::TcpListener::bind("[::]:0").await?;
loop {
    let conn = listener.accept().await?;
}

Mutable access requirements relaxed

In the previous code snippet, you might've noticed I don't need a mut-able TcpListener. You can now use more of the tokio API without mutable access! That's a welcome change: it reduces much of the locking required in our proxy.

New AsyncRead and AsyncWrite traits

Notably, AsyncRead::poll_read and AsyncWrite::poll_write don't return the number of bytes read/written anymore, just Result<()>. A quick work around is this:

fn poll_read(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>,
    buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
  let before = buf.filled().len();
  // inner poll_read...
  let nread = buf.filled().len() - before;
  // ...
}

More pinning required

Futures likes Sleep and Shutdown are now explicitly !Unpin. If you need to use them multiple times in a tokio::select! (like we do, all the time), then you'll need to either pin them on the heap with Box::pin or pin them on the stack with tokio::pin!. The documentation explains it in more detail.

let sleep = tokio::time::sleep(Duration::from_secs(1));
tokio::pin!(sleep);

loop {
  tokio::select! {
    _ = &mut sleep => {
      println!("timed out after 1 second!");
    },
    _ = &mut fut => {
      println!("future finished");
    }
  }
}

runtime::Handle doesn’t have block_on (yet)

There are discussions around adding back Handle::block_on, but for now, it's left unimplemented.

We needed it for the following pattern:

handle.spawn_blocking(move || handle.block_on(future));

We ended up getting around that restriction by using Arc<Runtime> since block_on doesn't require mutable access to Runtime anymore.

let (tx, rx) = tokio::sync::oneshot();
thread::spawn(||{
  let rt = Arc::new(tokio::runtime::Builder::new_current_thread().build().unwrap());

  // share your Runtime somehow, we send a single-threaded runtime via a oneshot.
  tx.send(rt.clone()).unwrap();

  // Runtime::block_on does not require a `&mut self`
  rt.block_on(async move {
    // do your thing, asynchronously
    // probably want to trigger the end of this runtime somehow (Signal, or other)
  });
});

let rt = rx.await.unwrap();

// now you can use spawn_blocking and block_on
rt.spawn_blocking({
  let rt = rt.clone();
    move || {
    rt.block_on(fut)
  }
});

TcpListener::from_std needs to be set to nonblocking

... or else you'll be surprised that everything hangs when you start accepting connections.

For example, if you use socket2 to fashion a TcpListener with a few more options, you'll need to use tokio::net::TcpListener::from_std(std_listener). Before doing that, you'll want to set_nonblocking(true) on your std listener.

let sock = socket2::Socket::new(
    socket2::Domain::ipv4(),
    socket2::Type::stream(),
    Some(socket2::Protocol::tcp()),
).unwrap();

sock.set_reuse_port(true).unwrap();
sock.set_nonblocking(true).unwrap(); // <---
sock.bind(&sock_addr).unwrap();
sock.listen(10240).unwrap();

tokio::net::TcpListener::from_std(sock.into_tcp_listener()).unwrap();

Come for the Rust esoterica, stay for the hosting

It'll take less than 10 minutes to get almost any container you've got running globally on our Rust-powered anycast proxy network.

Try Fly for free  
The Fly balloon celebrating with confetti

Miscellaneous API changes

  • tokio::time::delay_for -> tokio::time::sleep
  • tokio::sync::watch::Sender::broadcast -> tokio::sync::watch::Sender::send
  • Notify::notify -> Notify::notify_waiters

Hyper WebSockets

WebSockets upgrades are now on the whole request/response instance, not just Body ( body.on_upgrade() vs hyper::upgrade::on(req)). Here's an example.

Hyper now relies on OnUpgrade being present on the Response's or Request's Extensions. We were previously replacing extensions with our own. We had to make sure we copied OnUpgrade into our new Extensions, if present, before overwriting.

Kudos to the Tokio team

This was a major version upgrade, but it wasn't hard at all. Especially with the sporadic question/answer sessions on the very helpful Tokio Discord (hat tip to Alice Ryhl in particular).