diff --git a/Cargo.lock b/Cargo.lock index 584fb6a3f0..a37352ad84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2562,6 +2562,7 @@ dependencies = [ "futures", "indexmap 2.7.0", "itertools 0.11.0", + "lazy_static", "parquet2", "pyo3", "serde", @@ -2688,7 +2689,6 @@ dependencies = [ "daft-recordbatch", "flate2", "futures", - "rayon", "serde_json", "snafu", "tokio", diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 04477b38b6..b8321b5a65 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1582,6 +1582,13 @@ class PyMicroPartition: io_config: IOConfig | None = None, multithreaded_io: bool | None = None, ): ... + @classmethod + def read_warc( + cls, + uri: str, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + ): ... class PhysicalPlanScheduler: """A work scheduler for physical query plans.""" diff --git a/daft/io/_warc.py b/daft/io/_warc.py index 6f44d1b7fc..d39312b583 100644 --- a/daft/io/_warc.py +++ b/daft/io/_warc.py @@ -40,8 +40,9 @@ def read_warc( Defaults to None, which will let Daft decide based on the runner it is currently using. returns: - DataFrame: parsed DataFrame with mandatory metadata columns: "WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length", one column "warc_content" - with the raw byte content of the WARC record, and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string. + DataFrame: parsed DataFrame with mandatory metadata columns ("WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length"), one optional + metadata column ("WARC-Identified-Payload-Type"), one column "warc_content" with the raw byte content of the WARC record, + and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string. """ io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config @@ -60,6 +61,7 @@ def read_warc( "WARC-Type": DataType.string(), "WARC-Date": DataType.timestamp(TimeUnit.ns(), timezone="Etc/UTC"), "Content-Length": DataType.int64(), + "WARC-Identified-Payload-Type": DataType.string(), "warc_content": DataType.binary(), "warc_headers": DataType.string(), } diff --git a/daft/recordbatch/micropartition.py b/daft/recordbatch/micropartition.py index 7263c5c7ae..1442c36efa 100644 --- a/daft/recordbatch/micropartition.py +++ b/daft/recordbatch/micropartition.py @@ -524,3 +524,18 @@ def read_json( multithreaded_io=multithreaded_io, ) ) + + @classmethod + def read_warc( + cls, + path: str, + io_config: IOConfig | None = None, + multithreaded_io: bool | None = None, + ) -> MicroPartition: + return MicroPartition._from_pymicropartition( + _PyMicroPartition.read_warc( + uri=path, + io_config=io_config, + multithreaded_io=multithreaded_io, + ) + ) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 0f4828a966..9f6c92a839 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -794,6 +794,41 @@ pub fn read_json_into_micropartition( } } +pub fn read_warc_into_micropartition( + uris: &[&str], + schema: SchemaRef, + io_config: Arc, + multithreaded_io: bool, + io_stats: Option, +) -> DaftResult { + let io_client = daft_io::get_io_client(multithreaded_io, io_config)?; + let convert_options = WarcConvertOptions { + limit: None, + include_columns: None, + schema: schema.clone(), + predicate: None, + }; + + match uris { + [] => Ok(MicroPartition::empty(None)), + uris => { + // Perform a bulk read of URIs, materializing a table per URI. + let tables = daft_warc::read_warc_bulk( + uris, + convert_options, + io_client, + io_stats, + multithreaded_io, + None, + 8, + ) + .context(DaftCoreComputeSnafu)?; + + // Construct MicroPartition from tables and unioned schema + Ok(MicroPartition::new_loaded(schema, Arc::new(tables), None)) + } + } +} fn get_file_column_names<'a>( columns: Option<&'a [&'a str]>, partition_spec: Option<&PartitionSpec>, diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index d17abdb16c..0097837ecc 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -761,6 +761,48 @@ impl PyMicroPartition { Ok(mp.into()) } + #[staticmethod] + #[pyo3(signature = ( + uri, + io_config=None, + multithreaded_io=None + ))] + pub fn read_warc( + py: Python, + uri: &str, + io_config: Option, + multithreaded_io: Option, + ) -> PyResult { + let schema = Arc::new(Schema::new(vec![ + Field::new("WARC-Record-ID", daft_core::prelude::DataType::Utf8), + Field::new("WARC-Type", daft_core::prelude::DataType::Utf8), + Field::new( + "WARC-Date", + daft_core::prelude::DataType::Timestamp( + TimeUnit::Nanoseconds, + Some("Etc/UTC".to_string()), + ), + ), + Field::new("Content-Length", daft_core::prelude::DataType::Int64), + Field::new( + "WARC-Identified-Payload-Type", + daft_core::prelude::DataType::Utf8, + ), + Field::new("warc_content", daft_core::prelude::DataType::Binary), + Field::new("warc_headers", daft_core::prelude::DataType::Utf8), + ])?); + let mp = py.allow_threads(|| { + crate::micropartition::read_warc_into_micropartition( + &[uri], + schema.into(), + io_config.unwrap_or_default().config.into(), + multithreaded_io.unwrap_or(true), + None, + ) + })?; + Ok(mp.into()) + } + #[staticmethod] pub fn _from_unloaded_table_state( schema_bytes: &[u8], diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 7448f24f4f..a1e90713d8 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -22,6 +22,7 @@ daft-stats = {path = "../daft-stats", default-features = false} futures = {workspace = true} indexmap = {workspace = true} itertools = {workspace = true} +lazy_static = "1.5.0" parquet2 = {workspace = true} pyo3 = {workspace = true, optional = true} serde = {workspace = true} diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index e5a9a09bb1..1ea4259954 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -3,6 +3,7 @@ use std::{ any::Any, borrow::Cow, + collections::HashMap, fmt::Debug, hash::{Hash, Hasher}, sync::Arc, @@ -15,6 +16,7 @@ use common_scan_info::{Pushdowns, ScanTaskLike, ScanTaskLikeRef}; use daft_schema::schema::{Schema, SchemaRef}; use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use itertools::Itertools; +use lazy_static::lazy_static; use parquet2::metadata::FileMetaData; use serde::{Deserialize, Serialize}; @@ -500,6 +502,21 @@ impl From for ScanTaskLikeRef { pub type ScanTaskRef = Arc; +lazy_static! { + static ref WARC_COLUMN_SIZES: HashMap<&'static str, usize> = { + let mut m = HashMap::new(); + // Average sizes based on analysis of Common Crawl WARC files. + m.insert("WARC-Record-ID", 36); // UUID-style identifiers. + m.insert("WARC-Type", 8); // e.g. "response". + m.insert("WARC-Date", 8); // Timestamp stored as i64 nanoseconds. + m.insert("Content-Length", 8); // i64. + m.insert("WARC-Identified-Payload-Type", 5); // e.g. "text/html". Typically null. + m.insert("warc_content", 27282); // Average content size. + m.insert("warc_headers", 350); // Average headers size. + m + }; +} + impl ScanTask { #[must_use] pub fn new( @@ -671,15 +688,26 @@ impl ScanTask { FileFormatConfig::Csv(_) | FileFormatConfig::Json(_) => { config.csv_inflation_factor } - // TODO(desmond): We can do a lot better here. - FileFormatConfig::Warc(_) => 1.0, + FileFormatConfig::Warc(_) => { + if self.is_gzipped() { + 5.0 + } else { + 1.0 + } + } #[cfg(feature = "python")] FileFormatConfig::Database(_) => 1.0, #[cfg(feature = "python")] FileFormatConfig::PythonFunction => 1.0, }; let in_mem_size: f64 = (file_size as f64) * inflation_factor; - let read_row_size = self.schema.estimate_row_size_bytes(); + let read_row_size = if self.is_warc() { + // Across 100 Common Crawl WARC files, the average record size is 470 (metadata) + 27282 (content) bytes. + // This is 27752 bytes per record. + 27752.0 + } else { + self.schema.estimate_row_size_bytes() + }; in_mem_size / read_row_size }) }); @@ -717,30 +745,68 @@ impl ScanTask { self.size_bytes_on_disk.map(|s| s as usize) } + fn is_warc(&self) -> bool { + matches!(self.file_format_config.as_ref(), FileFormatConfig::Warc(_)) + } + + fn is_gzipped(&self) -> bool { + self.sources + .first() + .and_then(|s| match s { + DataSource::File { path, .. } => { + let filename = std::path::Path::new(path); + Some( + filename + .extension() + .is_some_and(|ext| ext.eq_ignore_ascii_case("gz")), + ) + } + _ => None, + }) + .unwrap_or(false) + } + #[must_use] pub fn estimate_in_memory_size_bytes( &self, config: Option<&DaftExecutionConfig>, ) -> Option { - let mat_schema = self.materialized_schema(); - self.statistics - .as_ref() - .and_then(|s| { - // Derive in-memory size estimate from table stats. - self.num_rows().and_then(|num_rows| { - let row_size = s.estimate_row_size(Some(mat_schema.as_ref())).ok()?; - let estimate = (num_rows as f64) * row_size; - Some(estimate as usize) + // WARC files that are gzipped are often 5x smaller than the uncompressed size. + // For example, see this blog post by Common Crawl: https://commoncrawl.org/blog/february-2025-crawl-archive-now-available + if self.is_warc() { + let approx_num_rows = self.approx_num_rows(config)?; + let mat_schema = self.materialized_schema(); + + // Calculate size based on materialized schema and WARC column sizes + let row_size: usize = mat_schema + .fields + .iter() + .map(|(name, _)| WARC_COLUMN_SIZES.get(name.as_str()).copied().unwrap_or(8)) + .sum(); + + let estimate = (approx_num_rows * row_size as f64) as usize; + Some(estimate) + } else { + let mat_schema = self.materialized_schema(); + self.statistics + .as_ref() + .and_then(|s| { + // Derive in-memory size estimate from table stats. + self.num_rows().and_then(|num_rows| { + let row_size = s.estimate_row_size(Some(mat_schema.as_ref())).ok()?; + let estimate = (num_rows as f64) * row_size; + Some(estimate as usize) + }) }) - }) - .or_else(|| { - // use approximate number of rows multiplied by an approximate bytes-per-row - self.approx_num_rows(config).map(|approx_num_rows| { - let row_size = mat_schema.estimate_row_size_bytes(); - let estimate = approx_num_rows * row_size; - estimate as usize + .or_else(|| { + // use approximate number of rows multiplied by an approximate bytes-per-row + self.approx_num_rows(config).map(|approx_num_rows| { + let row_size = mat_schema.estimate_row_size_bytes(); + let estimate = approx_num_rows * row_size; + estimate as usize + }) }) - }) + } } #[must_use] diff --git a/src/daft-warc/Cargo.toml b/src/daft-warc/Cargo.toml index eadb944fe7..0caf99fd1b 100644 --- a/src/daft-warc/Cargo.toml +++ b/src/daft-warc/Cargo.toml @@ -10,7 +10,6 @@ daft-io = {path = "../daft-io", default-features = false} daft-recordbatch = {path = "../daft-recordbatch", default-features = false} flate2 = {version = "1.1", features = ["zlib-rs"], default-features = false} futures = {workspace = true} -rayon = {workspace = true} serde_json = {workspace = true} snafu = {workspace = true} tokio = {workspace = true} diff --git a/src/daft-warc/src/lib.rs b/src/daft-warc/src/lib.rs index 880114bd7a..1a3613e588 100644 --- a/src/daft-warc/src/lib.rs +++ b/src/daft-warc/src/lib.rs @@ -11,7 +11,6 @@ use daft_dsl::ExprRef; use daft_io::{GetResult, IOClient, IOStatsRef}; use daft_recordbatch::RecordBatch; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use snafu::{futures::try_future::TryFutureExt, Snafu}; use tokio::{ fs::File, @@ -92,6 +91,7 @@ struct WarcHeaderState { record_id: Option, warc_date: Option>, warc_type: Option, + warc_identified_payload_type: Option, header_lines: Vec<(String, String)>, } @@ -101,6 +101,7 @@ impl WarcHeaderState { self.record_id = None; self.warc_date = None; self.warc_type = None; + self.warc_identified_payload_type = None; self.header_lines.clear(); } } @@ -112,6 +113,7 @@ struct WarcRecordBatchBuilder { warc_type_array: MutableUtf8Array, warc_date_array: MutablePrimitiveArray, warc_content_length_array: MutablePrimitiveArray, + warc_identified_payload_type_array: MutableUtf8Array, content_array: MutableBinaryArray, header_array: MutableUtf8Array, rows_processed: usize, @@ -123,7 +125,7 @@ struct WarcRecordBatchBuilder { impl WarcRecordBatchBuilder { const DEFAULT_STRING_LENGTH: usize = 20; - const DEFAULT_CONTENT_LENGTH: usize = 100; + const DEFAULT_CONTENT_LENGTH: usize = 27282; // 27282 is the average content length of a WARC file sampled from 100 Common Crawl WARC files. fn new(chunk_size: usize, schema: SchemaRef) -> Self { Self { @@ -139,6 +141,10 @@ impl WarcRecordBatchBuilder { ), warc_date_array: MutablePrimitiveArray::with_capacity(chunk_size), warc_content_length_array: MutablePrimitiveArray::with_capacity(chunk_size), + warc_identified_payload_type_array: MutableUtf8Array::with_capacities( + chunk_size, + Self::DEFAULT_STRING_LENGTH * chunk_size, + ), content_array: MutableBinaryArray::with_capacities( chunk_size, Self::DEFAULT_CONTENT_LENGTH * chunk_size, @@ -161,12 +167,15 @@ impl WarcRecordBatchBuilder { warc_type: Option<&str>, warc_date: Option, warc_content_length: Option, + warc_identified_payload_type: Option<&str>, header: Option<&str>, ) { self.record_id_array.push(record_id); self.warc_type_array.push(warc_type); self.warc_date_array.push(warc_date); self.warc_content_length_array.push(warc_content_length); + self.warc_identified_payload_type_array + .push(warc_identified_payload_type); self.header_array.push(header); // book keeping self.rows_processed += 1; @@ -192,6 +201,7 @@ impl WarcRecordBatchBuilder { self.warc_type_array.as_box(), self.warc_date_array.as_box(), self.warc_content_length_array.as_box(), + self.warc_identified_payload_type_array.as_box(), self.content_array.as_box(), self.header_array.as_box(), ], @@ -210,6 +220,8 @@ impl WarcRecordBatchBuilder { MutableUtf8Array::with_capacities(chunk_size, avg_warc_type_size * chunk_size); self.warc_date_array = MutablePrimitiveArray::with_capacity(chunk_size); self.warc_content_length_array = MutablePrimitiveArray::with_capacity(chunk_size); + self.warc_identified_payload_type_array = + MutableUtf8Array::with_capacities(chunk_size, avg_warc_type_size * chunk_size); let avg_content_size = self.content_bytes_so_far / rows_processed; @@ -248,6 +260,7 @@ impl WarcRecordBatchIterator { record_id: None, warc_date: None, warc_type: None, + warc_identified_payload_type: None, header_lines: Vec::new(), }, rb_builder: WarcRecordBatchBuilder::new(chunk_size, schema), @@ -300,6 +313,7 @@ impl WarcRecordBatchIterator { .warc_date .and_then(|d| d.timestamp_nanos_opt()), self.header_state.content_length.map(|len| len as i64), + self.header_state.warc_identified_payload_type.as_deref(), Some(&header_json), ); @@ -354,6 +368,9 @@ impl WarcRecordBatchIterator { self.header_state.warc_date = Some(date.with_timezone(&Utc)); } } + "WARC-Identified-Payload-Type" => { + self.header_state.warc_identified_payload_type = Some(value); + } _ => { // Store non-mandatory headers. self.header_state.header_lines.push((key, value)); @@ -404,7 +421,7 @@ pub fn read_warc_bulk( io_stats.clone(), ); tokio::task::spawn(async move { - read_warc_single_into_table( + read_warc_single_into_tables( uri.as_str(), convert_options, io_client, @@ -426,8 +443,10 @@ pub fn read_warc_bulk( // Limit has been met, early-terminate. (_, Some(rows_left)) if rows_left <= 0 => futures::future::ready(Ok(false)), // Limit has not yet been met, update remaining limit slack and continue. - (Ok(table), Some(rows_left)) => { - remaining_rows = Some(rows_left - table.len() as i64); + (Ok(tables), Some(rows_left)) => { + for table in tables { + remaining_rows = Some(rows_left - table.len() as i64); + } futures::future::ready(Ok(true)) } // (1) No limit, never early-terminate. @@ -435,22 +454,21 @@ pub fn read_warc_bulk( (_, None) | (Err(_), _) => futures::future::ready(Ok(true)), } }) + .map_ok(|tables| tables.into_iter().flatten().collect::>()) .try_collect::>() .await })?; - tables.into_iter().collect::>>() + Ok(tables.into_iter().flatten().collect::>()) } -async fn read_warc_single_into_table( +async fn read_warc_single_into_tables( uri: &str, convert_options: WarcConvertOptions, io_client: Arc, io_stats: Option, max_chunks_in_flight: Option, -) -> DaftResult { - let limit = convert_options.limit; - let schema = convert_options.schema.clone(); +) -> DaftResult> { let record_batch_stream = stream_warc( uri, io_client, @@ -459,24 +477,12 @@ async fn read_warc_single_into_table( max_chunks_in_flight, ) .await?; - let tables = Box::pin(record_batch_stream); - let collected_tables = tables + let collected_tables = record_batch_stream .try_collect::>() .await? .into_iter() .collect::>(); - if collected_tables.is_empty() { - RecordBatch::empty(Some(schema)) - } else { - let concated_table = tables_concat(collected_tables)?; - if let Some(limit) = limit - && concated_table.len() > limit - { - concated_table.head(limit) - } else { - Ok(concated_table) - } - } + Ok(collected_tables) } pub async fn stream_warc( @@ -606,42 +612,3 @@ fn combine_stream( } }) } - -// TODO(desmond): This can be deduped for all the different file format readers. -fn tables_concat(mut tables: Vec) -> DaftResult { - if tables.is_empty() { - return Err(DaftError::ValueError( - "Need at least 1 Table to perform concat".to_string(), - )); - } - if tables.len() == 1 { - return Ok(tables.pop().unwrap()); - } - let first_table = tables.as_slice().first().unwrap(); - - let first_schema = &first_table.schema; - for tab in tables.iter().skip(1) { - if tab.schema.as_ref() != first_schema.as_ref() { - return Err(DaftError::SchemaMismatch(format!( - "Table concat requires all schemas to match, {} vs {}", - first_schema, tab.schema - ))); - } - } - let num_columns = first_table.num_columns(); - let new_series = (0..num_columns) - .into_par_iter() - .map(|i| { - let series_to_cat: Vec<&Series> = tables - .iter() - .map(|s| s.as_ref().get_column_by_index(i).unwrap()) - .collect(); - Series::concat(series_to_cat.as_slice()) - }) - .collect::>>()?; - RecordBatch::new_with_size( - first_table.schema.clone(), - new_series, - tables.iter().map(daft_recordbatch::RecordBatch::len).sum(), - ) -}