diff --git a/drive-deposits-cal-types/src/math/engine.rs b/drive-deposits-cal-types/src/math/engine.rs index 920ceb1..7046cca 100644 --- a/drive-deposits-cal-types/src/math/engine.rs +++ b/drive-deposits-cal-types/src/math/engine.rs @@ -84,7 +84,10 @@ async fn build_from_new_bank( ) -> Result { // using spawn blocking for synchronous calculation code let bank_with_outcome = spawn_blocking(move || -> Result { - info!("task spawned for new_bank: {:?}", new_bank.name); + info!( + "task spawned for actual calculation for new_bank: {:?}", + new_bank.name + ); let deposits = build_from_new_deposits(new_bank.new_deposits, new_delta.clone())?; let outcome = build_outcome_from_deposits(&deposits, new_delta.clone().as_ref()); let bank = Bank { diff --git a/drive-deposits-logs-lambda-target/src/bin/by_level_lambda_writer.rs b/drive-deposits-logs-lambda-target/src/bin/by_level_lambda_writer.rs index 6de3f2b..44756bd 100644 --- a/drive-deposits-logs-lambda-target/src/bin/by_level_lambda_writer.rs +++ b/drive-deposits-logs-lambda-target/src/bin/by_level_lambda_writer.rs @@ -4,11 +4,12 @@ use drive_deposits_logs_lambda_target::dynamodb::DriveDepositsDb; use drive_deposits_rest_types::rest_types::CalculatePortfolioResponse; use lambda_runtime::{ run, service_fn, - tracing::{debug, error, info_span, init_default_subscriber}, + tracing::{debug, error, info_span, init_default_subscriber, instrument, Instrument}, Error, LambdaEvent, }; use serde_json::from_value; +#[instrument(skip(db_handler, event))] async fn banks_level_handler( db_handler: &DriveDepositsDb, event: LambdaEvent, @@ -33,6 +34,7 @@ async fn banks_level_handler( db_handler.table_name.as_str(), event_target_response, ) + .instrument(span) .await?; Ok(()) @@ -49,7 +51,9 @@ async fn main() -> Result<(), Error> { // service_fn returns ServiceFn that implements FnMut so cannot pass ownership of db_handler // closure is `FnOnce` if it moves the variable `db_handler` out of its environment - run(service_fn(|event| banks_level_handler(&db_handler, event))).await?; + run(service_fn(|event| banks_level_handler(&db_handler, event))) + .instrument(span) + .await?; Ok(()) } diff --git a/drive-deposits-logs-lambda-target/src/dynamodb/add.rs b/drive-deposits-logs-lambda-target/src/dynamodb/add.rs index d16bb8b..08541f4 100644 --- a/drive-deposits-logs-lambda-target/src/dynamodb/add.rs +++ b/drive-deposits-logs-lambda-target/src/dynamodb/add.rs @@ -9,7 +9,8 @@ use drive_deposits_lambda_db_types::{ use drive_deposits_rest_types::rest_types::CalculatePortfolioResponse; use std::collections::HashMap; use thiserror::Error; -use tracing::debug; +use tokio::try_join; +use tracing::{debug, info, instrument}; #[derive(Error, Debug)] pub enum AddItemError { @@ -18,38 +19,87 @@ pub enum AddItemError { #[error("AddItemError with DynamoDbSdkPutItemError error: {0}")] DynamoDbSdkPutItemError(#[from] SdkError), + + #[error("AddItemError with tokio::task::JoinError error: {0}")] + JoinError(#[from] tokio::task::JoinError), } +#[instrument] pub async fn add_item( client: &Client, table: &str, rest: CalculatePortfolioResponse, ) -> Result<(), AddItemError> { + info!("inside add_item"); + let portfolio_level_item = PortfolioLevelItem::try_from(&rest)?; - add_portfolio_level_item(client, table, portfolio_level_item).await?; + let (client_clone, table_clone) = clone_db_connection_details(client, table); + info!("spawn add_portfolio_level_item"); + let portfolio_level_item_handle = tokio::spawn(async move { + add_portfolio_level_item(&client_clone, &table_clone, portfolio_level_item).await + }); + info!("spawn add_bank_level_items"); let banks_level_items_wrapper = BankLevelItemsWrapper::try_from(&rest)?; - add_bank_level_items(client, table, banks_level_items_wrapper).await?; + let (client_clone, table_clone) = clone_db_connection_details(client, table); + let bank_level_items_handle = tokio::spawn(async move { + add_bank_level_items(&client_clone, &table_clone, banks_level_items_wrapper).await + }); + info!("spawn add_deposit_level_items for growth criteria"); let rest_wrapper_growth_criteria = CalculatePortfolioRestWrapper { calculate_portfolio_response: rest.clone(), deposit_sort_criteria: DepositSortCriteria::DeltaPeriodGrowth, }; let deposit_level_items_growth_criteria = DepositLevelItemsWrapper::try_from(&rest_wrapper_growth_criteria)?; - add_deposit_level_items(client, table, deposit_level_items_growth_criteria).await?; - + let (client_clone, table_clone) = clone_db_connection_details(client, table); + let deposit_level_items_growth_handle = tokio::spawn(async move { + add_deposit_level_items( + &client_clone, + &table_clone, + deposit_level_items_growth_criteria, + ) + .await + }); + + info!("spawn add_deposit_level_items for date criteria"); let rest_wrapper_date_criteria = CalculatePortfolioRestWrapper { calculate_portfolio_response: rest, deposit_sort_criteria: DepositSortCriteria::MaturityDate, }; let deposit_level_items_date_criteria = DepositLevelItemsWrapper::try_from(&rest_wrapper_date_criteria)?; - add_deposit_level_items(client, table, deposit_level_items_date_criteria).await?; + let (client_clone, table_clone) = clone_db_connection_details(client, table); + let deposit_level_items_date_handle = tokio::spawn(async move { + add_deposit_level_items( + &client_clone, + &table_clone, + deposit_level_items_date_criteria, + ) + .await + }); + + info!("try_join! for all the spawned tasks"); + let (portfolio_result, bank_level_result, deposit_growth_result, deposit_date_result) = try_join!( + portfolio_level_item_handle, + bank_level_items_handle, + deposit_level_items_growth_handle, + deposit_level_items_date_handle + )?; + + portfolio_result?; + bank_level_result?; + deposit_growth_result?; + deposit_date_result?; Ok(()) } +fn clone_db_connection_details(client: &Client, table: &str) -> (Client, String) { + (client.clone(), table.to_string()) +} + pub async fn add_portfolio_level_item( client: &Client, table: &str,