diff --git a/Cargo.toml b/Cargo.toml index 7a299eb..0620cc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ thiserror = "1.0.30" serde_json = "1.0.79" flate2 = "1.0.22" reqwest = "0.11.9" +futures-core = "0.3.21" [dependencies.serde] version = "1.0.136" @@ -20,6 +21,7 @@ features = ["derive"] tracing-subscriber = "0.3.9" dotenv = "0.15.0" lazy_static = "1.4.0" +futures-util = "0.3.21" [dev-dependencies.tokio] version = "1.17.0" diff --git a/src/client/client_core.rs b/src/client/client_core.rs index d06fb9f..8255922 100644 --- a/src/client/client_core.rs +++ b/src/client/client_core.rs @@ -7,7 +7,7 @@ use std::io::Write; impl Client { /// Performs a get request to the given Get Endpoint - #[tracing::instrument(skip(self), level = "trace")] + #[tracing::instrument(skip(self), level = "debug")] pub(crate) async fn get( &self, query: &Q, diff --git a/src/client/mod.rs b/src/client/mod.rs index 53be4c2..f3f0cbc 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,10 +1,13 @@ mod client_core; +mod update_stream; pub use crate::endpoints::*; use crate::{ClientBuilder, Result}; pub use client_core::*; use std::fmt::Debug; +pub use update_stream::*; +#[derive(Clone)] pub struct Client { pub(crate) client: reqwest::Client, pub(crate) base_url: String, diff --git a/src/client/update_stream.rs b/src/client/update_stream.rs new file mode 100644 index 0000000..47e60c1 --- /dev/null +++ b/src/client/update_stream.rs @@ -0,0 +1,113 @@ +use crate::Result; +use crate::{Client, UpdateResponse}; +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +impl Client { + pub async fn stream_updates(&self, since: u64) -> Result { + let entries = self.get_metadata(since).await?.0.entries; + let hashes = entries + .into_iter() + .flat_map(|e| e.update_hashes) + .collect::>(); + let client = self.clone(); + + Ok(UpdateStream::new(client, hashes)) + } +} + +/// A stream of update files +/// Used like follows: +/// +/// ``` +/// +/// # use hydrus_ptr_client::{Client}; +/// use futures_util::StreamExt; +/// +/// # async fn a() {/// +/// # let client = Client::new("", ""); +/// let mut stream = client.stream_updates(0).await.unwrap(); +/// +/// while let Some(Ok(update)) = stream.next().await { +/// // do something +/// } +/// # } +/// ``` +pub struct UpdateStream { + failed_hashes: Vec, + pending_hash: Option, + hashes: Vec, + client: Option, + fut: Option, Client)>>>>, +} + +impl UpdateStream { + pub(crate) fn new(client: Client, mut hashes: Vec) -> Self { + hashes.reverse(); + + Self { + client: Some(client), + hashes, + fut: None, + failed_hashes: Vec::new(), + pending_hash: None, + } + } + + /// Re-enqueues the latest failed hash for retry + pub fn retry_latest(&mut self) { + if let Some(hash) = self.failed_hashes.pop() { + self.hashes.push(hash); + } + } + + /// Puts all failed hashes back into the queue for retry + pub fn retry_all(&mut self) { + self.hashes.append(&mut self.failed_hashes); + } + + /// Returns a list of all failed hashes + pub fn failed_hashes(&self) -> &Vec { + &self.failed_hashes + } +} + +impl Unpin for UpdateStream {} + +impl Stream for UpdateStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.fut.is_none() { + if self.hashes.is_empty() { + return Poll::Ready(None); + } + let hash = self.hashes.pop().unwrap(); + self.pending_hash = Some(hash.clone()); + let client = self.client.take().unwrap(); + + self.fut = Some(Box::pin(async move { + let update = client.get_update(hash).await; + + (update, client) + })); + } + + match self.fut.as_mut().unwrap().as_mut().poll(cx) { + Poll::Ready((result, client)) => { + self.client = Some(client); + self.fut = None; + + if result.is_err() { + let pending_hash = self.pending_hash.take().unwrap(); + self.failed_hashes.push(pending_hash); + } + + Poll::Ready(Some(result)) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/client_builder.rs b/src/client_builder.rs index 1890264..47ab6b6 100644 --- a/src/client_builder.rs +++ b/src/client_builder.rs @@ -1,3 +1,4 @@ +use crate::constants::{DEFAULT_PTR_ADDRESS, DEFAULT_READONLY_ACCESS_KEY}; use crate::Client; use crate::{Error, Result}; use std::time::Duration; @@ -12,8 +13,8 @@ impl Default for ClientBuilder { fn default() -> Self { Self { reqwest_builder: reqwest::ClientBuilder::new(), - endpoint: String::from("https://ptr.hydrus.network:45871"), - access_key: None, + endpoint: String::from(DEFAULT_PTR_ADDRESS), + access_key: Some(String::from(DEFAULT_READONLY_ACCESS_KEY)), } } } diff --git a/src/constants.rs b/src/constants.rs index bbf0fae..2662f08 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -1,5 +1,9 @@ #![allow(unused)] +pub const DEFAULT_PTR_ADDRESS: &str = "https://ptr.hydrus.network:45871"; +pub const DEFAULT_READONLY_ACCESS_KEY: &str = + "4a285629721ca442541ef2c15ea17d1f7f7578b0c3f4f5f2a05f8f0ab297786f"; + // serializable pub const HYDRUS_TYPE_BASE: u64 = 0; pub const HYDRUS_TYPE_BASE_NAMED: u64 = 1; diff --git a/tests/common.rs b/tests/common.rs index 51029b9..8ce295b 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -15,11 +15,15 @@ fn setup() { pub fn get_client() -> Client { setup(); + let ptr_url_env = env::var("PTR_URL").ok(); + let ptr_key_env = env::var("PTR_ACCESS_KEY").ok(); - Client::builder() - .endpoint(env::var("PTR_URL").unwrap()) - .access_key(env::var("PTR_ACCESS_KEY").unwrap()) - .accept_invalid_certs(true) - .build() - .unwrap() + let mut builder = Client::builder().accept_invalid_certs(true); + if let Some(url) = ptr_url_env { + builder = builder.endpoint(url); + } + if let Some(key) = ptr_key_env { + builder = builder.access_key(key); + } + builder.build().unwrap() } diff --git a/tests/endpoints.rs b/tests/endpoints.rs index 59798e1..1c08003 100644 --- a/tests/endpoints.rs +++ b/tests/endpoints.rs @@ -1,3 +1,5 @@ +use futures_util::StreamExt; + mod common; #[tokio::test] @@ -24,3 +26,25 @@ async fn test_update() { client.get_update(DEFINITIONS_UPDATE_HASH).await.unwrap(); client.get_update(CONTENT_UPDATE_HASH).await.unwrap(); } + +#[tokio::test] +async fn test_update_stream() { + let client = common::get_client(); // 3230 + let mut update_stream = client.stream_updates(0).await.unwrap(); + let mut retry_count = 3; + + while let Some(update) = update_stream.next().await { + if let Err(e) = update { + if retry_count > 0 { + retry_count -= 1; + update_stream.retry_latest(); + } else { + assert!( + false, + "fetching next update failed within retry limit: {}", + e + ) + } + } + } +}