目录
前言
在上一节中,我们介绍了Rust中并发编程的方式之一:Fork和Join,通过新建线程提升代码的效率,这节课我们介绍并发编程的第二种方式:通道。Channel就类似于水管,通过Channel可以连接多个线程,达到多个线程之间协调作业。
Channel
我们以一个简单的需求为例来解释下Channel的使用方法。完成WordCount,使用2个线程,线程1从文件中读取数据,将数据通过Channel发送给线程2,线程2负责计算wordcount。
注意:
(1)下面代码中文件路径使用的是绝对路径
(2)使用mpsc::channel()可以创建一个通道,下面代码还指定了泛型,即在下面的代码中通道中只可以发送String类型的数据
(3)通道的返回值是一个发送者,一个消费者,发送者给线程1中的闭包,因为线程1负责读数据发送到通道中,消费者给线程2中的闭包,线程2负责读取数据处理wordcount。
(4)mpsc:指的是多生产者,单消费者的意思,即multi-producer,single-consumer。
use std::collections::HashMap;
use std::sync::mpsc;
use std::thread::JoinHandle;
use std::{fs, io, thread};
fn main() {
let paths = vec!["/Users/xxxxx/rustproject/lesson11/src/test.txt".to_string()];
let (receiver, handler1) = start_file_reader_thread(paths);
let handler2 = start_file_word_count_thread(receiver);
let _ = handler1.join().unwrap();
let _ = handler2.join().unwrap();
}
// 读取文件内容
fn start_file_reader_thread(
documents: Vec<String>,
) -> (mpsc::Receiver<String>, JoinHandle<Result<(), io::Error>>) {
let (sender, receiver) = mpsc::channel::<String>();
let handle = thread::spawn(move || -> Result<(), io::Error> {
for filename in documents {
let text = fs::read_to_string(filename)?;
if sender.send(text).is_err() {
break;
}
}
Ok(())
});
(receiver, handle)
}
// word count
fn start_file_word_count_thread(texts: mpsc::Receiver<String>) -> JoinHandle<()> {
let handle = thread::spawn(move || {
// 处理
let mut wc: HashMap<String, u32> = HashMap::new();
for line in texts {
let words: Vec<String> = line.split(" ").map(|x| x.to_string()).collect();
for word in words {
match wc.get(&word) {
None => {
wc.insert(word, 1);
}
Some(old) => {
wc.insert(word, old + 1);
}
}
}
}
// 打印
for (word, cnt) in wc {
println!("key = {}, count = {}", word, cnt);
}
});
handle
}
多生产者
上面是单生产者的例子,我们扩展一下,使用多生产者。
注意:
(1)使用生产者的clone方法,扩展出多个生产者传递给不同的线程发送消息。
use std::collections::HashMap;
use std::sync::mpsc;
use std::{fs, io, thread};
fn main() {
let path1 = vec!["/Users/xxx/rustproject/lesson11/src/test1.txt".to_string()];
let path2 = vec!["/Users/xxx/rustproject/lesson11/src/test2.txt".to_string()];
let (producer, consumer) = mpsc::channel::<String>();
let producer1 = producer.clone();
// 生产者
let handler1 = thread::spawn(move || -> Result<(), io::Error> {
for filename in path1 {
let text = fs::read_to_string(filename)?;
if producer.send(text).is_err() {
break;
}
}
Ok(())
});
let handler2 = thread::spawn(move || -> Result<(), io::Error>{
for filename in path2 {
let text = fs::read_to_string(filename)?;
if producer1.send(text).is_err() {
break;
}
}
Ok(())
});
// 消费者
let handler3 = thread::spawn(move || {
// 处理
let mut wc: HashMap<String, u32> = HashMap::new();
for line in consumer {
let words: Vec<String> = line.split(" ").map(|x| x.to_string()).collect();
for word in words {
match wc.get(&word) {
None => {
wc.insert(word, 1);
}
Some(old) => {
wc.insert(word, old + 1);
}
}
}
}
// 打印
for (word, cnt) in wc {
println!("key = {}, count = {}", word, cnt);
}
});
//
handler1.join();
handler2.join();
handler3.join();
}