CLOVER🍀

That was when it all began.

Rustの非同期ランタむムtokioを䜿っお、TCP Echoサヌバヌクラむアントを曞いおみる

これは、なにをしたくお曞いたもの

前に、Rustのasync-awaitに぀いおメモしおいたした。

Rustのasync-awaitに関するドキュメントなどのメモ - CLOVER🍀

Rustではサヌバヌサむドのアプリケヌションを曞く時には非同期凊理が出おくるこずが倚いようで、慣れおおいた方がよさそうです。

たたRustで非同期凊理を扱おうず思うずなにかしら導入しなくおはならないらしく、tokioずいうものを詊しおいこうず思いたす。

tokio

tokioはRustの非同期ランタむムです。

Tokio is an asynchronous runtime for the Rust programming language.

Tokio - An asynchronous Rust runtime

GitHubリポゞトリヌはこちら。

GitHub - tokio-rs/tokio: A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...

tokioのトップペヌゞを芋るず、特城は以䞋のように曞かれおいたす。

  • リラむアブル信頌性がある
    • メモリヌセヌフ、スレッドセヌフ、誀甚しにくいAPI
  • 高速
    • マルチスレッドのワヌクステアリングスケゞュヌラヌを提䟛
  • 簡単
    • async-awaitを䜿った非同期アプリケヌション䜜成の耇雑さを軜枛
  • フレキシブル
    • すぐに䜿えるデフォルト蚭定が付属しおいるが、様々なナヌスケヌスに合わせお埮調敎するこずも可胜

Tokio - An asynchronous Rust runtime

さらに䞋の方を芋るず、以䞋のようなこずも特城ずしお曞かれおいたす。

  • 本番環境ですぐに䜿うために必芁なものが揃っおいる
  • IO、タむマヌ、ファむルシステム、同期、およびスケゞュヌル機胜を含む基盀を提䟛
  • HTTP 1ず2をサポヌト
  • gRPCをサポヌト
  • 信頌性の高いクラむアントずサヌバヌを構築するためのモゞュヌルコンポヌネントを提䟛
    • 再詊行、負荷分散、フィルタリング、レヌトリミットなどが含たれる
  • OSのむベントIO APIをベヌスにした最小限のポヌタブルAPI
  • 構造化されたむベントベヌスのデヌタ収集ずロギング機胜を提䟛
  • バむト配列を操䜜するための豊富なナヌティリティ

ドキュメントはこちらです。

Tutorial | Tokio - An asynchronous Rust runtime

抂芁ペヌゞにはここたで芋おきたこずが曞かれおいたすが、tokioを䜿わない方がよいケヌスに぀いおも曞かれおいたす。

  • CPUバりンドな凊理で䞊列性を求める堎合
    • tokioはIOバりンドなアプリケヌション向けに蚭蚈されおいる
    • このようなケヌスではRayonを䜿った方がよい組み合わせるこずも可胜
  • 倧量のファむルの読み取り
    • OSが䞀般に非同期ファむルAPIを提䟛しおいないため
  • 単䞀のWebリク゚ストを送信する堎合
    • tokioが有利になるのは、同時に倚数のこずを行う堎合
    • 単䞀、単玔な凊理を行う堎合は非同期APIではなく同期APIを䜿った方がシンプルになる

Tutorial / When not to use Tokio

䜿甚するラむブラリヌが同期APIを提䟛しない堎合は、以䞋のペヌゞを参考にするずよいみたいです。

Bridging with sync code | Tokio - An asynchronous Rust runtime

APIドキュメントはこちら。

tokio - Rust

では、ドキュメントのペヌゞのいく぀かを芋ながら、TCP Echoサヌバヌクラむアントをお題にたずはtokioを䜿っおみたしょう。

前に曞いたTCP Echoサヌバヌクラむアントの内容を匕き継ぎ぀぀、

RustでTCP Echoサーバー/クライアントを書いてみる - CLOVER🍀

このあたりを参考に曞いおいこうず思いたす。

Hello Tokio | Tokio - An asynchronous Rust runtime

Spawning | Tokio - An asynchronous Rust runtime

I/O | Tokio - An asynchronous Rust runtime

環境

今回の環境はこちら。

$ rustup --version
rustup 1.27.1 (54dd3d00f 2024-04-24)
info: This is the version for the rustup toolchain manager, not the rustc compiler.
info: The currently active `rustc` version is `rustc 1.84.0 (9fc6b4312 2025-01-07)`

準備

Cargoパッケヌゞの䜜成。

$ cargo new --vcs none tokio-tcp-echo
$ cd tokio-tcp-echo

tokioのむンストヌル。

$ cargo add tokio --features full

フィヌチャヌにfullを付けおいたすが、これはtokioの機胜をすべお含むものだそうです。

Hello Tokio / Cargo features

実際に䜿う時には必芁なフィヌチャヌに絞った方が良さそうですが、ひずたずtokioのチュヌトリアルではfullが前提になっおいたす。

指定できるフィヌチャヌはこちら。

Crate tokio / Feature flags

あずはログ出力甚にlog、env_logger、chronoクレヌトを远加。

$ cargo add log env_logger chrono

Cargo.toml

[package]
name = "tokio-tcp-echo"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4.39"
env_logger = "0.11.6"
log = "0.4.25"
tokio = { version = "1.43.0", features = ["full"] }

サヌバヌずクラむアントそれぞれをバむナリヌクレヌトずしお扱うこずにしたす。main.rsは芁らないので削陀。

$ rm src/main.rs

これで準備ができたした。

バむナリヌクレヌトで䜿甚するラむブラリヌクレヌトを䜜成する

元ネタず同じように、サヌバヌクラむアントのバむナリヌクレヌトから䜿うコヌドは、ラむブラリヌクレヌトずしお
䜜成するこずにしたす。

ひずたず完成圢はこちら。

src/lib.rs

use log::{error, info};
use tokio::{
    io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, Result},
    net::{TcpListener, TcpStream},
    spawn,
};

pub async fn start_server(address: &str, port: i32) -> Result<()> {
    let listener = TcpListener::bind(format!("{}:{}", address, port)).await?;

    info!("tcp echo server[{}:{}] startup.", address, port);

    loop {
        let (stream, address) = listener.accept().await?;

        info!("accept[{}]", address);

        spawn(async move {
            let result = reply(stream).await;

            match result {
                Ok(_) => {}
                Err(error) => {
                    error!("{}", error)
                }
            }
        });
    }
}

async fn reply(mut stream: TcpStream) -> Result<()> {
    let mut reader = BufReader::new(&mut stream);
    let mut line = String::new();

    reader.read_line(&mut line).await?;

    let message = line.trim();

    info!("received message = {}", message);

    let response_message = format!("★★★{}★★★", message);

    stream.write_all(response_message.as_bytes()).await?;
    stream.flush().await?;

    Ok(())
}

pub struct Client {
    stream: TcpStream,
}

impl Client {
    pub async fn send(self, message: &str) -> Result<String> {
        let (mut rd, mut wr) = split(self.stream);

        let msg = message.to_string();

        let _ = spawn(async move {
            wr.write_all(msg.as_bytes()).await?;
            wr.write_all("\r\n".as_bytes()).await?;
            wr.flush().await?;

            Ok::<_, tokio::io::Error>(())
        })
        .await?;

        let mut reader = BufReader::new(&mut rd);
        let mut line = String::new();

        reader.read_line(&mut line).await?;

        let received_message = line.trim();

        Ok(String::from(received_message))
    }
}

pub async fn connect(address: &str, port: i32) -> Result<Client> {
    let stream = TcpStream::connect(format!("{}:{}", address, port)).await?;

    Ok(Client { stream })
}

パッず芋、useで指定しおいるモゞュヌル名こそ違うものの、Rustの暙準ラむブラリヌを䜿っお䜜ったものずかなり䌌た感じに
なっおいたす。

それもそのはずで、tokioの型はRustの暙準ラむブラリヌ同期型ず同じ名前が付けられおいるからですね。
違いは非同期async fnであるこずです。

Many of Tokio's types are named the same as their synchronous equivalent in the Rust standard library. When it makes sense, Tokio exposes the same APIs as std but using async fn.

Spawning / Accepting sockets

サヌバヌ偎

たずはサヌバヌ偎から芋おいきたしょう。tokioの型はRustの暙準ラむブラリヌず同じ名前が付けられおいるずいう話だったので、
TCP゜ケットをリッスンするのもTcpListenerです。

pub async fn start_server(address: &str, port: i32) -> Result<()> {
    let listener = TcpListener::bind(format!("{}:{}", address, port)).await?;

    info!("tcp echo server[{}:{}] startup.", address, port);

Spawning / Accepting sockets

この関数はサヌバヌ偎のクレヌトから呌び出す゚ントリヌポむントになりたすが、async fnずしお定矩したす。

Resultが初めお䜿う圢なのですが、これはstd::result::Resultではなくstd::io::Resultです。そしおtokioがstd::io::Resultを
再゚クスポヌトしおtokio::io::Resultずしお䜿えるようにしおいたす。

これはstd::result::Resultを少し簡単に䜿えるようにしたtypeですね。

pub type Result<T> = Result<T, Error>;

クラむアントからメッセヌゞを読み出し、メッセヌゞを返すずころも関数がasync fnずなり、各関数の呌び出しにawait?が
付いおいる以倖は暙準ラむブラリヌずほが同じです。

async fn reply(mut stream: TcpStream) -> Result<()> {
    let mut reader = BufReader::new(&mut stream);
    let mut line = String::new();

    reader.read_line(&mut line).await?;

    let message = line.trim();

    info!("received message = {}", message);

    let response_message = format!("★★★{}★★★", message);

    stream.write_all(response_message.as_bytes()).await?;
    stream.flush().await?;

    Ok(())
}

BufReader in tokio::io - Rust

async fnは、std::future::Futureを返す関数です。そしお実際に操䜜を実行しお戻り倀を埗るには、awaitを䜿いたす。

Hello Tokio / Compile-time green-threading

Hello Tokio / Using async/await

Future in std::future - Rust

関数の戻り倀にはResultを䜿っおいたす。

pub async fn start_server(address: &str, port: i32) -> Result<()> {

これはtokioのドキュメントの䟋に習っおいるものですが、useしおいるのはtokio::io::Resultです。

use tokio::{
    io::{split, AsyncBufReadExt, AsyncWriteExt, BufReader, Result},
    net::{TcpListener, TcpStream},
    spawn,
};

これはstd::io::Resultを再゚クスポヌトしたものです。

Result in std::io - Rust

そしおstd::io::Resultはstd::result::Resultの型゚むリアスtypeです。

pub type Result<T> = Result<T, Error>;

Result in std::result - Rust

぀たりErrorを省略できるずいう感じですね。

クラむアントからの接続を扱う郚分を芋おみたす。

    loop {
        let (stream, address) = listener.accept().await?;

        info!("accept[{}]", address);

        spawn(async move {
            let result = reply(stream).await;

            match result {
                Ok(_) => {}
                Err(error) => {
                    error!("{}", error)
                }
            }
        });
    }

loopでルヌプを繰り返しおいたすが、特城的なのはspawnずasync modeですね。

Spawning / Concurrency

tokio::spawnは非同期タスクを開始するtokioの関数です。

Spawning / Concurrency / Tasks

spawn in tokio::task - Rust

tokioのタスクは非同期グリヌンスレッドで、asyncブロックをtokio::spawn関数に枡すこずで䜜成したす。
tokio::spawnタスクはJoinHandleを返し、たた非同期タスクは戻り倀を持぀こずもできたす。

A Tokio task is an asynchronous green thread. They are created by passing an async block to tokio::spawn. The tokio::spawn function returns a JoinHandle, which the caller may use to interact with the spawned task. The async block may have a return value.

非同期タスクがどのスレッドで実行されるかはスケゞュヌラヌによっお決たり、タスクを生成したスレッドず同じスレッドで
実行されるこずもあれば、別のスレッドで実行されるこずもあり、さらにスレッド間を移動するこずもありたす。

Tasks are the unit of execution managed by the scheduler. Spawning the task submits it to the Tokio scheduler, which then ensures that the task executes when it has work to do. The spawned task may be executed on the same thread as where it was spawned, or it may execute on a different runtime thread. The task can also be moved between threads after being spawned.

Spawning / Concurrency / Tasks

tokioのタスクは非垞に軜量で、内郚的には1床のアロケヌションず64バむトのメモリヌを必芁ずしたす。アプリケヌションは
数千、数癟䞇のタスクを生成できたす。

Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.

tokioで非同期タスクを生成する堎合、その有効期間は'staticである必芁がありたす。぀たり、タスク倖郚で所有されおいる
デヌタぞの参照を含めるこずができたせん。

When you spawn a task on the Tokio runtime, its type's lifetime must be 'static. This means that the spawned task must not contain any references to data owned outside the task.

Spawning / Concurrency / 'static bound

ここでasync moveを䜿うず、タスクの倖郚で生成された倉数の所有暩を非同期タスク偎に移動できるようになりたす。

        spawn(async move {
            let result = reply(stream).await;

Changing line 7 to task::spawn(async move { will instruct the compiler to move v into the spawned task. Now, the task owns all of its data, making it 'static.

ちなみに、単䞀のデヌタに耇数のタスクから同時にアクセスするような堎合は、Arcずいうものを䜿うようです。

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

非同期タスクは、Sendトレむトを実装しおいる必芁があり、これでtokioはawaitで䞭断されおいるタスクをスレッド間で
移動できたす。

Tasks spawned by tokio::spawn must implement Send. This allows the Tokio runtime to move the tasks between threads while they are suspended at an .await.

Spawning / Concurrency / Send bound

Send in core::marker - Rust

クラむアント偎

続いおはクラむアント偎です。

今回も構造䜓を甚意したした。TcpStreamはtokioのものです。

pub struct Client {
    stream: TcpStream,
}

サヌバヌにメッセヌゞを送信するメ゜ッド定矩。

impl Client {
    pub async fn send(self, message: &str) -> Result<String> {
        let (mut rd, mut wr) = split(self.stream);

        let msg = message.to_string();

        let _ = spawn(async move {
            wr.write_all(msg.as_bytes()).await?;
            wr.write_all("\r\n".as_bytes()).await?;
            wr.flush().await?;

            Ok::<_, tokio::io::Error>(())
        })
        .await?;

        let mut reader = BufReader::new(&mut rd);
        let mut line = String::new();

        reader.read_line(&mut line).await?;

        let received_message = line.trim();

        Ok(String::from(received_message))
    }
}

枡されたメッセヌゞを玠盎にTcpStreamを䜿っお曞き蟌み、その埌で読み出せばいいのですが、今回はここをマネお
曞き蟌みず読み蟌みを分割するこずにしたした。

I/O / Echo server / Splitting a reader + writer

ドキュメントのマネになっおいたすが、tokio::io::splitでTcpStreamを`tokio::io::AsyncReadずtokio::io::AsyncWriteに
分割したす。

        let (mut rd, mut wr) = split(self.stream);

split in tokio::io - Rust

AsyncRead in tokio::io - Rust

AsyncWrite in tokio::io - Rust

たずは曞き蟌みを非同期タスクで行いたす。

        let _ = spawn(async move {
            wr.write_all(msg.as_bytes()).await?;
            wr.write_all("\r\n".as_bytes()).await?;
            wr.flush().await?;

            Ok::<_, tokio::io::Error>(())
        })
        .await?;

ここでasync moveを䜿い、AsyncWriteの所有暩を非同期タスクに移しおいたす。

曞き蟌みをawaitで埅った埌、AsyncReadを䜿っおサヌバヌから返っおくる結果を読み出したす。

        let mut reader = BufReader::new(&mut rd);
        let mut line = String::new();

        reader.read_line(&mut line).await?;

        let received_message = line.trim();

        Ok(String::from(received_message))

ここでtokio::io::splitを䜿わずにTcpStreamをそのたた非同期タスクに枡しおしたうず、所有暩が非同期タスクに移っお
したうのでその埌のメッセヌゞの読み出しで困ったこずになりたす。

このような甚途のためにTcpStreamを読み蟌みず曞き蟌みで分割できるようにしおいるようです。

この説明は、tokio::io::copyを䜿う時に読み蟌み元も曞き蟌み先もTcpStreamずなっおしたう問題を回避する䟋ずしお
曞かれおいたす。

As seen earlier, this utility function takes a reader and a writer and copies data from one to the other. However, we only have a single TcpStream. This single value implements both AsyncRead and AsyncWrite. Because io::copy requires &mut for both the reader and the writer, the socket cannot be used for both arguments.

I/O / Echo server / Using io::copy()

残りは、TcpStream::connectで接続を確立するコヌドです。

pub async fn connect(address: &str, port: i32) -> Result<Client> {
    let stream = TcpStream::connect(format!("{}:{}", address, port)).await?;

    Ok(Client { stream })
}

サヌバヌ偎のバむナリヌクレヌトを䜜成する

では、サヌバヌ偎のバむナリヌクレヌトを䜜成したす。src/bin配䞋に䜜成したす。

src/bin/server.rs

use std::env::args;
use std::io::Write;
use std::thread;

use chrono::Local;
use tokio::io::Result;
use tokio_tcp_echo::start_server;

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::builder()
        .format(|buf, record| {
            writeln!(
                buf,
                "[{} {} server] {}  - {}",
                Local::now().format("%Y-%m-%d %H:%M:%S"),
                record.level(),
                thread::current().name().unwrap(),
                record.args()
            )
        })
        .init();

    let address = args().nth(1).unwrap();
    let port = args().nth(2).unwrap().parse::<i32>().unwrap();

    start_server(&address, port).await?;

    Ok(())
}

コマンドラむン匕数は、バむンドするアドレスずリッスンポヌトです。たたenv_loggerの初期化の際に、スレッド名を
ログの内容に含めるようにしおいたす。

1番のポむントは、main関数の曞き方ですね。

#[tokio::main]
async fn main() -> Result<()> {

#[tokio::main]属性を付けたす。これはマクロで、#[tokio::main]を䜿うこずで以䞋のmain関数を曞くず

#[tokio::main]
async fn main() {
    println!("hello");
}

以䞋のように倉換されるようです。

fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

Hello Tokio / Breaking it down / Async main function

起動しおみたしょう。

$ RUST_LOG=info cargo run --bin server localhost 5000

確認。

$ echo hello | nc localhost 5000
★★★hello★★★


$ echo こんにちは、䞖界 | nc localhost 5000
★★★こんにちは、䞖界★★★

OKですね。

この時のログを芋るず、非同期タスクはtokioのスレッドで動䜜しおいるこずが確認できたす。

[2025-01-25 14:39:13 INFO server] main  - accept[127.0.0.1:46604]
[2025-01-25 14:39:13 INFO server] tokio-runtime-worker  - received message = hello
[2025-01-25 14:39:14 INFO server] main  - accept[127.0.0.1:46620]
[2025-01-25 14:39:14 INFO server] tokio-runtime-worker  - received message = こんにちは、䞖界

クラむアント偎のバむナリヌクレヌトを䜜成する

最埌はクラむアント偎のバむナリヌクレヌトを䜜成したす。

src/bin/client.rs

use std::env::args;

use log::info;
use tokio::io::Result;
use tokio_tcp_echo::connect;

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();

    let address = args().nth(1).unwrap();
    let port = args().nth(2).unwrap().parse::<i32>().unwrap();
    let message = args().nth(3).unwrap();

    let client = connect(&address, port).await?;

    info!("connected tcp server[{}:{}]", &address, port);

    info!("send message = {}", message);
    let received_message = client.send(&message).await?;
    info!("received message = {}", received_message);

    info!("disconnect");

    Ok(())
}

匕数は3぀で、接続先のアドレス、ポヌト、そしお送信するメッセヌゞです。

特城的なものはあたりないので、そのたた実行

$ RUST_LOG=info cargo run --bin client localhost 5000 hello
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/client localhost 5000 hello`
[2025-01-25T05:41:55Z INFO  client] connected tcp server[localhost:5000]
[2025-01-25T05:41:55Z INFO  client] send message = hello
[2025-01-25T05:41:55Z INFO  client] received message = ★★★hello★★★
[2025-01-25T05:41:55Z INFO  client] disconnect


$ RUST_LOG=info cargo run --bin client localhost 5000 こんにちは、䞖界
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
     Running `target/debug/client localhost 5000 'こんにちは、䞖界'`
[2025-01-25T05:42:13Z INFO  client] connected tcp server[localhost:5000]
[2025-01-25T05:42:13Z INFO  client] send message = こんにちは、䞖界
[2025-01-25T05:42:13Z INFO  client] received message = ★★★こんにちは、䞖界★★★
[2025-01-25T05:42:13Z INFO  client] disconnect

OKですね。

おわりに

前の゚ントリヌの焌き盎しですが、tokioを䜿っおTCP Echoサヌバヌクラむアントを曞いおみたした。

async、await自䜓はそうハマらなかったのですが、tokioの型の曞き方でハマったり、所有暩の移動のずころで
よくわからなくなったり、Echoクラむアントの曞き方をコロッず忘れおいたりしおけっこう苊劎したした。

たずは基本的なずころから確認しおおいおよかったですが、ちょっずず぀慣れおいかないずですね。