|
|
|
@ -97,6 +97,9 @@ impl ConcurrentStreamManager {
|
|
|
|
|
}
|
|
|
|
|
future.resolve(());
|
|
|
|
|
}
|
|
|
|
|
if let Err(e) = stream.shutdown() {
|
|
|
|
|
log::error!("Failed to shutdown stream: {}", e);
|
|
|
|
|
}
|
|
|
|
|
emitters.lock().remove(&recv_id);
|
|
|
|
|
threads.lock().remove(&thread::current().id());
|
|
|
|
|
}
|
|
|
|
@ -112,14 +115,26 @@ impl ConcurrentStreamManager {
|
|
|
|
|
let wg = WaitGroup::clone(&wg);
|
|
|
|
|
move || {
|
|
|
|
|
mem::drop(wg);
|
|
|
|
|
while let Ok(mut event) = stream.read() {
|
|
|
|
|
event.origin = Some(stream.receiver_node().clone());
|
|
|
|
|
|
|
|
|
|
if let Err(e) = sender.send((stream.receiver_node().clone(), event)) {
|
|
|
|
|
log::trace!("Failed to get event from {}: {}", recv_id, e);
|
|
|
|
|
break;
|
|
|
|
|
loop {
|
|
|
|
|
match stream.read() {
|
|
|
|
|
Ok(mut event) => {
|
|
|
|
|
event.origin = Some(stream.receiver_node().clone());
|
|
|
|
|
|
|
|
|
|
if let Err(e) = sender.send((stream.receiver_node().clone(), event))
|
|
|
|
|
{
|
|
|
|
|
log::trace!("Failed to get event from {}: {}", recv_id, e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
log::error!("Failed to send event: {}", e);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if let Err(e) = stream.shutdown() {
|
|
|
|
|
log::error!("Failed to shutdown stream: {}", e);
|
|
|
|
|
}
|
|
|
|
|
threads.lock().remove(&thread::current().id());
|
|
|
|
|
}
|
|
|
|
|
})?;
|
|
|
|
|