Skip to content

Commit

Permalink
perf: Reduce memory consumption for WARC reads and improve estimates (#…
Browse files Browse the repository at this point in the history
…3935)

This PR makes the following changes for `read_warc`:
- Reduce memory consumption
- Adds `WARC-Identified-Payload-Type` as an extracted metadata column
- Improve stats estimation for scan tasks that read WARC

## Reduced memory consumption

When reading a single Common Crawl file, the file size is typically 1GB,
which decompresses to 5GB of data.

Before this Resident Set Size peaks at `5.15GB` while heap size peaks at
`10.98GB`:


![8D239774-4801-4350-A560-05CEA1202CB5_1_201_a](https://github.com/user-attachments/assets/5931ec32-bdce-4b2b-8838-a72e5f7ad6f4)

After this PR, Resident Set Size peaks at `4.3GB` while heap size peaks
at `6.6GB`, which is more in line with expectations:


![D4676719-F780-4745-8E72-A269F07B6D05_1_201_a](https://github.com/user-attachments/assets/defc2a24-38a2-4adf-9d5f-529f0a895212)

## Additional `WARC-Identified-Payload-Type` metadata column

For ease of filtering WARC records, we extract
`WARC-Identified-Payload-Type` from the metadata as its own column.
Since this is an optional column, it is often NULL.

## Stats estimation

A single Common Crawl .warc.gz file is typically 1GB in size, but takes
up ~5GB of memory once decompressed.

For a .warc.gz file with `145,717` records, before this PR we would
estimate:

```
Stats = { Approx num rows = 9,912,769, Approx size bytes = 914.63 MiB,
Accumulated selectivity = 1.00 }
```

After this PR, we now estimate:

```
Stats = { Approx num rows = 167,773, Approx size bytes = 4.34 GiB, Accumulated
selectivity = 1.00 }
```

which is much closer to reality.

### Estimations with pushdowns

When doing `daft.read_warc("file.warc.gz").select("Content-Length")`, we
estimate `1.32 MiB` and in reality store `1.13 MiB`.

When doing
`daft.read_warc("cc-original.warc.gz").select("warc_content")`, we
estimate `4.39 GiB` and in reality store `3.82 GiB`.
  • Loading branch information
desmondcheongzx authored Mar 8, 2025
1 parent 8163982 commit c7df611
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 87 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1582,6 +1582,13 @@ class PyMicroPartition:
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...
@classmethod
def read_warc(
cls,
uri: str,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
): ...

class PhysicalPlanScheduler:
"""A work scheduler for physical query plans."""
Expand Down
6 changes: 4 additions & 2 deletions daft/io/_warc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def read_warc(
Defaults to None, which will let Daft decide based on the runner it is currently using.
returns:
DataFrame: parsed DataFrame with mandatory metadata columns: "WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length", one column "warc_content"
with the raw byte content of the WARC record, and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string.
DataFrame: parsed DataFrame with mandatory metadata columns ("WARC-Record-ID", "WARC-Type", "WARC-Date", "Content-Length"), one optional
metadata column ("WARC-Identified-Payload-Type"), one column "warc_content" with the raw byte content of the WARC record,
and one column "warc_headers" with the remaining headers of the WARC record stored as a JSON string.
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand All @@ -60,6 +61,7 @@ def read_warc(
"WARC-Type": DataType.string(),
"WARC-Date": DataType.timestamp(TimeUnit.ns(), timezone="Etc/UTC"),
"Content-Length": DataType.int64(),
"WARC-Identified-Payload-Type": DataType.string(),
"warc_content": DataType.binary(),
"warc_headers": DataType.string(),
}
Expand Down
15 changes: 15 additions & 0 deletions daft/recordbatch/micropartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,18 @@ def read_json(
multithreaded_io=multithreaded_io,
)
)

@classmethod
def read_warc(
cls,
path: str,
io_config: IOConfig | None = None,
multithreaded_io: bool | None = None,
) -> MicroPartition:
return MicroPartition._from_pymicropartition(
_PyMicroPartition.read_warc(
uri=path,
io_config=io_config,
multithreaded_io=multithreaded_io,
)
)
35 changes: 35 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,41 @@ pub fn read_json_into_micropartition(
}
}

pub fn read_warc_into_micropartition(
uris: &[&str],
schema: SchemaRef,
io_config: Arc<IOConfig>,
multithreaded_io: bool,
io_stats: Option<IOStatsRef>,
) -> DaftResult<MicroPartition> {
let io_client = daft_io::get_io_client(multithreaded_io, io_config)?;
let convert_options = WarcConvertOptions {
limit: None,
include_columns: None,
schema: schema.clone(),
predicate: None,
};

match uris {
[] => Ok(MicroPartition::empty(None)),
uris => {
// Perform a bulk read of URIs, materializing a table per URI.
let tables = daft_warc::read_warc_bulk(
uris,
convert_options,
io_client,
io_stats,
multithreaded_io,
None,
8,
)
.context(DaftCoreComputeSnafu)?;

// Construct MicroPartition from tables and unioned schema
Ok(MicroPartition::new_loaded(schema, Arc::new(tables), None))
}
}
}
fn get_file_column_names<'a>(
columns: Option<&'a [&'a str]>,
partition_spec: Option<&PartitionSpec>,
Expand Down
42 changes: 42 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,48 @@ impl PyMicroPartition {
Ok(mp.into())
}

#[staticmethod]
#[pyo3(signature = (
uri,
io_config=None,
multithreaded_io=None
))]
pub fn read_warc(
py: Python,
uri: &str,
io_config: Option<IOConfig>,
multithreaded_io: Option<bool>,
) -> PyResult<Self> {
let schema = Arc::new(Schema::new(vec![
Field::new("WARC-Record-ID", daft_core::prelude::DataType::Utf8),
Field::new("WARC-Type", daft_core::prelude::DataType::Utf8),
Field::new(
"WARC-Date",
daft_core::prelude::DataType::Timestamp(
TimeUnit::Nanoseconds,
Some("Etc/UTC".to_string()),
),
),
Field::new("Content-Length", daft_core::prelude::DataType::Int64),
Field::new(
"WARC-Identified-Payload-Type",
daft_core::prelude::DataType::Utf8,
),
Field::new("warc_content", daft_core::prelude::DataType::Binary),
Field::new("warc_headers", daft_core::prelude::DataType::Utf8),
])?);
let mp = py.allow_threads(|| {
crate::micropartition::read_warc_into_micropartition(
&[uri],
schema.into(),
io_config.unwrap_or_default().config.into(),
multithreaded_io.unwrap_or(true),
None,
)
})?;
Ok(mp.into())
}

#[staticmethod]
pub fn _from_unloaded_table_state(
schema_bytes: &[u8],
Expand Down
1 change: 1 addition & 0 deletions src/daft-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ daft-stats = {path = "../daft-stats", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true}
itertools = {workspace = true}
lazy_static = "1.5.0"
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true}
Expand Down
106 changes: 86 additions & 20 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{
any::Any,
borrow::Cow,
collections::HashMap,
fmt::Debug,
hash::{Hash, Hasher},
sync::Arc,
Expand All @@ -15,6 +16,7 @@ use common_scan_info::{Pushdowns, ScanTaskLike, ScanTaskLikeRef};
use daft_schema::schema::{Schema, SchemaRef};
use daft_stats::{PartitionSpec, TableMetadata, TableStatistics};
use itertools::Itertools;
use lazy_static::lazy_static;
use parquet2::metadata::FileMetaData;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -500,6 +502,21 @@ impl From<ScanTask> for ScanTaskLikeRef {

pub type ScanTaskRef = Arc<ScanTask>;

lazy_static! {
static ref WARC_COLUMN_SIZES: HashMap<&'static str, usize> = {
let mut m = HashMap::new();
// Average sizes based on analysis of Common Crawl WARC files.
m.insert("WARC-Record-ID", 36); // UUID-style identifiers.
m.insert("WARC-Type", 8); // e.g. "response".
m.insert("WARC-Date", 8); // Timestamp stored as i64 nanoseconds.
m.insert("Content-Length", 8); // i64.
m.insert("WARC-Identified-Payload-Type", 5); // e.g. "text/html". Typically null.
m.insert("warc_content", 27282); // Average content size.
m.insert("warc_headers", 350); // Average headers size.
m
};
}

impl ScanTask {
#[must_use]
pub fn new(
Expand Down Expand Up @@ -671,15 +688,26 @@ impl ScanTask {
FileFormatConfig::Csv(_) | FileFormatConfig::Json(_) => {
config.csv_inflation_factor
}
// TODO(desmond): We can do a lot better here.
FileFormatConfig::Warc(_) => 1.0,
FileFormatConfig::Warc(_) => {
if self.is_gzipped() {
5.0
} else {
1.0
}
}
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => 1.0,
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => 1.0,
};
let in_mem_size: f64 = (file_size as f64) * inflation_factor;
let read_row_size = self.schema.estimate_row_size_bytes();
let read_row_size = if self.is_warc() {
// Across 100 Common Crawl WARC files, the average record size is 470 (metadata) + 27282 (content) bytes.
// This is 27752 bytes per record.
27752.0
} else {
self.schema.estimate_row_size_bytes()
};
in_mem_size / read_row_size
})
});
Expand Down Expand Up @@ -717,30 +745,68 @@ impl ScanTask {
self.size_bytes_on_disk.map(|s| s as usize)
}

fn is_warc(&self) -> bool {
matches!(self.file_format_config.as_ref(), FileFormatConfig::Warc(_))
}

fn is_gzipped(&self) -> bool {
self.sources
.first()
.and_then(|s| match s {
DataSource::File { path, .. } => {
let filename = std::path::Path::new(path);
Some(
filename
.extension()
.is_some_and(|ext| ext.eq_ignore_ascii_case("gz")),
)
}
_ => None,
})
.unwrap_or(false)
}

#[must_use]
pub fn estimate_in_memory_size_bytes(
&self,
config: Option<&DaftExecutionConfig>,
) -> Option<usize> {
let mat_schema = self.materialized_schema();
self.statistics
.as_ref()
.and_then(|s| {
// Derive in-memory size estimate from table stats.
self.num_rows().and_then(|num_rows| {
let row_size = s.estimate_row_size(Some(mat_schema.as_ref())).ok()?;
let estimate = (num_rows as f64) * row_size;
Some(estimate as usize)
// WARC files that are gzipped are often 5x smaller than the uncompressed size.
// For example, see this blog post by Common Crawl: https://commoncrawl.org/blog/february-2025-crawl-archive-now-available
if self.is_warc() {
let approx_num_rows = self.approx_num_rows(config)?;
let mat_schema = self.materialized_schema();

// Calculate size based on materialized schema and WARC column sizes
let row_size: usize = mat_schema
.fields
.iter()
.map(|(name, _)| WARC_COLUMN_SIZES.get(name.as_str()).copied().unwrap_or(8))
.sum();

let estimate = (approx_num_rows * row_size as f64) as usize;
Some(estimate)
} else {
let mat_schema = self.materialized_schema();
self.statistics
.as_ref()
.and_then(|s| {
// Derive in-memory size estimate from table stats.
self.num_rows().and_then(|num_rows| {
let row_size = s.estimate_row_size(Some(mat_schema.as_ref())).ok()?;
let estimate = (num_rows as f64) * row_size;
Some(estimate as usize)
})
})
})
.or_else(|| {
// use approximate number of rows multiplied by an approximate bytes-per-row
self.approx_num_rows(config).map(|approx_num_rows| {
let row_size = mat_schema.estimate_row_size_bytes();
let estimate = approx_num_rows * row_size;
estimate as usize
.or_else(|| {
// use approximate number of rows multiplied by an approximate bytes-per-row
self.approx_num_rows(config).map(|approx_num_rows| {
let row_size = mat_schema.estimate_row_size_bytes();
let estimate = approx_num_rows * row_size;
estimate as usize
})
})
})
}
}

#[must_use]
Expand Down
1 change: 0 additions & 1 deletion src/daft-warc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ daft-io = {path = "../daft-io", default-features = false}
daft-recordbatch = {path = "../daft-recordbatch", default-features = false}
flate2 = {version = "1.1", features = ["zlib-rs"], default-features = false}
futures = {workspace = true}
rayon = {workspace = true}
serde_json = {workspace = true}
snafu = {workspace = true}
tokio = {workspace = true}
Expand Down
Loading

0 comments on commit c7df611

Please sign in to comment.