From 49dd76c1422e7a999b7b676871270044e930599d Mon Sep 17 00:00:00 2001 From: trivernis Date: Wed, 9 Sep 2020 12:06:19 +0200 Subject: [PATCH] Add RpcServer implementation Signed-off-by: trivernis --- Cargo.toml | 4 ++- src/lib.rs | 1 + src/server.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 src/server.rs diff --git a/Cargo.toml b/Cargo.toml index 65e2b20..efd268b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,6 @@ rmp = "0.8.9" rmp-serde = "0.14.4" crc = "1.8.1" serde = "1.0.115" -byteorder = "1.3.4" \ No newline at end of file +byteorder = "1.3.4" +log = "0.4.11" +crossbeam-utils = "0.7.2" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index d92bda7..f78a899 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod message; +pub mod server; #[cfg(test)] mod tests { diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..310166e --- /dev/null +++ b/src/server.rs @@ -0,0 +1,80 @@ +use std::net::{TcpListener, TcpStream}; +use std::io::{Read, ErrorKind, Write}; +use std::io; +use byteorder::{BigEndian, ByteOrder}; +use crate::message::Message; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{Receiver, Sender}; +use crossbeam_utils::sync::WaitGroup; +use std::mem; + +const BUF_SIZE: usize = 512; + +#[derive(Clone, Debug)] +pub struct MessageHandler { + message: Message, + pub response: Option, + pub wg: WaitGroup, +} + +impl MessageHandler { + pub fn done(&mut self, response: Message) { + self.response = Some(response); + self.wg = WaitGroup::new(); + } +} + +#[derive( Debug)] +pub struct RpcServer { + address: String, + pub receiver: Arc>>>>, + sender: Sender>> +} + +impl RpcServer { + /// Starts the RPC server + pub fn start(&mut self) -> io::Result<()> { + let listener = TcpListener::bind(&self.address)?; + for stream in listener.incoming() { + match stream { + Ok(s) => if let Err(e) = self.handle_message(s) { + log::trace!("Error handling message {}", e.to_string()) + }, + Err(e) => log::trace!("TCP Error {}", e.to_string()) + } + } + + Ok(()) + } + + /// Handles a message + fn handle_message(&mut self, mut incoming: TcpStream) -> io::Result<()> { + let mut length_raw = [0u8; 4]; + incoming.read_exact(&mut length_raw)?; + let length = BigEndian::read_u32(&length_raw); + let mut data = Vec::new(); + data.append(&mut length_raw.to_vec()); + + for _ in 0..(length as f32 / BUF_SIZE as f32).ceil() as usize { + let mut buf = [0u8; BUF_SIZE]; + incoming.read(&mut buf)?; + data.append(&mut buf.to_vec()) + } + + let message = Message::from_bytes(&data).map_err(|_|io::Error::from(ErrorKind::InvalidData))?; + let wg = WaitGroup::new(); + + let handler = Arc::new(Mutex::new(MessageHandler { + message, + wg: WaitGroup::clone(&wg), + response: None, + })); + self.sender.send(Arc::clone(&handler)).unwrap(); + wg.wait(); + if let Some(response) = mem::replace(&mut handler.lock().unwrap().response, None) { + incoming.write(&response.to_bytes())?; + } + + Ok(()) + } +} \ No newline at end of file