Skip to content

Commit

Permalink
Merge branch 'tomasarrachea-stream-sync-state' into tomasarrachea-fil…
Browse files Browse the repository at this point in the history
…ter-nullifiers-by-blocknum
  • Loading branch information
TomasArrachea authored Feb 19, 2025
2 parents 261232a + aa2dc07 commit e1763ad
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ env/
*.out
node_modules/
*DS_Store
*.iml
*.iml
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

- [BREAKING] Updated minimum Rust version to 1.84.
- [BREAKING] `Endpoint` configuration simplified to a single string (#654).
- Added `block_num` parameter to `CheckNullifiersByPrefix` endpoint.
- Added `block_num` parameter to `CheckNullifiersByPrefix` endpoint (#707).
- [BREAKING] Changed sync state endpoint to stream the response (#685).

### Enhancements

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ check: ## Check all targets and features for errors without code generation
# --- building ------------------------------------------------------------------------------------

.PHONY: build
build: ## Builds all crates and re-builds ptotobuf bindings for proto crates
build: ## Builds all crates and re-builds protobuf bindings for proto crates
${BUILD_PROTO} cargo build --locked

# --- installing ----------------------------------------------------------------------------------
Expand Down
15 changes: 6 additions & 9 deletions crates/proto/src/generated/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,24 @@ pub struct NullifierUpdate {
/// Represents the result of syncing state request.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SyncStateResponse {
/// Number of the latest block in the chain.
#[prost(fixed32, tag = "1")]
pub chain_tip: u32,
/// Block header of the block with the first note matching the specified criteria.
#[prost(message, optional, tag = "2")]
#[prost(message, optional, tag = "1")]
pub block_header: ::core::option::Option<super::block::BlockHeader>,
/// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`.
#[prost(message, optional, tag = "3")]
#[prost(message, optional, tag = "2")]
pub mmr_delta: ::core::option::Option<super::mmr::MmrDelta>,
/// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`.
#[prost(message, repeated, tag = "5")]
#[prost(message, repeated, tag = "3")]
pub accounts: ::prost::alloc::vec::Vec<super::account::AccountSummary>,
/// List of transactions executed against requested accounts between `request.block_num + 1` and
/// `response.block_header.block_num`.
#[prost(message, repeated, tag = "6")]
#[prost(message, repeated, tag = "4")]
pub transactions: ::prost::alloc::vec::Vec<super::transaction::TransactionSummary>,
/// List of all notes together with the Merkle paths from `response.block_header.note_root`.
#[prost(message, repeated, tag = "7")]
#[prost(message, repeated, tag = "5")]
pub notes: ::prost::alloc::vec::Vec<super::note::NoteSyncRecord>,
/// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`.
#[prost(message, repeated, tag = "8")]
#[prost(message, repeated, tag = "6")]
pub nullifiers: ::prost::alloc::vec::Vec<NullifierUpdate>,
}
/// Represents the result of syncing notes request.
Expand Down
45 changes: 26 additions & 19 deletions crates/proto/src/generated/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,11 @@ pub mod api_client {
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
/// This request returns the next block containing requested data. It also returns `chain_tip`
/// which is the latest block number in the chain. Client is expected to repeat these requests
/// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
/// the client is fully synchronized with the chain.
/// This request returns a stream where multiple update responses will be pushed in order.
/// Client is expected to read the updates from the stream and apply them, and then it will be
/// fully synchronized with the chain.
///
/// Each request also returns info about new notes, nullifiers etc. created. It also returns
/// Each update response also contains info about new notes, nullifiers etc. created. It also returns
/// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
/// MMR peaks and chain MMR nodes.
///
Expand All @@ -371,7 +370,9 @@ pub mod api_client {
&mut self,
request: impl tonic::IntoRequest<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Response<
tonic::codec::Streaming<super::super::responses::SyncStateResponse>,
>,
tonic::Status,
> {
self.inner
Expand All @@ -386,7 +387,7 @@ pub mod api_client {
let path = http::uri::PathAndQuery::from_static("/rpc.Api/SyncState");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("rpc.Api", "SyncState"));
self.inner.unary(req, path, codec).await
self.inner.server_streaming(req, path, codec).await
}
}
}
Expand Down Expand Up @@ -501,15 +502,23 @@ pub mod api_server {
tonic::Response<super::super::responses::SyncNoteResponse>,
tonic::Status,
>;
/// Server streaming response type for the SyncState method.
type SyncStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<
super::super::responses::SyncStateResponse,
tonic::Status,
>,
>
+ std::marker::Send
+ 'static;
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
/// This request returns the next block containing requested data. It also returns `chain_tip`
/// which is the latest block number in the chain. Client is expected to repeat these requests
/// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
/// the client is fully synchronized with the chain.
/// This request returns a stream where multiple update responses will be pushed in order.
/// Client is expected to read the updates from the stream and apply them, and then it will be
/// fully synchronized with the chain.
///
/// Each request also returns info about new notes, nullifiers etc. created. It also returns
/// Each update response also contains info about new notes, nullifiers etc. created. It also returns
/// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
/// MMR peaks and chain MMR nodes.
///
Expand All @@ -519,10 +528,7 @@ pub mod api_server {
async fn sync_state(
&self,
request: tonic::Request<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Status,
>;
) -> std::result::Result<tonic::Response<Self::SyncStateStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct ApiServer<T> {
Expand Down Expand Up @@ -1087,12 +1093,13 @@ pub mod api_server {
struct SyncStateSvc<T: Api>(pub Arc<T>);
impl<
T: Api,
> tonic::server::UnaryService<
> tonic::server::ServerStreamingService<
super::super::requests::SyncStateRequest,
> for SyncStateSvc<T> {
type Response = super::super::responses::SyncStateResponse;
type ResponseStream = T::SyncStateStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
Expand Down Expand Up @@ -1125,7 +1132,7 @@ pub mod api_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
Expand Down
27 changes: 18 additions & 9 deletions crates/proto/src/generated/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ pub mod api_client {
&mut self,
request: impl tonic::IntoRequest<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Response<
tonic::codec::Streaming<super::super::responses::SyncStateResponse>,
>,
tonic::Status,
> {
self.inner
Expand All @@ -465,7 +467,7 @@ pub mod api_client {
let path = http::uri::PathAndQuery::from_static("/store.Api/SyncState");
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new("store.Api", "SyncState"));
self.inner.unary(req, path, codec).await
self.inner.server_streaming(req, path, codec).await
}
}
}
Expand Down Expand Up @@ -602,6 +604,15 @@ pub mod api_server {
tonic::Response<super::super::responses::SyncNoteResponse>,
tonic::Status,
>;
/// Server streaming response type for the SyncState method.
type SyncStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<
super::super::responses::SyncStateResponse,
tonic::Status,
>,
>
+ std::marker::Send
+ 'static;
/// Returns info which can be used by the client to sync up to the latest state of the chain
/// for the objects (accounts, notes, nullifiers) the client is interested in.
///
Expand All @@ -620,10 +631,7 @@ pub mod api_server {
async fn sync_state(
&self,
request: tonic::Request<super::super::requests::SyncStateRequest>,
) -> std::result::Result<
tonic::Response<super::super::responses::SyncStateResponse>,
tonic::Status,
>;
) -> std::result::Result<tonic::Response<Self::SyncStateStream>, tonic::Status>;
}
#[derive(Debug)]
pub struct ApiServer<T> {
Expand Down Expand Up @@ -1332,12 +1340,13 @@ pub mod api_server {
struct SyncStateSvc<T: Api>(pub Arc<T>);
impl<
T: Api,
> tonic::server::UnaryService<
> tonic::server::ServerStreamingService<
super::super::requests::SyncStateRequest,
> for SyncStateSvc<T> {
type Response = super::super::responses::SyncStateResponse;
type ResponseStream = T::SyncStateStream;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
Expand Down Expand Up @@ -1370,7 +1379,7 @@ pub mod api_server {
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
Expand Down
15 changes: 6 additions & 9 deletions crates/rpc-proto/proto/responses.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,24 @@ message NullifierUpdate {

// Represents the result of syncing state request.
message SyncStateResponse {
// Number of the latest block in the chain.
fixed32 chain_tip = 1;

// Block header of the block with the first note matching the specified criteria.
block.BlockHeader block_header = 2;
block.BlockHeader block_header = 1;

// Data needed to update the partial MMR from `request.block_num + 1` to `response.block_header.block_num`.
mmr.MmrDelta mmr_delta = 3;
mmr.MmrDelta mmr_delta = 2;

// List of account hashes updated after `request.block_num + 1` but not after `response.block_header.block_num`.
repeated account.AccountSummary accounts = 5;
repeated account.AccountSummary accounts = 3;

// List of transactions executed against requested accounts between `request.block_num + 1` and
// `response.block_header.block_num`.
repeated transaction.TransactionSummary transactions = 6;
repeated transaction.TransactionSummary transactions = 4;

// List of all notes together with the Merkle paths from `response.block_header.note_root`.
repeated note.NoteSyncRecord notes = 7;
repeated note.NoteSyncRecord notes = 5;

// List of nullifiers created between `request.block_num + 1` and `response.block_header.block_num`.
repeated NullifierUpdate nullifiers = 8;
repeated NullifierUpdate nullifiers = 6;
}

// Represents the result of syncing notes request.
Expand Down
11 changes: 5 additions & 6 deletions crates/rpc-proto/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,16 @@ service Api {
// Returns info which can be used by the client to sync up to the latest state of the chain
// for the objects (accounts, notes, nullifiers) the client is interested in.
//
// This request returns the next block containing requested data. It also returns `chain_tip`
// which is the latest block number in the chain. Client is expected to repeat these requests
// in a loop until `response.block_header.block_num == response.chain_tip`, at which point
// the client is fully synchronized with the chain.
// This request returns a stream where multiple update responses will be pushed in order.
// Client is expected to read the updates from the stream and apply them, and then it will be
// fully synchronized with the chain.
//
// Each request also returns info about new notes, nullifiers etc. created. It also returns
// Each update response also contains info about new notes, nullifiers etc. created. It also returns
// Chain MMR delta that can be used to update the state of Chain MMR. This includes both chain
// MMR peaks and chain MMR nodes.
//
// For preserving some degree of privacy, note tags and nullifiers filters contain only high
// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make
// additional filtering of that data on its side.
rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {}
rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {}
}
2 changes: 1 addition & 1 deletion crates/rpc-proto/proto/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ service Api {
// For preserving some degree of privacy, note tags and nullifiers filters contain only high
// part of hashes. Thus, returned data contains excessive notes and nullifiers, client can make
// additional filtering of that data on its side.
rpc SyncState(requests.SyncStateRequest) returns (responses.SyncStateResponse) {}
rpc SyncState(requests.SyncStateRequest) returns (stream responses.SyncStateResponse) {}
}
7 changes: 4 additions & 3 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use miden_tx::TransactionVerifier;
use tonic::{
service::interceptor::InterceptedService,
transport::{Channel, Error},
Request, Response, Status,
Request, Response, Status, Streaming,
};
use tracing::{debug, info, instrument};

Expand Down Expand Up @@ -71,6 +71,8 @@ impl RpcApi {

#[tonic::async_trait]
impl api_server::Api for RpcApi {
type SyncStateStream = Streaming<SyncStateResponse>;

#[instrument(
target = COMPONENT,
name = "rpc:check_nullifiers",
Expand Down Expand Up @@ -130,13 +132,12 @@ impl api_server::Api for RpcApi {
target = COMPONENT,
name = "rpc:sync_state",
skip_all,
ret(level = "debug"),
err
)]
async fn sync_state(
&self,
request: Request<SyncStateRequest>,
) -> Result<Response<SyncStateResponse>, Status> {
) -> Result<Response<Self::SyncStateStream>, Status> {
debug!(target: COMPONENT, request = ?request.get_ref());

self.store.clone().sync_state(request).await
Expand Down
Loading

0 comments on commit e1763ad

Please sign in to comment.