Skip to content

Commit

Permalink
using tokio::spawn for concurrent tasks for writer
Browse files Browse the repository at this point in the history
Tracing enabled.
  • Loading branch information
rsachdeva committed Feb 4, 2025
1 parent a4123d9 commit 66abfbe
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
5 changes: 4 additions & 1 deletion drive-deposits-cal-types/src/math/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ async fn build_from_new_bank(
) -> Result<Bank, CalculationHaltError> {
// using spawn blocking for synchronous calculation code
let bank_with_outcome = spawn_blocking(move || -> Result<Bank, CalculationHaltError> {
info!("task spawned for new_bank: {:?}", new_bank.name);
info!(
"task spawned for actual calculation for new_bank: {:?}",
new_bank.name
);
let deposits = build_from_new_deposits(new_bank.new_deposits, new_delta.clone())?;
let outcome = build_outcome_from_deposits(&deposits, new_delta.clone().as_ref());
let bank = Bank {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use drive_deposits_logs_lambda_target::dynamodb::DriveDepositsDb;
use drive_deposits_rest_types::rest_types::CalculatePortfolioResponse;
use lambda_runtime::{
run, service_fn,
tracing::{debug, error, info_span, init_default_subscriber},
tracing::{debug, error, info_span, init_default_subscriber, instrument, Instrument},
Error, LambdaEvent,
};
use serde_json::from_value;

#[instrument(skip(db_handler, event))]
async fn banks_level_handler(
db_handler: &DriveDepositsDb,
event: LambdaEvent<EventBridgeEvent>,
Expand All @@ -33,6 +34,7 @@ async fn banks_level_handler(
db_handler.table_name.as_str(),
event_target_response,
)
.instrument(span)
.await?;

Ok(())
Expand All @@ -49,7 +51,9 @@ async fn main() -> Result<(), Error> {

// service_fn returns ServiceFn that implements FnMut so cannot pass ownership of db_handler
// closure is `FnOnce` if it moves the variable `db_handler` out of its environment
run(service_fn(|event| banks_level_handler(&db_handler, event))).await?;
run(service_fn(|event| banks_level_handler(&db_handler, event)))
.instrument(span)
.await?;

Ok(())
}
62 changes: 56 additions & 6 deletions drive-deposits-logs-lambda-target/src/dynamodb/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use drive_deposits_lambda_db_types::{
use drive_deposits_rest_types::rest_types::CalculatePortfolioResponse;
use std::collections::HashMap;
use thiserror::Error;
use tracing::debug;
use tokio::try_join;
use tracing::{debug, info, instrument};

#[derive(Error, Debug)]
pub enum AddItemError {
Expand All @@ -18,38 +19,87 @@ pub enum AddItemError {

#[error("AddItemError with DynamoDbSdkPutItemError error: {0}")]
DynamoDbSdkPutItemError(#[from] SdkError<PutItemError>),

#[error("AddItemError with tokio::task::JoinError error: {0}")]
JoinError(#[from] tokio::task::JoinError),
}

#[instrument]
pub async fn add_item(
client: &Client,
table: &str,
rest: CalculatePortfolioResponse,
) -> Result<(), AddItemError> {
info!("inside add_item");

let portfolio_level_item = PortfolioLevelItem::try_from(&rest)?;
add_portfolio_level_item(client, table, portfolio_level_item).await?;
let (client_clone, table_clone) = clone_db_connection_details(client, table);
info!("spawn add_portfolio_level_item");
let portfolio_level_item_handle = tokio::spawn(async move {
add_portfolio_level_item(&client_clone, &table_clone, portfolio_level_item).await
});

info!("spawn add_bank_level_items");
let banks_level_items_wrapper = BankLevelItemsWrapper::try_from(&rest)?;
add_bank_level_items(client, table, banks_level_items_wrapper).await?;
let (client_clone, table_clone) = clone_db_connection_details(client, table);
let bank_level_items_handle = tokio::spawn(async move {
add_bank_level_items(&client_clone, &table_clone, banks_level_items_wrapper).await
});

info!("spawn add_deposit_level_items for growth criteria");
let rest_wrapper_growth_criteria = CalculatePortfolioRestWrapper {
calculate_portfolio_response: rest.clone(),
deposit_sort_criteria: DepositSortCriteria::DeltaPeriodGrowth,
};
let deposit_level_items_growth_criteria =
DepositLevelItemsWrapper::try_from(&rest_wrapper_growth_criteria)?;
add_deposit_level_items(client, table, deposit_level_items_growth_criteria).await?;

let (client_clone, table_clone) = clone_db_connection_details(client, table);
let deposit_level_items_growth_handle = tokio::spawn(async move {
add_deposit_level_items(
&client_clone,
&table_clone,
deposit_level_items_growth_criteria,
)
.await
});

info!("spawn add_deposit_level_items for date criteria");
let rest_wrapper_date_criteria = CalculatePortfolioRestWrapper {
calculate_portfolio_response: rest,
deposit_sort_criteria: DepositSortCriteria::MaturityDate,
};
let deposit_level_items_date_criteria =
DepositLevelItemsWrapper::try_from(&rest_wrapper_date_criteria)?;
add_deposit_level_items(client, table, deposit_level_items_date_criteria).await?;
let (client_clone, table_clone) = clone_db_connection_details(client, table);
let deposit_level_items_date_handle = tokio::spawn(async move {
add_deposit_level_items(
&client_clone,
&table_clone,
deposit_level_items_date_criteria,
)
.await
});

info!("try_join! for all the spawned tasks");
let (portfolio_result, bank_level_result, deposit_growth_result, deposit_date_result) = try_join!(
portfolio_level_item_handle,
bank_level_items_handle,
deposit_level_items_growth_handle,
deposit_level_items_date_handle
)?;

portfolio_result?;
bank_level_result?;
deposit_growth_result?;
deposit_date_result?;

Ok(())
}

fn clone_db_connection_details(client: &Client, table: &str) -> (Client, String) {
(client.clone(), table.to_string())
}

pub async fn add_portfolio_level_item(
client: &Client,
table: &str,
Expand Down

0 comments on commit 66abfbe

Please sign in to comment.