diff --git a/applications/minotari_console_wallet/src/init/mod.rs b/applications/minotari_console_wallet/src/init/mod.rs index d5faff3b2a6..636faa2567c 100644 --- a/applications/minotari_console_wallet/src/init/mod.rs +++ b/applications/minotari_console_wallet/src/init/mod.rs @@ -23,6 +23,7 @@ #![allow(dead_code, unused)] use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; +use std::time::Instant; use log::*; use minotari_app_utilities::identity_management::setup_node_identity; @@ -442,6 +443,8 @@ pub async fn init_wallet( .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Error consensus manager. {}", e)))?; let factories = CryptoFactories::default(); + let now = Instant::now(); + let mut wallet = Wallet::start( wallet_config, config.peer_seeds.clone(), @@ -463,12 +466,18 @@ pub async fn init_wallet( WalletError::CommsInitializationError(cie) => cie.to_exit_error(), e => ExitError::new(ExitCode::WalletError, format!("Error creating Wallet Container: {}", e)), })?; - if let Some(hs) = wallet.comms.hidden_service() { - wallet - .db - .set_tor_identity(hs.tor_identity().clone()) - .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?; - } + // TODO: fix this + // if let Some(hs) = wallet.comms.hidden_service() { + // wallet + // .db + // .set_tor_identity(hs.tor_identity().clone()) + // .map_err(|e| ExitError::new(ExitCode::WalletError, format!("Problem writing tor identity. {}", e)))?; + // } + + error!( + target: LOG_TARGET, + "Wallet started in {}ms", now.elapsed().as_millis() + ); if let Some(file_name) = seed_words_file_name { let seed_words = wallet.get_seed_words(&MnemonicLanguage::English)?.join(" "); diff --git a/applications/minotari_node/src/bootstrap.rs b/applications/minotari_node/src/bootstrap.rs index c2d0ce77536..dce1a5b28c9 100644 --- a/applications/minotari_node/src/bootstrap.rs +++ b/applications/minotari_node/src/bootstrap.rs @@ -177,10 +177,11 @@ where B: BlockchainBackend + 'static .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; }, }; - if let Some(hs) = comms.hidden_service() { - identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity()) - .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; - } + todo!("Fix this"); + // if let Some(hs) = comms.hidden_service() { + // identity_management::save_as_json(&base_node_config.tor_identity_file, hs.tor_identity()) + // .map_err(|e| ExitError::new(ExitCode::IdentityError, e))?; + // } handles.register(comms); diff --git a/base_layer/contacts/src/chat_client/src/networking.rs b/base_layer/contacts/src/chat_client/src/networking.rs index fa84a20e9ff..b42d9a41bea 100644 --- a/base_layer/contacts/src/chat_client/src/networking.rs +++ b/base_layer/contacts/src/chat_client/src/networking.rs @@ -121,10 +121,11 @@ pub async fn start( trace!(target: LOG_TARGET, "save chat identity file"); }, }; - if let Some(hs) = comms.hidden_service() { - identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?; - trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity()); - } + todo!("Fix this"); + // if let Some(hs) = comms.hidden_service() { + // identity_management::save_as_json(&config.chat_client.tor_identity_file, hs.tor_identity())?; + // trace!(target: LOG_TARGET, "resave the chat tor identity {:?}", hs.tor_identity()); + // } handles.register(comms); let comms = handles.expect_handle::(); diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index ac9ab9b6536..5514e963c6a 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -70,6 +70,7 @@ use tari_storage::{ use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tower::ServiceBuilder; +use tari_comms::transports::HiddenServiceTransport; use crate::{ comms_connector::{InboundDomainConnector, PubsubDomainConnector}, @@ -251,20 +252,17 @@ pub async fn spawn_comms_using_transport( let listener_address_override = tor_config.listener_address_override.clone(); let mut hidden_service_ctl = initialize_hidden_service(tor_config)?; // Set the listener address to be the address (usually local) to which tor will forward all traffic - let transport = hidden_service_ctl.initialize_transport().await?; + let instant = Instant::now(); + let transport = HiddenServiceTransport::new(hidden_service_ctl); + error!(target: LOG_TARGET, "TOR transport initialized in {:.0?}", instant.elapsed()); + - info!( - target: LOG_TARGET, - "Tor hidden service initialized. proxied_address = '{:?}', listener_override_address = {:?}", - hidden_service_ctl.proxied_address(), - listener_address_override, - ); comms .with_listener_address( listener_address_override.unwrap_or_else(|| multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]), ) - .with_hidden_service_controller(hidden_service_ctl) + // .with_hidden_service_controller(hidden_service_ctl) .spawn_with_transport(transport) .await? }, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 807c68ecf9e..0dd463c0a00 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -5472,11 +5472,11 @@ pub unsafe extern "C" fn wallet_create( match w { Ok(w) => { // lets ensure the wallet tor_id is saved, this could have been changed during wallet startup - if let Some(hs) = w.comms.hidden_service() { - if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) { - warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e); - } - } + // if let Some(hs) = w.comms.hidden_service() { + // if let Err(e) = w.db.set_tor_identity(hs.tor_identity().clone()) { + // warn!(target: LOG_TARGET, "Could not save tor identity to db: {:?}", e); + // } + // } let wallet_address = TariAddress::new(w.comms.node_identity().public_key().clone(), w.network.as_network()); // Start Callback Handler @@ -5512,15 +5512,15 @@ pub unsafe extern "C" fn wallet_create( runtime.spawn(callback_handler.start()); - let mut ts = w.transaction_service.clone(); - runtime.spawn(async move { - if let Err(e) = ts.restart_transaction_protocols().await { - warn!( - target: LOG_TARGET, - "Could not restart transaction negotiation protocols: {:?}", e - ); - } - }); + // let mut ts = w.transaction_service.clone(); + // runtime.spawn(async move { + // if let Err(e) = ts.restart_transaction_protocols().await { + // warn!( + // target: LOG_TARGET, + // "Could not restart transaction negotiation protocols: {:?}", e + // ); + // } + // }); let tari_wallet = TariWallet { wallet: w, diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 649497c2c78..3b2d8e4c6cc 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{iter, sync::Arc, time::Duration}; +use std::time::Instant; use log::*; use multiaddr::{multiaddr, Protocol}; @@ -218,28 +219,30 @@ impl UnspawnedCommsNode { node_identity.node_id() ); - let listening_info = connection_manager_requester.wait_until_listening().await?; - + // let instant = Instant::now(); + // + // let listening_info = connection_manager_requester.wait_until_listening().await?; + // error!(target: LOG_TARGET, "Waited for {} to connect", instant.elapsed().as_millis()); // Final setup of the hidden service. - let mut hidden_service = None; - if let Some(mut ctl) = hidden_service_ctl { - // Only set the address to the bind address it is set to TCP port 0 - let mut proxied_addr = ctl.proxied_address(); - if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { - // Remove the TCP port 0 address and replace it with the actual listener port - if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() { - proxied_addr.pop(); - proxied_addr.push(Protocol::Tcp(port)); - ctl.set_proxied_addr(&proxied_addr); - } - } - let hs = ctl.create_hidden_service().await?; - let onion_addr = hs.get_onion_address(); - if !node_identity.public_addresses().contains(&onion_addr) { - node_identity.add_public_address(onion_addr); - } - hidden_service = Some(hs); - } + // let mut hidden_service = None; + // if let Some(mut ctl) = hidden_service_ctl { + // // Only set the address to the bind address it is set to TCP port 0 + // let mut proxied_addr = ctl.proxied_address(); + // if proxied_addr.ends_with(&multiaddr!(Tcp(0u16))) { + // // Remove the TCP port 0 address and replace it with the actual listener port + // if let Some(Protocol::Tcp(port)) = listening_info.bind_address().iter().last() { + // proxied_addr.pop(); + // proxied_addr.push(Protocol::Tcp(port)); + // ctl.set_proxied_addr(&proxied_addr); + // } + // } + // let hs = ctl.create_hidden_service().await?; + // let onion_addr = hs.get_onion_address(); + // if !node_identity.public_addresses().contains(&onion_addr) { + // node_identity.add_public_address(onion_addr); + // } + // hidden_service = Some(hs); + // } info!( target: LOG_TARGET, "Your node's public addresses are '{}'", @@ -266,11 +269,10 @@ impl UnspawnedCommsNode { shutdown_signal, connection_manager_requester, connectivity_requester, - listening_info, node_identity, peer_manager, liveness_watch, - hidden_service, + // hidden_service, complete_signals: ext_context.drain_complete_signals(), }) } @@ -313,11 +315,11 @@ pub struct CommsNode { /// Shared PeerManager instance peer_manager: Arc, /// The bind addresses of the listener(s) - listening_info: ListenerInfo, + // listening_info: ListenerInfo, /// Current liveness status liveness_watch: watch::Receiver, /// `Some` if the comms node is configured to run via a hidden service, otherwise `None` - hidden_service: Option, + //hidden_service: Option, /// The 'reciprocal' shutdown signals for each comms service complete_signals: Vec, } @@ -349,25 +351,20 @@ impl CommsNode { } /// Return the Ip/Tcp address that this node is listening on - pub fn listening_address(&self) -> &Multiaddr { - self.listening_info.bind_address() - } + // pub fn listening_address(&self) -> &Multiaddr { + // self.listening_info.bind_address() + // } /// Return [ListenerInfo] - pub fn listening_info(&self) -> &ListenerInfo { - &self.listening_info - } + // pub fn listening_info(&self) -> &ListenerInfo { + // &self.listening_info + // } /// Returns the current liveness status pub fn liveness_status(&self) -> LivenessStatus { *self.liveness_watch.borrow() } - /// Return the Ip/Tcp address that this node is listening on - pub fn hidden_service(&self) -> Option<&tor::HiddenService> { - self.hidden_service.as_ref() - } - /// Return a handle that is used to call the connectivity service. pub fn connectivity(&self) -> ConnectivityRequester { self.connectivity_requester.clone() diff --git a/comms/core/src/tor/hidden_service/controller.rs b/comms/core/src/tor/hidden_service/controller.rs index a706da54dfe..d5a60089cab 100644 --- a/comms/core/src/tor/hidden_service/controller.rs +++ b/comms/core/src/tor/hidden_service/controller.rs @@ -124,7 +124,9 @@ impl HiddenServiceController { } pub async fn initialize_transport(&mut self) -> Result { + dbg!("here3"); self.connect_and_auth().await?; + dbg!("here4"); let socks_addr = self.get_socks_address().await?; Ok(SocksTransport::new(SocksConfig { proxy_address: socks_addr, @@ -235,6 +237,7 @@ impl HiddenServiceController { } fn client_mut(&mut self) -> Result<&mut TorControlPortClient, HiddenServiceControllerError> { + dbg!("here5"); self.client .as_mut() .filter(|c| c.is_connected()) diff --git a/comms/core/src/transports/hidden_service_transport.rs b/comms/core/src/transports/hidden_service_transport.rs new file mode 100644 index 00000000000..d5f94400da2 --- /dev/null +++ b/comms/core/src/transports/hidden_service_transport.rs @@ -0,0 +1,100 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::io; +use std::sync::Arc; +use log::info; +use multiaddr::Multiaddr; +use tokio::sync::RwLock; +use crate::tor::HiddenServiceController; +use crate::transports::{SocksTransport, TcpTransport, Transport}; +use crate::transports::tcp::TcpInbound; + +const LOG_TARGET: &str = "comms::transports::hidden_service_transport"; + +#[derive(thiserror::Error, Debug)] +pub enum HiddenServiceTransportError { + #[error("Tor hidden service transport error: `{0}`")] + HiddenServiceControllerError(#[from] crate::tor::HiddenServiceControllerError), + #[error("Tor hidden service socks error: `{0}`")] + SocksTransportError(#[from] io::Error), + +} + +struct HiddenServiceTransportInner { + socks_transport: Option, + hidden_service_ctl: HiddenServiceController + +} + +#[derive(Clone)] +pub struct HiddenServiceTransport { + inner: Arc> +} + +impl HiddenServiceTransport { + pub fn new(hidden_service_ctl: HiddenServiceController) -> Self { + Self { + inner : Arc::new(RwLock::new(HiddenServiceTransportInner { + socks_transport: None, + hidden_service_ctl + })) + } + } + + async fn ensure_initialized(&self) -> Result<(), io::Error> { + let inner = self.inner.read().await; + if inner.socks_transport.is_none() { + drop(inner); + let mut mut_inner = self.inner.write().await; + if mut_inner.socks_transport.is_none() { + let transport = mut_inner.hidden_service_ctl.initialize_transport().await.expect("TODO NEED TO MAP THESE ERRORS SOMEHOW"); + mut_inner.socks_transport = Some(transport); + } + } + Ok(()) + } +} +#[crate::async_trait] +impl Transport for HiddenServiceTransport { + type Output = ::Output; + type Error = ::Error; + type Listener = ::Listener; + + async fn listen(&self, addr: &Multiaddr) -> Result<(Self::Listener, Multiaddr), Self::Error> { + self.ensure_initialized().await?; + let inner = self.inner.read().await; + + // info!( + // target: LOG_TARGET, + // "Tor hidden service initialized. proxied_address = '{:?}'", + // inner.proxied_address(), + // ); + Ok(inner.socks_transport.as_ref().unwrap().listen(addr).await?) + } + + async fn dial(&self, addr: &Multiaddr) -> Result { + self.ensure_initialized().await?; + let inner = self.inner.read().await; + Ok(inner.socks_transport.as_ref().unwrap().dial(addr).await?) + } +} diff --git a/comms/core/src/transports/mod.rs b/comms/core/src/transports/mod.rs index 45050f540d8..64199853024 100644 --- a/comms/core/src/transports/mod.rs +++ b/comms/core/src/transports/mod.rs @@ -48,6 +48,9 @@ mod tcp; pub use tcp::TcpTransport; mod tcp_with_tor; +mod hidden_service_transport; +pub use hidden_service_transport::HiddenServiceTransport; + pub use tcp_with_tor::TcpWithTorTransport; /// Defines an abstraction for implementations that can dial and listen for connections over a provided address.