Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analytics Indexer Operational Refactor #21486

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sui-analytics-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ bcs.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono.workspace = true
clap.workspace = true
csv.workspace = true
move-core-types.workspace = true
object_store.workspace = true
Expand Down Expand Up @@ -54,6 +53,7 @@ arrow.workspace = true
gcp-bigquery-client = "0.25.0"
snowflake-api.workspace = true
tap.workspace = true
serde_yaml.workspace = true

[dev-dependencies]

Expand Down
47 changes: 23 additions & 24 deletions crates/sui-analytics-indexer/src/analytics_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use crate::analytics_metrics::AnalyticsMetrics;
use crate::handlers::AnalyticsHandler;
use crate::writers::AnalyticsWriter;
use crate::{
join_paths, AnalyticsIndexerConfig, FileMetadata, MaxCheckpointReader, ParquetSchema,
EPOCH_DIR_PREFIX,
join_paths, FileMetadata, MaxCheckpointReader, ParquetSchema, TaskContext, EPOCH_DIR_PREFIX,
};

struct State<S: Serialize + ParquetSchema> {
Expand All @@ -40,8 +39,7 @@ struct State<S: Serialize + ParquetSchema> {
pub struct AnalyticsProcessor<S: Serialize + ParquetSchema> {
handler: Box<dyn AnalyticsHandler<S>>,
state: Mutex<State<S>>,
metrics: AnalyticsMetrics,
config: AnalyticsIndexerConfig,
task_context: TaskContext,
sender: mpsc::Sender<FileMetadata>,
#[allow(dead_code)]
kill_sender: oneshot::Sender<()>,
Expand Down Expand Up @@ -75,16 +73,19 @@ impl<S: Serialize + ParquetSchema + 'static> Worker for AnalyticsProcessor<S> {

let num_checkpoints_processed =
state.current_checkpoint_range.end - state.current_checkpoint_range.start;
let cut_new_files = (num_checkpoints_processed >= self.config.checkpoint_interval)
|| (state.last_commit_instant.elapsed().as_secs() > self.config.time_interval_s)
let cut_new_files = (num_checkpoints_processed
>= self.task_context.config.checkpoint_interval)
|| (state.last_commit_instant.elapsed().as_secs()
> self.task_context.config.time_interval_s)
|| (state.num_checkpoint_iterations % CHECK_FILE_SIZE_ITERATION_CYCLE == 0
&& state.writer.file_size()?.unwrap_or(0)
> self.config.max_file_size_mb * 1024 * 1024);
> self.task_context.config.max_file_size_mb * 1024 * 1024);
if cut_new_files {
self.cut(&mut state).await?;
self.reset(&mut state)?;
}
self.metrics
self.task_context
.metrics
.total_received
.with_label_values(&[self.name()])
.inc();
Expand All @@ -107,26 +108,25 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
writer: Box<dyn AnalyticsWriter<S>>,
max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
next_checkpoint_seq_num: CheckpointSequenceNumber,
metrics: AnalyticsMetrics,
config: AnalyticsIndexerConfig,
task_context: TaskContext,
) -> Result<Self> {
let local_store_config = ObjectStoreConfig {
directory: Some(config.checkpoint_dir.clone()),
directory: Some(task_context.checkpoint_dir_path().to_path_buf()),
object_store: Some(ObjectStoreType::File),
..Default::default()
};
let local_object_store = local_store_config.make()?;
let remote_object_store = config.remote_store_config.make()?;
let remote_object_store = task_context.job_config.remote_store_config.make()?;
let (kill_sender, kill_receiver) = oneshot::channel::<()>();
let (sender, receiver) = mpsc::channel::<FileMetadata>(100);
let name: String = handler.name().parse()?;
let checkpoint_dir = config.checkpoint_dir.clone();
let cloned_metrics = metrics.clone();
let checkpoint_dir = task_context.checkpoint_dir_path();
let cloned_metrics = task_context.metrics.clone();
tokio::task::spawn(Self::start_syncing_with_remote(
remote_object_store,
local_object_store.clone(),
checkpoint_dir,
config.remote_store_path_prefix.clone(),
checkpoint_dir.to_path_buf(),
task_context.config.remote_store_path_prefix()?,
receiver,
kill_receiver,
cloned_metrics,
Expand All @@ -135,7 +135,7 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
tokio::task::spawn(Self::setup_max_checkpoint_metrics_updates(
max_checkpoint_reader,
metrics.clone(),
task_context.metrics.clone(),
max_checkpoint_receiver,
name,
));
Expand All @@ -152,8 +152,7 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
kill_sender,
sender,
max_checkpoint_sender,
metrics,
config,
task_context,
})
}

Expand All @@ -166,8 +165,8 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
&& state.writer.flush(state.current_checkpoint_range.end)?
{
let file_metadata = FileMetadata::new(
self.config.file_type,
self.config.file_format,
self.task_context.config.file_type,
self.task_context.config.file_format,
state.current_epoch,
state.current_checkpoint_range.clone(),
);
Expand All @@ -183,8 +182,8 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {

fn epoch_dir(&self, state: &State<S>) -> Result<PathBuf> {
let path = path_to_filesystem(
self.config.checkpoint_dir.to_path_buf(),
&self.config.file_type.dir_prefix(),
self.task_context.checkpoint_dir_path().to_path_buf(),
&self.task_context.config.file_type.dir_prefix(),
)?
.join(format!("{}{}", EPOCH_DIR_PREFIX, state.current_epoch));
Ok(path)
Expand Down Expand Up @@ -288,7 +287,7 @@ impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
from: Arc<DynObjectStore>,
to: Arc<DynObjectStore>,
) -> Result<()> {
let remote_dest = join_paths(prefix, &path);
let remote_dest = join_paths(prefix.as_ref(), &path);
info!("Syncing file to remote: {:?}", &remote_dest);
copy_file(&path, &remote_dest, &from, &to).await?;
fs::remove_file(path_to_filesystem(dir, &path)?)?;
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-analytics-indexer/src/handlers/df_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use anyhow::Result;
use fastcrypto::encoding::{Base64, Encoding};
use std::collections::HashMap;
use std::path::Path;
use sui_data_ingestion_core::Worker;
use sui_indexer::errors::IndexerError;
use sui_types::object::bounded_visitor::BoundedVisitor;
Expand Down Expand Up @@ -90,8 +89,7 @@ impl AnalyticsHandler<DynamicFieldEntry> for DynamicFieldHandler {
}

impl DynamicFieldHandler {
pub fn new(store_path: &Path, rest_uri: &str) -> Self {
let package_store = LocalDBPackageStore::new(&store_path.join("dynamic_field"), rest_uri);
pub fn new(package_store: LocalDBPackageStore) -> Self {
let state = State {
dynamic_fields: vec![],
package_store: package_store.clone(),
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-analytics-indexer/src/handlers/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use fastcrypto::encoding::{Base64, Encoding};
use move_core_types::annotated_value::MoveValue;
use sui_types::SYSTEM_PACKAGE_ADDRESSES;

use std::path::Path;
use sui_data_ingestion_core::Worker;
use tokio::sync::Mutex;

Expand Down Expand Up @@ -87,8 +86,7 @@ impl AnalyticsHandler<EventEntry> for EventHandler {
}

impl EventHandler {
pub fn new(store_path: &Path, rest_uri: &str) -> Self {
let package_store = LocalDBPackageStore::new(&store_path.join("event"), rest_uri);
pub fn new(package_store: LocalDBPackageStore) -> Self {
let state = State {
events: vec![],
package_store: package_store.clone(),
Expand Down
11 changes: 3 additions & 8 deletions crates/sui-analytics-indexer/src/handlers/object_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use anyhow::Result;
use fastcrypto::encoding::{Base64, Encoding};
use std::path::Path;
use sui_data_ingestion_core::Worker;
use sui_types::{TypeTag, SYSTEM_PACKAGE_ADDRESSES};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -89,8 +88,7 @@ impl AnalyticsHandler<ObjectEntry> for ObjectHandler {
}

impl ObjectHandler {
pub fn new(store_path: &Path, rest_uri: &str, package_filter: &Option<String>) -> Self {
let package_store = LocalDBPackageStore::new(&store_path.join("object"), rest_uri);
pub fn new(package_store: LocalDBPackageStore, package_filter: &Option<String>) -> Self {
let state = State {
objects: vec![],
package_store: package_store.clone(),
Expand Down Expand Up @@ -311,11 +309,8 @@ mod tests {
#[tokio::test]
async fn test_check_type_hierarchy() {
let temp_dir = tempfile::tempdir().unwrap();
let handler = ObjectHandler::new(
temp_dir.path(),
"http://localhost:9000",
&Some("0xabc".to_string()),
);
let package_store = LocalDBPackageStore::new(temp_dir.path(), "http://localhost:9000");
let handler = ObjectHandler::new(package_store, &Some("0xabc".to_string()));
let mut state = handler.state.lock().await;

// 1. Direct match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use anyhow::Result;
use std::collections::BTreeMap;
use std::path::Path;
use sui_data_ingestion_core::Worker;
use sui_types::SYSTEM_PACKAGE_ADDRESSES;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -81,8 +80,7 @@ impl AnalyticsHandler<WrappedObjectEntry> for WrappedObjectHandler {
}

impl WrappedObjectHandler {
pub fn new(store_path: &Path, rest_uri: &str) -> Self {
let package_store = LocalDBPackageStore::new(&store_path.join("wrapped_object"), rest_uri);
pub fn new(package_store: LocalDBPackageStore) -> Self {
let state = Mutex::new(State {
wrapped_objects: vec![],
package_store: package_store.clone(),
Expand Down
Loading
Loading