From 8eeaea3563977aae3291a5f86d32850776ce0827 Mon Sep 17 00:00:00 2001 From: Kees Verruijt Date: Wed, 2 Oct 2024 09:27:08 +0200 Subject: [PATCH] Sync --- src/navico/data.rs | 53 ++++++++++++++++------ src/navico/mod.rs | 1 + src/navico/report.rs | 21 +++++++++ src/radar.rs | 28 +++++++----- src/radar/trail.rs | 70 ++++++++++++++++++++++------- src/settings.rs | 54 ++++++++++++++++++---- src/signalk.rs | 104 ++++++++++++++++++++++++++----------------- 7 files changed, 243 insertions(+), 88 deletions(-) diff --git a/src/navico/data.rs b/src/navico/data.rs index ab4df04..8b0ee58 100644 --- a/src/navico/data.rs +++ b/src/navico/data.rs @@ -2,12 +2,14 @@ use bincode::deserialize; use log::{debug, trace, warn}; use protobuf::Message; use serde::Deserialize; +use std::f64::consts::PI; use std::time::{SystemTime, UNIX_EPOCH}; use std::{io, time::Duration}; use tokio::net::UdpSocket; use tokio::sync::mpsc::Receiver; use tokio::time::sleep; use tokio_graceful_shutdown::SubsystemHandle; +use trail::TrailBuffer; use crate::locator::LocatorId; use crate::navico::NAVICO_SPOKE_LEN; @@ -114,12 +116,13 @@ pub struct NavicoDataReceiver { key: String, statistics: Statistics, info: RadarInfo, - buf: Vec, sock: Option, rx: tokio::sync::mpsc::Receiver, doppler: DopplerMode, pixel_to_blob: [[u8; BYTE_LOOKUP_LENGTH]; LOOKUP_SPOKE_LENGTH], replay: bool, + trails: TrailBuffer, + previous_range: u32, } impl NavicoDataReceiver { @@ -127,17 +130,19 @@ impl NavicoDataReceiver { let key = info.key(); let pixel_to_blob = Self::pixel_to_blob(&info.legend); + let trails = TrailBuffer::new(info.legend.clone(), NAVICO_SPOKES, NAVICO_SPOKE_LEN); NavicoDataReceiver { key, statistics: Statistics { broken_packets: 0 }, info: info, - buf: Vec::with_capacity(size_of::()), sock: None, rx, doppler: DopplerMode::None, pixel_to_blob, replay, + trails, + previous_range: 0, } } @@ -205,11 +210,16 @@ impl NavicoDataReceiver { self.pixel_to_blob = Self::pixel_to_blob(&legend); self.info.legend = legend; } + Some(DataUpdate::RelativeTrail(seconds)) => { + self.trails.set_relative_trails_revolutions(seconds); + } None => {} } } async fn socket_loop(&mut self, subsys: &SubsystemHandle) -> Result<(), RadarError> { + let mut buf = Vec::with_capacity(size_of::()); + loop { tokio::select! { biased; _ = subsys.on_shutdown_requested() => { @@ -218,10 +228,10 @@ impl NavicoDataReceiver { r = self.rx.recv() => { self.handle_data_update(r); }, - r = self.sock.as_ref().unwrap().recv_buf_from(&mut self.buf) => { + r = self.sock.as_ref().unwrap().recv_buf_from(&mut buf) => { match r { Ok(_) => { - self.process_frame(); + self.process_frame(&mut buf); }, Err(e) => { return Err(RadarError::Io(e)); @@ -229,7 +239,7 @@ impl NavicoDataReceiver { } }, } - self.buf.clear(); + buf.clear(); } } @@ -253,9 +263,7 @@ impl NavicoDataReceiver { } } - fn process_frame(&mut self) { - let data = &self.buf; - + fn process_frame(&mut self, data: &mut Vec) { if data.len() < FRAME_HEADER_LENGTH + RADAR_LINE_LENGTH { warn!( "UDP data frame with even less than one spoke, len {} dropped", @@ -305,7 +313,8 @@ impl NavicoDataReceiver { } if mark_full_rotation { - self.info.full_rotation(); + let ms = self.info.full_rotation(); + self.trails.set_rotation_speed(ms); } let mut bytes = Vec::new(); @@ -409,7 +418,7 @@ impl NavicoDataReceiver { } fn process_spoke( - &self, + &mut self, range: u32, angle: SpokeBearing, heading: Option, @@ -453,20 +462,38 @@ impl NavicoDataReceiver { generic_spoke.len() ); + if range != self.previous_range && range != 0 { + if self.previous_range != 0 { + let zoom_factor = self.previous_range as f64 / range as f64; + self.trails.zoom_relative_trails(zoom_factor); + } + self.previous_range = range; + } + // For now, don't send heading in replay mode, signalk-radar-client doesn't // handle it well yet. let heading = if self.replay { None - } else { + } else if heading.is_some() { heading.map(|h| (((h / 2) + angle) % (NAVICO_SPOKES as u16)) as u32) + } else { + let heading = crate::signalk::get_heading_true(); + heading.map(|h| { + (((h * NAVICO_SPOKES as f64 / (2. * PI)) as u16 + angle) % (NAVICO_SPOKES as u16)) + as u32 + }) }; - let mut message = RadarMessage::new(); - message.radar = 1; + self.trails + .update_relative_trails(angle, &mut generic_spoke); + + let mut message: RadarMessage = RadarMessage::new(); + message.radar = self.info.id as u32; let mut spoke = Spoke::new(); spoke.range = range; spoke.angle = angle as u32; spoke.bearing = heading; + (spoke.lat, spoke.lon) = crate::signalk::get_position_i64(); spoke.time = SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_millis() as u64) diff --git a/src/navico/mod.rs b/src/navico/mod.rs index 37d992f..c2c0649 100644 --- a/src/navico/mod.rs +++ b/src/navico/mod.rs @@ -42,6 +42,7 @@ const NAVICO_BEACON_ADDRESS: SocketAddr = pub enum DataUpdate { Doppler(DopplerMode), Legend(Legend), + RelativeTrail(u16), } #[derive(Deserialize, Debug, Copy, Clone)] diff --git a/src/navico/report.rs b/src/navico/report.rs index 5d5068c..61daf8e 100644 --- a/src/navico/report.rs +++ b/src/navico/report.rs @@ -397,6 +397,10 @@ impl NavicoReportReceiver { self.radars.update(&self.info); return Ok(()); } + ControlType::TargetTrails => { + self.handle_target_trails_value(cv).await?; + return Ok(()); + } _ => {} // rest is numeric } @@ -422,6 +426,23 @@ impl NavicoReportReceiver { Ok(()) } + async fn handle_target_trails_value(&mut self, cv: &ControlValue) -> Result<(), RadarError> { + let value = cv.value.parse::().unwrap_or(0); + if self + .info + .set(&cv.id, value, cv.auto, ControlState::Manual) + .is_err() + { + log::warn!("Cannot set TargetTrails to {}", value); + } + + self.data_tx + .send(DataUpdate::RelativeTrail(value as u16)) + .await + .map_err(|_| RadarError::CannotSetControlType(cv.id))?; + Ok(()) + } + async fn send_report_requests(&mut self) -> Result<(), RadarError> { self.command_sender.send_report_requests().await?; self.report_request_timeout += self.report_request_interval; diff --git a/src/radar.rs b/src/radar.rs index cd87120..f73df5a 100644 --- a/src/radar.rs +++ b/src/radar.rs @@ -15,7 +15,7 @@ use std::{ use thiserror::Error; use tokio_graceful_shutdown::SubsystemHandle; -mod trail; +pub(crate) mod trail; use crate::config::Persistence; use crate::locator::LocatorId; @@ -58,13 +58,16 @@ impl IntoResponse for RadarError { } } +// +// This order of pixeltypes is also how they are stored in the legend. +// #[derive(Serialize, Clone, Debug)] enum PixelType { - History, + Normal, TargetBorder, DopplerApproaching, DopplerReceding, - Normal, + History, } #[derive(Clone, Debug)] @@ -255,7 +258,7 @@ impl RadarInfo { self.legend = default_legend(doppler, self.pixel_values); } - pub fn full_rotation(&mut self) { + pub fn full_rotation(&mut self) -> u32 { let now = Instant::now(); let diff: Duration = now - self.rotation_timestamp; let diff = diff.as_millis() as f64; @@ -265,12 +268,17 @@ impl RadarInfo { log::debug!("{}: rotation speed {} dRPM", self.key, rpm); - let _ = self - .command_tx - .send(ControlMessage::SetValue(ControlValue::new( - ControlType::RotationSpeed, - rpm, - ))); + if diff < 3000. && diff > 600. { + let _ = self + .command_tx + .send(ControlMessage::SetValue(ControlValue::new( + ControlType::RotationSpeed, + rpm, + ))); + diff as u32 + } else { + 0 + } } /// diff --git a/src/radar/trail.rs b/src/radar/trail.rs index 8ad9d6a..41f3cd7 100644 --- a/src/radar/trail.rs +++ b/src/radar/trail.rs @@ -2,57 +2,97 @@ use super::{Legend, SpokeBearing, BLOB_HISTORY_COLORS}; const MARGIN: usize = 100; -struct TrailBuffer { +pub struct TrailBuffer { legend: Legend, spokes: usize, max_spoke_len: usize, previous_pixels_per_meter: f64, trail_size: usize, true_trails: Vec, - relative_trails: Vec, + relative_trails: Vec, + trail_length_ms: u32, + rotation_speed_ms: u32, } impl TrailBuffer { - pub fn new(legend: Legend, spokes: u32, max_spoke_len: u32) -> Self { - let trail_size: usize = max_spoke_len as usize * 2 + MARGIN * 2; + pub fn new(legend: Legend, spokes: usize, max_spoke_len: usize) -> Self { + let trail_size = max_spoke_len * 2 + MARGIN * 2; TrailBuffer { legend, - spokes: spokes as usize, - max_spoke_len: spokes as usize, + spokes, + max_spoke_len, previous_pixels_per_meter: 0., trail_size, true_trails: vec![0; trail_size * trail_size], - relative_trails: vec![0; spokes as usize * max_spoke_len as usize], + relative_trails: vec![0; spokes * max_spoke_len], + trail_length_ms: 0, + rotation_speed_ms: 0, } } - fn update_relative_trails(&mut self, angle: SpokeBearing, data: &mut Vec, len: usize) { + pub fn set_relative_trails_revolutions(&mut self, seconds: u16) { + self.trail_length_ms = seconds as u32 * 1000; + } + + pub fn set_rotation_speed(&mut self, ms: u32) { + self.rotation_speed_ms = ms; + } + + pub fn update_relative_trails(&mut self, angle: SpokeBearing, data: &mut Vec) { + if self.trail_length_ms == 0 { + return; + } + let max_trail_value = (self.trail_length_ms / self.rotation_speed_ms) as u16; + let trail = &mut self.relative_trails[angle as usize * self.max_spoke_len as usize ..(angle + 1) as usize * self.max_spoke_len]; let mut radius = 0; + if angle == 0 { + log::debug!("Spoke before trails: {:?}", data); + } + let update_relative_motion = true; // motion == TARGET_MOTION_RELATIVE; - while radius < len { - if data[radius] >= self.legend.strong_return { + while radius < data.len() { + if data[radius] >= self.legend.strong_return && data[radius] < self.legend.history_start + { trail[radius] = 1; - } else if trail[radius] > 0 && trail[radius] < BLOB_HISTORY_COLORS { - trail[radius] += 1; + } else if trail[radius] > 0 { + trail[radius] = trail[radius].wrapping_add(1); // Yes, we want overflow here after 65535 rotations } - if update_relative_motion && data[radius] == 0 { - data[radius] = self.legend.history_start + trail[radius]; + if update_relative_motion + && data[radius] == 0 + && trail[radius] > 0 + && trail[radius] < max_trail_value + { + let mut index = + (trail[radius] * BLOB_HISTORY_COLORS as u16 / max_trail_value) as u8; + if index >= BLOB_HISTORY_COLORS { + index = BLOB_HISTORY_COLORS; + } + if index < 1 { + index = 1; + } + + data[radius] = self.legend.history_start + index - 1; } radius += 1; } while radius < self.max_spoke_len { trail[radius] = 0; } + + if angle == 0 { + log::debug!("Trail after trails: {:?}", trail); + log::debug!("Spoke after trails: {:?}", data); + } } // zoom_factor > 1 -> zoom in, enlarge image - fn zoom_relative_trails(&mut self, zoom_factor: f64) { + pub fn zoom_relative_trails(&mut self, zoom_factor: f64) { let mut new_trail = vec![0; self.max_spoke_len]; let mut index_prev = 0; for spoke in 0..self.spokes { diff --git a/src/settings.rs b/src/settings.rs index ae1aeee..5b643ed 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -65,7 +65,23 @@ impl Controls { self.controls.insert(control_type, value); } - pub fn new_base(controls: HashMap) -> Self { + pub fn new_base(mut controls: HashMap) -> Self { + // Add controls that are not radar dependent + + let descriptions = HashMap::from([ + (0, "Off".to_string()), + (15, "15s".to_string()), + (30, "30s".to_string()), + (60, "1 min".to_string()), + (180, "3 min".to_string()), + (300, "5 min".to_string()), + (600, "10 min".to_string()), + ]); + + let control = Control::new_map(ControlType::TargetTrails, descriptions); + + controls.insert(ControlType::TargetTrails, control); + Controls { controls } } @@ -198,7 +214,7 @@ impl Control { control_type, name: control_type.to_string(), automatic: None, - has_off: false, + //has_off: false, default_value: min_value, min_value, max_value, @@ -227,7 +243,7 @@ impl Control { control_type, name: control_type.to_string(), automatic: Some(automatic), - has_off: false, + //has_off: false, default_value: min_value, min_value, max_value, @@ -248,7 +264,7 @@ impl Control { control_type, name: control_type.to_string(), automatic: None, - has_off: false, + //has_off: false, default_value: Some(0), min_value: Some(0), max_value: Some((descriptions.len() as i32) - 1), @@ -270,12 +286,32 @@ impl Control { }) } + pub fn new_map(control_type: ControlType, descriptions: HashMap) -> Self { + Self::new(ControlDefinition { + control_type, + name: control_type.to_string(), + automatic: None, + //has_off: false, + default_value: Some(0), + min_value: Some(0), + max_value: Some((descriptions.len() as i32) - 1), + step_value: Some(1), + wire_scale_factor: Some((descriptions.len() as i32) - 1), + wire_offset: Some(0), + unit: None, + descriptions: Some(descriptions), + valid_values: None, + is_read_only: false, + is_string_value: false, + is_send_always: false, + }) + } pub fn new_string(control_type: ControlType) -> Self { let control = Self::new(ControlDefinition { control_type, name: control_type.to_string(), automatic: None, - has_off: false, + //has_off: false, default_value: None, min_value: None, max_value: None, @@ -451,8 +487,8 @@ pub(crate) struct ControlDefinition { #[serde(skip)] pub(crate) control_type: ControlType, name: String, - #[serde(skip)] - has_off: bool, + //#[serde(skip)] + //has_off: bool, #[serde(flatten, skip_serializing_if = "Option::is_none")] automatic: Option, #[serde(skip_serializing_if = "is_false")] @@ -520,7 +556,7 @@ pub enum ControlType { TargetBoost, TargetExpansion, TargetSeparation, - // TargetTrails, + TargetTrails, // TimedIdle, // TimedRun, // TrailsMotion, @@ -605,7 +641,7 @@ impl Display for ControlType { ControlType::TargetBoost => "Target boost", ControlType::TargetExpansion => "Target expansion", ControlType::TargetSeparation => "Target separation", - // ControlType::TargetTrails => "Target trails", + ControlType::TargetTrails => "Target trails", // ControlType::TimedIdle => "Time idle", // ControlType::TimedRun => "Timed run", // ControlType::TrailsMotion => "Target trails motion", diff --git a/src/signalk.rs b/src/signalk.rs index 2b3ae22..d62b33a 100644 --- a/src/signalk.rs +++ b/src/signalk.rs @@ -10,6 +10,8 @@ use std::{ pin::Pin, sync::atomic::{AtomicBool, Ordering}, }; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; use tokio::{io::AsyncWriteExt, net::TcpStream}; use tokio_graceful_shutdown::SubsystemHandle; @@ -22,12 +24,29 @@ const TCP_SERVICE_NAME: &'static str = "_signalk-tcp._tcp.local."; const SUBSCRIBE: &'static str = "{\"context\": \"vessels.self\",\"subscribe\": [{\"path\": \"navigation.headingTrue\"},{\"path\": \"navigation.position\"}]}\r\n"; -pub(crate) static HEADING_TRUE_VALID: AtomicBool = AtomicBool::new(false); -pub(crate) static POSITION_VALID: AtomicBool = AtomicBool::new(false); -pub(crate) static HEADING_TRUE: AtomicF64 = AtomicF64::new(0.0); -pub(crate) static POSITION_LAT: AtomicF64 = AtomicF64::new(0.0); -pub(crate) static POSITION_LON: AtomicF64 = AtomicF64::new(0.0); +static HEADING_TRUE_VALID: AtomicBool = AtomicBool::new(false); +static POSITION_VALID: AtomicBool = AtomicBool::new(false); +static HEADING_TRUE: AtomicF64 = AtomicF64::new(0.0); +static POSITION_LAT: AtomicF64 = AtomicF64::new(0.0); +static POSITION_LON: AtomicF64 = AtomicF64::new(0.0); +pub(crate) fn get_heading_true() -> Option { + if HEADING_TRUE_VALID.load(Ordering::Acquire) { + return Some(HEADING_TRUE.load(Ordering::Acquire)); + } + return None; +} + +pub(crate) fn get_position_i64() -> (Option, Option) { + if POSITION_VALID.load(Ordering::Acquire) { + let lat = POSITION_LAT.load(Ordering::Acquire); + let lon = POSITION_LON.load(Ordering::Acquire); + let lat = (lat * 1e16) as i64; + let lon = (lon * 1e16) as i64; + return (Some(lat), Some(lon)); + } + return (None, None); +} pub(crate) struct NavigationData {} impl NavigationData { @@ -54,7 +73,7 @@ impl NavigationData { event = tcp_locator.recv_async() => { match event { Ok(ServiceEvent::ServiceResolved(info)) => { - log::info!("Resolved a new service: {}", info.get_fullname()); + log::debug!("Resolved a new service: {}", info.get_fullname()); let addr = info.get_addresses(); let port = info.get_port(); @@ -63,7 +82,7 @@ impl NavigationData { } }, other_event => { - log::info!("Received other event: {:?}", &other_event); + log::trace!("Received other event: {:?}", &other_event); continue; } } @@ -74,6 +93,10 @@ impl NavigationData { let stream = connect_first(known_addresses.clone()).await; match stream { Ok(stream) => { + log::info!( + "Listening to Signal K data from {}", + stream.peer_addr().unwrap() + ); if self.receive_loop(stream, &subsys).await.is_ok() { break; } @@ -82,7 +105,11 @@ impl NavigationData { } } - mdns.shutdown().unwrap(); + if let Ok(r) = mdns.shutdown() { + if let Ok(r) = r.recv() { + log::debug!("mdns_shutdown: {:?}", r); + } + } return Ok(()); } @@ -90,35 +117,31 @@ impl NavigationData { // or Ok if we are to shutdown. async fn receive_loop( &self, - mut stream: TcpStream, + stream: TcpStream, subsys: &SubsystemHandle, ) -> Result<(), RadarError> { + let mut buffered = BufReader::new(stream); + loop { - let mut buf = [0; 4096]; + let mut line = String::new(); tokio::select! { biased; _ = subsys.on_shutdown_requested() => { + log::info!("SK receive_loop done"); return Ok(()); }, - _ = stream.readable() => { - match stream.try_read(&mut buf) { - Ok(n) => { - if n > 2 { - - // Try to convert to String - if let Ok(s) = std::str::from_utf8(&buf[0..n]) { - log::info!("SK <- {}", s); - let s = s.to_string(); - if s.starts_with("{\"name\":") { - self.subscribe(&mut stream).await?; - log::info!("SK -> {}", SUBSCRIBE); - } - else { - match parse_signalk(&s) { - Err(e) => { log::warn!("{}", e)} - Ok(_) => { } - } - } + r = buffered.read_line(&mut line) => { + match r { + Ok(_) => { + log::trace!("SK <- {}", line); + if line.starts_with("{\"name\":") { + self.send_subscription(&mut buffered).await?; + log::trace!("SK -> {}", SUBSCRIBE); + } + else { + match parse_signalk(&line) { + Err(e) => { log::warn!("{}", e)} + Ok(_) => { } } } } @@ -134,7 +157,7 @@ impl NavigationData { } } - async fn subscribe(&self, stream: &mut TcpStream) -> Result<(), RadarError> { + async fn send_subscription(&self, stream: &mut BufReader) -> Result<(), RadarError> { let bytes: &[u8] = SUBSCRIBE.as_bytes(); stream.write_all(bytes).await.map_err(|e| RadarError::Io(e)) @@ -149,13 +172,12 @@ impl NavigationData { fn parse_signalk(s: &str) -> Result<(), RadarError> { match serde_json::from_str::(s) { Ok(v) => { - let values = &v["values"]; + let updates = &v["updates"][0]; + let values = &updates["values"][0]; { - log::info!("{:?}", values); + log::trace!("values: {:?}", values); - if let (Some(path), Some(value)) = - (values["path"].as_str(), values["value"].as_object()) - { + if let (Some(path), value) = (values["path"].as_str(), &values["value"]) { match path { "navigation.position" => { if let (Some(longitude), Some(latitude)) = @@ -167,8 +189,8 @@ fn parse_signalk(s: &str) -> Result<(), RadarError> { return Ok(()); } } - "navigation.trueHeading" => { - if let Some(heading) = value["heading"].as_f64() { + "navigation.headingTrue" => { + if let Some(heading) = value.as_f64() { HEADING_TRUE_VALID.store(true, Ordering::Release); HEADING_TRUE.store(heading, Ordering::Release); return Ok(()); @@ -196,7 +218,7 @@ async fn connect_to_socket(address: SocketAddr) -> Result let stream = TcpStream::connect(address) .await .map_err(|e| RadarError::Io(e))?; - log::info!("Connected to {}", address); + log::debug!("Connected to {}", address); Ok(stream) } @@ -220,7 +242,7 @@ where addresses .into_iter() .map(|address| { - log::info!("Connecting to {}", address); + log::debug!("Connecting to {}", address); Box::pin(connect_to_socket(address)) as Pin + Send>> }) .collect(); @@ -228,11 +250,11 @@ where // Use select_ok to return the first successful connection match select_ok(futures).await { Ok((stream, _)) => { - log::info!("First successful connection: {:?}", stream); + log::debug!("First successful connection: {:?}", stream); Ok(stream) } Err(e) => { - log::info!("All connections failed: {}", e); + log::debug!("All connections failed: {}", e); Err(e) } }