Skip to content

Commit

Permalink
feat: Low level flight interface (#19239)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 15, 2024
1 parent 9376ce3 commit bd126d5
Show file tree
Hide file tree
Showing 10 changed files with 522 additions and 287 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-data = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
async-stream = { version = "0.3", optional = true }
tokio = { workspace = true, optional = true, features = ["io-util"] }

[dev-dependencies]
criterion = "0.5"
Expand Down Expand Up @@ -116,7 +117,7 @@ full = [
arrow_rs = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"]
io_ipc = ["arrow-format", "polars-error/arrow-format"]
io_ipc_compression = ["lz4", "zstd", "io_ipc"]
io_flight = ["io_ipc", "arrow-format/flight-data"]
io_flight = ["io_ipc", "arrow-format/flight-data", "async-stream", "futures", "tokio"]

io_avro = ["avro-schema", "polars-error/avro-schema"]
io_avro_compression = [
Expand Down
1 change: 1 addition & 0 deletions crates/polars-arrow/src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl ArrowArray {
ArrowDataType::BinaryView | ArrowDataType::Utf8View
);

#[allow(unused_mut)]
let (offset, mut buffers, children, dictionary) =
offset_buffers_children_dictionary(array.as_ref());

Expand Down
241 changes: 0 additions & 241 deletions crates/polars-arrow/src/io/flight/mod.rs

This file was deleted.

81 changes: 58 additions & 23 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,7 @@ pub fn read_file_dictionaries<R: Read + Seek>(
Ok(dictionaries)
}

/// Reads the footer's length and magic number in footer
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
// read footer length and magic number in footer
let end = reader.seek(SeekFrom::End(-10))? + 10;

let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer)?;
pub(super) fn decode_footer_len(footer: [u8; 10], end: u64) -> PolarsResult<(u64, usize)> {
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC_V2 {
Expand All @@ -152,6 +145,17 @@ fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)>
Ok((end, footer_len))
}

/// Reads the footer's length and magic number in footer
fn read_footer_len<R: Read + Seek>(reader: &mut R) -> PolarsResult<(u64, usize)> {
// read footer length and magic number in footer
let end = reader.seek(SeekFrom::End(-10))? + 10;

let mut footer: [u8; 10] = [0; 10];

reader.read_exact(&mut footer)?;
decode_footer_len(footer, end)
}

fn read_footer<R: Read + Seek>(reader: &mut R, footer_len: usize) -> PolarsResult<Vec<u8>> {
// read footer
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
Expand Down Expand Up @@ -187,29 +191,60 @@ fn deserialize_footer_blocks(
Ok((footer, blocks))
}

pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
let (footer, blocks) = deserialize_footer_blocks(footer_data)?;
pub(super) fn deserialize_footer_ref(footer_data: &[u8]) -> PolarsResult<FooterRef> {
arrow_format::ipc::FooterRef::read_as_root(footer_data)
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferFooter(err)))
}

let ipc_schema = footer
pub(super) fn deserialize_schema_ref_from_footer(
footer: arrow_format::ipc::FooterRef,
) -> PolarsResult<arrow_format::ipc::SchemaRef> {
footer
.schema()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferSchema(err)))?
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingSchema))
}

/// Get the IPC blocks from the footer containing record batches
pub(super) fn iter_recordbatch_blocks_from_footer(
footer: arrow_format::ipc::FooterRef,
) -> PolarsResult<impl Iterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_> {
let blocks = footer
.record_batches()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))?
.ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingRecordBatches))?;

Ok(blocks.iter().map(|block| {
block
.try_into()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))
}))
}

pub(super) fn iter_dictionary_blocks_from_footer(
footer: arrow_format::ipc::FooterRef,
) -> PolarsResult<Option<impl Iterator<Item = PolarsResult<arrow_format::ipc::Block>> + '_>> {
let dictionaries = footer
.dictionaries()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?
.map(|dictionaries| {
dictionaries
.into_iter()
.map(|block| {
block.try_into().map_err(|err| {
polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
})
})
.collect::<PolarsResult<Vec<_>>>()
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferDictionaries(err)))?;

Ok(dictionaries.map(|dicts| {
dicts.into_iter().map(|block| {
block.try_into().map_err(|err| {
polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err))
})
})
}))
}

pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
let footer = deserialize_footer_ref(footer_data)?;
let blocks = iter_recordbatch_blocks_from_footer(footer)?.collect::<PolarsResult<Vec<_>>>()?;
let dictionaries = iter_dictionary_blocks_from_footer(footer)?
.map(|dicts| dicts.collect::<PolarsResult<Vec<_>>>())
.transpose()?;
let ipc_schema = deserialize_schema_ref_from_footer(footer)?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;

Ok(FileMetadata {
schema: Arc::new(schema),
Expand Down
Loading

0 comments on commit bd126d5

Please sign in to comment.