From d3e4b3de4d5215fbae9cb990eaab904e05d847f1 Mon Sep 17 00:00:00 2001 From: trivernis Date: Thu, 10 Sep 2020 18:54:30 +0200 Subject: [PATCH] Add rpc server to startup Signed-off-by: trivernis --- Cargo.lock | 110 ++++++++++++++++++++++++++++++++++++++ Cargo.toml | 8 ++- msg-rpc | 2 +- src/main.rs | 47 ++++++++++++++++ src/server/messages.rs | 47 ++++++++++++++++ src/server/mod.rs | 3 +- src/server/rpc_methods.rs | 8 +-- src/server/user_rpc.rs | 81 +++++++++++++++++++++++++++- 8 files changed, 299 insertions(+), 7 deletions(-) create mode 100644 src/server/messages.rs diff --git a/Cargo.lock b/Cargo.lock index 55acba0..bbef3f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,14 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +dependencies = [ + "memchr", +] + [[package]] name = "async-channel" version = "1.4.2" @@ -103,6 +112,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi 0.3.9", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -194,6 +214,17 @@ dependencies = [ "bitflags", ] +[[package]] +name = "colored" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" +dependencies = [ + "atty", + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "combine" version = "4.3.2" @@ -283,6 +314,19 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "event-listener" version = "2.4.0" @@ -306,11 +350,17 @@ name = "flotte-user-management" version = "0.1.0" dependencies = [ "byteorder", + "colored", + "crossbeam-utils", "dotenv", + "env_logger", + "log", "msgrpc", "postgres", "rand", "redis", + "rmp", + "rmp-serde", "scrypt", "serde", "serde_postgres", @@ -499,6 +549,15 @@ dependencies = [ "digest", ] +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + [[package]] name = "idna" version = "0.2.0" @@ -901,6 +960,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quote" version = "1.0.7" @@ -978,6 +1043,24 @@ version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +[[package]] +name = "regex" +version = "1.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-syntax" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" + [[package]] name = "rmp" version = "0.8.9" @@ -1144,6 +1227,24 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "termcolor" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "tinyvec" version = "0.3.4" @@ -1383,6 +1484,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index a20e745..05c1fdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,10 @@ serde = {version = "1.0.115", features = ["serde_derive"]} rand = "0.7.3" scrypt = "0.4.1" zeroize = {version = "1.1.0", features = ["zeroize_derive"]} -byteorder = "1.3.4" \ No newline at end of file +byteorder = "1.3.4" +rmp-serde = "0.14.4" +rmp = "0.8.9" +log = "0.4.11" +env_logger = "0.7.1" +colored = "2.0.0" +crossbeam-utils = "0.7.2" \ No newline at end of file diff --git a/msg-rpc b/msg-rpc index f25e96c..085ed6e 160000 --- a/msg-rpc +++ b/msg-rpc @@ -1 +1 @@ -Subproject commit f25e96c2ae3fd5b177ec3eec176ccf0ae381984c +Subproject commit 085ed6ee0454ae9b4a10e579cbae6aa4e5904f25 diff --git a/src/main.rs b/src/main.rs index bea523c..2d46e51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,53 @@ +use colored::Colorize; +use crossbeam_utils::sync::WaitGroup; +use env_logger::Env; use flotte_user_management::database::Database; +use flotte_user_management::server::user_rpc::UserRpcServer; +use log::Level; +use std::thread; fn main() { + init_logger(); let database = Database::new().unwrap(); database.init().unwrap(); + let rpc_server = UserRpcServer::new(&database); + let wg = WaitGroup::new(); + { + let wg = WaitGroup::clone(&wg); + thread::spawn(move || { + rpc_server.start(); + std::mem::drop(wg); + }); + } + wg.wait(); +} + +fn init_logger() { + env_logger::Builder::from_env(Env::default().default_filter_or("info")) + .format(|buf, record| { + use std::io::Write; + let color = get_level_style(record.level()); + writeln!( + buf, + "{}: {}", + record + .level() + .to_string() + .to_lowercase() + .as_str() + .color(color), + record.args() + ) + }) + .init(); +} + +fn get_level_style(level: Level) -> colored::Color { + match level { + Level::Trace => colored::Color::Magenta, + Level::Debug => colored::Color::Blue, + Level::Info => colored::Color::Green, + Level::Warn => colored::Color::Yellow, + Level::Error => colored::Color::Red, + } } diff --git a/src/server/messages.rs b/src/server/messages.rs new file mode 100644 index 0000000..9c7bb09 --- /dev/null +++ b/src/server/messages.rs @@ -0,0 +1,47 @@ +use serde::export::Formatter; +use serde::{Deserialize, Serialize}; +use std::error::Error; +use std::fmt; +use std::fmt::Display; + +#[derive(Deserialize)] +pub struct ValidateTokenRequest { + pub token: [u8; 32], +} + +#[derive(Debug, Serialize)] +pub struct ErrorMessage { + message: String, +} + +impl ErrorMessage { + pub fn new(message: String) -> Self { + Self { message } + } +} + +impl Display for ErrorMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.message) + } +} +impl Error for ErrorMessage {} + +#[derive(Serialize)] +pub struct InfoEntry { + name: String, + method: [u8; 4], + description: String, + data: String, +} + +impl InfoEntry { + pub fn new(name: &str, method: [u8; 4], description: &str, data: &str) -> Self { + Self { + method, + name: name.to_string(), + description: description.to_string(), + data: data.to_string(), + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 790cf2f..ec75acc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,2 +1,3 @@ +pub mod messages; +pub mod rpc_methods; pub mod user_rpc; -pub mod rpc_methods; \ No newline at end of file diff --git a/src/server/rpc_methods.rs b/src/server/rpc_methods.rs index 29a7e09..eccd169 100644 --- a/src/server/rpc_methods.rs +++ b/src/server/rpc_methods.rs @@ -1,4 +1,6 @@ #![allow(dead_code)] -const NULL: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; -const VALIDATE_TOKEN: [u8; 4] = [0x56, 0x41, 0x4c, 0x49]; -const GET_ROLES: [u8; 4] = [0x52, 0x4f, 0x4c, 0x45]; +pub(crate) const NULL: [u8; 4] = [0x00, 0x00, 0x00, 0x00]; +pub(crate) const ERROR: [u8; 4] = [0x0F, 0x0F, 0x0F, 0x0F]; +pub(crate) const INFO: [u8; 4] = [0x49, 0x4e, 0x46, 0x4f]; +pub(crate) const VALIDATE_TOKEN: [u8; 4] = [0x56, 0x41, 0x4c, 0x49]; +pub(crate) const GET_ROLES: [u8; 4] = [0x52, 0x4f, 0x4c, 0x45]; diff --git a/src/server/user_rpc.rs b/src/server/user_rpc.rs index 9e22f68..322b971 100644 --- a/src/server/user_rpc.rs +++ b/src/server/user_rpc.rs @@ -1,6 +1,85 @@ +use super::rpc_methods::*; +use crate::database::Database; +use crate::server::messages::{ErrorMessage, InfoEntry, ValidateTokenRequest}; +use msgrpc::message::Message; +use msgrpc::server::RpcServer; +use rmp_serde::Deserializer; +use serde::Deserialize; +use std::sync::Arc; +use std::thread; + +const RPC_SERVER_ADDRESS: &str = "RPC_SERVER_ADDRESS"; +const DEFAULT_SERVER_ADDRESS: &str = "127.0.0.1:5555"; + pub struct UserRpcServer { + database: Database, } +type RpcResult = Result; + impl UserRpcServer { + pub fn new(database: &Database) -> Self { + Self { + database: Database::clone(database), + } + } + + pub fn start(&self) { + let mut server = RpcServer::new( + dotenv::var(RPC_SERVER_ADDRESS).unwrap_or(DEFAULT_SERVER_ADDRESS.to_string()), + ); + let receiver = Arc::clone(&server.receiver); + thread::spawn(move || { + server.start().unwrap(); + }); + while let Ok(h) = receiver.lock().unwrap().recv() { + let mut handler = h.lock().unwrap(); + let response = match handler.message.method { + INFO => self.handle_info(), + GET_ROLES => unimplemented!(), + VALIDATE_TOKEN => self.handle_validate_token(&handler.message.data), + _ => Err(ErrorMessage::new("Invalid Method".to_string())), + } + .unwrap_or_else(|e| Message::new_with_serialize(ERROR, e)); + log::trace!("Responding with {:?}", &response); + handler.done(response); + } + } -} \ No newline at end of file + fn handle_validate_token(&self, data: &Vec) -> RpcResult { + log::trace!("Validating token."); + let message = + ValidateTokenRequest::deserialize(&mut Deserializer::new(&mut data.as_slice())) + .map_err(|e| ErrorMessage::new(e.to_string()))?; + let valid = self + .database + .users + .validate_request_token(&message.token) + .unwrap_or(false); + log::trace!("Serializing..."); + let data = rmp_serde::to_vec(&valid).map_err(|e| ErrorMessage::new(e.to_string()))?; + + Ok(Message::new(VALIDATE_TOKEN, data)) + } + + fn handle_info(&self) -> RpcResult { + Ok(Message::new_with_serialize( + INFO, + vec![ + InfoEntry::new("info", INFO, "Shows this entry", ""), + InfoEntry::new( + "validate token", + VALIDATE_TOKEN, + "Validates a request token", + "{token: [u8; 32]}", + ), + InfoEntry::new( + "get roles", + GET_ROLES, + "Returns the roles the user is assigned to", + "{token: [u8; 32]}", + ), + ], + )) + } +}