diff --git a/Cargo.lock b/Cargo.lock index c2e7fe2..016a1ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,6 +381,7 @@ dependencies = [ "log 0.4.11", "mime 0.3.16", "msgrpc", + "num_cpus", "postgres", "r2d2", "r2d2_postgres", @@ -388,6 +389,7 @@ dependencies = [ "rmp", "rmp-serde", "rouille", + "scheduled-thread-pool", "serde", "serde_json", "serde_postgres", @@ -751,8 +753,10 @@ dependencies = [ "crc", "crossbeam-utils", "log 0.4.11", + "num_cpus", "rmp", "rmp-serde", + "scheduled-thread-pool", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index f650c82..8579f69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,4 +28,6 @@ rouille = "3.0.0" base64 = "0.12.3" chrono = "0.4.15" r2d2 = "0.8.9" -r2d2_postgres = "0.16.0" \ No newline at end of file +r2d2_postgres = "0.16.0" +scheduled-thread-pool = "0.2.5" +num_cpus = "1.13.0" \ No newline at end of file diff --git a/msg-rpc b/msg-rpc index 085ed6e..49d5458 160000 --- a/msg-rpc +++ b/msg-rpc @@ -1 +1 @@ -Subproject commit 085ed6ee0454ae9b4a10e579cbae6aa4e5904f25 +Subproject commit 49d545853871d48c88be1e1e4929a58c43b2a953 diff --git a/src/database/permissions.rs b/src/database/permissions.rs index 4ef98b9..3633fd3 100644 --- a/src/database/permissions.rs +++ b/src/database/permissions.rs @@ -34,6 +34,7 @@ impl Permissions { let mut connection = self.pool.get()?; let mut transaction = connection.transaction()?; let mut created_permissions = Vec::new(); + let _: Vec> = permissions .iter() .map(|CreatePermissionsEntry { name, description }| { diff --git a/src/server/user_rpc.rs b/src/server/user_rpc.rs index 2ff41fc..600170f 100644 --- a/src/server/user_rpc.rs +++ b/src/server/user_rpc.rs @@ -8,6 +8,7 @@ use crate::utils::get_user_id_from_token; use msgrpc::message::Message; use msgrpc::server::RpcServer; use rmp_serde::Deserializer; +use scheduled_thread_pool::ScheduledThreadPool; use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; @@ -41,31 +42,39 @@ impl UserRpcServer { server.start().unwrap(); }) .unwrap(); + let pool = ScheduledThreadPool::new(num_cpus::get()); log::info!("RPC-Server running on {}", listen_address); while let Ok(h) = receiver.lock().unwrap().recv() { - let mut handler = h.lock().unwrap(); - log::debug!("Received message {:?}", handler.message); - let response = match handler.message.method { - INFO => self.handle_info(), - GET_ROLES => self.handle_get_roles(&handler.message.data), - VALIDATE_TOKEN => self.handle_validate_token(&handler.message.data), - GET_ROLE_PERMISSIONS => self.handle_get_permissions(&handler.message.data), - CREATE_ROLE => self.handle_create_role(&handler.message.data), - CREATE_PERMISSION => self.handle_create_permissions(&handler.message.data), - _ => Err(ErrorMessage::new("Invalid Method".to_string())), - } - .unwrap_or_else(|e| Message::new_with_serialize(ERROR, e)); - log::debug!("Responding with message {:?}", &response); - handler.done(response); + let database = Database::clone(&self.database); + log::trace!("Scheduling message for execution in pool"); + pool.execute(move || { + let mut handler = h.lock().unwrap(); + log::debug!("Received message {:?}", handler.message); + let response = match handler.message.method { + INFO => Self::handle_info(), + GET_ROLES => Self::handle_get_roles(database, &handler.message.data), + VALIDATE_TOKEN => Self::handle_validate_token(database, &handler.message.data), + GET_ROLE_PERMISSIONS => { + Self::handle_get_permissions(database, &handler.message.data) + } + CREATE_ROLE => Self::handle_create_role(database, &handler.message.data), + CREATE_PERMISSION => { + Self::handle_create_permissions(database, &handler.message.data) + } + _ => Err(ErrorMessage::new("Invalid Method".to_string())), + } + .unwrap_or_else(|e| Message::new_with_serialize(ERROR, e)); + log::debug!("Responding with message {:?}", &response); + handler.done(response); + }); } } - fn handle_validate_token(&self, data: &Vec) -> RpcResult { + fn handle_validate_token(database: Database, data: &Vec) -> RpcResult { log::trace!("Validating token."); let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) .map_err(|e| ErrorMessage::new(e.to_string()))?; - let valid = self - .database + let valid = database .users .validate_request_token(&message.token) .unwrap_or((false, -1)); @@ -75,7 +84,7 @@ impl UserRpcServer { Ok(Message::new(VALIDATE_TOKEN, data)) } - fn handle_info(&self) -> RpcResult { + fn handle_info() -> RpcResult { log::trace!("Get Info"); Ok(Message::new_with_serialize( INFO, @@ -115,14 +124,14 @@ impl UserRpcServer { )) } - fn handle_get_permissions(&self, data: &Vec) -> RpcResult { + fn handle_get_permissions(database: Database, data: &Vec) -> RpcResult { log::trace!("Get Permissions"); let message = GetPermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) .map_err(|e| ErrorMessage::new(e.to_string()))?; let mut response_data = HashMap::new(); for role_id in message.roles { - let permissions = self.database.role_permission.by_role(role_id)?; + let permissions = database.role_permission.by_role(role_id)?; response_data.insert(role_id.to_string(), permissions); } @@ -132,12 +141,11 @@ impl UserRpcServer { )) } - fn handle_get_roles(&self, data: &Vec) -> RpcResult { + fn handle_get_roles(database: Database, data: &Vec) -> RpcResult { log::trace!("Get Roles"); let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) .map_err(|e| ErrorMessage::new(e.to_string()))?; - if !self - .database + if !database .users .validate_request_token(&message.token) .unwrap_or((false, -1)) @@ -146,31 +154,29 @@ impl UserRpcServer { return Err(ErrorMessage::new("Invalid request token".to_string())); } let user_id = get_user_id_from_token(&message.token); - let response_data = self.database.user_roles.by_user(user_id)?; + let response_data = database.user_roles.by_user(user_id)?; Ok(Message::new_with_serialize(GET_ROLES, response_data)) } - fn handle_create_role(&self, data: &Vec) -> RpcResult { + fn handle_create_role(database: Database, data: &Vec) -> RpcResult { log::trace!("Create Role"); let message = CreateRoleRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) .map_err(|e| ErrorMessage::new(e.to_string()))?; - let role = self.database.roles.create_role( - message.name, - message.description, - message.permissions, - )?; + let role = + database + .roles + .create_role(message.name, message.description, message.permissions)?; Ok(Message::new_with_serialize(CREATE_ROLE, role)) } - fn handle_create_permissions(&self, data: &Vec) -> RpcResult { + fn handle_create_permissions(database: Database, data: &Vec) -> RpcResult { log::trace!("Create Permission"); let message = CreatePermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) .map_err(|e| ErrorMessage::new(e.to_string()))?; - let permissions = self - .database + let permissions = database .permissions .create_permissions(message.permissions)?;