Change tcp message handling to be multithreaded

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 94ae69ec60
commit b73f41f67d
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

4
Cargo.lock generated

@ -381,6 +381,7 @@ dependencies = [
"log 0.4.11", "log 0.4.11",
"mime 0.3.16", "mime 0.3.16",
"msgrpc", "msgrpc",
"num_cpus",
"postgres", "postgres",
"r2d2", "r2d2",
"r2d2_postgres", "r2d2_postgres",
@ -388,6 +389,7 @@ dependencies = [
"rmp", "rmp",
"rmp-serde", "rmp-serde",
"rouille", "rouille",
"scheduled-thread-pool",
"serde", "serde",
"serde_json", "serde_json",
"serde_postgres", "serde_postgres",
@ -751,8 +753,10 @@ dependencies = [
"crc", "crc",
"crossbeam-utils", "crossbeam-utils",
"log 0.4.11", "log 0.4.11",
"num_cpus",
"rmp", "rmp",
"rmp-serde", "rmp-serde",
"scheduled-thread-pool",
"serde", "serde",
] ]

@ -28,4 +28,6 @@ rouille = "3.0.0"
base64 = "0.12.3" base64 = "0.12.3"
chrono = "0.4.15" chrono = "0.4.15"
r2d2 = "0.8.9" r2d2 = "0.8.9"
r2d2_postgres = "0.16.0" r2d2_postgres = "0.16.0"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"

@ -1 +1 @@
Subproject commit 085ed6ee0454ae9b4a10e579cbae6aa4e5904f25 Subproject commit 49d545853871d48c88be1e1e4929a58c43b2a953

@ -34,6 +34,7 @@ impl Permissions {
let mut connection = self.pool.get()?; let mut connection = self.pool.get()?;
let mut transaction = connection.transaction()?; let mut transaction = connection.transaction()?;
let mut created_permissions = Vec::new(); let mut created_permissions = Vec::new();
let _: Vec<DatabaseResult<()>> = permissions let _: Vec<DatabaseResult<()>> = permissions
.iter() .iter()
.map(|CreatePermissionsEntry { name, description }| { .map(|CreatePermissionsEntry { name, description }| {

@ -8,6 +8,7 @@ use crate::utils::get_user_id_from_token;
use msgrpc::message::Message; use msgrpc::message::Message;
use msgrpc::server::RpcServer; use msgrpc::server::RpcServer;
use rmp_serde::Deserializer; use rmp_serde::Deserializer;
use scheduled_thread_pool::ScheduledThreadPool;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -41,31 +42,39 @@ impl UserRpcServer {
server.start().unwrap(); server.start().unwrap();
}) })
.unwrap(); .unwrap();
let pool = ScheduledThreadPool::new(num_cpus::get());
log::info!("RPC-Server running on {}", listen_address); log::info!("RPC-Server running on {}", listen_address);
while let Ok(h) = receiver.lock().unwrap().recv() { while let Ok(h) = receiver.lock().unwrap().recv() {
let mut handler = h.lock().unwrap(); let database = Database::clone(&self.database);
log::debug!("Received message {:?}", handler.message); log::trace!("Scheduling message for execution in pool");
let response = match handler.message.method { pool.execute(move || {
INFO => self.handle_info(), let mut handler = h.lock().unwrap();
GET_ROLES => self.handle_get_roles(&handler.message.data), log::debug!("Received message {:?}", handler.message);
VALIDATE_TOKEN => self.handle_validate_token(&handler.message.data), let response = match handler.message.method {
GET_ROLE_PERMISSIONS => self.handle_get_permissions(&handler.message.data), INFO => Self::handle_info(),
CREATE_ROLE => self.handle_create_role(&handler.message.data), GET_ROLES => Self::handle_get_roles(database, &handler.message.data),
CREATE_PERMISSION => self.handle_create_permissions(&handler.message.data), VALIDATE_TOKEN => Self::handle_validate_token(database, &handler.message.data),
_ => Err(ErrorMessage::new("Invalid Method".to_string())), GET_ROLE_PERMISSIONS => {
} Self::handle_get_permissions(database, &handler.message.data)
.unwrap_or_else(|e| Message::new_with_serialize(ERROR, e)); }
log::debug!("Responding with message {:?}", &response); CREATE_ROLE => Self::handle_create_role(database, &handler.message.data),
handler.done(response); CREATE_PERMISSION => {
Self::handle_create_permissions(database, &handler.message.data)
}
_ => Err(ErrorMessage::new("Invalid Method".to_string())),
}
.unwrap_or_else(|e| Message::new_with_serialize(ERROR, e));
log::debug!("Responding with message {:?}", &response);
handler.done(response);
});
} }
} }
fn handle_validate_token(&self, data: &Vec<u8>) -> RpcResult<Message> { fn handle_validate_token(database: Database, data: &Vec<u8>) -> RpcResult<Message> {
log::trace!("Validating token."); log::trace!("Validating token.");
let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice()))
.map_err(|e| ErrorMessage::new(e.to_string()))?; .map_err(|e| ErrorMessage::new(e.to_string()))?;
let valid = self let valid = database
.database
.users .users
.validate_request_token(&message.token) .validate_request_token(&message.token)
.unwrap_or((false, -1)); .unwrap_or((false, -1));
@ -75,7 +84,7 @@ impl UserRpcServer {
Ok(Message::new(VALIDATE_TOKEN, data)) Ok(Message::new(VALIDATE_TOKEN, data))
} }
fn handle_info(&self) -> RpcResult<Message> { fn handle_info() -> RpcResult<Message> {
log::trace!("Get Info"); log::trace!("Get Info");
Ok(Message::new_with_serialize( Ok(Message::new_with_serialize(
INFO, INFO,
@ -115,14 +124,14 @@ impl UserRpcServer {
)) ))
} }
fn handle_get_permissions(&self, data: &Vec<u8>) -> RpcResult<Message> { fn handle_get_permissions(database: Database, data: &Vec<u8>) -> RpcResult<Message> {
log::trace!("Get Permissions"); log::trace!("Get Permissions");
let message = let message =
GetPermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) GetPermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice()))
.map_err(|e| ErrorMessage::new(e.to_string()))?; .map_err(|e| ErrorMessage::new(e.to_string()))?;
let mut response_data = HashMap::new(); let mut response_data = HashMap::new();
for role_id in message.roles { for role_id in message.roles {
let permissions = self.database.role_permission.by_role(role_id)?; let permissions = database.role_permission.by_role(role_id)?;
response_data.insert(role_id.to_string(), permissions); response_data.insert(role_id.to_string(), permissions);
} }
@ -132,12 +141,11 @@ impl UserRpcServer {
)) ))
} }
fn handle_get_roles(&self, data: &Vec<u8>) -> RpcResult<Message> { fn handle_get_roles(database: Database, data: &Vec<u8>) -> RpcResult<Message> {
log::trace!("Get Roles"); log::trace!("Get Roles");
let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) let message = TokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice()))
.map_err(|e| ErrorMessage::new(e.to_string()))?; .map_err(|e| ErrorMessage::new(e.to_string()))?;
if !self if !database
.database
.users .users
.validate_request_token(&message.token) .validate_request_token(&message.token)
.unwrap_or((false, -1)) .unwrap_or((false, -1))
@ -146,31 +154,29 @@ impl UserRpcServer {
return Err(ErrorMessage::new("Invalid request token".to_string())); return Err(ErrorMessage::new("Invalid request token".to_string()));
} }
let user_id = get_user_id_from_token(&message.token); let user_id = get_user_id_from_token(&message.token);
let response_data = self.database.user_roles.by_user(user_id)?; let response_data = database.user_roles.by_user(user_id)?;
Ok(Message::new_with_serialize(GET_ROLES, response_data)) Ok(Message::new_with_serialize(GET_ROLES, response_data))
} }
fn handle_create_role(&self, data: &Vec<u8>) -> RpcResult<Message> { fn handle_create_role(database: Database, data: &Vec<u8>) -> RpcResult<Message> {
log::trace!("Create Role"); log::trace!("Create Role");
let message = CreateRoleRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) let message = CreateRoleRequest::deserialize(&mut Deserializer::new(&mut data.as_slice()))
.map_err(|e| ErrorMessage::new(e.to_string()))?; .map_err(|e| ErrorMessage::new(e.to_string()))?;
let role = self.database.roles.create_role( let role =
message.name, database
message.description, .roles
message.permissions, .create_role(message.name, message.description, message.permissions)?;
)?;
Ok(Message::new_with_serialize(CREATE_ROLE, role)) Ok(Message::new_with_serialize(CREATE_ROLE, role))
} }
fn handle_create_permissions(&self, data: &Vec<u8>) -> RpcResult<Message> { fn handle_create_permissions(database: Database, data: &Vec<u8>) -> RpcResult<Message> {
log::trace!("Create Permission"); log::trace!("Create Permission");
let message = let message =
CreatePermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) CreatePermissionsRequest::deserialize(&mut Deserializer::new(&mut data.as_slice()))
.map_err(|e| ErrorMessage::new(e.to_string()))?; .map_err(|e| ErrorMessage::new(e.to_string()))?;
let permissions = self let permissions = database
.database
.permissions .permissions
.create_permissions(message.permissions)?; .create_permissions(message.permissions)?;

Loading…
Cancel
Save