|
|
@ -7,18 +7,21 @@ use scheduled_thread_pool::ScheduledThreadPool;
|
|
|
|
use crate::crypto::CryptoStream;
|
|
|
|
use crate::crypto::CryptoStream;
|
|
|
|
use crate::event::Event;
|
|
|
|
use crate::event::Event;
|
|
|
|
use crate::event_handler::EventHandler;
|
|
|
|
use crate::event_handler::EventHandler;
|
|
|
|
|
|
|
|
use crate::result::VentedError::UnknownNode;
|
|
|
|
use crate::result::{VentedError, VentedResult};
|
|
|
|
use crate::result::{VentedError, VentedResult};
|
|
|
|
use crate::server::data::{Node, ServerConnectionContext};
|
|
|
|
use crate::server::data::{Node, ServerConnectionContext};
|
|
|
|
use crate::server::server_events::{
|
|
|
|
use crate::server::server_events::{
|
|
|
|
NodeInformationPayload, CONNECT_EVENT, CONN_ACCEPT_EVENT, CONN_REJECT_EVENT,
|
|
|
|
AuthPayload, NodeInformationPayload, AUTH_EVENT, CONNECT_EVENT, CONN_ACCEPT_EVENT,
|
|
|
|
|
|
|
|
CONN_CHALLENGE_EVENT, CONN_REJECT_EVENT, READY_EVENT,
|
|
|
|
};
|
|
|
|
};
|
|
|
|
use parking_lot::Mutex;
|
|
|
|
use parking_lot::Mutex;
|
|
|
|
use std::io::Write;
|
|
|
|
use std::io::Write;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use std::thread;
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
use x25519_dalek::StaticSecret;
|
|
|
|
|
|
|
|
|
|
|
|
pub mod data;
|
|
|
|
pub mod data;
|
|
|
|
pub(crate) mod server_events;
|
|
|
|
pub mod server_events;
|
|
|
|
|
|
|
|
|
|
|
|
/// The vented server that provides parallel handling of connections
|
|
|
|
/// The vented server that provides parallel handling of connections
|
|
|
|
pub struct VentedServer {
|
|
|
|
pub struct VentedServer {
|
|
|
@ -27,20 +30,24 @@ pub struct VentedServer {
|
|
|
|
listener_pool: Arc<Mutex<ScheduledThreadPool>>,
|
|
|
|
listener_pool: Arc<Mutex<ScheduledThreadPool>>,
|
|
|
|
sender_pool: Arc<Mutex<ScheduledThreadPool>>,
|
|
|
|
sender_pool: Arc<Mutex<ScheduledThreadPool>>,
|
|
|
|
event_handler: Arc<Mutex<EventHandler>>,
|
|
|
|
event_handler: Arc<Mutex<EventHandler>>,
|
|
|
|
secret_key: SecretKey,
|
|
|
|
global_secret_key: SecretKey,
|
|
|
|
node_id: String,
|
|
|
|
node_id: String,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl VentedServer {
|
|
|
|
impl VentedServer {
|
|
|
|
pub fn new(node_id: String, nodes: Vec<Node>, num_threads: usize) -> Self {
|
|
|
|
pub fn new(
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
node_id: String,
|
|
|
|
|
|
|
|
secret_key: SecretKey,
|
|
|
|
|
|
|
|
nodes: Vec<Node>,
|
|
|
|
|
|
|
|
num_threads: usize,
|
|
|
|
|
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
Self {
|
|
|
|
node_id,
|
|
|
|
node_id,
|
|
|
|
event_handler: Arc::new(Mutex::new(EventHandler::new())),
|
|
|
|
event_handler: Arc::new(Mutex::new(EventHandler::new())),
|
|
|
|
listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
|
|
|
|
listener_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
|
|
|
|
sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
|
|
|
|
sender_pool: Arc::new(Mutex::new(ScheduledThreadPool::new(num_threads))),
|
|
|
|
connections: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
connections: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
secret_key: SecretKey::generate(&mut rng),
|
|
|
|
global_secret_key: secret_key,
|
|
|
|
known_nodes: Arc::new(Mutex::new(nodes)),
|
|
|
|
known_nodes: Arc::new(Mutex::new(nodes)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -55,7 +62,13 @@ impl VentedServer {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
Ok(())
|
|
|
|
Ok(())
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
if let Some(node) = self.known_nodes.lock().iter().find(|n| n.id == node_id) {
|
|
|
|
let found_node = self
|
|
|
|
|
|
|
|
.known_nodes
|
|
|
|
|
|
|
|
.lock()
|
|
|
|
|
|
|
|
.iter()
|
|
|
|
|
|
|
|
.find(|n| n.id == node_id)
|
|
|
|
|
|
|
|
.cloned();
|
|
|
|
|
|
|
|
if let Some(node) = found_node {
|
|
|
|
if let Some(address) = &node.address {
|
|
|
|
if let Some(address) = &node.address {
|
|
|
|
let handler = self.connect(address.clone())?;
|
|
|
|
let handler = self.connect(address.clone())?;
|
|
|
|
self.sender_pool.lock().execute(move || {
|
|
|
|
self.sender_pool.lock().execute(move || {
|
|
|
@ -104,8 +117,8 @@ impl VentedServer {
|
|
|
|
fn get_server_context(&self) -> ServerConnectionContext {
|
|
|
|
fn get_server_context(&self) -> ServerConnectionContext {
|
|
|
|
ServerConnectionContext {
|
|
|
|
ServerConnectionContext {
|
|
|
|
is_server: true,
|
|
|
|
is_server: true,
|
|
|
|
own_node_id: self.node_id.clone(),
|
|
|
|
node_id: self.node_id.clone(),
|
|
|
|
secret_key: self.secret_key.clone(),
|
|
|
|
global_secret: self.global_secret_key.clone(),
|
|
|
|
known_nodes: Arc::clone(&self.known_nodes),
|
|
|
|
known_nodes: Arc::clone(&self.known_nodes),
|
|
|
|
connections: Arc::clone(&self.connections),
|
|
|
|
connections: Arc::clone(&self.connections),
|
|
|
|
event_handler: Arc::clone(&self.event_handler),
|
|
|
|
event_handler: Arc::clone(&self.event_handler),
|
|
|
@ -121,6 +134,9 @@ impl VentedServer {
|
|
|
|
|
|
|
|
|
|
|
|
pool.lock().execute(move || {
|
|
|
|
pool.lock().execute(move || {
|
|
|
|
let stream = VentedServer::get_crypto_stream(params, stream).expect("Listener failed");
|
|
|
|
let stream = VentedServer::get_crypto_stream(params, stream).expect("Listener failed");
|
|
|
|
|
|
|
|
event_handler
|
|
|
|
|
|
|
|
.lock()
|
|
|
|
|
|
|
|
.handle_event(Event::new(READY_EVENT.to_string()));
|
|
|
|
while let Ok(event) = stream.read() {
|
|
|
|
while let Ok(event) = stream.read() {
|
|
|
|
if let Some(response) = event_handler.lock().handle_event(event) {
|
|
|
|
if let Some(response) = event_handler.lock().handle_event(event) {
|
|
|
|
stream.send(response).expect("Failed to send response");
|
|
|
|
stream.send(response).expect("Failed to send response");
|
|
|
@ -138,8 +154,8 @@ impl VentedServer {
|
|
|
|
let (node_id, secret_box) = VentedServer::perform_key_exchange(
|
|
|
|
let (node_id, secret_box) = VentedServer::perform_key_exchange(
|
|
|
|
params.is_server,
|
|
|
|
params.is_server,
|
|
|
|
&mut stream,
|
|
|
|
&mut stream,
|
|
|
|
¶ms.secret_key,
|
|
|
|
params.node_id.clone(),
|
|
|
|
params.own_node_id,
|
|
|
|
params.global_secret,
|
|
|
|
params.known_nodes,
|
|
|
|
params.known_nodes,
|
|
|
|
)?;
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
|
@ -170,6 +186,9 @@ impl VentedServer {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
|
|
|
|
self.event_handler
|
|
|
|
|
|
|
|
.lock()
|
|
|
|
|
|
|
|
.handle_event(Event::new(READY_EVENT.to_string()));
|
|
|
|
|
|
|
|
|
|
|
|
Ok(stream)
|
|
|
|
Ok(stream)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -178,14 +197,27 @@ impl VentedServer {
|
|
|
|
fn perform_key_exchange(
|
|
|
|
fn perform_key_exchange(
|
|
|
|
is_server: bool,
|
|
|
|
is_server: bool,
|
|
|
|
stream: &mut TcpStream,
|
|
|
|
stream: &mut TcpStream,
|
|
|
|
secret_key: &SecretKey,
|
|
|
|
|
|
|
|
own_node_id: String,
|
|
|
|
own_node_id: String,
|
|
|
|
|
|
|
|
global_secret: SecretKey,
|
|
|
|
known_nodes: Arc<Mutex<Vec<Node>>>,
|
|
|
|
known_nodes: Arc<Mutex<Vec<Node>>>,
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
|
|
|
|
let secret_key = SecretKey::generate(&mut rand::thread_rng());
|
|
|
|
if is_server {
|
|
|
|
if is_server {
|
|
|
|
Self::perform_server_key_exchange(stream, secret_key, own_node_id, known_nodes)
|
|
|
|
Self::perform_server_key_exchange(
|
|
|
|
|
|
|
|
stream,
|
|
|
|
|
|
|
|
&secret_key,
|
|
|
|
|
|
|
|
own_node_id,
|
|
|
|
|
|
|
|
global_secret,
|
|
|
|
|
|
|
|
known_nodes,
|
|
|
|
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
Self::perform_client_key_exchange(stream, secret_key, own_node_id)
|
|
|
|
Self::perform_client_key_exchange(
|
|
|
|
|
|
|
|
stream,
|
|
|
|
|
|
|
|
&secret_key,
|
|
|
|
|
|
|
|
own_node_id,
|
|
|
|
|
|
|
|
global_secret,
|
|
|
|
|
|
|
|
known_nodes,
|
|
|
|
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -194,6 +226,8 @@ impl VentedServer {
|
|
|
|
mut stream: &mut TcpStream,
|
|
|
|
mut stream: &mut TcpStream,
|
|
|
|
secret_key: &SecretKey,
|
|
|
|
secret_key: &SecretKey,
|
|
|
|
own_node_id: String,
|
|
|
|
own_node_id: String,
|
|
|
|
|
|
|
|
global_secret: SecretKey,
|
|
|
|
|
|
|
|
known_nodes: Arc<Mutex<Vec<Node>>>,
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
stream.write(
|
|
|
|
stream.write(
|
|
|
|
&Event::with_payload(
|
|
|
|
&Event::with_payload(
|
|
|
@ -207,7 +241,7 @@ impl VentedServer {
|
|
|
|
)?;
|
|
|
|
)?;
|
|
|
|
stream.flush()?;
|
|
|
|
stream.flush()?;
|
|
|
|
let event = Event::from_bytes(&mut stream)?;
|
|
|
|
let event = Event::from_bytes(&mut stream)?;
|
|
|
|
if event.name != CONN_ACCEPT_EVENT {
|
|
|
|
if event.name != CONN_CHALLENGE_EVENT {
|
|
|
|
return Err(VentedError::UnknownNode(event.name));
|
|
|
|
return Err(VentedError::UnknownNode(event.name));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let NodeInformationPayload {
|
|
|
|
let NodeInformationPayload {
|
|
|
@ -215,6 +249,37 @@ impl VentedServer {
|
|
|
|
node_id,
|
|
|
|
node_id,
|
|
|
|
} = event.get_payload::<NodeInformationPayload>().unwrap();
|
|
|
|
} = event.get_payload::<NodeInformationPayload>().unwrap();
|
|
|
|
let public_key = PublicKey::from(public_key);
|
|
|
|
let public_key = PublicKey::from(public_key);
|
|
|
|
|
|
|
|
let shared_auth_secret =
|
|
|
|
|
|
|
|
StaticSecret::from(global_secret.to_bytes()).diffie_hellman(&public_key);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream.write(
|
|
|
|
|
|
|
|
&Event::with_payload(
|
|
|
|
|
|
|
|
AUTH_EVENT.to_string(),
|
|
|
|
|
|
|
|
&AuthPayload {
|
|
|
|
|
|
|
|
calculated_secret: shared_auth_secret.to_bytes(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
.as_bytes(),
|
|
|
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let event = Event::from_bytes(&mut stream)?;
|
|
|
|
|
|
|
|
if event.name != CONN_ACCEPT_EVENT {
|
|
|
|
|
|
|
|
return Err(VentedError::UnknownNode(event.name));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let known_nodes = known_nodes.lock();
|
|
|
|
|
|
|
|
let node_static_info = event.get_payload::<NodeInformationPayload>()?;
|
|
|
|
|
|
|
|
let node_data = if let Some(data) = known_nodes
|
|
|
|
|
|
|
|
.iter()
|
|
|
|
|
|
|
|
.find(|n| n.id == node_static_info.node_id)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
data.clone()
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
return Err(UnknownNode(node_id));
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
if node_data.public_key.to_bytes() != node_static_info.public_key {
|
|
|
|
|
|
|
|
return Err(UnknownNode(node_id));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let secret_box = ChaChaBox::new(&public_key, &secret_key);
|
|
|
|
let secret_box = ChaChaBox::new(&public_key, &secret_key);
|
|
|
|
|
|
|
|
|
|
|
|
Ok((node_id, secret_box))
|
|
|
|
Ok((node_id, secret_box))
|
|
|
@ -226,6 +291,7 @@ impl VentedServer {
|
|
|
|
mut stream: &mut TcpStream,
|
|
|
|
mut stream: &mut TcpStream,
|
|
|
|
secret_key: &SecretKey,
|
|
|
|
secret_key: &SecretKey,
|
|
|
|
own_node_id: String,
|
|
|
|
own_node_id: String,
|
|
|
|
|
|
|
|
global_secret: SecretKey,
|
|
|
|
known_nodes: Arc<Mutex<Vec<Node>>>,
|
|
|
|
known_nodes: Arc<Mutex<Vec<Node>>>,
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
) -> VentedResult<(String, ChaChaBox)> {
|
|
|
|
let event = Event::from_bytes(&mut stream)?;
|
|
|
|
let event = Event::from_bytes(&mut stream)?;
|
|
|
@ -238,29 +304,53 @@ impl VentedServer {
|
|
|
|
} = event.get_payload::<NodeInformationPayload>().unwrap();
|
|
|
|
} = event.get_payload::<NodeInformationPayload>().unwrap();
|
|
|
|
let public_key = PublicKey::from(public_key);
|
|
|
|
let public_key = PublicKey::from(public_key);
|
|
|
|
|
|
|
|
|
|
|
|
if known_nodes
|
|
|
|
let known_nodes = known_nodes.lock();
|
|
|
|
.lock()
|
|
|
|
let node_data = if let Some(data) = known_nodes.iter().find(|n| n.id == node_id) {
|
|
|
|
.iter()
|
|
|
|
data.clone()
|
|
|
|
.find(|n| n.id == node_id)
|
|
|
|
} else {
|
|
|
|
.is_none()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?;
|
|
|
|
stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?;
|
|
|
|
stream.flush()?;
|
|
|
|
stream.flush()?;
|
|
|
|
return Err(VentedError::UnknownNode(node_id));
|
|
|
|
return Err(UnknownNode(node_id));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
let secret_box = ChaChaBox::new(&public_key, &secret_key);
|
|
|
|
let secret_box = ChaChaBox::new(&public_key, &secret_key);
|
|
|
|
stream.write(
|
|
|
|
stream.write(
|
|
|
|
&Event::with_payload(
|
|
|
|
&Event::with_payload(
|
|
|
|
CONN_ACCEPT_EVENT.to_string(),
|
|
|
|
CONN_CHALLENGE_EVENT.to_string(),
|
|
|
|
&NodeInformationPayload {
|
|
|
|
&NodeInformationPayload {
|
|
|
|
public_key: secret_key.public_key().to_bytes(),
|
|
|
|
public_key: secret_key.public_key().to_bytes(),
|
|
|
|
node_id: own_node_id,
|
|
|
|
node_id: own_node_id.clone(),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.as_bytes(),
|
|
|
|
.as_bytes(),
|
|
|
|
)?;
|
|
|
|
)?;
|
|
|
|
stream.flush()?;
|
|
|
|
stream.flush()?;
|
|
|
|
|
|
|
|
let auth_event = Event::from_bytes(&mut stream)?;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if auth_event.name != AUTH_EVENT {
|
|
|
|
|
|
|
|
return Err(VentedError::UnexpectedEvent(auth_event.name));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let AuthPayload { calculated_secret } = auth_event.get_payload::<AuthPayload>()?;
|
|
|
|
|
|
|
|
let expected_secret =
|
|
|
|
|
|
|
|
StaticSecret::from(secret_key.to_bytes()).diffie_hellman(&node_data.public_key);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if expected_secret.to_bytes() != calculated_secret {
|
|
|
|
|
|
|
|
stream.write(&Event::new(CONN_REJECT_EVENT.to_string()).as_bytes())?;
|
|
|
|
|
|
|
|
stream.flush()?;
|
|
|
|
|
|
|
|
return Err(UnknownNode(node_id));
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
stream.write(
|
|
|
|
|
|
|
|
&Event::with_payload(
|
|
|
|
|
|
|
|
CONN_ACCEPT_EVENT.to_string(),
|
|
|
|
|
|
|
|
&NodeInformationPayload {
|
|
|
|
|
|
|
|
node_id: own_node_id,
|
|
|
|
|
|
|
|
public_key: global_secret.public_key().to_bytes(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
.as_bytes(),
|
|
|
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
stream.flush()?;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
Ok((node_id, secret_box))
|
|
|
|
Ok((node_id, secret_box))
|
|
|
|
}
|
|
|
|
}
|
|
|
|