-
Notifications
You must be signed in to change notification settings - Fork 105
/
Copy pathtask_worker.rs
55 lines (46 loc) · 1.66 KB
/
task_worker.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
mod async_helpers;
use futures::{select, FutureExt};
use std::io::Write;
use std::{error::Error, time::Duration};
use zeromq::{Socket, SocketRecv, SocketSend};
#[async_helpers::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Socket to receive messages on
let mut receiver = zeromq::PullSocket::new();
receiver.connect("tcp://127.0.0.1:5557").await?;
// Socket to send messages to
let mut sender = zeromq::PushSocket::new();
sender.connect("tcp://127.0.0.1:5558").await?;
// Socket for control input
let mut controller = zeromq::SubSocket::new();
controller.connect("tcp://127.0.0.1:5559").await?;
controller.subscribe("").await?;
// Process messages from receiver and controller
loop {
select! {
message = receiver.recv().fuse() => {
// Process task
let message = message.unwrap();
let workload = String::from_utf8(message.get(0).unwrap().to_vec())?
.parse()
.expect("Couldn't parse u64 from data");
// Do the work
async_helpers::sleep(Duration::from_millis(workload)).await;
// Send results to sink
sender.send(message).await?;
// Simple progress indicator for the viewer
print!(".");
std::io::stdout().flush()?;
},
// Any waiting controller command acts as 'KILL'
_kill = controller.recv().fuse() => {
break
}
};
}
println!("Done");
receiver.close().await;
sender.close().await;
controller.close().await;
Ok(())
}