Add lock for parallel runs and detached mode

Signed-off-by: trivernis <trivernis@protonmail.com>
main
trivernis 3 years ago
parent 40496b1686
commit 1d1e9bcd87
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -27,7 +27,7 @@ features = ["derive"]
[dependencies.tokio] [dependencies.tokio]
version = "1.9.0" version = "1.9.0"
features = ["macros", "process"] features = ["macros", "process", "sync"]
[features] [features]
default = ["tokio/rt-multi-thread"] default = ["tokio/rt-multi-thread"]

@ -1,6 +1,3 @@
#[cfg(all(feature = "singlethreaded", feature = "multithreaded"))]
compile_error!("Can't have singlethreaded and mutithreaded feature enabled at the same time.");
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use utils::logging::init_logger; use utils::logging::init_logger;
@ -36,7 +33,7 @@ async fn init_and_start() {
for (name, endpoint) in &settings.endpoints { for (name, endpoint) in &settings.endpoints {
log::info!("Adding endpoint '{}' with path '{}'", name, &endpoint.path); log::info!("Adding endpoint '{}' with path '{}'", name, &endpoint.path);
server.add_hook(endpoint.path.clone(), endpoint.action.clone().into()) server.add_hook(endpoint.path.clone(), endpoint.into())
} }
server.start(&settings.server.address).await server.start(&settings.server.address).await

@ -1,31 +1,71 @@
use crate::server::command_template::CommandTemplate; use crate::server::command_template::CommandTemplate;
use crate::utils::error::MultihookResult; use crate::utils::error::MultihookResult;
use crate::utils::settings::EndpointSettings;
use serde_json::Value; use serde_json::Value;
use std::fs::read_to_string; use std::fs::read_to_string;
use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::Semaphore;
static MAX_CONCURRENCY: usize = 256;
#[derive(Clone)]
pub struct HookAction { pub struct HookAction {
command: CommandTemplate, command: CommandTemplate,
parallel_lock: Arc<Semaphore>,
run_detached: bool,
} }
impl HookAction { impl HookAction {
pub fn new<S: ToString>(command: S) -> Self { pub fn new<S: ToString>(command: S, parallel: bool, detached: bool) -> Self {
let parallel_lock = if parallel {
Semaphore::new(MAX_CONCURRENCY)
} else {
Semaphore::new(1)
};
Self { Self {
command: CommandTemplate::new(command), command: CommandTemplate::new(command),
parallel_lock: Arc::new(parallel_lock),
run_detached: detached,
} }
} }
pub async fn execute(&self, body: &str) -> MultihookResult<()> { pub async fn execute(&self, body: &str) -> MultihookResult<()> {
if self.run_detached {
tokio::spawn({
let action = self.clone();
let body = body.to_owned();
async move {
if let Err(e) = action.execute_command(&body).await {
log::error!("Detached hook threw an error: {:?}", e);
}
}
});
Ok(())
} else {
self.execute_command(body).await
}
}
async fn execute_command(&self, body: &str) -> MultihookResult<()> {
let json_body: Value = serde_json::from_str(body).unwrap_or_default(); let json_body: Value = serde_json::from_str(body).unwrap_or_default();
let command = self.command.evaluate(&json_body); let command = self.command.evaluate(&json_body);
log::debug!("Acquiring lock for parallel runs...");
let permit = self.parallel_lock.acquire().await.unwrap();
log::debug!("Lock acquired. Running command...");
let output = Command::new("sh") let output = Command::new("sh")
.env("HOOK_BODY", body) .env("HOOK_BODY", body)
.arg("-c") .arg("-c")
.arg(command) .arg(command)
.kill_on_drop(true)
.output() .output()
.await?; .await?;
log::debug!("Command finished. Releasing parallel lock...");
mem::drop(permit);
let stderr = String::from_utf8_lossy(&output.stderr[..]); let stderr = String::from_utf8_lossy(&output.stderr[..]);
let stdout = String::from_utf8_lossy(&output.stdout[..]); let stdout = String::from_utf8_lossy(&output.stdout[..]);
log::debug!("Command output is: {}", stdout); log::debug!("Command output is: {}", stdout);
@ -38,10 +78,11 @@ impl HookAction {
} }
} }
impl From<String> for HookAction { impl From<&EndpointSettings> for HookAction {
fn from(action: String) -> Self { fn from(endpoint: &EndpointSettings) -> Self {
let action = endpoint.action.clone();
let path = PathBuf::from(&action); let path = PathBuf::from(&action);
let contents = read_to_string(path).unwrap_or(action); let contents = read_to_string(path).unwrap_or(action);
Self::new(contents) Self::new(contents, endpoint.allow_parallel, endpoint.run_detached)
} }
} }

@ -3,6 +3,7 @@ use lazy_static::lazy_static;
use regex::{Match, Regex}; use regex::{Match, Regex};
use serde_json::Value; use serde_json::Value;
#[derive(Clone)]
pub struct CommandTemplate { pub struct CommandTemplate {
src: String, src: String,
matches: Vec<(usize, usize)>, matches: Vec<(usize, usize)>,

@ -10,7 +10,7 @@ use action::HookAction;
use crate::utils::error::MultihookError; use crate::utils::error::MultihookError;
mod action; pub mod action;
pub mod command_template; pub mod command_template;
pub struct HookServer { pub struct HookServer {

@ -21,6 +21,10 @@ pub struct ServerSettings {
pub struct EndpointSettings { pub struct EndpointSettings {
pub path: String, pub path: String,
pub action: String, pub action: String,
#[serde(default)]
pub allow_parallel: bool,
#[serde(default)]
pub run_detached: bool,
} }
impl Default for Settings { impl Default for Settings {

Loading…
Cancel
Save