Skip to content

Commit

Permalink
chore: Improve the robustness of the firehose consumer example (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChat authored Mar 9, 2025
1 parent 2e03ad2 commit f162f81
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
49 changes: 32 additions & 17 deletions examples/firehose/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,47 @@ impl RepoSubscription {
Ok(RepoSubscription { stream })
}
async fn run(&mut self, handler: impl CommitHandler) -> Result<(), Box<dyn std::error::Error>> {
while let Some(result) = self.next().await {
if let Ok(Frame::Message(Some(t), message)) = result {
if t.as_str() == "#commit" {
let commit = serde_ipld_dagcbor::from_reader(message.body.as_slice())?;
loop {
let message = match self.next().await {
Some(msg) => msg,
None => continue, // Could be a websocket control message, etc.
};

// Handle the commit on the websocket reader thread.
//
// N.B: You should ensure that your commit handler either executes as quickly as
// possible or offload processing to a separate thread. If you run too far behind,
// the firehose server _will_ terminate the connection!
if let Err(err) = handler.handle_commit(&commit).await {
eprintln!("FAILED: {err:?}");
match message {
Ok(Frame::Message(Some(t), message)) => {
if t.as_str() == "#commit" {
let commit = serde_ipld_dagcbor::from_reader(message.body.as_slice())?;

// Handle the commit on the websocket reader thread.
//
// N.B: You should ensure that your commit handler either executes as quickly as
// possible or offload processing to a separate thread. If you run too far behind,
// the firehose server _will_ terminate the connection!
if let Err(err) = handler.handle_commit(&commit).await {
eprintln!("FAILED: {err:?}");
}
}
}
Ok(Frame::Message(None, _msg)) => (),
Ok(Frame::Error(_e)) => {
println!("received error frame");
break;
}
Err(e) => {
println!("error {e}");
}
}
}
Ok(())
}
}

impl Subscription for RepoSubscription {
async fn next(&mut self) -> Option<Result<Frame, <Frame as TryFrom<&[u8]>>::Error>> {
if let Some(Ok(Message::Binary(data))) = self.stream.next().await {
Some(Frame::try_from(data.as_slice()))
} else {
None
async fn next(&mut self) -> Option<anyhow::Result<Frame>> {
match self.stream.next().await {
Some(Ok(Message::Binary(data))) => Some(Frame::try_from(data.as_slice())),
Some(Ok(_)) | None => None,
Some(Err(e)) => Some(Err(anyhow::Error::new(e))),
}
}
}
Expand All @@ -60,7 +75,7 @@ impl CommitHandler for Firehose {
let mut repo = Repository::open(
CarStore::open(std::io::Cursor::new(commit.blocks.as_slice())).await?,
// N.B: This same CID is also specified inside of the `CarStore`, accessible
// via `car.header().roots[0]`.
// via `car.roots().next().unwrap()`.
commit.commit.0,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/firehose/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::future::Future;

#[trait_variant::make(HttpService: Send)]
pub trait Subscription {
async fn next(&mut self) -> Option<Result<Frame, <Frame as TryFrom<&[u8]>>::Error>>;
async fn next(&mut self) -> Option<anyhow::Result<Frame>>;
}

pub trait CommitHandler {
Expand Down

0 comments on commit f162f81

Please sign in to comment.