Add stream implementation for retrieving updates

Signed-off-by: trivernis <trivernis@protonmail.com>
main
trivernis 2 years ago
parent 642a6c3b55
commit 2811aeb40d
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -11,6 +11,7 @@ thiserror = "1.0.30"
serde_json = "1.0.79"
flate2 = "1.0.22"
reqwest = "0.11.9"
futures-core = "0.3.21"
[dependencies.serde]
version = "1.0.136"
@ -20,6 +21,7 @@ features = ["derive"]
tracing-subscriber = "0.3.9"
dotenv = "0.15.0"
lazy_static = "1.4.0"
futures-util = "0.3.21"
[dev-dependencies.tokio]
version = "1.17.0"

@ -7,7 +7,7 @@ use std::io::Write;
impl Client {
/// Performs a get request to the given Get Endpoint
#[tracing::instrument(skip(self), level = "trace")]
#[tracing::instrument(skip(self), level = "debug")]
pub(crate) async fn get<E: GetEndpoint, Q: Serialize + Debug>(
&self,
query: &Q,

@ -1,10 +1,13 @@
mod client_core;
mod update_stream;
pub use crate::endpoints::*;
use crate::{ClientBuilder, Result};
pub use client_core::*;
use std::fmt::Debug;
pub use update_stream::*;
#[derive(Clone)]
pub struct Client {
pub(crate) client: reqwest::Client,
pub(crate) base_url: String,

@ -0,0 +1,113 @@
use crate::Result;
use crate::{Client, UpdateResponse};
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl Client {
pub async fn stream_updates(&self, since: u64) -> Result<UpdateStream> {
let entries = self.get_metadata(since).await?.0.entries;
let hashes = entries
.into_iter()
.flat_map(|e| e.update_hashes)
.collect::<Vec<String>>();
let client = self.clone();
Ok(UpdateStream::new(client, hashes))
}
}
/// A stream of update files
/// Used like follows:
///
/// ```
///
/// # use hydrus_ptr_client::{Client};
/// use futures_util::StreamExt;
///
/// # async fn a() {///
/// # let client = Client::new("", "");
/// let mut stream = client.stream_updates(0).await.unwrap();
///
/// while let Some(Ok(update)) = stream.next().await {
/// // do something
/// }
/// # }
/// ```
pub struct UpdateStream {
failed_hashes: Vec<String>,
pending_hash: Option<String>,
hashes: Vec<String>,
client: Option<Client>,
fut: Option<Pin<Box<dyn Future<Output = (Result<UpdateResponse>, Client)>>>>,
}
impl UpdateStream {
pub(crate) fn new(client: Client, mut hashes: Vec<String>) -> Self {
hashes.reverse();
Self {
client: Some(client),
hashes,
fut: None,
failed_hashes: Vec::new(),
pending_hash: None,
}
}
/// Re-enqueues the latest failed hash for retry
pub fn retry_latest(&mut self) {
if let Some(hash) = self.failed_hashes.pop() {
self.hashes.push(hash);
}
}
/// Puts all failed hashes back into the queue for retry
pub fn retry_all(&mut self) {
self.hashes.append(&mut self.failed_hashes);
}
/// Returns a list of all failed hashes
pub fn failed_hashes(&self) -> &Vec<String> {
&self.failed_hashes
}
}
impl Unpin for UpdateStream {}
impl Stream for UpdateStream {
type Item = Result<UpdateResponse>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.fut.is_none() {
if self.hashes.is_empty() {
return Poll::Ready(None);
}
let hash = self.hashes.pop().unwrap();
self.pending_hash = Some(hash.clone());
let client = self.client.take().unwrap();
self.fut = Some(Box::pin(async move {
let update = client.get_update(hash).await;
(update, client)
}));
}
match self.fut.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready((result, client)) => {
self.client = Some(client);
self.fut = None;
if result.is_err() {
let pending_hash = self.pending_hash.take().unwrap();
self.failed_hashes.push(pending_hash);
}
Poll::Ready(Some(result))
}
Poll::Pending => Poll::Pending,
}
}
}

@ -1,3 +1,4 @@
use crate::constants::{DEFAULT_PTR_ADDRESS, DEFAULT_READONLY_ACCESS_KEY};
use crate::Client;
use crate::{Error, Result};
use std::time::Duration;
@ -12,8 +13,8 @@ impl Default for ClientBuilder {
fn default() -> Self {
Self {
reqwest_builder: reqwest::ClientBuilder::new(),
endpoint: String::from("https://ptr.hydrus.network:45871"),
access_key: None,
endpoint: String::from(DEFAULT_PTR_ADDRESS),
access_key: Some(String::from(DEFAULT_READONLY_ACCESS_KEY)),
}
}
}

@ -1,5 +1,9 @@
#![allow(unused)]
pub const DEFAULT_PTR_ADDRESS: &str = "https://ptr.hydrus.network:45871";
pub const DEFAULT_READONLY_ACCESS_KEY: &str =
"4a285629721ca442541ef2c15ea17d1f7f7578b0c3f4f5f2a05f8f0ab297786f";
// serializable
pub const HYDRUS_TYPE_BASE: u64 = 0;
pub const HYDRUS_TYPE_BASE_NAMED: u64 = 1;

@ -15,11 +15,15 @@ fn setup() {
pub fn get_client() -> Client {
setup();
let ptr_url_env = env::var("PTR_URL").ok();
let ptr_key_env = env::var("PTR_ACCESS_KEY").ok();
Client::builder()
.endpoint(env::var("PTR_URL").unwrap())
.access_key(env::var("PTR_ACCESS_KEY").unwrap())
.accept_invalid_certs(true)
.build()
.unwrap()
let mut builder = Client::builder().accept_invalid_certs(true);
if let Some(url) = ptr_url_env {
builder = builder.endpoint(url);
}
if let Some(key) = ptr_key_env {
builder = builder.access_key(key);
}
builder.build().unwrap()
}

@ -1,3 +1,5 @@
use futures_util::StreamExt;
mod common;
#[tokio::test]
@ -24,3 +26,25 @@ async fn test_update() {
client.get_update(DEFINITIONS_UPDATE_HASH).await.unwrap();
client.get_update(CONTENT_UPDATE_HASH).await.unwrap();
}
#[tokio::test]
async fn test_update_stream() {
let client = common::get_client(); // 3230
let mut update_stream = client.stream_updates(0).await.unwrap();
let mut retry_count = 3;
while let Some(update) = update_stream.next().await {
if let Err(e) = update {
if retry_count > 0 {
retry_count -= 1;
update_stream.retry_latest();
} else {
assert!(
false,
"fetching next update failed within retry limit: {}",
e
)
}
}
}
}

Loading…
Cancel
Save