Tokio Notes
1. Introduction to Tokio
Tokio is an asynchronous runtime for Rust, designed for building fast and reliable network applications. It provides async/await support, efficient task scheduling, and various utilities for working with async operations.
In async rust, it has a future trait, that implements poll method, and has two values done and pending.
2. Setting Up Tokio
To use Tokio in your Rust project, add the following to Cargo.toml
:
3. Basic Async Runtime
A Tokio async main function must be annotated with #[tokio::main]
:
Alternatively, you can manually create a runtime:
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Manually managed runtime");
});
}
4. Spawning Async Tasks
5. Async Sleep
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Sleeping for 2 seconds...");
sleep(Duration::from_secs(2)).await;
println!("Awake now!");
}
6. Using Tokio Channels
Tokio provides async channels for communication between tasks:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
tx.send("Hello from task").await.unwrap();
});
let msg = rx.recv().await.unwrap();
println!("Received: {msg}");
}
7. Handling Concurrency with Join
async fn task1() {
println!("Task 1");
}
async fn task2() {
println!("Task 2");
}
#[tokio::main]
async fn main() {
tokio::join!(task1(), task2());
}
8. Using Tokio Mutex
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
let data_cloned = data.clone();
tokio::spawn(async move {
let mut num = data_cloned.lock().await;
*num += 1;
}).await.unwrap();
println!("Final value: {}", *data.lock().await);
}
9. Streaming with Tokio
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);
let mut stream = tokio_stream::wrappers::ReceiverStream::new(rx);
tokio::spawn(async move {
tx.send(42).await.unwrap();
});
while let Some(value) = stream.next().await {
println!("Received: {value}");
}
}
10. Using Tokio with HTTP (reqwest)
use reqwest;
#[tokio::main]
async fn main() {
let resp = reqwest::get("https://api.github.com/repos/tokio-rs/tokio")
.await
.unwrap()
.text()
.await
.unwrap();
println!("Response: {resp}");
}
11. Using Tokio Runtime Without #[tokio::main]
use tokio::runtime::Runtime;
fn main() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
println!("Using Tokio without #[tokio::main]");
});
}
12. Tokio File IO with Async Read/Write
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut file = File::create("test.txt").await?;
file.write_all(b"Hello, Tokio!").await?;
let mut file = File::open("test.txt").await?;
let mut contents = vec![];
file.read_to_end(&mut contents).await?;
println!("File contents: {:?}", String::from_utf8_lossy(&contents));
Ok(())
}
13. Tokio Tasks with Timeouts
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let result = timeout(Duration::from_secs(2), async {
tokio::time::sleep(Duration::from_secs(3)).await;
"Completed"
}).await;
match result {
Ok(val) => println!("Task finished: {val}"),
Err(_) => println!("Task timed out"),
}
}
14. Tokio Signal Handling
use tokio::signal;
#[tokio::main]
async fn main() {
println!("Press Ctrl+C to exit");
signal::ctrl_c().await.unwrap();
println!("Shutting down");
}
15. Tokio Thread Pool
use tokio::runtime::Builder;
fn main() {
let rt = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Using a multi-threaded Tokio runtime");
});
}
A good example
use tokio::time::{sleep, Duration};
use futures::future::join_all;
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut workers: Vec<tokio::task::JoinHandle<()>> = Vec::new();
for i in 0..2 {
let w = rt.spawn(worker(i));
workers.push(w);
}
separator();
// Wait for all workers to complete
rt.block_on(async {
join_all(workers).await;
});
separator();
println!("All done. Bye bye!");
}
async fn worker(id: u32) {
for i in 1..100 {
println!("Worker-{id} is working on idx-{i}");
sleep(Duration::from_millis(10)).await;
}
}
fn separator(){
let sep = String::from("=").repeat(50);
println!("{sep}");
}