安全性、速度、並行性を兼ね備えた言語と、巷でうわさのRustを覗いてみる(スレッド編 その4)
前回は、スレッド間でのメッセージの受け渡しについて見ていきました。
今回は、tx
をクローンして複数のスレッドからメッセージを送信する方法について見ていきたいと思います。
前回の「スレッド編 その3」はこちらからご覧いただけます。
複数のスレッドからメッセージを送信する
Rustでは、tx
をクローンすることで、複数のスレッドからメッセージを送信することができます。 これにより、スレッド間での並列処理がより柔軟に行えるようになります。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// チャンネルを作成
let (tx, rx) = mpsc::channel();
// txをクローン
let tx1 = tx.clone();
// スレッド1からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド1 - メッセージ1"),
String::from("スレッド1 - メッセージ2"),
];
for message in messages {
tx.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// スレッド2からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド2 - メッセージ1"),
String::from("スレッド2 - メッセージ2"),
];
for message in messages {
tx1.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// メインスレッドでメッセージを受信
for received in rx {
println!("受信したメッセージ: {}", received);
}
}
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2
このように、tx.clone()
を使ってtx
をクローンし、複数のスレッドからメッセージを送信することができます。
クローンしたtx
は元のtx
と同じ機能を持ち、send
メソッドでメッセージを送信できます。
メインスレッドでは、rx
を使って送信されたメッセージを受信します。
tx.clone()
の定義を見ておきたいと思います。
/// Clone a sender to send to other threads.
///
/// Note, be aware of the lifetime of the sender because all senders
/// (including the original) need to be dropped in order for
/// [`Receiver::recv`] to stop blocking.
fn clone(&self) -> Sender<T> {
mpsc::channel()
と同じくSender<T>
を返すのがわかります。
オリジナルを含めたすべてのsender
がDrop
されるまでブロッキングされる書いてありますが、これは後でみていきたいと思います。
クローンされたTXがDropされるタイミング
つぎに、tx
をクローンした場合のDrop
のタイミングを確認しておきます。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// チャンネルを作成
let (tx, rx) = mpsc::channel();
// txをクローン
let tx1 = tx.clone();
// スレッド1からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド1 - メッセージ1"),
String::from("スレッド1 - メッセージ2"),
];
for message in messages {
tx.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// スレッド2からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド2 - メッセージ1"),
String::from("スレッド2 - メッセージ2"),
];
for message in messages {
tx1.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// メインスレッドでメッセージを受信
loop {
match rx.recv() {
Ok(received) => println!("受信したメッセージ: {}", received),
Err(_) => {
println!("すべての送信側が閉じられました");
break;
}
}
}
}
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2
すべての送信側が閉じられました
すべてのtx
がDrop
された後に、受信側が終了することが確認できます。
rx.recv()
は、tx
がすべてDrop
されるとErr
を返し、ループが終わります。
クローンされたtxを使わない場合
クローンもとのtx
を利用しない(Drop
ない)とどうなるのか気になったので検証してみたいと思います。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// チャンネルを作成
let (tx, rx) = mpsc::channel();
// txをクローン
let tx1 = tx.clone();
let tx2 = tx.clone();
// スレッド1からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド1 - メッセージ1"),
String::from("スレッド1 - メッセージ2"),
];
for message in messages {
tx1.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// スレッド2からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("スレッド2 - メッセージ1"),
String::from("スレッド2 - メッセージ2"),
];
for message in messages {
tx2.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
});
// メインスレッドでメッセージを受信
loop {
match rx.recv() {
Ok(received) => println!("受信したメッセージ: {}", received),
Err(_) => {
println!("すべての送信側が閉じられました");
break;
}
}
}
}
受信したメッセージ: スレッド1 - メッセージ1
受信したメッセージ: スレッド2 - メッセージ1
受信したメッセージ: スレッド1 - メッセージ2
受信したメッセージ: スレッド2 - メッセージ2
このように出力されて、メインスレッドが終了しません。
もっとコードをシンプルにして検証してみます。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let tx1 = tx.clone();
let tx2 = tx.clone();
drop(tx1);
drop(tx2);
match rx.recv() {
Ok(msg) => println!("受信したメッセージ: {}", msg),
Err(err) => println!("エラー: {}", err),
}
println!("メインスレッドが終了");
}
このコードを実行すると、何も出力されずにメインスレッドも終了しません。
tx
がDrop
されないので、ブロッキングされているようです。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel::<String>();
let tx1 = tx.clone();
let tx2 = tx.clone();
drop(tx1);
drop(tx2);
drop(tx);
match rx.recv() {
Ok(msg) => println!("受信したメッセージ: {}", msg),
Err(err) => println!("エラー: {}", err),
}
println!("メインスレッドが終了");
}
なので、このように明示的にtx
をDrop
すると、メインスレッドが終了してしまいます。
ちなみに、そもそもtx
を定義しない場合は、ブロッキングされないようです。
use std::sync::mpsc;
fn main() {
let (_, rx) = mpsc::channel::<String>();
match rx.recv() {
Ok(msg) => println!("受信したメッセージ: {}", msg),
Err(err) => println!("エラー: {}", err),
}
println!("メインスレッドが終了");
}
このコードはブロッキングされずに、普通に終了します。
クローンもとのtx
を使い忘れることなんてなさそうだけど、気をつけておきたいです。
この記事を書いた人
-
2008年にアーティスへ入社。
システムエンジニアとして、SI案件のシステム開発に携わる。
その後、事業開発部の立ち上げから自社サービスの開発、保守をメインに従事。
ドメイン駆動設計(DDD)を中心にドメインを重視しながら、保守可能なソフトウェア開発を探求している。
この執筆者の最新記事
関連記事
最新記事
FOLLOW US
最新の情報をお届けします
- facebookでフォロー
- Twitterでフォロー
- Feedlyでフォロー