diff --git a/Cargo.toml b/Cargo.toml index 559dc56..726ea5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "vented" description = "Event driven encrypted tcp communicaton" -version = "0.4.3" +version = "0.5.0" authors = ["trivernis "] edition = "2018" readme = "README.md" diff --git a/src/event/mod.rs b/src/event/mod.rs index 9343a33..76251c7 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -20,6 +20,7 @@ pub struct EmptyPayload {} pub struct Event { pub name: String, pub payload: Vec, + pub origin: Option, } impl Event { @@ -28,6 +29,7 @@ impl Event { Self { name: name.to_string(), payload: Vec::with_capacity(0), + origin: None, } } } @@ -39,6 +41,7 @@ impl Event { Self { name: name.to_string(), payload, + origin: None, } } @@ -84,6 +87,7 @@ impl Event { Ok(Self { name: event_name, payload, + origin: None, }) } diff --git a/src/server/data.rs b/src/server/data.rs index e772823..96cce01 100644 --- a/src/server/data.rs +++ b/src/server/data.rs @@ -16,6 +16,7 @@ pub struct Node { pub id: String, pub public_key: PublicKey, pub address: Option, + pub trusted: bool, } #[derive(Clone)] @@ -23,7 +24,7 @@ pub(crate) struct ServerConnectionContext { pub is_server: bool, pub node_id: String, pub global_secret: SecretKey, - pub known_nodes: Arc>>, + pub known_nodes: Arc>>, pub event_handler: Arc>, pub connections: Arc>>, pub forwarded_connections: Arc>>>, diff --git a/src/server/mod.rs b/src/server/mod.rs index 2497144..4bae882 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -11,15 +11,15 @@ use crate::result::VentedError::UnknownNode; use crate::result::{VentedError, VentedResult}; use crate::server::data::{Future, Node, ServerConnectionContext}; use crate::server::server_events::{ - AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, - RedirectResponsePayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, - CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_CONFIRM_EVENT, REDIRECT_EVENT, - REDIRECT_FAIL_EVENT, REDIRECT_REDIRECTED_EVENT, REJECT_EVENT, + AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload, + ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT, + NODE_LIST_REQUEST_EVENT, READY_EVENT, REDIRECT_EVENT, REJECT_EVENT, }; use crossbeam_utils::sync::WaitGroup; use parking_lot::Mutex; use sha2::Digest; use std::io::Write; +use std::iter::FromIterator; use std::sync::Arc; use std::thread; use x25519_dalek::StaticSecret; @@ -45,6 +45,7 @@ type CryptoStreamMap = Arc>>; /// Node { /// id: "B".to_string(), /// address: None, +/// trusted: true, /// public_key: global_secret_b.public_key() // load it from somewhere /// }, ///]; @@ -65,7 +66,7 @@ type CryptoStreamMap = Arc>>; pub struct VentedServer { connections: CryptoStreamMap, forwarded_connections: ForwardFutureVector, - known_nodes: Arc>>, + known_nodes: Arc>>, listener_pool: Arc>, sender_pool: Arc>, event_handler: Arc>, @@ -99,7 +100,9 @@ impl VentedServer { connections: Arc::new(Mutex::new(HashMap::new())), forwarded_connections: Arc::new(Mutex::new(HashMap::new())), global_secret_key: secret_key, - known_nodes: Arc::new(Mutex::new(nodes)), + known_nodes: Arc::new(Mutex::new(HashMap::from_iter( + nodes.iter().cloned().map(|node| (node.id.clone(), node)), + ))), redirect_handles: Arc::new(Mutex::new(HashMap::new())), }; server.register_events(); @@ -114,7 +117,7 @@ impl VentedServer { /// Returns the nodes known to the server pub fn nodes(&self) -> Vec { - self.known_nodes.lock().clone() + self.known_nodes.lock().values().cloned().collect() } /// Emits an event to the specified Node @@ -189,86 +192,20 @@ impl VentedServer { wg2 } - /// Registers default server events - fn register_events(&mut self) { - self.on(REDIRECT_CONFIRM_EVENT, { - let redirect_handles = Arc::clone(&self.redirect_handles); - move |event| { - let payload = event.get_payload::().ok()?; - let mut future = redirect_handles.lock().remove(&payload.id)?; - future.set_value(true); - - None - } - }); - self.on(REDIRECT_FAIL_EVENT, { - let redirect_handles = Arc::clone(&self.redirect_handles); - move |event| { - let payload = event.get_payload::().ok()?; - let mut future = redirect_handles.lock().remove(&payload.id)?; - future.set_value(false); - - None - } - }); - self.on(REDIRECT_EVENT, { - let connections = Arc::clone(&self.connections); - - move |event| { - let payload = event.get_payload::().ok()?; - let stream = connections.lock().get(&payload.target)?.clone(); - if stream - .send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload)) - .is_ok() - { - Some(Event::with_payload( - REDIRECT_CONFIRM_EVENT, - &RedirectResponsePayload { id: payload.id }, - )) - } else { - Some(Event::with_payload( - REDIRECT_FAIL_EVENT, - &RedirectResponsePayload { id: payload.id }, - )) - } - } - }); - self.on(REDIRECT_REDIRECTED_EVENT, { - let event_handler = Arc::clone(&self.event_handler); - let connections = Arc::clone(&self.connections); - let pool = Arc::clone(&self.sender_pool); - - move |event| { - let payload = event.get_payload::().ok()?; - let event = Event::from_bytes(&mut &payload.content[..]).ok()?; - let proxy_stream = connections.lock().get(&payload.proxy)?.clone(); - - pool.lock().execute({ - let event_handler = Arc::clone(&event_handler); - move || { - let response = event_handler.lock().handle_event(event); - let event = response.map(|mut value| { - Event::with_payload( - REDIRECT_EVENT, - &RedirectPayload::new( - payload.target, - payload.proxy, - payload.source, - value.as_bytes(), - ), - ) - }); - if let Some(event) = event { - proxy_stream - .send(event) - .expect("Failed to respond to redirected event."); - } - } - }); + /// Request a list of network nodes from a trusted node + pub fn request_node_list(&self) -> VentedResult<()> { + let trusted_nodes = self + .known_nodes + .lock() + .values() + .filter(|node| node.trusted) + .cloned() + .collect::>(); + for node in trusted_nodes { + self.emit(node.id, Event::new(NODE_LIST_REQUEST_EVENT))?; + } - None - } - }) + Ok(()) } /// Returns a copy of the servers metadata @@ -290,7 +227,7 @@ impl VentedServer { let public_nodes = self .known_nodes .lock() - .iter() + .values() .filter(|node| node.address.is_some()) .cloned() .collect::>(); @@ -360,7 +297,8 @@ impl VentedServer { event_handler: Arc>, stream: &CryptoStream, ) -> VentedResult<()> { - while let Ok(event) = stream.read() { + while let Ok(mut event) = stream.read() { + event.origin = Some(stream.receiver_node().clone()); if let Some(response) = event_handler.lock().handle_event(event) { stream.send(response)? } @@ -382,8 +320,7 @@ impl VentedServer { let target_node = { self.known_nodes .lock() - .iter() - .find(|node| &node.id == target) + .get(target) .cloned() .ok_or(VentedError::UnknownNode(target.clone()))? }; @@ -435,9 +372,9 @@ impl VentedServer { self.listener_pool.lock().execute({ let stream = CryptoStream::clone(&stream); let event_handler = Arc::clone(&self.event_handler); - event_handler.lock().handle_event(Event::new(READY_EVENT)); move || { + event_handler.lock().handle_event(Event::new(READY_EVENT)); if let Err(e) = Self::handle_read(event_handler, &stream) { log::error!("Connection aborted: {}", e); } @@ -454,7 +391,7 @@ impl VentedServer { stream: TcpStream, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { let secret_key = SecretKey::generate(&mut rand::thread_rng()); if is_server { @@ -482,7 +419,7 @@ impl VentedServer { secret_key: &SecretKey, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { stream.write( &Event::with_payload( @@ -521,7 +458,7 @@ impl VentedServer { let public_key = PublicKey::from(public_key); - let node_data = if let Some(data) = known_nodes.lock().iter().find(|n| n.id == node_id) { + let node_data = if let Some(data) = known_nodes.lock().get(&node_id) { data.clone() } else { stream.write(&Event::new(REJECT_EVENT).as_bytes())?; @@ -554,7 +491,7 @@ impl VentedServer { secret_key: &SecretKey, own_node_id: String, global_secret: SecretKey, - known_nodes: Arc>>, + known_nodes: Arc>>, ) -> VentedResult<(String, CryptoStream)> { let event = Event::from_bytes(&mut stream)?; if event.name != CONNECT_EVENT { @@ -579,7 +516,7 @@ impl VentedServer { } let public_key = PublicKey::from(public_key); - let node_data = if let Some(data) = known_nodes.lock().iter().find(|n| n.id == node_id) { + let node_data = if let Some(data) = known_nodes.lock().get(&node_id) { data.clone() } else { stream.write(&Event::new(REJECT_EVENT).as_bytes())?; diff --git a/src/server/server_events.rs b/src/server/server_events.rs index 466f3db..0c91525 100644 --- a/src/server/server_events.rs +++ b/src/server/server_events.rs @@ -1,5 +1,10 @@ +use crate::event::Event; +use crate::server::data::Node; +use crate::server::VentedServer; use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use x25519_dalek::PublicKey; pub(crate) const CONNECT_EVENT: &str = "conn:connect"; pub(crate) const AUTH_EVENT: &str = "conn:authenticate"; @@ -11,6 +16,9 @@ pub(crate) const REDIRECT_EVENT: &str = "conn:redirect"; pub(crate) const REDIRECT_CONFIRM_EVENT: &str = "conn:redirect_confirm"; pub(crate) const REDIRECT_FAIL_EVENT: &str = "conn:redirect_failed"; pub(crate) const REDIRECT_REDIRECTED_EVENT: &str = "conn:redirect_redirected"; +pub(crate) const NODE_LIST_REQUEST_EVENT: &str = "conn:node_list_request"; +pub(crate) const NODE_LIST_EVENT: &str = "conn:node_list"; + pub const READY_EVENT: &str = "server:ready"; #[derive(Serialize, Deserialize, Debug)] @@ -73,3 +81,152 @@ impl RedirectPayload { pub(crate) struct RedirectResponsePayload { pub(crate) id: [u8; 16], } + +#[derive(Serialize, Deserialize)] +pub(crate) struct NodeListPayload { + pub nodes: Vec, +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct NodeListElement { + id: String, + public_key: [u8; 32], + address: Option, +} + +impl VentedServer { + /// Registers default server events + pub(crate) fn register_events(&mut self) { + self.on(REDIRECT_CONFIRM_EVENT, { + let redirect_handles = Arc::clone(&self.redirect_handles); + move |event| { + let payload = event.get_payload::().ok()?; + let mut future = redirect_handles.lock().remove(&payload.id)?; + future.set_value(true); + + None + } + }); + self.on(REDIRECT_FAIL_EVENT, { + let redirect_handles = Arc::clone(&self.redirect_handles); + move |event| { + let payload = event.get_payload::().ok()?; + let mut future = redirect_handles.lock().remove(&payload.id)?; + future.set_value(false); + + None + } + }); + self.on(REDIRECT_EVENT, { + let connections = Arc::clone(&self.connections); + + move |event| { + let payload = event.get_payload::().ok()?; + let stream = connections.lock().get(&payload.target)?.clone(); + if stream + .send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload)) + .is_ok() + { + Some(Event::with_payload( + REDIRECT_CONFIRM_EVENT, + &RedirectResponsePayload { id: payload.id }, + )) + } else { + Some(Event::with_payload( + REDIRECT_FAIL_EVENT, + &RedirectResponsePayload { id: payload.id }, + )) + } + } + }); + self.on(REDIRECT_REDIRECTED_EVENT, { + let event_handler = Arc::clone(&self.event_handler); + let connections = Arc::clone(&self.connections); + let pool = Arc::clone(&self.sender_pool); + + move |event| { + let payload = event.get_payload::().ok()?; + let event = Event::from_bytes(&mut &payload.content[..]).ok()?; + let proxy_stream = connections.lock().get(&payload.proxy)?.clone(); + + pool.lock().execute({ + let event_handler = Arc::clone(&event_handler); + move || { + let response = event_handler.lock().handle_event(event); + let event = response.map(|mut value| { + Event::with_payload( + REDIRECT_EVENT, + &RedirectPayload::new( + payload.target, + payload.proxy, + payload.source, + value.as_bytes(), + ), + ) + }); + if let Some(event) = event { + proxy_stream + .send(event) + .expect("Failed to respond to redirected event."); + } + } + }); + + None + } + }); + self.on(NODE_LIST_EVENT, { + let node_list = Arc::clone(&self.known_nodes); + + move |event| { + let list = event.get_payload::().ok()?; + let mut own_nodes = node_list.lock(); + let origin = event.origin?; + + if !own_nodes.get(&origin)?.trusted { + log::warn!("Untrusted node '{}' tried to send network update!", origin); + return None; + } + + let mut new_nodes = 0; + for node in list.nodes { + if !own_nodes.contains_key(&node.id) { + own_nodes.insert( + node.id.clone(), + Node { + id: node.id, + trusted: false, + public_key: PublicKey::from(node.public_key), + address: node.address, + }, + ); + new_nodes += 1; + } + } + log::debug!("Updated node list: Added {} new nodes", new_nodes); + + None + } + }); + self.on(NODE_LIST_REQUEST_EVENT, { + let node_list = Arc::clone(&self.known_nodes); + + move |_| { + let nodes = node_list + .lock() + .values() + .map(|node| NodeListElement { + id: node.id.clone(), + address: node.address.clone(), + public_key: node.public_key.to_bytes(), + }) + .collect(); + + Some(Event::with_payload( + NODE_LIST_EVENT, + &NodeListPayload { nodes }, + )) + } + }); + } +} diff --git a/tests/test_communication.rs b/tests/test_communication.rs index 33568cb..455a3b7 100644 --- a/tests/test_communication.rs +++ b/tests/test_communication.rs @@ -29,16 +29,19 @@ fn test_server_communication() { id: "A".to_string(), address: Some("localhost:22222".to_string()), public_key: global_secret_a.public_key(), + trusted: true, }, Node { id: "B".to_string(), address: None, public_key: global_secret_b.public_key(), + trusted: false, }, Node { id: "C".to_string(), address: None, public_key: global_secret_c.public_key(), + trusted: false, }, ]; let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 6); @@ -64,7 +67,9 @@ fn test_server_communication() { }); server_a.on(READY_EVENT, { let ready_count = Arc::clone(&ready_count); + move |_| { + println!("Server A ready"); ready_count.fetch_add(1, Ordering::Relaxed); None } @@ -72,6 +77,15 @@ fn test_server_communication() { server_b.on(READY_EVENT, { let ready_count = Arc::clone(&ready_count); move |_| { + println!("Server B ready"); + ready_count.fetch_add(1, Ordering::Relaxed); + None + } + }); + server_c.on(READY_EVENT, { + let ready_count = Arc::clone(&ready_count); + move |_| { + println!("Server C ready"); ready_count.fetch_add(1, Ordering::Relaxed); None } @@ -80,10 +94,10 @@ fn test_server_communication() { let ping_c_count = Arc::clone(&ping_c_count); move |_| { ping_c_count.fetch_add(1, Ordering::Relaxed); - println!("C RECEIVED A PING!"); None } }); + server_b.request_node_list().unwrap(); let wg = server_c .emit("A".to_string(), Event::new("ping".to_string())) .unwrap(); @@ -109,7 +123,7 @@ fn test_server_communication() { } assert_eq!(ping_c_count.load(Ordering::SeqCst), 1); - assert_eq!(ready_count.load(Ordering::SeqCst), 3); + assert_eq!(ready_count.load(Ordering::SeqCst), 4); assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10); }