Add support for node list synchronization

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/1/head
trivernis 4 years ago
parent 68729d8dd2
commit 0ee8d691c7
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -1,7 +1,7 @@
[package] [package]
name = "vented" name = "vented"
description = "Event driven encrypted tcp communicaton" description = "Event driven encrypted tcp communicaton"
version = "0.4.3" version = "0.5.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"

@ -20,6 +20,7 @@ pub struct EmptyPayload {}
pub struct Event { pub struct Event {
pub name: String, pub name: String,
pub payload: Vec<u8>, pub payload: Vec<u8>,
pub origin: Option<String>,
} }
impl Event { impl Event {
@ -28,6 +29,7 @@ impl Event {
Self { Self {
name: name.to_string(), name: name.to_string(),
payload: Vec::with_capacity(0), payload: Vec::with_capacity(0),
origin: None,
} }
} }
} }
@ -39,6 +41,7 @@ impl Event {
Self { Self {
name: name.to_string(), name: name.to_string(),
payload, payload,
origin: None,
} }
} }
@ -84,6 +87,7 @@ impl Event {
Ok(Self { Ok(Self {
name: event_name, name: event_name,
payload, payload,
origin: None,
}) })
} }

@ -16,6 +16,7 @@ pub struct Node {
pub id: String, pub id: String,
pub public_key: PublicKey, pub public_key: PublicKey,
pub address: Option<String>, pub address: Option<String>,
pub trusted: bool,
} }
#[derive(Clone)] #[derive(Clone)]
@ -23,7 +24,7 @@ pub(crate) struct ServerConnectionContext {
pub is_server: bool, pub is_server: bool,
pub node_id: String, pub node_id: String,
pub global_secret: SecretKey, pub global_secret: SecretKey,
pub known_nodes: Arc<Mutex<Vec<Node>>>, pub known_nodes: Arc<Mutex<HashMap<String, Node>>>,
pub event_handler: Arc<Mutex<EventHandler>>, pub event_handler: Arc<Mutex<EventHandler>>,
pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>, pub connections: Arc<Mutex<HashMap<String, CryptoStream>>>,
pub forwarded_connections: Arc<Mutex<HashMap<(String, String), Future<CryptoStream>>>>, pub forwarded_connections: Arc<Mutex<HashMap<(String, String), Future<CryptoStream>>>>,

@ -11,15 +11,15 @@ use crate::result::VentedError::UnknownNode;
use crate::result::{VentedError, VentedResult}; use crate::result::{VentedError, VentedResult};
use crate::server::data::{Future, Node, ServerConnectionContext}; use crate::server::data::{Future, Node, ServerConnectionContext};
use crate::server::server_events::{ use crate::server::server_events::{
AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, AuthPayload, ChallengePayload, NodeInformationPayload, RedirectPayload, VersionMismatchPayload,
RedirectResponsePayload, VersionMismatchPayload, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, ACCEPT_EVENT, AUTH_EVENT, CHALLENGE_EVENT, CONNECT_EVENT, MISMATCH_EVENT,
CONNECT_EVENT, MISMATCH_EVENT, READY_EVENT, REDIRECT_CONFIRM_EVENT, REDIRECT_EVENT, NODE_LIST_REQUEST_EVENT, READY_EVENT, REDIRECT_EVENT, REJECT_EVENT,
REDIRECT_FAIL_EVENT, REDIRECT_REDIRECTED_EVENT, REJECT_EVENT,
}; };
use crossbeam_utils::sync::WaitGroup; use crossbeam_utils::sync::WaitGroup;
use parking_lot::Mutex; use parking_lot::Mutex;
use sha2::Digest; use sha2::Digest;
use std::io::Write; use std::io::Write;
use std::iter::FromIterator;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use x25519_dalek::StaticSecret; use x25519_dalek::StaticSecret;
@ -45,6 +45,7 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
/// Node { /// Node {
/// id: "B".to_string(), /// id: "B".to_string(),
/// address: None, /// address: None,
/// trusted: true,
/// public_key: global_secret_b.public_key() // load it from somewhere /// public_key: global_secret_b.public_key() // load it from somewhere
/// }, /// },
///]; ///];
@ -65,7 +66,7 @@ type CryptoStreamMap = Arc<Mutex<HashMap<String, CryptoStream>>>;
pub struct VentedServer { pub struct VentedServer {
connections: CryptoStreamMap, connections: CryptoStreamMap,
forwarded_connections: ForwardFutureVector, forwarded_connections: ForwardFutureVector,
known_nodes: Arc<Mutex<Vec<Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
listener_pool: Arc<Mutex<ScheduledThreadPool>>, listener_pool: Arc<Mutex<ScheduledThreadPool>>,
sender_pool: Arc<Mutex<ScheduledThreadPool>>, sender_pool: Arc<Mutex<ScheduledThreadPool>>,
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
@ -99,7 +100,9 @@ impl VentedServer {
connections: Arc::new(Mutex::new(HashMap::new())), connections: Arc::new(Mutex::new(HashMap::new())),
forwarded_connections: Arc::new(Mutex::new(HashMap::new())), forwarded_connections: Arc::new(Mutex::new(HashMap::new())),
global_secret_key: secret_key, global_secret_key: secret_key,
known_nodes: Arc::new(Mutex::new(nodes)), known_nodes: Arc::new(Mutex::new(HashMap::from_iter(
nodes.iter().cloned().map(|node| (node.id.clone(), node)),
))),
redirect_handles: Arc::new(Mutex::new(HashMap::new())), redirect_handles: Arc::new(Mutex::new(HashMap::new())),
}; };
server.register_events(); server.register_events();
@ -114,7 +117,7 @@ impl VentedServer {
/// Returns the nodes known to the server /// Returns the nodes known to the server
pub fn nodes(&self) -> Vec<Node> { pub fn nodes(&self) -> Vec<Node> {
self.known_nodes.lock().clone() self.known_nodes.lock().values().cloned().collect()
} }
/// Emits an event to the specified Node /// Emits an event to the specified Node
@ -189,86 +192,20 @@ impl VentedServer {
wg2 wg2
} }
/// Registers default server events /// Request a list of network nodes from a trusted node
fn register_events(&mut self) { pub fn request_node_list(&self) -> VentedResult<()> {
self.on(REDIRECT_CONFIRM_EVENT, { let trusted_nodes = self
let redirect_handles = Arc::clone(&self.redirect_handles); .known_nodes
move |event| { .lock()
let payload = event.get_payload::<RedirectResponsePayload>().ok()?; .values()
let mut future = redirect_handles.lock().remove(&payload.id)?; .filter(|node| node.trusted)
future.set_value(true); .cloned()
.collect::<Vec<Node>>();
None for node in trusted_nodes {
} self.emit(node.id, Event::new(NODE_LIST_REQUEST_EVENT))?;
}); }
self.on(REDIRECT_FAIL_EVENT, {
let redirect_handles = Arc::clone(&self.redirect_handles);
move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(false);
None
}
});
self.on(REDIRECT_EVENT, {
let connections = Arc::clone(&self.connections);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let stream = connections.lock().get(&payload.target)?.clone();
if stream
.send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload))
.is_ok()
{
Some(Event::with_payload(
REDIRECT_CONFIRM_EVENT,
&RedirectResponsePayload { id: payload.id },
))
} else {
Some(Event::with_payload(
REDIRECT_FAIL_EVENT,
&RedirectResponsePayload { id: payload.id },
))
}
}
});
self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler);
let connections = Arc::clone(&self.connections);
let pool = Arc::clone(&self.sender_pool);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let event = Event::from_bytes(&mut &payload.content[..]).ok()?;
let proxy_stream = connections.lock().get(&payload.proxy)?.clone();
pool.lock().execute({
let event_handler = Arc::clone(&event_handler);
move || {
let response = event_handler.lock().handle_event(event);
let event = response.map(|mut value| {
Event::with_payload(
REDIRECT_EVENT,
&RedirectPayload::new(
payload.target,
payload.proxy,
payload.source,
value.as_bytes(),
),
)
});
if let Some(event) = event {
proxy_stream
.send(event)
.expect("Failed to respond to redirected event.");
}
}
});
None Ok(())
}
})
} }
/// Returns a copy of the servers metadata /// Returns a copy of the servers metadata
@ -290,7 +227,7 @@ impl VentedServer {
let public_nodes = self let public_nodes = self
.known_nodes .known_nodes
.lock() .lock()
.iter() .values()
.filter(|node| node.address.is_some()) .filter(|node| node.address.is_some())
.cloned() .cloned()
.collect::<Vec<Node>>(); .collect::<Vec<Node>>();
@ -360,7 +297,8 @@ impl VentedServer {
event_handler: Arc<Mutex<EventHandler>>, event_handler: Arc<Mutex<EventHandler>>,
stream: &CryptoStream, stream: &CryptoStream,
) -> VentedResult<()> { ) -> VentedResult<()> {
while let Ok(event) = stream.read() { while let Ok(mut event) = stream.read() {
event.origin = Some(stream.receiver_node().clone());
if let Some(response) = event_handler.lock().handle_event(event) { if let Some(response) = event_handler.lock().handle_event(event) {
stream.send(response)? stream.send(response)?
} }
@ -382,8 +320,7 @@ impl VentedServer {
let target_node = { let target_node = {
self.known_nodes self.known_nodes
.lock() .lock()
.iter() .get(target)
.find(|node| &node.id == target)
.cloned() .cloned()
.ok_or(VentedError::UnknownNode(target.clone()))? .ok_or(VentedError::UnknownNode(target.clone()))?
}; };
@ -435,9 +372,9 @@ impl VentedServer {
self.listener_pool.lock().execute({ self.listener_pool.lock().execute({
let stream = CryptoStream::clone(&stream); let stream = CryptoStream::clone(&stream);
let event_handler = Arc::clone(&self.event_handler); let event_handler = Arc::clone(&self.event_handler);
event_handler.lock().handle_event(Event::new(READY_EVENT));
move || { move || {
event_handler.lock().handle_event(Event::new(READY_EVENT));
if let Err(e) = Self::handle_read(event_handler, &stream) { if let Err(e) = Self::handle_read(event_handler, &stream) {
log::error!("Connection aborted: {}", e); log::error!("Connection aborted: {}", e);
} }
@ -454,7 +391,7 @@ impl VentedServer {
stream: TcpStream, stream: TcpStream,
own_node_id: String, own_node_id: String,
global_secret: SecretKey, global_secret: SecretKey,
known_nodes: Arc<Mutex<Vec<Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
) -> VentedResult<(String, CryptoStream)> { ) -> VentedResult<(String, CryptoStream)> {
let secret_key = SecretKey::generate(&mut rand::thread_rng()); let secret_key = SecretKey::generate(&mut rand::thread_rng());
if is_server { if is_server {
@ -482,7 +419,7 @@ impl VentedServer {
secret_key: &SecretKey, secret_key: &SecretKey,
own_node_id: String, own_node_id: String,
global_secret: SecretKey, global_secret: SecretKey,
known_nodes: Arc<Mutex<Vec<Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
) -> VentedResult<(String, CryptoStream)> { ) -> VentedResult<(String, CryptoStream)> {
stream.write( stream.write(
&Event::with_payload( &Event::with_payload(
@ -521,7 +458,7 @@ impl VentedServer {
let public_key = PublicKey::from(public_key); let public_key = PublicKey::from(public_key);
let node_data = if let Some(data) = known_nodes.lock().iter().find(|n| n.id == node_id) { let node_data = if let Some(data) = known_nodes.lock().get(&node_id) {
data.clone() data.clone()
} else { } else {
stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.write(&Event::new(REJECT_EVENT).as_bytes())?;
@ -554,7 +491,7 @@ impl VentedServer {
secret_key: &SecretKey, secret_key: &SecretKey,
own_node_id: String, own_node_id: String,
global_secret: SecretKey, global_secret: SecretKey,
known_nodes: Arc<Mutex<Vec<Node>>>, known_nodes: Arc<Mutex<HashMap<String, Node>>>,
) -> VentedResult<(String, CryptoStream)> { ) -> VentedResult<(String, CryptoStream)> {
let event = Event::from_bytes(&mut stream)?; let event = Event::from_bytes(&mut stream)?;
if event.name != CONNECT_EVENT { if event.name != CONNECT_EVENT {
@ -579,7 +516,7 @@ impl VentedServer {
} }
let public_key = PublicKey::from(public_key); let public_key = PublicKey::from(public_key);
let node_data = if let Some(data) = known_nodes.lock().iter().find(|n| n.id == node_id) { let node_data = if let Some(data) = known_nodes.lock().get(&node_id) {
data.clone() data.clone()
} else { } else {
stream.write(&Event::new(REJECT_EVENT).as_bytes())?; stream.write(&Event::new(REJECT_EVENT).as_bytes())?;

@ -1,5 +1,10 @@
use crate::event::Event;
use crate::server::data::Node;
use crate::server::VentedServer;
use rand::{thread_rng, RngCore}; use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
use x25519_dalek::PublicKey;
pub(crate) const CONNECT_EVENT: &str = "conn:connect"; pub(crate) const CONNECT_EVENT: &str = "conn:connect";
pub(crate) const AUTH_EVENT: &str = "conn:authenticate"; pub(crate) const AUTH_EVENT: &str = "conn:authenticate";
@ -11,6 +16,9 @@ pub(crate) const REDIRECT_EVENT: &str = "conn:redirect";
pub(crate) const REDIRECT_CONFIRM_EVENT: &str = "conn:redirect_confirm"; pub(crate) const REDIRECT_CONFIRM_EVENT: &str = "conn:redirect_confirm";
pub(crate) const REDIRECT_FAIL_EVENT: &str = "conn:redirect_failed"; pub(crate) const REDIRECT_FAIL_EVENT: &str = "conn:redirect_failed";
pub(crate) const REDIRECT_REDIRECTED_EVENT: &str = "conn:redirect_redirected"; pub(crate) const REDIRECT_REDIRECTED_EVENT: &str = "conn:redirect_redirected";
pub(crate) const NODE_LIST_REQUEST_EVENT: &str = "conn:node_list_request";
pub(crate) const NODE_LIST_EVENT: &str = "conn:node_list";
pub const READY_EVENT: &str = "server:ready"; pub const READY_EVENT: &str = "server:ready";
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -73,3 +81,152 @@ impl RedirectPayload {
pub(crate) struct RedirectResponsePayload { pub(crate) struct RedirectResponsePayload {
pub(crate) id: [u8; 16], pub(crate) id: [u8; 16],
} }
#[derive(Serialize, Deserialize)]
pub(crate) struct NodeListPayload {
pub nodes: Vec<NodeListElement>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct NodeListElement {
id: String,
public_key: [u8; 32],
address: Option<String>,
}
impl VentedServer {
/// Registers default server events
pub(crate) fn register_events(&mut self) {
self.on(REDIRECT_CONFIRM_EVENT, {
let redirect_handles = Arc::clone(&self.redirect_handles);
move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(true);
None
}
});
self.on(REDIRECT_FAIL_EVENT, {
let redirect_handles = Arc::clone(&self.redirect_handles);
move |event| {
let payload = event.get_payload::<RedirectResponsePayload>().ok()?;
let mut future = redirect_handles.lock().remove(&payload.id)?;
future.set_value(false);
None
}
});
self.on(REDIRECT_EVENT, {
let connections = Arc::clone(&self.connections);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let stream = connections.lock().get(&payload.target)?.clone();
if stream
.send(Event::with_payload(REDIRECT_REDIRECTED_EVENT, &payload))
.is_ok()
{
Some(Event::with_payload(
REDIRECT_CONFIRM_EVENT,
&RedirectResponsePayload { id: payload.id },
))
} else {
Some(Event::with_payload(
REDIRECT_FAIL_EVENT,
&RedirectResponsePayload { id: payload.id },
))
}
}
});
self.on(REDIRECT_REDIRECTED_EVENT, {
let event_handler = Arc::clone(&self.event_handler);
let connections = Arc::clone(&self.connections);
let pool = Arc::clone(&self.sender_pool);
move |event| {
let payload = event.get_payload::<RedirectPayload>().ok()?;
let event = Event::from_bytes(&mut &payload.content[..]).ok()?;
let proxy_stream = connections.lock().get(&payload.proxy)?.clone();
pool.lock().execute({
let event_handler = Arc::clone(&event_handler);
move || {
let response = event_handler.lock().handle_event(event);
let event = response.map(|mut value| {
Event::with_payload(
REDIRECT_EVENT,
&RedirectPayload::new(
payload.target,
payload.proxy,
payload.source,
value.as_bytes(),
),
)
});
if let Some(event) = event {
proxy_stream
.send(event)
.expect("Failed to respond to redirected event.");
}
}
});
None
}
});
self.on(NODE_LIST_EVENT, {
let node_list = Arc::clone(&self.known_nodes);
move |event| {
let list = event.get_payload::<NodeListPayload>().ok()?;
let mut own_nodes = node_list.lock();
let origin = event.origin?;
if !own_nodes.get(&origin)?.trusted {
log::warn!("Untrusted node '{}' tried to send network update!", origin);
return None;
}
let mut new_nodes = 0;
for node in list.nodes {
if !own_nodes.contains_key(&node.id) {
own_nodes.insert(
node.id.clone(),
Node {
id: node.id,
trusted: false,
public_key: PublicKey::from(node.public_key),
address: node.address,
},
);
new_nodes += 1;
}
}
log::debug!("Updated node list: Added {} new nodes", new_nodes);
None
}
});
self.on(NODE_LIST_REQUEST_EVENT, {
let node_list = Arc::clone(&self.known_nodes);
move |_| {
let nodes = node_list
.lock()
.values()
.map(|node| NodeListElement {
id: node.id.clone(),
address: node.address.clone(),
public_key: node.public_key.to_bytes(),
})
.collect();
Some(Event::with_payload(
NODE_LIST_EVENT,
&NodeListPayload { nodes },
))
}
});
}
}

@ -29,16 +29,19 @@ fn test_server_communication() {
id: "A".to_string(), id: "A".to_string(),
address: Some("localhost:22222".to_string()), address: Some("localhost:22222".to_string()),
public_key: global_secret_a.public_key(), public_key: global_secret_a.public_key(),
trusted: true,
}, },
Node { Node {
id: "B".to_string(), id: "B".to_string(),
address: None, address: None,
public_key: global_secret_b.public_key(), public_key: global_secret_b.public_key(),
trusted: false,
}, },
Node { Node {
id: "C".to_string(), id: "C".to_string(),
address: None, address: None,
public_key: global_secret_c.public_key(), public_key: global_secret_c.public_key(),
trusted: false,
}, },
]; ];
let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 6); let mut server_a = VentedServer::new("A".to_string(), global_secret_a, nodes.clone(), 6);
@ -64,7 +67,9 @@ fn test_server_communication() {
}); });
server_a.on(READY_EVENT, { server_a.on(READY_EVENT, {
let ready_count = Arc::clone(&ready_count); let ready_count = Arc::clone(&ready_count);
move |_| { move |_| {
println!("Server A ready");
ready_count.fetch_add(1, Ordering::Relaxed); ready_count.fetch_add(1, Ordering::Relaxed);
None None
} }
@ -72,6 +77,15 @@ fn test_server_communication() {
server_b.on(READY_EVENT, { server_b.on(READY_EVENT, {
let ready_count = Arc::clone(&ready_count); let ready_count = Arc::clone(&ready_count);
move |_| { move |_| {
println!("Server B ready");
ready_count.fetch_add(1, Ordering::Relaxed);
None
}
});
server_c.on(READY_EVENT, {
let ready_count = Arc::clone(&ready_count);
move |_| {
println!("Server C ready");
ready_count.fetch_add(1, Ordering::Relaxed); ready_count.fetch_add(1, Ordering::Relaxed);
None None
} }
@ -80,10 +94,10 @@ fn test_server_communication() {
let ping_c_count = Arc::clone(&ping_c_count); let ping_c_count = Arc::clone(&ping_c_count);
move |_| { move |_| {
ping_c_count.fetch_add(1, Ordering::Relaxed); ping_c_count.fetch_add(1, Ordering::Relaxed);
println!("C RECEIVED A PING!");
None None
} }
}); });
server_b.request_node_list().unwrap();
let wg = server_c let wg = server_c
.emit("A".to_string(), Event::new("ping".to_string())) .emit("A".to_string(), Event::new("ping".to_string()))
.unwrap(); .unwrap();
@ -109,7 +123,7 @@ fn test_server_communication() {
} }
assert_eq!(ping_c_count.load(Ordering::SeqCst), 1); assert_eq!(ping_c_count.load(Ordering::SeqCst), 1);
assert_eq!(ready_count.load(Ordering::SeqCst), 3); assert_eq!(ready_count.load(Ordering::SeqCst), 4);
assert_eq!(ping_count.load(Ordering::SeqCst), 10); assert_eq!(ping_count.load(Ordering::SeqCst), 10);
assert_eq!(pong_count.load(Ordering::SeqCst), 10); assert_eq!(pong_count.load(Ordering::SeqCst), 10);
} }

Loading…
Cancel
Save