安全性、速度、並行性を兼ね備えた言語と、巷でうわさのRustを覗いてみる(スレッド編 その3)
みなさまお久しぶりです。
今回も「The Rust Programming Language」を読みながら、 スレッドについて見ていきたいと思います。
前回の「スレッド編 その2」はこちらからご覧いただけます。
スレッド間でメッセージを受け渡す
RustではGoのようにスレッド間でメッセージを受け渡すことができるらしいです。
早速それを試してみたいと思います。
use std::thread;
use std::sync::mpsc;
fn main() {
// チャンネルを作成
let (tx, rx) = mpsc::channel();
// スレッド内からメッセージを送信
thread::spawn(move || {
let val = String::from("message 1");
tx.send(val).unwrap();
});
// スレッド内から送信されたメッセージを受信
let received = rx.recv().unwrap();
println!("受信したメッセージ: {}", received);
}
受信したメッセージ: message 1
まずは、mpsc
をつかってチャンネルを作ります。
これは、「multiple producer, single consumer」の略のようです。
このように、複数の送信側で値を生成し、1つの受信側で値を消費できることを意味しているようです。
tx
とrx
で、送信と受信を行います。
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
定義を確認すると、それぞれSender<T>
型とReceiver<T>
型を返してくるようです。
あとは、tx.send
をつかってスレッド内からメッセージを送信し、rx.recv
でメインスレッドで受信します。rx.recv
は、メッセージを受信するまてブロッキングするので、スレッドが終了するまえにメインスレッドが終了することはないです。
なにもメッセージを送信しない場合どうなるか試してみる
メッセージを受信するまでブロッキングされるようなので、そもそもメッセージを送信しない場合はどうなるのかが気になったので試してみたいと思います。
use std::thread;
use std::sync::mpsc;
fn main() {
// チャンネルを作成
let (_, rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel();
// スレッド内でなにもしない
thread::spawn(move || {});
// スレッド内から送信されたメッセージを受信
let received = rx.recv().unwrap();
println!("受信したメッセージ: {}", received);
}
チャンネルを作成するときに、tx
で送信する値を使って型推論しているようで、 単純にスレッド内をコメントアウトするだけではビルドできなかったので、明示的に型を指定しました。
これを実行すると以下のようなエラーがでます。
thread 'main' panicked at src/main.rs:12:30:
called `Result::unwrap()` on an `Err` value: RecvError
recv
がエラーを返してきているようなので、それをハンドリングしてみます。
use std::thread;
use std::sync::mpsc;
fn main() {
// チャンネルを作成
let (_, rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel();
// スレッド内でなにもしない
thread::spawn(move || {});
// スレッド内から送信されたメッセージを受信
if let Ok(received) = rx.recv() {
println!("受信したメッセージ: {}", received);
} else {
println!("チャンネルが閉じられました");
}
println!("メインスレッドが終了");
}
チャンネルが閉じられました
メインスレッドが終了
ここでrecv
の定義を確認してみます。
pub fn recv(&self) -> Result<T, RecvError>
recv
は、Result<T, RecvError>
を返し、送信されるメッセージがなくなると、RecvError
を返してくるようです。
どうやってメッセージがこれ以上無いと判断しているのかが気になりました。
簡単に調べてみたところ、どうやらtx
がDropされたかどうかで判断しているようです。
明示的にtxをDropさせてrecvを観察する
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// チャンネルを作成
let (tx, rx) = mpsc::channel();
// スレッド内からメッセージを送信
thread::spawn(move || {
let messages = vec![
String::from("message 1"),
String::from("message 2"),
String::from("message 3"),
];
for message in messages {
tx.send(message).unwrap();
thread::sleep(Duration::from_secs(1)); // 一秒待つ
}
// チャンネルの送信側を明示的に閉じる
drop(tx);
thread::sleep(Duration::from_secs(1)); // 一秒待つ
println!("Dropされた後");
});
// メッセージを受信し、エラーハンドリング
loop {
match rx.recv() {
Ok(received) => {
println!("受信したメッセージ: {}", received);
}
Err(err) => {
println!("チャンネルが閉じられました: {}", err);
break;
}
}
}
}
受信したメッセージ: message 1
受信したメッセージ: message 2
受信したメッセージ: message 3
チャンネルが閉じられました: receiving on a closed channel
tx
は、明示的にDropしない場合、スレッドが終了するときに自動的にDropされるはずです。 ですので、途中でDropさせることにより、recv
がエラーを返し、loopが終了してメインスレッドが終了します。 その結果、スレッド内のDrop以降に書かれたprintln
は出力されません。
そこで、今度はdrop(tx);
をコメントアウトして実行してみたいと思います。
受信したメッセージ: message 1
受信したメッセージ: message 2
受信したメッセージ: message 3
Dropされた後
チャンネルが閉じられました: receiving on a closed channel
スレッドが終了するときにDropされるので、スレッド内の処理が継続されてから、メインスレッドが 終了しています。
これによって、recv
はtx
がDropされたかどうかで、これ以上メッセージが来ないかどうかを 判断してることが観察できました。
所有権て便利すぎなのではと感じました。
この記事を書いた人
-
2008年にアーティスへ入社。
システムエンジニアとして、SI案件のシステム開発に携わる。
その後、事業開発部の立ち上げから自社サービスの開発、保守をメインに従事。
ドメイン駆動設計(DDD)を中心にドメインを重視しながら、保守可能なソフトウェア開発を探求している。
この執筆者の最新記事
関連記事
最新記事
FOLLOW US
最新の情報をお届けします
- facebookでフォロー
- Twitterでフォロー
- Feedlyでフォロー