Add scheduled thread pool

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 085ed6ee04
commit 49d5458538
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -13,4 +13,6 @@ crc = "1.8.1"
serde = "1.0.115" serde = "1.0.115"
byteorder = "1.3.4" byteorder = "1.3.4"
log = "0.4.11" log = "0.4.11"
crossbeam-utils = "0.7.2" crossbeam-utils = "0.7.2"
scheduled-thread-pool = "0.2.5"
num_cpus = "1.13.0"

@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, Sender, channel}; use std::sync::mpsc::{Receiver, Sender, channel};
use crossbeam_utils::sync::WaitGroup; use crossbeam_utils::sync::WaitGroup;
use std::mem; use std::mem;
use scheduled_thread_pool::ScheduledThreadPool;
const BUF_SIZE: usize = 512; const BUF_SIZE: usize = 512;
@ -46,11 +47,18 @@ impl RpcServer {
/// Starts the RPC server /// Starts the RPC server
pub fn start(&mut self) -> io::Result<()> { pub fn start(&mut self) -> io::Result<()> {
let listener = TcpListener::bind(&self.address)?; let listener = TcpListener::bind(&self.address)?;
let pool = ScheduledThreadPool::new(num_cpus::get());
for stream in listener.incoming() { for stream in listener.incoming() {
log::trace!("Connection received."); log::trace!("Connection received.");
match stream { match stream {
Ok(s) => if let Err(e) = self.handle_message(s) { Ok(s) => {
log::trace!("Error handling message {}", e.to_string()) let sender = Sender::clone(&self.sender);
log::trace!("Scheduling message to be handled by thread pool");
pool.execute(|| {
if let Err(e) = Self::handle_message(sender, s) {
log::trace!("Error handling message {}", e.to_string())
}
});
}, },
Err(e) => log::trace!("TCP Error {}", e.to_string()) Err(e) => log::trace!("TCP Error {}", e.to_string())
} }
@ -60,7 +68,7 @@ impl RpcServer {
} }
/// Handles a message /// Handles a message
fn handle_message(&mut self, mut incoming: TcpStream) -> io::Result<()> { fn handle_message(sender: Sender<Arc<Mutex<MessageHandler>>>, mut incoming: TcpStream) -> io::Result<()> {
let mut length_raw = [0u8; 4]; let mut length_raw = [0u8; 4];
incoming.read_exact(&mut length_raw)?; incoming.read_exact(&mut length_raw)?;
let length = BigEndian::read_u32(&length_raw); let length = BigEndian::read_u32(&length_raw);
@ -85,7 +93,7 @@ impl RpcServer {
wg: WaitGroup::clone(&wg), wg: WaitGroup::clone(&wg),
response: None, response: None,
})); }));
self.sender.send(Arc::clone(&handler)).unwrap(); sender.send(Arc::clone(&handler)).unwrap();
wg.wait(); wg.wait();
if let Some(response) = mem::replace(&mut handler.lock().unwrap().response, None) { if let Some(response) = mem::replace(&mut handler.lock().unwrap().response, None) {
incoming.write(&response.to_bytes())?; incoming.write(&response.to_bytes())?;

Loading…
Cancel
Save