グローバルナビゲーションへ

本文へ

フッターへ

お役立ち情報Blog



安全性、速度、並行性を兼ね備えた言語と、巷でうわさのRustを覗いてみる(スレッド編 その4)

前回は、スレッド間でのメッセージの受け渡しについて見ていきました。
今回は、txをクローンして複数のスレッドからメッセージを送信する方法について見ていきたいと思います。
前回の「スレッド編 その3」はこちらからご覧いただけます。

複数のスレッドからメッセージを送信する

Rustでは、txをクローンすることで、複数のスレッドからメッセージを送信することができます。 これにより、スレッド間での並列処理がより柔軟に行えるようになります。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // チャンネルを作成
    let (tx, rx) = mpsc::channel();

    // txをクローン
    let tx1 = tx.clone();

    // スレッド1からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド1 - メッセージ1"),
            String::from("スレッド1 - メッセージ2"),
        ];

        for message in messages {
            tx.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // スレッド2からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド2 - メッセージ1"),
            String::from("スレッド2 - メッセージ2"),
        ];

        for message in messages {
            tx1.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // メインスレッドでメッセージを受信
    for received in rx {
        println!("受信したメッセージ: {}", received);
    }
}
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2

このように、tx.clone()を使ってtxをクローンし、複数のスレッドからメッセージを送信することができます。
クローンしたtxは元のtxと同じ機能を持ち、sendメソッドでメッセージを送信できます。

メインスレッドでは、rxを使って送信されたメッセージを受信します。

tx.clone()の定義を見ておきたいと思います。

    /// Clone a sender to send to other threads.
    ///
    /// Note, be aware of the lifetime of the sender because all senders
    /// (including the original) need to be dropped in order for
    /// [`Receiver::recv`] to stop blocking.
    fn clone(&self) -> Sender<T> {

mpsc::channel()と同じくSender<T>を返すのがわかります。

オリジナルを含めたすべてのsenderDropされるまでブロッキングされる書いてありますが、これは後でみていきたいと思います。

クローンされたTXがDropされるタイミング

つぎに、txをクローンした場合のDropのタイミングを確認しておきます。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // チャンネルを作成
    let (tx, rx) = mpsc::channel();

    // txをクローン
    let tx1 = tx.clone();

    // スレッド1からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド1 - メッセージ1"),
            String::from("スレッド1 - メッセージ2"),
        ];

        for message in messages {
            tx.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // スレッド2からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド2 - メッセージ1"),
            String::from("スレッド2 - メッセージ2"),
        ];

        for message in messages {
            tx1.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // メインスレッドでメッセージを受信
    loop {
        match rx.recv() {
            Ok(received) => println!("受信したメッセージ: {}", received),
            Err(_) => {
                println!("すべての送信側が閉じられました");
                break;
            }
        }
    }
}
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2
すべての送信側が閉じられました

すべてのtxDropされた後に、受信側が終了することが確認できます。
rx.recv()は、txがすべてDropされるとErrを返し、ループが終わります。

クローンされたtxを使わない場合

クローンもとのtxを利用しない(Dropない)とどうなるのか気になったので検証してみたいと思います。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // チャンネルを作成
    let (tx, rx) = mpsc::channel();

    // txをクローン
    let tx1 = tx.clone();
    let tx2 = tx.clone();

    // スレッド1からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド1 - メッセージ1"),
            String::from("スレッド1 - メッセージ2"),
        ];

        for message in messages {
            tx1.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // スレッド2からメッセージを送信
    thread::spawn(move || {
        let messages = vec![
            String::from("スレッド2 - メッセージ1"),
            String::from("スレッド2 - メッセージ2"),
        ];

        for message in messages {
            tx2.send(message).unwrap();
            thread::sleep(Duration::from_secs(1)); // 一秒待つ
        }
    });

    // メインスレッドでメッセージを受信
    loop {
        match rx.recv() {
            Ok(received) => println!("受信したメッセージ: {}", received),
            Err(_) => {
                println!("すべての送信側が閉じられました");
                break;
            }
        }
    }
}
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2

このように出力されて、メインスレッドが終了しません。

もっとコードをシンプルにして検証してみます。

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();

    let tx1 = tx.clone();
    let tx2 = tx.clone();

    drop(tx1);
    drop(tx2);

    match rx.recv() {
        Ok(msg) => println!("受信したメッセージ: {}", msg),
        Err(err) => println!("エラー: {}", err),
    }

    println!("メインスレッドが終了");
}

このコードを実行すると、何も出力されずにメインスレッドも終了しません。

txDropされないので、ブロッキングされているようです。

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel::<String>();

    let tx1 = tx.clone();
    let tx2 = tx.clone();

    drop(tx1);
    drop(tx2);
    drop(tx);

    match rx.recv() {
        Ok(msg) => println!("受信したメッセージ: {}", msg),
        Err(err) => println!("エラー: {}", err),
    }

    println!("メインスレッドが終了");
}

なので、このように明示的にtxDropすると、メインスレッドが終了してしまいます。

ちなみに、そもそもtxを定義しない場合は、ブロッキングされないようです。

use std::sync::mpsc;

fn main() {
    let (_, rx) = mpsc::channel::<String>();

    match rx.recv() {
        Ok(msg) => println!("受信したメッセージ: {}", msg),
        Err(err) => println!("エラー: {}", err),
    }

    println!("メインスレッドが終了");
}

このコードはブロッキングされずに、普通に終了します。

クローンもとのtxを使い忘れることなんてなさそうだけど、気をつけておきたいです。

この記事を書いた人

tkr2f
tkr2f事業開発部 web application engineer
2008年にアーティスへ入社。
システムエンジニアとして、SI案件のシステム開発に携わる。
その後、事業開発部の立ち上げから自社サービスの開発、保守をメインに従事。
ドメイン駆動設計(DDD)を中心にドメインを重視しながら、保守可能なソフトウェア開発を探求している。
この記事のカテゴリ

FOLLOW US

最新の情報をお届けします