Skip to content

Commit

Permalink
feat: improve monerod switchover (#6794)
Browse files Browse the repository at this point in the history
Description
---
Improved monerod switchover logic when another process is busy acquiring
a new monerod server. Preference will be given to the currently ongoing
monerod server qualifying round.

Motivation and Context
---
A new monerod timeout or error response interfered with a current
monerod server qualifying round.

How Has This Been Tested?
---
System-level testing.

Typical change-over from one server to the next for **get_heightt** -
note the response times. In this case the change over was from
`xmr-01.tari.com` to `83.217.209.212:18089`, with the former having a
lot faster response with `get_height`.
```rust
2025-02-13 02:36:47.911666300 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 9449507401888018260, response time: 37ms
2025-02-13 02:36:48.939471000 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 13575053716280068238, response time: 63ms
2025-02-13 02:36:49.952704000 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 5611195404276155000, response time: 67ms
2025-02-13 02:36:50.932377400 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 10283440367075556454, response time: 42ms
2025-02-13 02:36:52.016915900 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 12963264473660460967, response time: 61ms
2025-02-13 02:36:53.017457500 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 7705492877366699726, response time: 47ms
2025-02-13 02:36:54.020946200 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 3351472723537365142, response time: 41ms
2025-02-13 02:36:55.054276400 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 14654389803029846950, response time: 67ms
2025-02-13 02:36:57.084860800 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 16581611060531486049, response time: 1078ms
2025-02-13 02:36:58.069733300 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 11200975234993943966, response time: 1049ms
2025-02-13 02:37:00.037884400 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 16367810207953566192, response time: 2008ms
2025-02-13 02:37:00.857839600 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 14763931288495226594, response time: 787ms
2025-02-13 02:37:01.080669500 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 13876100441960084867, response time: 2014ms
2025-02-13 02:37:01.444369800 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 889014338068691509, response time: 362ms
2025-02-13 02:37:02.463123200 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 11511042497828797364, response time: 370ms
2025-02-13 02:37:03.471015500 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 12745839339869933821, response time: 369ms
2025-02-13 02:37:04.550573600 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 5407531915096230694, response time: 384ms
2025-02-13 02:37:05.565108800 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 15144540318823463949, response time: 386ms
2025-02-13 02:37:06.575400700 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_height' response status = 200 OK, trace_id: 5465604504835685969, response time: 384ms
```
Typical change-over from one server to the next for
**get_block_template** during the same time frame. Because this request
happens less frequently, we do not notice any time-outs, just an
increase in average response times.
```rust
2025-02-13 02:35:40.635512300 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 7102650860353685516, response time: 209ms
2025-02-13 02:35:56.862736500 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 5002503581636467590, response time: 309ms
2025-02-13 02:36:12.882713600 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 6305477133685166358, response time: 217ms
2025-02-13 02:36:29.013004600 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 3734157611122817821, response time: 224ms
2025-02-13 02:36:45.113786200 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 17453492759535002385, response time: 233ms
2025-02-13 02:37:01.236324200 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 379651289171152797, response time: 376ms
2025-02-13 02:37:17.127726700 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 13522787336401775792, response time: 378ms
2025-02-13 02:37:33.211018900 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 1796862900003369865, response time: 384ms
2025-02-13 02:37:49.320177300 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 3402644666467065627, response time: 365ms
2025-02-13 02:38:05.478493600 [minotari_mm_proxy::proxy::inner] DEBUG [monerod] 'get_block_template' response status = 200 OK, trace_id: 7165471818726006683, response time: 393ms
```


What process can a PR reviewer use to test or verify this change?
---
Code review

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Integrated a new dependency for advanced random number generation to
enhance backend performance.

- **New Features**
- Enhanced error handling with robust detection for invalid requests,
offering more informative feedback.
- Improved reliability in server communications with refined fallback
mechanisms and better request tracking.

- **Refactor**
- Streamlined logging by updating verbosity levels for clearer and more
concise debug outputs.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
hansieodendaal authored Feb 17, 2025
1 parent d3ea51f commit 47dc1d5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/minotari_merge_mining_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ tracing = "0.1"
url = "2.1.1"
scraper = "0.19.0"
toml = "0.8.19"
rand = "0.8"
regex = "1.11.1"

[build-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions applications/minotari_merge_mining_proxy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum MmProxyError {
MaxSizeVecError(#[from] MaxSizeVecError),
#[error("Monerod timeout: {0}")]
MonerodTimeout(String),
#[error("Monerod request could not be parsed: {0}")]
InvalidMonerodRequest(String),
}

impl From<tonic::Status> for MmProxyError {
Expand Down
126 changes: 86 additions & 40 deletions applications/minotari_merge_mining_proxy/src/proxy/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use log::error;
use minotari_app_grpc::{tari_rpc, tari_rpc::SubmitBlockRequest};
use minotari_app_utilities::parse_miner_input::{BaseNodeGrpcClient, ShaP2PoolGrpcClient};
use monero::Hash;
use rand::random;
use serde_json as json;
use serde_json::json;
use tari_common_types::tari_address::TariAddress;
Expand Down Expand Up @@ -567,9 +568,24 @@ impl InnerService {
Ok(proxy::into_response(parts, &resp))
}

fn clear_current_monerod_server_lock(&self, last_assigned_server: Option<&str>) {
fn clear_current_monerod_server_lock(&self, last_assigned_server: Option<&str>, host_with_error: Option<&str>) {
// Current
let mut lock = self.current_monerod_server.write().expect("Write lock should not fail");
let current = lock.clone();
if let Some(host) = host_with_error {
if let Some(server) = current.clone() {
// If the error was reported on a previously assigned server, we do not clear the lock. This happens on
// requests that timed out after a new server has been assigned.
if !server.contains(host) {
trace!(
target: LOG_TARGET, "A new monerod server has already been assigned. Current: '{}', host with \
error: '{}'",
server, host
);
return;
}
}
}
*lock = None;
// Last assigned
if let Some(server) = last_assigned_server {
Expand Down Expand Up @@ -614,22 +630,25 @@ impl InnerService {
return Ok(None);
}
// Return the previously qualified monerod URL if it exists
let mut parse_error = None;
let mut busy_qualifying = 0;
let start_reading_lock_time = Instant::now();
loop {
let lock = {
let lock_contents = {
self.current_monerod_server
.read()
.expect("Read lock should not fail")
.clone()
};
if let Some(server) = lock {
if let Some(server) = lock_contents {
// Give some time for the server to be qualified
if server == BUSY_QUALIFYING {
let time_lapsed = start_reading_lock_time.elapsed();
if time_lapsed > self.config.monerod_connection_timeout {
return Err(MmProxyError::ServersUnavailable(BUSY_QUALIFYING.to_string()));
return if self.config.monerod_fallback == MonerodFallback::StaticWhenMonerodFails {
Ok(None)
} else {
Err(MmProxyError::ServersUnavailable(BUSY_QUALIFYING.to_string()))
};
}
trace!(
target: LOG_TARGET,
Expand All @@ -642,24 +661,24 @@ impl InnerService {
// Parse the URL if qualifying is done
match format!("{}{}", server, request_uri.path()).parse::<Url>() {
Ok(url) => return Ok(Some(url)),
Err(e) => {
parse_error = Some(e);
break;
Err(err) => {
return if format!("{}/getheight", server).parse::<Url>().is_ok() {
Err(MmProxyError::InvalidMonerodRequest(request_uri.path().to_string()))
} else {
self.clear_current_monerod_server_lock(None, None);
Err(err.into())
};
},
}
}
// If no server is qualified, proceed with qualifying
break;
}
if let Some(e) = parse_error {
self.clear_current_monerod_server_lock(None);
return Err(e.into());
}

// Set the "busy qualifying" state
self.set_current_monerod_server_lock_busy();

// Create an iterator to query the list twice before giving up, starting after the last used entry
// Create an iterator to query the list, starting after the last used entry
let last_used_url = {
let lock = self
.last_assigned_monerod_url
Expand All @@ -676,21 +695,25 @@ impl InnerService {
.unwrap_or(0);
pos = (pos + 1) % self.config.monerod_url.len();
let (left, right) = self.config.monerod_url.split_at_checked(pos).ok_or_else(|| {
self.clear_current_monerod_server_lock(None);
MmProxyError::ConversionError("Invalid utf 8 url".to_string())
self.clear_current_monerod_server_lock(Some(self.config.monerod_url[0].as_str()), None);
MmProxyError::ConversionError("last_used_url".to_string())
})?;
let left = left.to_vec();
let right = right.to_vec();
let iter = right.iter().chain(left.iter()).chain(right.iter()).chain(left.iter());
let iter = right.iter().chain(left.iter());

// Lock the current and last monerod server into the first available server
for server in iter {
let start = Instant::now();
let url = match format!("{}{}", server, request_uri.path()).parse::<Url>() {
Ok(val) => val,
Err(e) => {
self.clear_current_monerod_server_lock(Some(server));
return Err(e.into());
self.clear_current_monerod_server_lock(Some(server), None);
return if format!("{}/getheight", server).parse::<Url>().is_ok() {
Err(MmProxyError::InvalidMonerodRequest(request_uri.path().to_string()))
} else {
Err(e.into())
};
},
};
let pos = self.config.monerod_url.iter().position(|x| x == server).unwrap_or(0);
Expand All @@ -712,14 +735,15 @@ impl InnerService {
// This approach is used to verify the server's availability without needing a valid request body.
Ok(response) => {
self.update_monerod_server_locks(server);
let data_len = match response {
Ok(data) => data.content_length().unwrap_or_default(),
Err(_) => 0,
};
info!(
target: LOG_TARGET,
"Monerod server available (response in {:.2?}, {} bytes): {}",
start.elapsed(), data_len, url.as_str()
start.elapsed(),
match response {
Ok(data) => data.content_length().unwrap_or_default(),
Err(_) => 0,
},
url.as_str()
);
return Ok(Some(url));
},
Expand All @@ -729,26 +753,28 @@ impl InnerService {
"Monerod server unavailable (timeout in {:.2?}): {}",
start.elapsed(), url.as_str()
);
self.clear_current_monerod_server_lock(Some(server));
if self.config.monerod_fallback == MonerodFallback::StaticWhenMonerodFails {
self.clear_current_monerod_server_lock(Some(server), None);
return Ok(None);
}
},
}
}

// Clear the "busy qualifying" state
self.clear_current_monerod_server_lock(None);
self.clear_current_monerod_server_lock(None, None);
Err(MmProxyError::ServersUnavailable(format!("{}", self.config.monerod_url)))
}

/// Proxy a request received by this server to Monerod
#[allow(clippy::too_many_lines)]
async fn proxy_request_to_monerod(
&self,
request: Request<Bytes>,
monerod_method: MonerodMethod,
) -> Result<(Request<Bytes>, Response<json::Value>), MmProxyError> {
trace!(target: LOG_TARGET, "proxy_request_to_monerod: '{}'", monerod_method);
let trace_id = random::<u64>();
trace!(target: LOG_TARGET, "proxy_request_to_monerod: '{}' (trace_id: {})", monerod_method, trace_id);

// This is a cheap clone of the request body
let body: Bytes = request.body().clone();
Expand All @@ -767,7 +793,11 @@ impl InnerService {
None => host.parse()?,
};
headers.insert("host", host);
debug!(target: LOG_TARGET, "Host header updated to match monerod_uri. Request headers: {:?}", headers);
debug!(
target: LOG_TARGET,
"Host header updated to match monerod_uri. Request headers: {:?} (trace_id: {})",
headers, trace_id
);
}
let mut builder = self
.http_client
Expand All @@ -780,17 +810,20 @@ impl InnerService {
builder = builder.basic_auth(&self.config.monerod_username, Some(&self.config.monerod_password));
}

debug!(target: LOG_TARGET, "[monerod] '{}' request: {} {}", monerod_method, request.method(), monerod_url);
debug!(
target: LOG_TARGET,
"[monerod] '{}' request: {} {} (trace_id: {})",
monerod_method, request.method(), monerod_url, trace_id
);

if self_select_response {
let accept_response = self_select_submit_block_monerod_response(request_id);
convert_json_to_hyper_json_response(accept_response, StatusCode::OK, monerod_url.clone()).await?
} else {
// Send the request to the current monerod server
match timeout(
self.config.monerod_connection_timeout,
builder.body(body.clone()).send(),
)
match timeout(self.config.monerod_connection_timeout, async {
builder.body(body.clone()).send().await
})
.await
{
Ok(response) => match response.map_err(MmProxyError::MonerodRequestFailed) {
Expand All @@ -800,14 +833,22 @@ impl InnerService {
hyper_json_response
},
Err(e) => {
warn!(target: LOG_TARGET, "[monerod] '{}' request response '{}'", monerod_method, e);
self.handle_monerod_error_response(monerod_method, request_id, e)?
warn!(
target: LOG_TARGET,
"[monerod] '{}' request response '{}' (trace_id: {})",
monerod_method, e, trace_id
);
self.handle_monerod_error_response(monerod_method, request_id, monerod_url.host_str(), e)?
},
},
Err(e) => {
let err = MmProxyError::MonerodTimeout(e.to_string());
warn!(target: LOG_TARGET, "[monerod] '{}' request response '{}'", monerod_method, err);
self.handle_monerod_error_response(monerod_method, request_id, err)?
warn!(
target: LOG_TARGET,
"[monerod] '{}' request response '{}' (trace_id: {})",
monerod_method, err, trace_id
);
self.handle_monerod_error_response(monerod_method, request_id, monerod_url.host_str(), err)?
},
}
}
Expand All @@ -825,7 +866,7 @@ impl InnerService {

debug!(
target: LOG_TARGET,
"[monerod] '{}' response status = {},{} response time: {}ms",
"[monerod] '{}' response status = {},{} trace_id: {}, response time: {}ms",
monerod_method,
json_response.status(),
if json_response.body()["error"].is_null() {
Expand All @@ -835,19 +876,24 @@ impl InnerService {
.as_str()
.unwrap_or("unknown error"))
},
start.elapsed().as_millis(),
trace_id, start.elapsed().as_millis(),
);
trace!(
target: LOG_TARGET,
"[monerod] '{}' response '{:?}' (trace_id: {})",
monerod_method, json_response, trace_id
);
trace!(target: LOG_TARGET, "[monerod] '{}' response '{:?}'", monerod_method, json_response);
Ok((request, json_response))
}

fn handle_monerod_error_response(
&self,
monerod_method: MonerodMethod,
request_id: Option<i64>,
host_with_error: Option<&str>,
err: MmProxyError,
) -> Result<Response<serde_json::Value>, MmProxyError> {
self.clear_current_monerod_server_lock(None);
self.clear_current_monerod_server_lock(None, host_with_error);
if self.config.monerod_fallback == MonerodFallback::MonerodOnly {
Err(err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

use std::str::FromStr;

use log::trace;
use monero::{
blockdata::{
transaction,
Expand Down Expand Up @@ -361,7 +360,7 @@ pub(crate) fn convert_static_monerod_response_to_hyper_response(
monerod_cache_values: Option<MonerodCacheValues>,
) -> Result<hyper::Response<serde_json::Value>, MmProxyError> {
if let Some(cache_values) = monerod_cache_values.clone() {
trace!(
debug!(
target: LOG_TARGET,
"[monerod] use static response for {}, req_id: {:?}, height: {:?}, prev_hash: {:?}, timestamp: {:?}, \
seed_height: {:?}, seed_hash: {:?}",
Expand All @@ -373,7 +372,7 @@ pub(crate) fn convert_static_monerod_response_to_hyper_response(
cache_values.seed_hash.map(hex::encode),
);
} else {
trace!(target: LOG_TARGET, "[monerod] use static response for {}, req_id: {:?}", method, req_id);
debug!(target: LOG_TARGET, "[monerod] use static response for {}, req_id: {:?}", method, req_id);
}
let static_response = get_static_monerod_response(method, req_id, monerod_cache_values)?;

Expand Down

0 comments on commit 47dc1d5

Please sign in to comment.