From 1d1e9bcd8771f85e46dc370856ed3e41329c3964 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 7 Aug 2021 16:45:02 +0200 Subject: [PATCH] Add lock for parallel runs and detached mode Signed-off-by: trivernis --- Cargo.toml | 2 +- src/main.rs | 5 +--- src/server/action.rs | 49 +++++++++++++++++++++++++++++++--- src/server/command_template.rs | 1 + src/server/mod.rs | 2 +- src/utils/settings.rs | 4 +++ 6 files changed, 53 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6cef58d..c8cc2f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ features = ["derive"] [dependencies.tokio] version = "1.9.0" -features = ["macros", "process"] +features = ["macros", "process", "sync"] [features] default = ["tokio/rt-multi-thread"] diff --git a/src/main.rs b/src/main.rs index 14daa7f..a4cd13b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,3 @@ -#[cfg(all(feature = "singlethreaded", feature = "multithreaded"))] -compile_error!("Can't have singlethreaded and mutithreaded feature enabled at the same time."); - use std::path::{Path, PathBuf}; use utils::logging::init_logger; @@ -36,7 +33,7 @@ async fn init_and_start() { for (name, endpoint) in &settings.endpoints { log::info!("Adding endpoint '{}' with path '{}'", name, &endpoint.path); - server.add_hook(endpoint.path.clone(), endpoint.action.clone().into()) + server.add_hook(endpoint.path.clone(), endpoint.into()) } server.start(&settings.server.address).await diff --git a/src/server/action.rs b/src/server/action.rs index 8441800..76a1222 100644 --- a/src/server/action.rs +++ b/src/server/action.rs @@ -1,31 +1,71 @@ use crate::server::command_template::CommandTemplate; use crate::utils::error::MultihookResult; +use crate::utils::settings::EndpointSettings; use serde_json::Value; use std::fs::read_to_string; +use std::mem; use std::path::PathBuf; +use std::sync::Arc; use tokio::process::Command; +use tokio::sync::Semaphore; +static MAX_CONCURRENCY: usize = 256; + +#[derive(Clone)] pub struct HookAction { command: CommandTemplate, + parallel_lock: Arc, + run_detached: bool, } impl HookAction { - pub fn new(command: S) -> Self { + pub fn new(command: S, parallel: bool, detached: bool) -> Self { + let parallel_lock = if parallel { + Semaphore::new(MAX_CONCURRENCY) + } else { + Semaphore::new(1) + }; Self { command: CommandTemplate::new(command), + parallel_lock: Arc::new(parallel_lock), + run_detached: detached, } } pub async fn execute(&self, body: &str) -> MultihookResult<()> { + if self.run_detached { + tokio::spawn({ + let action = self.clone(); + let body = body.to_owned(); + async move { + if let Err(e) = action.execute_command(&body).await { + log::error!("Detached hook threw an error: {:?}", e); + } + } + }); + + Ok(()) + } else { + self.execute_command(body).await + } + } + + async fn execute_command(&self, body: &str) -> MultihookResult<()> { let json_body: Value = serde_json::from_str(body).unwrap_or_default(); let command = self.command.evaluate(&json_body); + log::debug!("Acquiring lock for parallel runs..."); + let permit = self.parallel_lock.acquire().await.unwrap(); + log::debug!("Lock acquired. Running command..."); let output = Command::new("sh") .env("HOOK_BODY", body) .arg("-c") .arg(command) + .kill_on_drop(true) .output() .await?; + log::debug!("Command finished. Releasing parallel lock..."); + mem::drop(permit); let stderr = String::from_utf8_lossy(&output.stderr[..]); let stdout = String::from_utf8_lossy(&output.stdout[..]); log::debug!("Command output is: {}", stdout); @@ -38,10 +78,11 @@ impl HookAction { } } -impl From for HookAction { - fn from(action: String) -> Self { +impl From<&EndpointSettings> for HookAction { + fn from(endpoint: &EndpointSettings) -> Self { + let action = endpoint.action.clone(); let path = PathBuf::from(&action); let contents = read_to_string(path).unwrap_or(action); - Self::new(contents) + Self::new(contents, endpoint.allow_parallel, endpoint.run_detached) } } diff --git a/src/server/command_template.rs b/src/server/command_template.rs index 30f056c..d69b11c 100644 --- a/src/server/command_template.rs +++ b/src/server/command_template.rs @@ -3,6 +3,7 @@ use lazy_static::lazy_static; use regex::{Match, Regex}; use serde_json::Value; +#[derive(Clone)] pub struct CommandTemplate { src: String, matches: Vec<(usize, usize)>, diff --git a/src/server/mod.rs b/src/server/mod.rs index a4a5909..5e9f61d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -10,7 +10,7 @@ use action::HookAction; use crate::utils::error::MultihookError; -mod action; +pub mod action; pub mod command_template; pub struct HookServer { diff --git a/src/utils/settings.rs b/src/utils/settings.rs index 8a6e43b..dcf4e1e 100644 --- a/src/utils/settings.rs +++ b/src/utils/settings.rs @@ -21,6 +21,10 @@ pub struct ServerSettings { pub struct EndpointSettings { pub path: String, pub action: String, + #[serde(default)] + pub allow_parallel: bool, + #[serde(default)] + pub run_detached: bool, } impl Default for Settings {