グローバルナビゲーションへ

本文へ

フッターへ

お役立ち情報Blog



安全性、速度、並行性を兼ね備えた言語と、巷でうわさのRustを覗いてみる(スレッド編 その3)

みなさまお久しぶりです。

今回も「The Rust Programming Language」を読みながら、 スレッドについて見ていきたいと思います。
前回の「スレッド編 その2」はこちらからご覧いただけます。

スレッド間でメッセージを受け渡す

RustではGoのようにスレッド間でメッセージを受け渡すことができるらしいです。

早速それを試してみたいと思います。

use std::thread;
use std::sync::mpsc;

fn main() {
    // チャンネルを作成
    let (tx, rx) = mpsc::channel();

    // スレッド内からメッセージを送信
    thread::spawn(move || {
        let val = String::from("message 1");
        tx.send(val).unwrap();
    });

    // スレッド内から送信されたメッセージを受信
    let received = rx.recv().unwrap();

    println!("受信したメッセージ: {}", received);
}
受信したメッセージ: message 1

まずは、mpscをつかってチャンネルを作ります。
これは、「multiple producer, single consumer」の略のようです。

In short, the way Rust’s standard library implements channels means a channel can have multiple sending ends that produce values but only one receiving end that consumes those values.

このように、複数の送信側で値を生成し、1つの受信側で値を消費できることを意味しているようです。

txrxで、送信と受信を行います。

pub fn channel<T>() -> (Sender<T>, Receiver<T>)

定義を確認すると、それぞれSender<T>型とReceiver<T>型を返してくるようです。

あとは、tx.sendをつかってスレッド内からメッセージを送信し、rx.recvでメインスレッドで受信します。rx.recvは、メッセージを受信するまてブロッキングするので、スレッドが終了するまえにメインスレッドが終了することはないです。

なにもメッセージを送信しない場合どうなるか試してみる

メッセージを受信するまでブロッキングされるようなので、そもそもメッセージを送信しない場合はどうなるのかが気になったので試してみたいと思います。

use std::thread;
use std::sync::mpsc;

fn main() {
    // チャンネルを作成
    let (_, rx): (mpsc::Sender, mpsc::Receiver)  = mpsc::channel();

    // スレッド内でなにもしない
    thread::spawn(move || {});

    // スレッド内から送信されたメッセージを受信
    let received = rx.recv().unwrap();

    println!("受信したメッセージ: {}", received);
}

チャンネルを作成するときに、txで送信する値を使って型推論しているようで、 単純にスレッド内をコメントアウトするだけではビルドできなかったので、明示的に型を指定しました。

これを実行すると以下のようなエラーがでます。

thread 'main' panicked at src/main.rs:12:30:
called `Result::unwrap()` on an `Err` value: RecvError

recvがエラーを返してきているようなので、それをハンドリングしてみます。

use std::thread;
use std::sync::mpsc;

fn main() {
    // チャンネルを作成
    let (_, rx): (mpsc::Sender, mpsc::Receiver)  = mpsc::channel();

    // スレッド内でなにもしない
    thread::spawn(move || {});

    // スレッド内から送信されたメッセージを受信
    if let Ok(received) = rx.recv() {
        println!("受信したメッセージ: {}", received);
    } else {
        println!("チャンネルが閉じられました");
    }

    println!("メインスレッドが終了");
}
チャンネルが閉じられました
メインスレッドが終了

ここでrecvの定義を確認してみます。

pub fn recv(&self) -> Result<T, RecvError>

recvは、Result<T, RecvError>を返し、送信されるメッセージがなくなると、RecvErrorを返してくるようです。

どうやってメッセージがこれ以上無いと判断しているのかが気になりました。

簡単に調べてみたところ、どうやらtxがDropされたかどうかで判断しているようです。

明示的にtxをDropさせてrecvを観察する

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // チャンネルを作成
    let (tx, rx) = mpsc::channel();

    // スレッド内からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("message 1"),
            String::from("message 2"),
            String::from("message 3"),
        ];

        for message in messages {
            tx.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }

        // チャンネルの送信側を明示的に閉じる
        drop(tx);

        thread::sleep(Duration::from_secs(1)); // 一秒待つ
        println!("Dropされた後");
    });

    // メッセージを受信し、エラーハンドリング
    loop {
        match rx.recv() {
            Ok(received) => {
                println!("受信したメッセージ: {}", received);
            }
            Err(err) => {
                println!("チャンネルが閉じられました: {}", err);
                break;
            }
        }
    }
}
受信したメッセージ: message 1
受信したメッセージ: message 2
受信したメッセージ: message 3
チャンネルが閉じられました: receiving on a closed channel

txは、明示的にDropしない場合、スレッドが終了するときに自動的にDropされるはずです。 ですので、途中でDropさせることにより、recvがエラーを返し、loopが終了してメインスレッドが終了します。 その結果、スレッド内のDrop以降に書かれたprintlnは出力されません。

そこで、今度はdrop(tx);をコメントアウトして実行してみたいと思います。

受信したメッセージ: message 1
受信したメッセージ: message 2
受信したメッセージ: message 3
Dropされた後
チャンネルが閉じられました: receiving on a closed channel

スレッドが終了するときにDropされるので、スレッド内の処理が継続されてから、メインスレッドが 終了しています。

これによって、recvtxがDropされたかどうかで、これ以上メッセージが来ないかどうかを 判断してることが観察できました。

所有権て便利すぎなのではと感じました。

この記事を書いた人

tkr2f
tkr2f事業開発部 web application engineer
2008年にアーティスへ入社。
システムエンジニアとして、SI案件のシステム開発に携わる。
その後、事業開発部の立ち上げから自社サービスの開発、保守をメインに従事。
ドメイン駆動設計(DDD)を中心にドメインを重視しながら、保守可能なソフトウェア開発を探求している。
この記事のカテゴリ

FOLLOW US

最新の情報をお届けします