From 5c1f3ed5b9a4ed81b669cfeab250116a06abf8fa Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:26:11 -0400 Subject: [PATCH 01/10] hack out the measurements entirely --- services/sensor_tracker/Cargo.lock | 2 +- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/README.md | 11 +++--- services/sensor_tracker/src/logic.rs | 5 ++- services/sensor_tracker/src/model.rs | 41 ++------------------- services/sensor_tracker/src/predis.rs | 52 ++++++++------------------- 6 files changed, 26 insertions(+), 87 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 2b6df651..5acc6a59 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -514,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "sensor_tracker" -version = "0.3.0" +version = "0.4.0" dependencies = [ "dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 320a2035..74cf0f85 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sensor_tracker" -version = "0.3.0" +version = "0.4.0" authors = ["Terkwood "] edition = "2018" diff --git a/services/sensor_tracker/README.md b/services/sensor_tracker/README.md index 6930ce25..c22518a5 100644 --- a/services/sensor_tracker/README.md +++ b/services/sensor_tracker/README.md @@ -12,25 +12,24 @@ Additionally, it creates an entry in the Redis `/sensors/ This is useful for sensors generating temperature and/or pH data. -Such data comes into an MQTT topic looking like this: +Such data might come into an MQTT topic looking like this: ``` { "device_id": , "temp_f": 81.71, "temp_c": 23.45, "ph": 7.77, "ph_mv": 453.05 } ``` -If the sensor isn't, it will create the following type of stub record -for the temp sensor based on a UUID V5 ID conversion: +If the device hasn't ever been tracked, it will create the following type of stub record with an internal device ID. The internal device ID is a (namespaced) UUID V5: ``` -HMSET /sensors// create_time +HMSET /devices/ create_time ``` The operator is encouraged to later amend the hash to include -a helpful reference to the tank which the sensor serves, so +a helpful reference to the area which the sensing device serves, so that the LED status utility can properly format messages. ``` -HSET /sensors// tank 0 +HSET /devices/ area 0 ``` ### Sample redis records diff --git a/services/sensor_tracker/src/logic.rs b/services/sensor_tracker/src/logic.rs index 3dceb591..4ec56bc9 100644 --- a/services/sensor_tracker/src/logic.rs +++ b/services/sensor_tracker/src/logic.rs @@ -15,15 +15,14 @@ pub fn receive_updates( if let Some(sensor_message) = prawnqtt::deser_message(paho) { let ext_device_id: &str = &sensor_message.device_id; - sensor_message.measurements().iter().for_each(|measure| { - if let Ok(delta_events) = predis::update(redis_ctx, &measure, ext_device_id) + if let Ok(delta_events) = predis::update(redis_ctx, &sensor_message, ext_device_id) { // emit all changed keys & hash field names to redis // on the appropriate redis pub/sub topic. // these will be processed later by the gcloud_push utility predis::publish_updates(redis_ctx, delta_event_topic, delta_events) } - }); + ; } } Err(_) if !mqtt_cli.is_connected() => { diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index 3628ce5d..1bc50363 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -1,5 +1,8 @@ /// This message is emitted to an MQTT channel by /// some device with access to a temp sensor (DS18B20, etc) +/// `external_device_id` is usually reported as a +/// e.g. "28654597090000e4" + #[derive(Serialize, Deserialize, Debug)] pub struct SensorMessage { pub device_id: String, @@ -13,45 +16,7 @@ pub struct SensorMessage { pub heat_index_f: Option, } -/// `external_device_id` is usually reported as a -/// e.g. "28654597090000e4" -impl SensorMessage { - pub fn measurements(&self) -> Vec { - let mut v: Vec = vec![]; - if let ( - Some(humidity), - Some(status), - Some(temp_f), - Some(temp_c), - Some(heat_index_f), - Some(heat_index_c), - ) = ( - self.humidity, - &self.status, - self.temp_f, - self.temp_c, - self.heat_index_f, - self.heat_index_c, - ) { - v.push(Measurement::DHT { - status: status.to_owned(), - humidity, - temp_f, - temp_c, - heat_index_f, - heat_index_c, - }) - } else if let (Some(temp_f), Some(temp_c)) = (self.temp_f, self.temp_c) { - v.push(Measurement::Temp { temp_f, temp_c }) - } - if let (Some(ph), Some(ph_mv)) = (self.ph, self.ph_mv) { - v.push(Measurement::PH { ph, ph_mv }) - } - - v - } -} #[derive(Debug)] pub enum Measurement { diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 3d60ad78..841b6259 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -13,20 +13,20 @@ use uuid::Uuid; /// Will create a new sensor record for this device if one does not already exist. pub fn update<'a, 'b>( redis_ctx: &RedisContext, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, ext_device_id: &str, ) -> Result, redis::RedisError> { let mut delta_events: Vec = vec![]; - println!("Received redis {} update: {:?}", measure.name(), measure); + println!("Received redis update: {:?}", sensor_message); - let ext_device_namespace = &redis_ctx.get_external_device_namespace(measure.name())?; + let ext_device_namespace = &redis_ctx.get_external_device_namespace()?; let device_id = internal_device_id(ext_device_id, ext_device_namespace).unwrap(); println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; - let sensor_set_event = update_sensor_set(redis_ctx, rn, measure, device_id); + let sensor_set_event = update_sensor_set(redis_ctx, rn, sensor_message, device_id); if let Some(e) = sensor_set_event { delta_events.push(e) } @@ -42,12 +42,8 @@ pub fn update<'a, 'b>( if let Ok(v) = tank_and_area_and_update_count { // Tank associated with this sensor? let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { - (Some(tank_num), _) => { - update_container_hash(redis_ctx, Container::Tanks, tank_num, &measure) - } - (_, Some(area_num)) => { - update_container_hash(redis_ctx, Container::Areas, area_num, &measure) - } + (Some(tank_num), _) => update_area_hash(redis_ctx, tank_num, &measure), + (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; @@ -93,39 +89,19 @@ fn update_sensor_set( } } -enum Container { - Tanks, - Areas, -} - -impl Container { - pub fn to_string(self) -> String { - match self { - Container::Tanks => "tanks".to_string(), - Container::Areas => "areas".to_string(), - } - } -} - -fn update_container_hash( +fn update_area_hash( redis_ctx: &RedisContext, - container: Container, container_num: &u64, measure: &model::Measurement, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's // current reading. - let container_key = format!( - "{}/{}/{}", - redis_ctx.namespace, - container.to_string(), - container_num - ); + let area_key = format!("{}/areas/{}", redis_ctx.namespace, container_num); - let container_measure_count: Result, _> = redis_ctx + let area_measure_count: Result, _> = redis_ctx .conn - .hget(&container_key, &format!("{}_update_count", measure.name())); + .hget(&area_key, &format!("{}_update_count", measure.name())); let uc_name = format!("{}_update_count", measure.name()); let ut_name = format!("{}_update_time", measure.name()); @@ -134,7 +110,7 @@ fn update_container_hash( data.push(( &uc_name, - container_measure_count + area_measure_count .unwrap_or(None) .map(|u| u + 1) .unwrap_or(1) @@ -143,20 +119,20 @@ fn update_container_hash( data.push((&ut_name, epoch_secs().to_string())); ( - redis_ctx.conn.hset_multiple(&container_key, &data[..]), + redis_ctx.conn.hset_multiple(&area_key, &data[..]), data.iter().map(|(a, _)| *a).collect(), ) }; match update { (Err(e), _) => { - println!("update fails for {}: {:?}", container_key, e); + println!("update fails for {}: {:?}", area_key, e); None } (Ok(_), fields) if fields.len() > 0 => { let fs = fields.iter().map(|s| s.to_string()).collect(); Some(REvent::HashUpdated { - key: container_key.to_string(), + key: area_key.to_string(), fields: fs, }) } From 44cec69876aa19b4b06cfed0cb206e8afde22fee Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:33:08 -0400 Subject: [PATCH 02/10] hack it up --- services/redis_context/Cargo.lock | 4 +++- services/redis_context/Cargo.toml | 4 ++-- services/redis_context/src/lib.rs | 13 ++++--------- services/sensor_tracker/Cargo.lock | 6 ++---- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/src/predis.rs | 28 +++++++++++++-------------- 6 files changed, 26 insertions(+), 31 deletions(-) diff --git a/services/redis_context/Cargo.lock b/services/redis_context/Cargo.lock index 5f8415c7..2b8eb013 100644 --- a/services/redis_context/Cargo.lock +++ b/services/redis_context/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "ascii" version = "0.7.1" @@ -278,7 +280,7 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" +version = "0.2.0" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/redis_context/Cargo.toml b/services/redis_context/Cargo.toml index 0a8cec4f..44849648 100644 --- a/services/redis_context/Cargo.toml +++ b/services/redis_context/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "redis_context" -version = "0.1.0" +version = "0.2.0" authors = ["Terkwood "] edition = "2018" [dependencies] redis = "0.9" -uuid = { version = "0.7", features = ["v4", "v5"] } # v4 is random, v5 is name-based +uuid = { version = "0.7", features = ["v4", "v5"] } diff --git a/services/redis_context/src/lib.rs b/services/redis_context/src/lib.rs index 238a3beb..49fac269 100644 --- a/services/redis_context/src/lib.rs +++ b/services/redis_context/src/lib.rs @@ -24,12 +24,9 @@ impl RedisContext { } /// This is the "name" field that will be used to form a V5 UUID - pub fn get_external_device_namespace( - &self, - device_type: String, - ) -> Result { + pub fn get_external_device_namespace(&self) -> Result { let key = format!("{}/external_device_namespace", self.namespace); - let r: Option = self.conn.hget(&key, device_type)?; + let r: Option = self.conn.get(&key)?; match r { None => { @@ -44,20 +41,18 @@ impl RedisContext { } } - pub enum ExternalDevice { Temp, PH, - Unknown + Unknown, } - impl From for ExternalDevice { fn from(device_type: String) -> Self { match device_type.to_lowercase().trim() { "temp" => ExternalDevice::Temp, "ph" => ExternalDevice::PH, - _ => ExternalDevice::Unknown + _ => ExternalDevice::Unknown, } } } diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 5acc6a59..099b1317 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -422,8 +422,7 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#7087164ca6a90be0389e466a216cdc4cc55ae781" +version = "0.2.0" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -520,7 +519,7 @@ dependencies = [ "envy 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "paho-mqtt 0.4.0 (git+https://github.com/Terkwood/paho.mqtt.rust.git?rev=1b72f84)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", + "redis_context 0.2.0", "redis_delta 0.1.2", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", @@ -841,7 +840,6 @@ dependencies = [ "checksum rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1961a422c4d189dfb50ffa9320bf1f2a9bd54ecb92792fb9477f99a1045f3372" "checksum rand_core 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0905b6b7079ec73b314d4c748701f6931eb79fd97c668caa3f1899b22b32c6db" "checksum redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3" -"checksum redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "2069749032ea3ec200ca51e4a31df41759190a88edca0d2d86ee8bedf7073341" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 74cf0f85..0438322c 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -9,7 +9,7 @@ dotenv = "0.13" envy = "0.3" paho-mqtt = { git = "https://github.com/Terkwood/paho.mqtt.rust.git", rev = "1b72f84" } redis = "0.9" -redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } +redis_context = { path = "../redis_context" } redis_delta = { path = "../redis_delta" } serde = "1.0" serde_derive = "1.0" diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 841b6259..9fa351a7 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -32,7 +32,7 @@ pub fn update<'a, 'b>( } // lookup associated tank - let sensor_hash_key = &format!("{}/sensors/{}/{}", rn, measure.name(), device_id).to_string(); + let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); let tank_and_area_and_update_count: Result>, _> = redis_ctx.conn.hget( sensor_hash_key, @@ -54,10 +54,10 @@ pub fn update<'a, 'b>( // record a hit on the updates that the sensor has seen // and also record the most recent measurement on the record // for this individual sensor - let sensor_updated = update_sensor_hash( + let sensor_updated = update_device_hash( redis_ctx, sensor_hash_key, - measure, + sensor_message, v.get(2).unwrap_or(&None), ); @@ -72,18 +72,18 @@ pub fn update<'a, 'b>( fn update_sensor_set( redis_ctx: &RedisContext, rn: &str, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, device_id: Uuid, ) -> Option { - let set_sensor_type_key = format!("{}/sensors/{}", rn, measure.name()); + let set_device_key = format!("{}/devices", rn); // add to the member set if it doesn't already exist - let sensors_added: Result = redis_ctx + let devices_added: Result = redis_ctx .conn - .sadd(&set_sensor_type_key, &format!("{}", device_id)); + .sadd(&set_device_key, &format!("{}", device_id)); - match sensors_added { + match devices_added { Ok(n) if n > 0 => Some(REvent::SetUpdated { - key: set_sensor_type_key, + key: set_device_key, }), _ => None, } @@ -179,10 +179,10 @@ fn ensure_sensor_hash_exists( result } -fn update_sensor_hash( +fn update_device_hash( redis_ctx: &RedisContext, - sensor_hash_key: &str, - measure: &model::Measurement, + device_hash_key: &str, + measure: &model::SensorMessage, maybe_sensor_upd_count: &Option, ) -> Option { let upd_c = &format!("{}_update_count", measure.name()); @@ -197,9 +197,9 @@ fn update_sensor_hash( let ut = &format!("{}_update_time", measure.name()); data.push((ut, epoch_secs().to_string())); - let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(sensor_hash_key, &data[..]); + let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(device_hash_key, &data[..]); if let Err(e) = redis_result { - println!("couldn't update sensor record {}: {:?}", sensor_hash_key, e); + println!("couldn't update device record {}: {:?}", device_hash_key, e); None } else { let mut fields: Vec = vec![]; From 0df3c52d72ce7101d18acf7310b97caa711aeb0f Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:37:15 -0400 Subject: [PATCH 03/10] hack it up --- services/redis_context/src/lib.rs | 16 ----------- services/sensor_tracker/src/predis.rs | 41 +++++++++++++-------------- 2 files changed, 20 insertions(+), 37 deletions(-) diff --git a/services/redis_context/src/lib.rs b/services/redis_context/src/lib.rs index 49fac269..17d6fb18 100644 --- a/services/redis_context/src/lib.rs +++ b/services/redis_context/src/lib.rs @@ -40,19 +40,3 @@ impl RedisContext { } } } - -pub enum ExternalDevice { - Temp, - PH, - Unknown, -} - -impl From for ExternalDevice { - fn from(device_type: String) -> Self { - match device_type.to_lowercase().trim() { - "temp" => ExternalDevice::Temp, - "ph" => ExternalDevice::PH, - _ => ExternalDevice::Unknown, - } - } -} diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 9fa351a7..94c76dac 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -31,20 +31,19 @@ pub fn update<'a, 'b>( delta_events.push(e) } - // lookup associated tank + // lookup associated area let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); - let tank_and_area_and_update_count: Result>, _> = redis_ctx.conn.hget( + let area_and_sensors_update_count: Result>, _> = redis_ctx.conn.hget( sensor_hash_key, - vec!["tank", "area", &format!("{}_update_count", measure.name())], + vec!["area", "sensors_update_count"], ); - if let Ok(v) = tank_and_area_and_update_count { + if let Ok(v) = area_and_sensors_update_count { // Tank associated with this sensor? - let revent = match (v.get(0).unwrap_or(&None), v.get(1).unwrap_or(&None)) { - (Some(tank_num), _) => update_area_hash(redis_ctx, tank_num, &measure), - (_, Some(area_num)) => update_area_hash(redis_ctx, area_num, &measure), - (None, None) => ensure_sensor_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), + let revent = match v.get(0).unwrap_or(&None) { + Some(area_num) => update_area_hash(redis_ctx, area_num, &sensor_message), + None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; if let Some(ev) = revent { @@ -92,7 +91,7 @@ fn update_sensor_set( fn update_area_hash( redis_ctx: &RedisContext, container_num: &u64, - measure: &model::Measurement, + sensor_message: &model::SensorMessage, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's @@ -101,12 +100,12 @@ fn update_area_hash( let area_measure_count: Result, _> = redis_ctx .conn - .hget(&area_key, &format!("{}_update_count", measure.name())); + .hget(&area_key, &format!("sensors_update_count")); - let uc_name = format!("{}_update_count", measure.name()); - let ut_name = format!("{}_update_time", measure.name()); + let uc_name = format!("sensors_update_count"); + let ut_name = format!("sensors_update_time"); let update: (Result, Vec<&str>) = { - let mut data: Vec<(&str, String)> = measure.to_redis(); + let mut data: Vec<(&str, String)> = sensor_message.to_redis(); data.push(( &uc_name, @@ -140,9 +139,9 @@ fn update_area_hash( } } -fn ensure_sensor_hash_exists( +fn ensure_device_hash_exists( redis_ctx: &RedisContext, - sensor_hash_key: &str, + device_hash_key: &str, ext_device_id_str: &str, ) -> Option { // We know that there's no associated "tank" @@ -154,7 +153,7 @@ fn ensure_sensor_hash_exists( redis_ctx .conn - .exists(sensor_hash_key) + .exists(device_hash_key) .iter() .for_each(|e: &bool| { if !e { @@ -166,7 +165,7 @@ fn ensure_sensor_hash_exists( ][..]; // new sensor, make note of when it is created let _: Result, _> = - redis_ctx.conn.hset_multiple(sensor_hash_key, field_vals); + redis_ctx.conn.hset_multiple(device_hash_key, field_vals); let fields = vec![cf, ed]; result = Some(REvent::HashUpdated { @@ -182,10 +181,10 @@ fn ensure_sensor_hash_exists( fn update_device_hash( redis_ctx: &RedisContext, device_hash_key: &str, - measure: &model::SensorMessage, + sensor_message: &model::SensorMessage, maybe_sensor_upd_count: &Option, ) -> Option { - let upd_c = &format!("{}_update_count", measure.name()); + let upd_c = &format!("sensors_update_count"); let mut data: Vec<(&str, String)> = vec![( upd_c, maybe_sensor_upd_count @@ -193,8 +192,8 @@ fn update_device_hash( .unwrap_or(1) .to_string(), )]; - data.extend(measure.to_redis()); - let ut = &format!("{}_update_time", measure.name()); + data.extend(sensor_message.to_redis()); + let ut = &format!("sensors_update_time"); data.push((ut, epoch_secs().to_string())); let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(device_hash_key, &data[..]); From 1e8fa6a97e33ffc71239462fa27eb3c63d395718 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Sat, 20 Jul 2019 17:44:16 -0400 Subject: [PATCH 04/10] drop the hammer --- services/sensor_tracker/src/logic.rs | 16 +++++++------- services/sensor_tracker/src/model.rs | 31 ++++++++++++++++++++++++++- services/sensor_tracker/src/predis.rs | 13 ++++++----- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/services/sensor_tracker/src/logic.rs b/services/sensor_tracker/src/logic.rs index 4ec56bc9..09e9f756 100644 --- a/services/sensor_tracker/src/logic.rs +++ b/services/sensor_tracker/src/logic.rs @@ -15,14 +15,14 @@ pub fn receive_updates( if let Some(sensor_message) = prawnqtt::deser_message(paho) { let ext_device_id: &str = &sensor_message.device_id; - if let Ok(delta_events) = predis::update(redis_ctx, &sensor_message, ext_device_id) - { - // emit all changed keys & hash field names to redis - // on the appropriate redis pub/sub topic. - // these will be processed later by the gcloud_push utility - predis::publish_updates(redis_ctx, delta_event_topic, delta_events) - } - ; + if let Ok(delta_events) = + predis::update(redis_ctx, &sensor_message, ext_device_id) + { + // emit all changed keys & hash field names to redis + // on the appropriate redis pub/sub topic. + // these will be processed later by the gcloud_push utility + predis::publish_updates(redis_ctx, delta_event_topic, delta_events) + }; } } Err(_) if !mqtt_cli.is_connected() => { diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index 1bc50363..e24cf37c 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -16,7 +16,36 @@ pub struct SensorMessage { pub heat_index_f: Option, } - +impl SensorMessage { + pub fn to_redis(&self) -> Vec<(&str, String)> { + let mut data = vec![]; + if let Some(s) = &self.status { + data.push(("status", s.to_string())); + } + if let Some(humidity) = self.humidity { + data.push(("humidity", humidity.to_string())); + } + if let Some(tf) = self.temp_f { + data.push(("temp_f", tf.to_string())); + } + if let Some(tc) = self.temp_c { + data.push(("temp_c", tc.to_string())); + } + if let Some(hf) = self.heat_index_f { + data.push(("heat_index_f", hf.to_string())); + } + if let Some(hc) = self.heat_index_c { + data.push(("heat_index_c", hc.to_string())); + } + if let Some(ph) = self.ph { + data.push(("ph", ph.to_string())) + } + if let Some(ph_mv) = self.ph_mv { + data.push(("ph_mv", ph_mv.to_string())) + } + data + } +} #[derive(Debug)] pub enum Measurement { diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 94c76dac..ddf65e49 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -34,16 +34,15 @@ pub fn update<'a, 'b>( // lookup associated area let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); - let area_and_sensors_update_count: Result>, _> = redis_ctx.conn.hget( - sensor_hash_key, - vec!["area", "sensors_update_count"], - ); + let area_and_sensors_update_count: Result>, _> = redis_ctx + .conn + .hget(sensor_hash_key, vec!["area", "sensors_update_count"]); if let Ok(v) = area_and_sensors_update_count { // Tank associated with this sensor? let revent = match v.get(0).unwrap_or(&None) { Some(area_num) => update_area_hash(redis_ctx, area_num, &sensor_message), - None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), + None => ensure_device_hash_exists(redis_ctx, sensor_hash_key, ext_device_id), }; if let Some(ev) = revent { @@ -169,7 +168,7 @@ fn ensure_device_hash_exists( let fields = vec![cf, ed]; result = Some(REvent::HashUpdated { - key: sensor_hash_key.to_string(), + key: device_hash_key.to_string(), fields, }) } @@ -205,7 +204,7 @@ fn update_device_hash( data.iter().for_each(|(f, _)| fields.push(f.to_string())); Some(REvent::HashUpdated { - key: sensor_hash_key.to_string(), + key: device_hash_key.to_string(), fields, }) } From c8dfee2b2bc4f6c4b1750667308dbe92b0669cce Mon Sep 17 00:00:00 2001 From: Terkwood Date: Mon, 22 Jul 2019 17:07:50 -0400 Subject: [PATCH 05/10] trim --- services/sensor_tracker/src/model.rs | 68 --------------------------- services/sensor_tracker/src/predis.rs | 2 +- 2 files changed, 1 insertion(+), 69 deletions(-) diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index e24cf37c..f6bcb128 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -46,71 +46,3 @@ impl SensorMessage { data } } - -#[derive(Debug)] -pub enum Measurement { - Temp { - temp_f: f64, - temp_c: f64, - }, - PH { - ph: f64, - ph_mv: f64, - }, - /// Digital humidity and temp, e.g. DHT11 sensor - DHT { - status: String, - humidity: f64, - temp_f: f64, - temp_c: f64, - heat_index_f: f64, - heat_index_c: f64, - }, -} - -impl Measurement { - pub fn name(&self) -> String { - match self { - Measurement::Temp { - temp_f: _, - temp_c: _, - } => "temp".to_string(), - Measurement::PH { ph: _, ph_mv: _ } => "ph".to_string(), - Measurement::DHT { - status: _, - humidity: _, - temp_f: _, - temp_c: _, - heat_index_f: _, - heat_index_c: _, - } => "dht".to_string(), - } - } - - pub fn to_redis(&self) -> Vec<(&str, String)> { - match self { - Measurement::Temp { temp_f, temp_c } => vec![ - ("temp_f", temp_f.to_string()), - ("temp_c", temp_c.to_string()), - ], - Measurement::PH { ph, ph_mv } => { - vec![("ph", ph.to_string()), ("ph_mv", ph_mv.to_string())] - } - Measurement::DHT { - status, - humidity, - temp_f, - temp_c, - heat_index_f, - heat_index_c, - } => vec![ - ("status", status.to_string()), - ("humidity", humidity.to_string()), - ("temp_f", temp_f.to_string()), - ("temp_c", temp_c.to_string()), - ("heat_index_f", heat_index_f.to_string()), - ("heat_index_c", heat_index_c.to_string()), - ], - } - } -} diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index ddf65e49..55e39c5b 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -70,7 +70,7 @@ pub fn update<'a, 'b>( fn update_sensor_set( redis_ctx: &RedisContext, rn: &str, - sensor_message: &model::SensorMessage, + sensor_message: &model::SensorMessage, // TODO wat ? device_id: Uuid, ) -> Option { let set_device_key = format!("{}/devices", rn); From 2a3a3788c2d46d00985084cdf65c66c3ce0bf00b Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 25 Jul 2019 12:44:14 -0400 Subject: [PATCH 06/10] hack version --- services/sensor_tracker/Cargo.lock | 11 ++++++----- services/sensor_tracker/Cargo.toml | 3 ++- services/sensor_tracker/src/predis.rs | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 1442ddb8..1b762e12 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -679,8 +679,8 @@ dependencies = [ [[package]] name = "redis_context" -version = "0.1.0" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#f92a34261f726b904498ff8b34ee5f1a0e810c09" +version = "0.2.0" +source = "git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data#c8dfee2b2bc4f6c4b1750667308dbe92b0669cce" dependencies = [ "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -689,8 +689,9 @@ dependencies = [ [[package]] name = "redis_delta" version = "0.1.2" -source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#f92a34261f726b904498ff8b34ee5f1a0e810c09" +source = "git+https://github.com/Terkwood/prawnalith/?branch=unstable#1a6bf54663c794eb540172b16c2c5a01b674d2a4" dependencies = [ + "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -838,7 +839,7 @@ dependencies = [ "envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", + "redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)", "redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", "rumqtt 0.30.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1332,7 +1333,7 @@ dependencies = [ "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" "checksum redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3" -"checksum redis_context 0.1.0 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" +"checksum redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)" = "" "checksum redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum regex 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6b23da8dfd98a84bd7e08700190a5d9f7d2d38abd4369dd1dae651bc40bfd2cc" diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 20fa1a1a..9fc363c1 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -13,7 +13,8 @@ envy = "*" rand_core="0.2.2" rumqtt = "*" redis = "^0.9" -redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } +# TODO restore branch +redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "feature/simplified-sensor-tracker-data" } redis_delta = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } serde = "*" serde_derive = "*" diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 55e39c5b..0dcf1daf 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -26,7 +26,7 @@ pub fn update<'a, 'b>( println!("\tDevice ID (internal): {}", device_id); let rn = &redis_ctx.namespace; - let sensor_set_event = update_sensor_set(redis_ctx, rn, sensor_message, device_id); + let sensor_set_event = update_devices_set(redis_ctx, rn, sensor_message, device_id); if let Some(e) = sensor_set_event { delta_events.push(e) } @@ -67,7 +67,7 @@ pub fn update<'a, 'b>( Ok(delta_events) } -fn update_sensor_set( +fn update_devices_set( redis_ctx: &RedisContext, rn: &str, sensor_message: &model::SensorMessage, // TODO wat ? From 7adfe29b0d85f34c0bf3073400aa7bd3d3e4ca02 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 25 Jul 2019 12:46:43 -0400 Subject: [PATCH 07/10] =?UTF-8?q?Update=20README.md=20=F0=9F=93=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/sensor_tracker/README.md | 32 +++++++++++-------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/services/sensor_tracker/README.md b/services/sensor_tracker/README.md index 5ae67d93..7a8ffffb 100644 --- a/services/sensor_tracker/README.md +++ b/services/sensor_tracker/README.md @@ -20,26 +20,16 @@ Such data might come into an MQTT topic looking like this: If the device hasn't ever been tracked, it will create the following type of stub record with an internal device ID. The internal device ID is a (namespaced) UUID V5: -<<<<<<< HEAD -``` -HMSET /devices/ create_time -======= ```text HMSET /sensors// create_time ->>>>>>> unstable ``` The operator is encouraged to later amend the hash to include a helpful reference to the area which the sensing device serves, so that the LED status utility can properly format messages. -<<<<<<< HEAD -``` -HSET /devices/ area 0 -======= ```text HSET /sensors// tank 0 ->>>>>>> unstable ``` ## Docker builds @@ -50,9 +40,9 @@ See `build.sh` and `run.sh` for entry points. #### temp sensor -`> hgetall namespace/sensors/temp/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` +`> hgetall namespace/devices/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` -``` +```text 1) "create_time" 2) "1540598539" 3) "ext_device_id" @@ -69,11 +59,11 @@ See `build.sh` and `run.sh` for entry points. 14) "1" ``` -**pH sensor** +#### pH sensor -`> hgetall namespace/sensors/ph/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` +`> hgetall namespace/devices/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa` -``` +```text 1) "low_ph_ref" 2) "4.00" 3) "low_mv" @@ -96,19 +86,19 @@ See `build.sh` and `run.sh` for entry points. 20) "286cbc98090000bd" ``` -**tank counter** +#### area counter -`> get namespace/tanks` +`> get namespace/areas` -``` +```text "1" ``` -**tank hash** +#### area hash -`> hgetall namespace/tanks/1` +`> hgetall namespace/areas/1` -``` +```text hgetall namespace/tanks/1 1) "temp_f" 2) "81.16" From 8b4a3675432a73001adee140fbc99a6d3c07e5c7 Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 25 Jul 2019 14:04:39 -0400 Subject: [PATCH 08/10] hack cargo branch --- services/sensor_tracker/Cargo.lock | 7 ++++--- services/sensor_tracker/Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index 1b762e12..a870e2c4 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -680,8 +680,9 @@ dependencies = [ [[package]] name = "redis_context" version = "0.2.0" -source = "git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data#c8dfee2b2bc4f6c4b1750667308dbe92b0669cce" +source = "git+https://github.com/Terkwood/prawnalith/?branch=fix/data#7adfe29b0d85f34c0bf3073400aa7bd3d3e4ca02" dependencies = [ + "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -839,7 +840,7 @@ dependencies = [ "envy 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand_core 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)", + "redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=fix/data)", "redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)", "rumqtt 0.30.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.97 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1333,7 +1334,7 @@ dependencies = [ "checksum rand_xorshift 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" "checksum redis 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c747d743d48233f9bc3ed3fb00cb84c1d98d8c7f54ed2d4cca9adf461a7ef3" -"checksum redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=feature/simplified-sensor-tracker-data)" = "" +"checksum redis_context 0.2.0 (git+https://github.com/Terkwood/prawnalith/?branch=fix/data)" = "" "checksum redis_delta 0.1.2 (git+https://github.com/Terkwood/prawnalith/?branch=unstable)" = "" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" "checksum regex 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6b23da8dfd98a84bd7e08700190a5d9f7d2d38abd4369dd1dae651bc40bfd2cc" diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index 9fc363c1..cd16f6ee 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -14,7 +14,7 @@ rand_core="0.2.2" rumqtt = "*" redis = "^0.9" # TODO restore branch -redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "feature/simplified-sensor-tracker-data" } +redis_context = { git = "https://github.com/Terkwood/prawnalith/", branch = "fix/data" } redis_delta = { git = "https://github.com/Terkwood/prawnalith/", branch = "unstable" } serde = "*" serde_derive = "*" From 5615d371b45990ba245fc37a9815a0b329717cd0 Mon Sep 17 00:00:00 2001 From: Terkwood <38859656+Terkwood@users.noreply.github.com> Date: Thu, 1 Aug 2019 18:45:47 -0400 Subject: [PATCH 09/10] Update data model in LED status helper (#113) --- services/led_status_helper/Cargo.lock | 17 +- services/led_status_helper/Cargo.toml | 4 +- services/led_status_helper/src/main.rs | 259 ++++++++---------------- services/led_status_helper/src/model.rs | 11 + services/sensor_tracker/Cargo.lock | 2 +- services/sensor_tracker/Cargo.toml | 2 +- services/sensor_tracker/src/logic.rs | 4 +- services/sensor_tracker/src/model.rs | 4 +- services/sensor_tracker/src/predis.rs | 8 +- 9 files changed, 111 insertions(+), 200 deletions(-) create mode 100644 services/led_status_helper/src/model.rs diff --git a/services/led_status_helper/Cargo.lock b/services/led_status_helper/Cargo.lock index 52c57310..52ea05ed 100644 --- a/services/led_status_helper/Cargo.lock +++ b/services/led_status_helper/Cargo.lock @@ -112,7 +112,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "failure 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -211,11 +211,8 @@ dependencies = [ [[package]] name = "lazy_static" -version = "1.1.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", -] [[package]] name = "lazycell" @@ -224,7 +221,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "led_status_helper" -version = "0.3.0" +version = "0.3.1" dependencies = [ "dotenv 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -352,7 +349,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", "foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "openssl-sys 0.9.36 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -653,7 +650,7 @@ name = "thread_local" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -699,7 +696,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -863,7 +860,7 @@ dependencies = [ "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -"checksum lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca488b89a5657b0a2ecd45b95609b3e848cf1755da332a0da46e2b2b1cb371a7" +"checksum lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" "checksum lazycell 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddba4c30a78328befecec92fc94970e53b3ae385827d28620f0f5bb2493081e0" "checksum libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)" = "76e3a3ef172f1a0b9a9ff0dd1491ae5e6c948b94479a3021819ba7d860c8645d" "checksum lock_api 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "775751a3e69bde4df9b38dd00a1b5d6ac13791e4223d4a0506577f0dd27cfb7a" diff --git a/services/led_status_helper/Cargo.toml b/services/led_status_helper/Cargo.toml index 89f4c4f5..a46f0222 100644 --- a/services/led_status_helper/Cargo.toml +++ b/services/led_status_helper/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "led_status_helper" -version = "0.3.0" +version = "0.3.1" authors = ["Terkwood "] edition = "2018" @@ -11,4 +11,4 @@ rumqtt = "0.10.1" redis = "0.9.1" serde = "1.0.79" serde_derive = "1.0.79" -uuid = { version = "0.7", features = ["v4"] } \ No newline at end of file +uuid = { version = "0.7", features = ["v4"] } diff --git a/services/led_status_helper/src/main.rs b/services/led_status_helper/src/main.rs index 70fdc306..10e238f8 100644 --- a/services/led_status_helper/src/main.rs +++ b/services/led_status_helper/src/main.rs @@ -5,6 +5,8 @@ extern crate dotenv; extern crate envy; extern crate redis; +mod model; + use std::slice::SliceConcatExt; use std::time; @@ -13,6 +15,8 @@ use rumqtt::{MqttClient, MqttOptions, QoS}; use uuid::Uuid; +use model::SensorReadings; + #[derive(Deserialize, Debug, Clone)] struct Config { redis_auth: Option, @@ -32,53 +36,22 @@ fn generate_mq_client_id() -> String { format!("led_status/{}", Uuid::new_v4()) } -fn get_num_containers( - conn: &redis::Connection, - namespace: &str, - container: Container, -) -> Result { - conn.get(format!("{}/{}", namespace, container.to_string())) -} - -enum Container { - Tanks, - Areas, -} +const AREAS: &str = "areas"; -impl Container { - pub fn to_string(self) -> String { - match self { - Container::Tanks => "tanks".to_string(), - Container::Areas => "areas".to_string(), - } - } +fn get_num_areas(conn: &redis::Connection, namespace: &str) -> Result { + conn.get(format!("{}/{}", namespace, AREAS.to_string())) } struct Temp { f: f64, c: f64, - update_time: Option, -} - -/// Digital humidity and temp, e.g. DHT11 sensor -struct DHT { - humidity: f64, - temp_f: f64, - temp_c: f64, - heat_index_f: f64, - heat_index_c: f64, - update_time: Option, -} - -struct PH { - val: f64, - update_time: Option, } struct Staleness { warning: String, deadline_seconds: u32, } + impl Staleness { fn text(&self, maybe_time: Option) -> String { match maybe_time { @@ -108,11 +81,12 @@ fn c_to_f(temp_c: f64) -> f64 { } const NAN: f64 = -255.0; + fn get_area_data( conn: &redis::Connection, area: i64, namespace: &str, -) -> Result, redis::RedisError> { +) -> Result, redis::RedisError> { let numbers: Vec> = conn.hget( format!("{}/areas/{}", namespace, area), vec![ @@ -121,87 +95,44 @@ fn get_area_data( "temp_c", "heat_index_f", "heat_index_c", + "ph", ], )?; // A redis string let update_time_vec: Option = conn.hget( format!("{}/areas/{}", namespace, area), - vec!["dht_update_time"], + vec!["sensors_update_time"], )?; - let (humidity, init_temp_f, init_temp_c, heat_index_f, heat_index_c) = ( + let (humidity, init_temp_f, init_temp_c, heat_index_f, heat_index_c, ph) = ( numbers.get(0), numbers.get(1), numbers.get(2), numbers.get(3), numbers.get(4), + numbers.get(5), ); let update_time = update_time_vec.map(|s| s.parse::().unwrap_or(0)); - let temp = safe_temp(init_temp_f, init_temp_c, update_time); + let temp = safe_temp(init_temp_f, init_temp_c).map(|t| (t.f, t.c)); - let (temp_f, temp_c) = temp.map(|t| (t.f, t.c)).unwrap_or((NAN, NAN)); - - Ok(Some(DHT { - humidity: unnest_ref(humidity).unwrap_or(NAN), - temp_f, - temp_c, - heat_index_f: unnest_ref(heat_index_f).unwrap_or(NAN), - heat_index_c: unnest_ref(heat_index_c).unwrap_or(NAN), + Ok(Some(SensorReadings { + humidity: unnest_ref(humidity), + temp_f: temp.map(|t| t.0), + temp_c: temp.map(|t| t.1), + heat_index_f: unnest_ref(heat_index_f), + heat_index_c: unnest_ref(heat_index_c), + ph: unnest_ref(ph), update_time, })) } - -fn get_tank_data( - conn: &redis::Connection, - tank: i64, - namespace: &str, -) -> Result<(Option, Option), redis::RedisError> { - let numbers: Vec> = conn.hget( - format!("{}/tanks/{}", namespace, tank), - vec!["temp_f", "temp_c", "ph"], - )?; - let update_times: Vec> = conn.hget( - format!("{}/tanks/{}", namespace, tank), - vec!["temp_update_time", "ph_update_time"], - )?; - let (temp_f, temp_c) = (numbers.get(0), numbers.get(1)); - let (temp_update_time, ph_update_time) = ( - unnest_ref(update_times.get(0)), - unnest_ref(update_times.get(1)), - ); - let temp = safe_temp(temp_f, temp_c, temp_update_time); - let ph = unnest_ref(numbers.get(2)).map(|val| PH { - val, - update_time: ph_update_time, - }); - - Ok((temp, ph)) -} - -fn safe_temp( - temp_f: Option<&Option>, - temp_c: Option<&Option>, - temp_update_time: Option, -) -> Option { +fn safe_temp(temp_f: Option<&Option>, temp_c: Option<&Option>) -> Option { match (temp_f, temp_c) { - (Some(&Some(f)), Some(&Some(c))) => Some(Temp { - f, - c, - update_time: temp_update_time, - }), - (_, Some(&Some(c))) => Some(Temp { - f: c_to_f(c), - c, - update_time: temp_update_time, - }), - (Some(&Some(f)), _) => Some(Temp { - f, - c: f_to_c(f), - update_time: temp_update_time, - }), + (Some(&Some(f)), Some(&Some(c))) => Some(Temp { f, c }), + (_, Some(&Some(c))) => Some(Temp { f: c_to_f(c), c }), + (Some(&Some(f)), _) => Some(Temp { f, c: f_to_c(f) }), _ => None, } } @@ -223,98 +154,70 @@ fn generate_status( namespace: &str, staleness: &Staleness, ) -> Result { - let num_tanks = get_num_containers(&conn, namespace, Container::Tanks)?; - - let tank_statuses: Result, redis::RedisError> = (1..num_tanks + 1) - .map(move |tank| { - get_tank_data(&conn, tank, namespace).map(move |(maybe_temp, maybe_ph)| { - if let (&None, &None) = (&maybe_temp, &maybe_ph) { - return "".to_string(); // nothing to format - } - - let tank_string = format!("T{}:", tank); - let temp_string = maybe_temp - .map(move |t| { - ( - match temp_unit { - 'c' | 'C' => t.c, - _ => t.f, - }, - t.update_time, - ) - }) - .map(|(t, update_time)| { - format!( - " {}°{}{}", - t, - temp_unit.to_ascii_uppercase(), - staleness.text(update_time) - ) - }) - .unwrap_or("".to_string()); - let ph_string: String = maybe_ph - .map(move |ph| format!(" pH {}{}", ph.val, staleness.text(ph.update_time))) - .unwrap_or("".to_string()); - - tank_string + &ph_string + &temp_string - }) - }) - .collect(); - - let tank_portion = tank_statuses.map(|ss| ss.join(" ")); - - let num_areas = get_num_containers(&conn, namespace, Container::Areas)?; + let num_areas = get_num_areas(&conn, namespace)?; let area_statuses: Result, redis::RedisError> = (1..num_areas + 1) .map(move |area| { - get_area_data(&conn, area, namespace).map(move |maybe_dht| { - if let &None = &maybe_dht { + get_area_data(&conn, area, namespace).map(move |maybe_sensor_readings| { + if let Some(sensor_readings) = maybe_sensor_readings { + let area_string = format!("A{}", area); + + let stale = || staleness.text(sensor_readings.update_time); + + let ph_string: String = sensor_readings + .ph + .map(move |ph| format!(" pH {}{}", ph, stale())) + .unwrap_or("".to_string()); + + let humidity_string: String = sensor_readings + .humidity + .map(move |h| format!(" {}%H{}", h, stale())) + .unwrap_or("".to_string()); + + let temp = match (sensor_readings.temp_c, sensor_readings.temp_f) { + (None, None) => None, + _ => Some(Temp { + f: sensor_readings.temp_f.unwrap_or(NAN), + c: sensor_readings.temp_c.unwrap_or(NAN), + }), + }; + + let heat_index = + match (sensor_readings.heat_index_c, sensor_readings.heat_index_f) { + (None, None) => None, + _ => Some(Temp { + f: sensor_readings.heat_index_f.unwrap_or(NAN), + c: sensor_readings.heat_index_c.unwrap_or(NAN), + }), + }; + + let temp_letter = temp_unit.to_ascii_uppercase(); + let heat_index_string = heat_index + .map(move |hi| match temp_unit { + 'c' | 'C' => hi.c, + _ => hi.f, + }) + .map(|hi| format!(" {}h{}{}", hi, temp_letter, stale())) + .unwrap_or("".to_string()); + + let temp_string = temp + .map(move |t| match temp_unit { + 'c' | 'C' => t.c, + _ => t.f, + }) + .map(|t| format!(" {}°{}{}", t, temp_letter, stale())) + .unwrap_or("".to_string()); + + area_string + &ph_string + &humidity_string + &heat_index_string + &temp_string + } else { return "".to_string(); // nothing to format } - - let area_string = format!("A{}: ", area); - - let data_string = maybe_dht - .map(move |dht| { - ( - dht.humidity, - match temp_unit { - 'c' | 'C' => dht.temp_c, - _ => dht.temp_f, - }, - match temp_unit { - 'c' | 'C' => dht.heat_index_c, - _ => dht.heat_index_f, - }, - dht.update_time, - ) - }) - .map(|(humidity, temp, heat_index, update_time)| { - let stale = staleness.text(update_time); - let temp_letter = temp_unit.to_ascii_uppercase(); - format!( - "{}%H{} {}°{}{} {}h{}{}", - humidity, - stale, - temp, - temp_letter, - stale, - heat_index, - temp_letter, - stale, - ) - }) - .unwrap_or("".to_string()); - - area_string + &data_string }) }) .collect(); - let area_portion = area_statuses.map(|ss| ss.join(" ")); - - tank_portion - .and_then(|tp| area_portion.map(|ap| ap + " " + &tp)) // join areas and tanks + area_statuses + .map(|ss| ss.join(" ")) .map(|msg| right_align(&msg)) // lay out the message nicely } diff --git a/services/led_status_helper/src/model.rs b/services/led_status_helper/src/model.rs new file mode 100644 index 00000000..212a5665 --- /dev/null +++ b/services/led_status_helper/src/model.rs @@ -0,0 +1,11 @@ +/// This data is stored on a redis device record +#[derive(Serialize, Deserialize, Debug, Copy, Clone)] +pub struct SensorReadings { + pub temp_f: Option, + pub temp_c: Option, + pub ph: Option, + pub humidity: Option, + pub heat_index_c: Option, + pub heat_index_f: Option, + pub update_time: Option, +} diff --git a/services/sensor_tracker/Cargo.lock b/services/sensor_tracker/Cargo.lock index a870e2c4..bc5db332 100644 --- a/services/sensor_tracker/Cargo.lock +++ b/services/sensor_tracker/Cargo.lock @@ -832,7 +832,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "sensor_tracker" -version = "0.4.0" +version = "0.4.1" dependencies = [ "crossbeam 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/services/sensor_tracker/Cargo.toml b/services/sensor_tracker/Cargo.toml index cd16f6ee..5edb8538 100644 --- a/services/sensor_tracker/Cargo.toml +++ b/services/sensor_tracker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sensor_tracker" -version = "0.4.0" +version = "0.4.1" authors = ["Terkwood "] edition = "2018" diff --git a/services/sensor_tracker/src/logic.rs b/services/sensor_tracker/src/logic.rs index 098616e0..44f2c027 100644 --- a/services/sensor_tracker/src/logic.rs +++ b/services/sensor_tracker/src/logic.rs @@ -2,7 +2,7 @@ use crossbeam_channel::{select, Receiver}; use redis_context::RedisContext; use rumqtt::Notification; -use crate::model::SensorMessage; +use crate::model::SensorReadings; use crate::predis; pub fn receive_updates( @@ -35,7 +35,7 @@ pub fn receive_updates( } } -fn deser_message(payload: &[u8]) -> Option { +fn deser_message(payload: &[u8]) -> Option { let r = std::str::from_utf8(&payload); r.ok() .and_then(|s| serde_json::from_str(s).map(|r| Some(r)).unwrap_or(None)) diff --git a/services/sensor_tracker/src/model.rs b/services/sensor_tracker/src/model.rs index f6bcb128..c4e9bece 100644 --- a/services/sensor_tracker/src/model.rs +++ b/services/sensor_tracker/src/model.rs @@ -4,7 +4,7 @@ /// e.g. "28654597090000e4" #[derive(Serialize, Deserialize, Debug)] -pub struct SensorMessage { +pub struct SensorReadings { pub device_id: String, pub temp_f: Option, pub temp_c: Option, @@ -16,7 +16,7 @@ pub struct SensorMessage { pub heat_index_f: Option, } -impl SensorMessage { +impl SensorReadings { pub fn to_redis(&self) -> Vec<(&str, String)> { let mut data = vec![]; if let Some(s) = &self.status { diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index 0dcf1daf..d0aefc38 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -13,7 +13,7 @@ use uuid::Uuid; /// Will create a new sensor record for this device if one does not already exist. pub fn update<'a, 'b>( redis_ctx: &RedisContext, - sensor_message: &model::SensorMessage, + sensor_message: &model::SensorReadings, ext_device_id: &str, ) -> Result, redis::RedisError> { let mut delta_events: Vec = vec![]; @@ -70,7 +70,7 @@ pub fn update<'a, 'b>( fn update_devices_set( redis_ctx: &RedisContext, rn: &str, - sensor_message: &model::SensorMessage, // TODO wat ? + sensor_message: &model::SensorReadings, // TODO wat ? device_id: Uuid, ) -> Option { let set_device_key = format!("{}/devices", rn); @@ -90,7 +90,7 @@ fn update_devices_set( fn update_area_hash( redis_ctx: &RedisContext, container_num: &u64, - sensor_message: &model::SensorMessage, + sensor_message: &model::SensorReadings, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's @@ -180,7 +180,7 @@ fn ensure_device_hash_exists( fn update_device_hash( redis_ctx: &RedisContext, device_hash_key: &str, - sensor_message: &model::SensorMessage, + sensor_message: &model::SensorReadings, maybe_sensor_upd_count: &Option, ) -> Option { let upd_c = &format!("sensors_update_count"); From b2da24db8317c5d757301179fcfe43462188fcaf Mon Sep 17 00:00:00 2001 From: Terkwood Date: Thu, 1 Aug 2019 19:07:23 -0400 Subject: [PATCH 10/10] rename various, extract consts --- services/sensor_tracker/src/predis.rs | 56 +++++++++++++++------------ 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/services/sensor_tracker/src/predis.rs b/services/sensor_tracker/src/predis.rs index d0aefc38..4f22a1ac 100644 --- a/services/sensor_tracker/src/predis.rs +++ b/services/sensor_tracker/src/predis.rs @@ -7,6 +7,14 @@ use serde_json; use std::time::SystemTime; use uuid::Uuid; +const SENSORS_UPDATE_COUNT: &str = "sensors_update_count"; +const SENSORS_UPDATE_TIME: &str = "sensors_update_time"; + +const AREA_HASH_FIELD: &str = "area"; + +const AREAS: &str = "areas"; +const DEVICES: &str = "devices"; + /// Updates redis so that the individual measurement is applied to the correct tank. /// Also records the measurement to a record associated with the sensor itself. /// Keeps track of how many updates have been applied to each tank and sensor record. @@ -32,11 +40,11 @@ pub fn update<'a, 'b>( } // lookup associated area - let sensor_hash_key = &format!("{}/devices/{}", rn, device_id).to_string(); + let sensor_hash_key = &format!("{}/{}/{}", rn, DEVICES, device_id).to_string(); let area_and_sensors_update_count: Result>, _> = redis_ctx .conn - .hget(sensor_hash_key, vec!["area", "sensors_update_count"]); + .hget(sensor_hash_key, vec![AREA_HASH_FIELD, SENSORS_UPDATE_COUNT]); if let Ok(v) = area_and_sensors_update_count { // Tank associated with this sensor? @@ -73,7 +81,7 @@ fn update_devices_set( sensor_message: &model::SensorReadings, // TODO wat ? device_id: Uuid, ) -> Option { - let set_device_key = format!("{}/devices", rn); + let set_device_key = format!("{}/{}", rn, DEVICES); // add to the member set if it doesn't already exist let devices_added: Result = redis_ctx .conn @@ -89,25 +97,22 @@ fn update_devices_set( fn update_area_hash( redis_ctx: &RedisContext, - container_num: &u64, - sensor_message: &model::SensorReadings, + area_num: &u64, + sensor_readings: &model::SensorReadings, ) -> Option { // We found the area associated with this // sensor ID, so we should update that area's // current reading. - let area_key = format!("{}/areas/{}", redis_ctx.namespace, container_num); + let area_key = format!("{}/{}/{}", redis_ctx.namespace, AREAS, area_num); - let area_measure_count: Result, _> = redis_ctx - .conn - .hget(&area_key, &format!("sensors_update_count")); + let area_measure_count: Result, _> = + redis_ctx.conn.hget(&area_key, SENSORS_UPDATE_COUNT); - let uc_name = format!("sensors_update_count"); - let ut_name = format!("sensors_update_time"); let update: (Result, Vec<&str>) = { - let mut data: Vec<(&str, String)> = sensor_message.to_redis(); + let mut data: Vec<(&str, String)> = sensor_readings.to_redis(); data.push(( - &uc_name, + SENSORS_UPDATE_COUNT, area_measure_count .unwrap_or(None) .map(|u| u + 1) @@ -115,7 +120,7 @@ fn update_area_hash( .to_string(), )); - data.push((&ut_name, epoch_secs().to_string())); + data.push((SENSORS_UPDATE_TIME, epoch_secs().to_string())); ( redis_ctx.conn.hset_multiple(&area_key, &data[..]), data.iter().map(|(a, _)| *a).collect(), @@ -138,6 +143,9 @@ fn update_area_hash( } } +const CREATE_TIME_FIELD: &str = "create_time"; +const EXT_DEVICE_ID_FIELD: &str = "ext_device_id"; + fn ensure_device_hash_exists( redis_ctx: &RedisContext, device_hash_key: &str, @@ -156,17 +164,18 @@ fn ensure_device_hash_exists( .iter() .for_each(|e: &bool| { if !e { - let cf = "create_time".to_string(); - let ed = "ext_device_id".to_string(); let field_vals = &vec![ - (&cf, format!("{}", epoch_secs())), - (&ed, ext_device_id_str.to_string()), + (CREATE_TIME_FIELD, format!("{}", epoch_secs())), + (EXT_DEVICE_ID_FIELD, ext_device_id_str.to_string()), ][..]; - // new sensor, make note of when it is created + // new device, make note of when it is created let _: Result, _> = redis_ctx.conn.hset_multiple(device_hash_key, field_vals); - let fields = vec![cf, ed]; + let fields = vec![ + CREATE_TIME_FIELD.to_string(), + EXT_DEVICE_ID_FIELD.to_string(), + ]; result = Some(REvent::HashUpdated { key: device_hash_key.to_string(), fields, @@ -183,17 +192,16 @@ fn update_device_hash( sensor_message: &model::SensorReadings, maybe_sensor_upd_count: &Option, ) -> Option { - let upd_c = &format!("sensors_update_count"); let mut data: Vec<(&str, String)> = vec![( - upd_c, + SENSORS_UPDATE_COUNT, maybe_sensor_upd_count .map(|u| u + 1) .unwrap_or(1) .to_string(), )]; data.extend(sensor_message.to_redis()); - let ut = &format!("sensors_update_time"); - data.push((ut, epoch_secs().to_string())); + + data.push((SENSORS_UPDATE_TIME, epoch_secs().to_string())); let redis_result: Result<(), _> = redis_ctx.conn.hset_multiple(device_hash_key, &data[..]); if let Err(e) = redis_result {