Skip to content

Commit

Permalink
feat: implement submit pipeline (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmhrpr authored and scarmuega committed May 17, 2024
1 parent 2abe1a3 commit ecdc454
Show file tree
Hide file tree
Showing 19 changed files with 1,196 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions examples/sync-preprod/dolos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ path = "./shelley.json"

[logging]
max_level = "debug"

[submit.grpc]
listen_address = "[::]:50052"
prune_after_confirmations = 10000
peer_addresses = ["preprod-node.world.dev.cardano.org:30000"]
peer_magic = 1
3 changes: 3 additions & 0 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
chain.clone(),
));

let submitter = tokio::spawn(dolos::submit::serve(config.submit, wal.clone(), false));

dolos::sync::pipeline(
&config.upstream,
wal,
Expand All @@ -37,6 +39,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
.block();

server.abort();
submitter.abort();

Ok(())
}
4 changes: 4 additions & 0 deletions src/bin/dolos/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod daemon;
mod data;
mod eval;
mod serve;
mod submit;
mod sync;

#[cfg(feature = "mithril")]
Expand All @@ -21,6 +22,7 @@ enum Command {
Data(data::Args),
Serve(serve::Args),
Eval(eval::Args),
Submit(submit::Args),

#[cfg(feature = "mithril")]
Bootstrap(bootstrap::Args),
Expand Down Expand Up @@ -69,6 +71,7 @@ pub struct Config {
pub rolldb: RolldbConfig,
pub upstream: dolos::sync::Config,
pub serve: dolos::serve::Config,
pub submit: dolos::submit::Config,
pub retries: Option<gasket::retries::Policy>,
pub byron: GenesisFileRef,
pub shelley: GenesisFileRef,
Expand Down Expand Up @@ -110,6 +113,7 @@ fn main() -> Result<()> {
Command::Data(x) => data::run(&config, &x)?,
Command::Serve(x) => serve::run(config, &x)?,
Command::Eval(x) => eval::run(&config, &x)?,
Command::Submit(x) => submit::run(config, &x)?,

#[cfg(feature = "mithril")]
Command::Bootstrap(x) => bootstrap::run(&config, &x)?,
Expand Down
13 changes: 13 additions & 0 deletions src/bin/dolos/submit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#[derive(Debug, clap::Args)]
pub struct Args {}

#[tokio::main]
pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, _, _) = crate::common::open_data_stores(&config)?;

dolos::submit::serve(config.submit, wal, true).await?;

Ok(())
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod model;
pub mod prelude;
pub mod serve;
pub mod storage;
pub mod submit;
pub mod sync;

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions src/model.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pallas::{crypto::hash::Hash, network::miniprotocols::Point};

pub type BlockSlot = u64;
pub type BlockHeight = u64;
pub type BlockHash = Hash<32>;
pub type RawBlock = Vec<u8>;
pub type TxHash = Hash<32>;
Expand Down
269 changes: 269 additions & 0 deletions src/submit/grpc/endpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
use futures_core::Stream;
use gasket::framework::*;
use gasket::messaging::{tokio::ChannelSendAdapter, SendAdapter};
use pallas::crypto::hash::Hash;
use pallas::ledger::traverse::MultiEraTx;
use std::collections::HashMap;
use std::ops::Deref;
use std::path::PathBuf;
use std::{pin::Pin, sync::Arc};
use tokio::sync::{Notify, RwLock};
use tonic::transport::{Certificate, Server, ServerTlsConfig};
use tonic::{Request, Response, Status};
use tracing::info;
use utxorpc_spec::utxorpc;
use utxorpc_spec::utxorpc::v1alpha::submit::submit_service_server::SubmitServiceServer;
use utxorpc_spec::utxorpc::v1alpha::submit::{Stage as SubmitStage, WaitForTxResponse, *};

use crate::prelude::Error;

use super::mempool::Monitor;
use super::Transaction;

pub struct SubmitServiceImpl {
channel: ChannelSendAdapter<Vec<Transaction>>,
mempool_view: Arc<RwLock<Monitor>>,
change_notify: Arc<Notify>,
}

impl SubmitServiceImpl {
pub fn new(
channel: ChannelSendAdapter<Vec<Transaction>>,
mempool_view: Arc<RwLock<Monitor>>,
change_notify: Arc<Notify>,
) -> Self {
Self {
channel,
mempool_view: mempool_view,
change_notify: change_notify,
}
}
}

#[async_trait::async_trait]
impl submit_service_server::SubmitService for SubmitServiceImpl {
type WaitForTxStream =
Pin<Box<dyn Stream<Item = Result<WaitForTxResponse, tonic::Status>> + Send + 'static>>;

type WatchMempoolStream =
Pin<Box<dyn Stream<Item = Result<WatchMempoolResponse, tonic::Status>> + Send + 'static>>;

async fn submit_tx(
&self,
request: Request<SubmitTxRequest>,
) -> Result<Response<SubmitTxResponse>, Status> {
let message = request.into_inner();

info!("received new grpc submit tx request: {:?}", message);

let mut received = vec![];

for (idx, tx_bytes) in message.tx.into_iter().flat_map(|x| x.r#type).enumerate() {
match tx_bytes {
any_chain_tx::Type::Raw(bytes) => {
let decoded = MultiEraTx::decode(&bytes).map_err(|e| {
Status::invalid_argument(
format! {"could not decode tx at index {idx}: {e}"},
)
})?;

let hash = decoded.hash();

// TODO: we don't phase-2 validate txs before propagating so we could
// propagate p2 invalid transactions resulting in collateral loss
if !decoded.redeemers().is_empty() {
return Err(Status::invalid_argument(
"txs interacting with plutus scripts not yet supported",
));
}

received.push(Transaction {
hash,
era: u16::from(decoded.era()) - 1, // TODO: pallas Era is 1-indexed so maybe that is the reason this works
bytes: bytes.into(),
})
}
}
}

let hashes = received.iter().map(|x| x.hash.to_vec().into()).collect();

self.channel
.clone()
.send(received.into())
.await
.map_err(|_| Status::internal("couldn't add txs to mempool"))?;

Ok(Response::new(SubmitTxResponse { r#ref: hashes }))
}

async fn wait_for_tx(
&self,
request: Request<WaitForTxRequest>,
) -> Result<Response<Self::WaitForTxStream>, Status> {
let mempool_view_rwlock = self.mempool_view.clone();
let change_notifier = self.change_notify.clone();

Ok(Response::new(Box::pin(async_stream::stream! {
let tx_refs = request.into_inner().r#ref;

let mut last_update: HashMap<&[u8; 32], Option<SubmitStage>> = HashMap::new();

let mut tx_hashes = vec![];

for tx_ref in tx_refs {
let tx_hash: [u8; 32] = tx_ref
.deref()
.try_into()
.map_err(|_| Status::invalid_argument("tx hash malformed"))?;

tx_hashes.push(tx_hash)
}

last_update.extend(tx_hashes.iter().map(|x| (x, None)));

info!("starting wait_for_tx async stream for tx hashes: {:?}", tx_hashes.iter().map(|x| Hash::new(*x)).collect::<Vec<_>>());

loop {
change_notifier.notified().await;

for hash in tx_hashes.iter() {
let mempool_view = mempool_view_rwlock.read().await;

let stage = if let Some(maybe_inclusion) = mempool_view.txs.get(&(*hash).into()) {
if let Some(inclusion) = maybe_inclusion {
// TODO: spec does not have way to detail number of confirmations
let _confirmations = mempool_view.tip_height - inclusion;

// tx is included on chain
SubmitStage::Confirmed
} else {
// tx has been propagated but not included on chain
SubmitStage::Mempool
}
} else {
// tx hash provided has not been passed to propagators
SubmitStage::Unspecified
};

// if stage changed since we last informed user, send user update
match last_update.get(&hash).unwrap() {
Some(last_stage) if (*last_stage == stage) => (),
_ => {
let response = WaitForTxResponse {
r#ref: hash.to_vec().into(),
stage: stage.into()
};

yield Ok(response);

last_update.insert(hash, Some(stage));
}
}
}
}
})))
}

async fn read_mempool(
&self,
_request: tonic::Request<ReadMempoolRequest>,
) -> Result<tonic::Response<ReadMempoolResponse>, tonic::Status> {
todo!()
}

async fn watch_mempool(
&self,
_request: tonic::Request<WatchMempoolRequest>,
) -> Result<tonic::Response<Self::WatchMempoolStream>, tonic::Status> {
todo!()
}
}

#[derive(Stage)]
#[stage(name = "endpoints", unit = "()", worker = "Worker")]
pub struct Stage {
listen_address: String,
tls_client_ca_root: Option<PathBuf>,
send_channel: ChannelSendAdapter<Vec<Transaction>>,
mempool_view: Arc<RwLock<Monitor>>,
change_notify: Arc<Notify>,
// #[metric]
// received_txs: gasket::metrics::Counter,
}

impl Stage {
pub fn new(
listen_address: String,
tls_client_ca_root: Option<PathBuf>,
send_channel: ChannelSendAdapter<Vec<Transaction>>,
mempool_view: Arc<RwLock<Monitor>>,
change_notify: Arc<Notify>,
) -> Self {
Self {
listen_address,
tls_client_ca_root,
send_channel,
mempool_view,
change_notify,
}
}
}

pub struct Worker {}

impl Worker {}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(_stage: &Stage) -> Result<Self, WorkerError> {
Ok(Self {})
}

async fn schedule(&mut self, _stage: &mut Stage) -> Result<WorkSchedule<()>, WorkerError> {
Ok(WorkSchedule::Unit(()))
}

async fn execute(&mut self, _unit: &(), stage: &mut Stage) -> Result<(), WorkerError> {
let addr = stage.listen_address.parse().or_panic()?;

let service = SubmitServiceImpl::new(
stage.send_channel.clone(),
stage.mempool_view.clone(),
stage.change_notify.clone(),
);
let service = SubmitServiceServer::new(service);

let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(utxorpc::v1alpha::cardano::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(utxorpc::v1alpha::submit::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET)
.build()
.unwrap();

let mut server = Server::builder().accept_http1(true);

if let Some(pem) = stage.tls_client_ca_root.clone() {
let pem = std::env::current_dir().unwrap().join(pem);
let pem = std::fs::read_to_string(pem)
.map_err(Error::config)
.or_panic()?;
let pem = Certificate::from_pem(pem);

let tls = ServerTlsConfig::new().client_ca_root(pem);

server = server.tls_config(tls).map_err(Error::config).or_panic()?;
}

info!("serving via gRPC on address: {}", stage.listen_address);

let _ = server
// GrpcWeb is over http1 so we must enable it.
.add_service(tonic_web::enable(service))
.add_service(reflection)
.serve(addr)
.await;

Ok(())
}
}
Loading

0 comments on commit ecdc454

Please sign in to comment.