Skip to content

Commit

Permalink
Merge pull request #22 from cowlicks/replication
Browse files Browse the repository at this point in the history
Changes for Replication
  • Loading branch information
ttiurani authored Oct 28, 2024
2 parents 48140bd + 85952b9 commit 5c2982e
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 19 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ tracing = "0.1"
pretty-hash = "0.4"
futures-timer = "3"
futures-lite = "1"
hypercore = { version = "0.14.0", default-features = false }
sha2 = "0.10"
curve25519-dalek = "4"
crypto_secretstream = "0.2"

[dependencies.hypercore]
version = "0.14.0"
default-features = false


[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes", "unstable"] }
async-compat = "0.2.1"
Expand Down
49 changes: 38 additions & 11 deletions src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
use tracing::debug;

/// A protocol channel.
///
/// This is the handle that can be sent to other threads.
#[derive(Clone)]
pub struct Channel {
inbound_rx: Option<Receiver<Message>>,
direct_inbound_tx: Sender<Message>,
Expand Down Expand Up @@ -44,6 +46,25 @@ impl fmt::Debug for Channel {
}

impl Channel {
fn new(
inbound_rx: Option<Receiver<Message>>,
direct_inbound_tx: Sender<Message>,
outbound_tx: Sender<Vec<ChannelMessage>>,
discovery_key: DiscoveryKey,
key: Key,
local_id: usize,
closed: Arc<AtomicBool>,
) -> Self {
Self {
inbound_rx,
direct_inbound_tx,
outbound_tx,
key,
discovery_key,
local_id,
closed,
}
}
/// Get the discovery key of this channel.
pub fn discovery_key(&self) -> &[u8; 32] {
&self.discovery_key
Expand Down Expand Up @@ -72,6 +93,7 @@ impl Channel {
"Channel is closed",
));
}
debug!("TX:\n{message:?}\n");
let message = ChannelMessage::new(self.local_id as u64, message);
self.outbound_tx
.send(vec![message])
Expand All @@ -97,9 +119,13 @@ impl Channel {
"Channel is closed",
));
}

let messages = messages
.iter()
.map(|message| ChannelMessage::new(self.local_id as u64, message.clone()))
.map(|message| {
debug!("TX:\n{message:?}\n");
ChannelMessage::new(self.local_id as u64, message.clone())
})
.collect();
self.outbound_tx
.send(messages)
Expand All @@ -120,7 +146,7 @@ impl Channel {
/// you will only want to send a LocalSignal message with this sender to make
/// it clear what event came from the remote peer and what was local
/// signaling.
pub fn local_sender(&mut self) -> Sender<Message> {
pub fn local_sender(&self) -> Sender<Message> {
self.direct_inbound_tx.clone()
}

Expand Down Expand Up @@ -257,15 +283,16 @@ impl ChannelHandle {
.expect("May not open channel that is not locally attached");

let (inbound_tx, inbound_rx) = async_channel::unbounded();
let channel = Channel {
inbound_rx: Some(inbound_rx),
direct_inbound_tx: inbound_tx.clone(),
let channel = Channel::new(
Some(inbound_rx),
inbound_tx.clone(),
outbound_tx,
discovery_key: self.discovery_key,
key: local_state.key,
local_id: local_state.local_id,
closed: self.closed.clone(),
};
self.discovery_key,
local_state.key,
local_state.local_id,
self.closed.clone(),
);

self.inbound_tx = Some(inbound_tx);
channel
}
Expand Down Expand Up @@ -387,7 +414,7 @@ impl ChannelMap {
}
}

pub(crate) fn has_channel(&mut self, discovery_key: &[u8]) -> bool {
pub(crate) fn has_channel(&self, discovery_key: &[u8]) -> bool {
let hdkey = hex::encode(discovery_key);
self.channels.contains_key(&hdkey)
}
Expand Down
4 changes: 2 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl Message {
}

/// Pre-encodes a message to state, returns length
pub(crate) fn preencode(&mut self, state: &mut HypercoreState) -> Result<usize, EncodingError> {
pub(crate) fn preencode(&self, state: &mut HypercoreState) -> Result<usize, EncodingError> {
match self {
Self::Open(ref message) => state.0.preencode(message)?,
Self::Close(ref message) => state.0.preencode(message)?,
Expand All @@ -427,7 +427,7 @@ impl Message {

/// Encodes a message to a given buffer, using preencoded state, results size
pub(crate) fn encode(
&mut self,
&self,
state: &mut HypercoreState,
buf: &mut [u8],
) -> Result<usize, EncodingError> {
Expand Down
23 changes: 21 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ impl fmt::Debug for State {
}

/// A Protocol stream.
///
#[derive(Debug)]
pub struct Protocol<IO> {
write_state: WriteState,
read_state: ReadState,
Expand All @@ -152,6 +150,26 @@ pub struct Protocol<IO> {
queued_events: VecDeque<Event>,
}

impl<IO> std::fmt::Debug for Protocol<IO> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Protocol")
.field("write_state", &self.write_state)
.field("read_state", &self.read_state)
//.field("io", &self.io)
.field("state", &self.state)
.field("options", &self.options)
.field("handshake", &self.handshake)
.field("channels", &self.channels)
.field("command_rx", &self.command_rx)
.field("command_tx", &self.command_tx)
.field("outbound_rx", &self.outbound_rx)
.field("outbound_tx", &self.outbound_tx)
.field("keepalive", &self.keepalive)
.field("queued_events", &self.queued_events)
.finish()
}
}

impl<IO> Protocol<IO>
where
IO: AsyncWrite + AsyncRead + Send + Unpin + 'static,
Expand Down Expand Up @@ -510,6 +528,7 @@ where
}
}

/// Open a Channel with the given key. Adding it to our channel map
fn command_open(&mut self, key: Key) -> Result<()> {
// Create a new channel.
let channel_handle = self.channels.attach_local(key);
Expand Down
5 changes: 3 additions & 2 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,15 @@ impl CompactEncoding<Bitfield> for State {
}

/// Range message. Type 8.
/// Notifies Peer's that the Sender has a range of contiguous blocks.
#[derive(Debug, Clone, PartialEq)]
pub struct Range {
/// If true, notifies that data has been cleared from this range.
/// If false, notifies existing data range.
pub drop: bool,
/// Start index
/// Range starts at this index
pub start: u64,
/// Length
/// Length of the range
pub length: u64,
}

Expand Down
3 changes: 2 additions & 1 deletion tests/js_interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ async fn js_interop_rcrs_simple_server_writer() -> Result<()> {
}

#[test(async_test)]
#[cfg_attr(not(feature = "js_interop_tests"), ignore)]
//#[cfg_attr(not(feature = "js_interop_tests"), ignore)]
#[ignore] // FIXME this tests hangs sporadically
async fn js_interop_rcrs_simple_client_writer() -> Result<()> {
js_interop_rcrs_simple(false, 8108).await?;
Ok(())
Expand Down

0 comments on commit 5c2982e

Please sign in to comment.