Fix dangling response listeners

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/32/head
trivernis 3 years ago
parent 9cc7d1ffe8
commit 38342eac44
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -62,6 +62,7 @@ async fn handle_connection<S: 'static + AsyncProtocolStream>(
/// Handles a single event in a different tokio context /// Handles a single event in a different tokio context
fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) { fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
ctx.set_ref_id(Some(event.id())); ctx.set_ref_id(Some(event.id()));
let event_id = event.id();
tokio::spawn(async move { tokio::spawn(async move {
match handler.handle_event(&ctx, event).await { match handler.handle_event(&ctx, event).await {
@ -74,6 +75,8 @@ fn handle_event(mut ctx: Context, handler: Arc<EventHandler>, event: Event) {
{ {
tracing::error!("Error occurred when sending error response: {:?}", e); tracing::error!("Error occurred when sending error response: {:?}", e);
} }
let mut reply_listeners = ctx.reply_listeners.lock().await;
reply_listeners.remove(&event_id);
} }
Err(e) => { Err(e) => {
// emit an error event // emit an error event

@ -20,6 +20,7 @@ pub struct EmitMetadataWithResponseStream<P: IntoPayload> {
pub(crate) emit_metadata: Option<EmitMetadata<P>>, pub(crate) emit_metadata: Option<EmitMetadata<P>>,
} }
/// An asynchronous stream one can read all responses to a specific event from.
pub struct ResponseStream { pub struct ResponseStream {
event_id: u64, event_id: u64,
ctx: Option<Context>, ctx: Option<Context>,

Loading…
Cancel
Save