From 49d545853871d48c88be1e1e4929a58c43b2a953 Mon Sep 17 00:00:00 2001 From: trivernis Date: Sat, 12 Sep 2020 14:31:22 +0200 Subject: [PATCH] Add scheduled thread pool Signed-off-by: trivernis --- Cargo.toml | 4 +++- src/server.rs | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index efd268b..5a19579 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,6 @@ crc = "1.8.1" serde = "1.0.115" byteorder = "1.3.4" log = "0.4.11" -crossbeam-utils = "0.7.2" \ No newline at end of file +crossbeam-utils = "0.7.2" +scheduled-thread-pool = "0.2.5" +num_cpus = "1.13.0" \ No newline at end of file diff --git a/src/server.rs b/src/server.rs index 2cc4424..c3644eb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{Receiver, Sender, channel}; use crossbeam_utils::sync::WaitGroup; use std::mem; +use scheduled_thread_pool::ScheduledThreadPool; const BUF_SIZE: usize = 512; @@ -46,11 +47,18 @@ impl RpcServer { /// Starts the RPC server pub fn start(&mut self) -> io::Result<()> { let listener = TcpListener::bind(&self.address)?; + let pool = ScheduledThreadPool::new(num_cpus::get()); for stream in listener.incoming() { log::trace!("Connection received."); match stream { - Ok(s) => if let Err(e) = self.handle_message(s) { - log::trace!("Error handling message {}", e.to_string()) + Ok(s) => { + let sender = Sender::clone(&self.sender); + log::trace!("Scheduling message to be handled by thread pool"); + pool.execute(|| { + if let Err(e) = Self::handle_message(sender, s) { + log::trace!("Error handling message {}", e.to_string()) + } + }); }, Err(e) => log::trace!("TCP Error {}", e.to_string()) } @@ -60,7 +68,7 @@ impl RpcServer { } /// Handles a message - fn handle_message(&mut self, mut incoming: TcpStream) -> io::Result<()> { + fn handle_message(sender: Sender>>, mut incoming: TcpStream) -> io::Result<()> { let mut length_raw = [0u8; 4]; incoming.read_exact(&mut length_raw)?; let length = BigEndian::read_u32(&length_raw); @@ -85,7 +93,7 @@ impl RpcServer { wg: WaitGroup::clone(&wg), response: None, })); - self.sender.send(Arc::clone(&handler)).unwrap(); + sender.send(Arc::clone(&handler)).unwrap(); wg.wait(); if let Some(response) = mem::replace(&mut handler.lock().unwrap().response, None) { incoming.write(&response.to_bytes())?;