Skip to content

Commit

Permalink
Merge pull request #3868 from PsiACE/bump-arrow2
Browse files Browse the repository at this point in the history
[deps] bump arrow2 to d14ae86
  • Loading branch information
sundy-li authored Jan 24, 2022
2 parents dc8bfed + 6e75e08 commit 60bba3d
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ simd = ["arrow/simd"]
# Workspace dependencies

# Github dependencies
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "f07cc2c"}
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "d14ae86"}
arrow-format = { version = "0.3.0", features = ["flight-data", "flight-service"] }
parquet2 = { version = "0.8.1", default_features = false }
# Crates.io dependencies
Expand Down
36 changes: 26 additions & 10 deletions query/src/api/rpc/flight_client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,21 @@ impl FlightDataStream {
}

let arrow_schema = Arc::new(schema.to_arrow());
Ok(
deserialize_batch(&flight_data, arrow_schema, true, &Default::default())
.map(create_data_block)?,
let ipc_fields = common_arrow::arrow::io::ipc::write::default_ipc_fields(
&arrow_schema.fields,
);
let ipc_schema = common_arrow::arrow::io::ipc::IpcSchema {
fields: ipc_fields,
is_little_endian: true,
};

Ok(deserialize_batch(
&flight_data,
arrow_schema,
&ipc_schema,
&Default::default(),
)
.map(create_data_block)?)
}
}
})
Expand All @@ -66,7 +77,7 @@ impl FlightDataStream {
#[inline]
#[allow(dead_code)]
pub fn from_receiver(
schema_ref: DataSchemaRef,
schema: DataSchemaRef,
inner: Receiver<Result<FlightData, ErrorCode>>,
) -> impl Stream<Item = Result<DataBlock, ErrorCode>> {
ReceiverStream::new(inner).map(move |flight_data| match flight_data {
Expand All @@ -83,13 +94,18 @@ impl FlightDataStream {
DataBlock::create(Arc::new(schema), columns)
}

Ok(deserialize_batch(
&flight_data,
Arc::new(schema_ref.to_arrow()),
true,
&Default::default(),
let arrow_schema = Arc::new(schema.to_arrow());
let ipc_fields =
common_arrow::arrow::io::ipc::write::default_ipc_fields(&arrow_schema.fields);
let ipc_schema = common_arrow::arrow::io::ipc::IpcSchema {
fields: ipc_fields,
is_little_endian: true,
};

Ok(
deserialize_batch(&flight_data, arrow_schema, &ipc_schema, &Default::default())
.map(create_data_block)?,
)
.map(create_data_block)?)
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions query/src/api/rpc/flight_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ impl DatabendQueryFlightDispatcher {
}

#[tracing::instrument(level = "debug", skip_all)]
pub fn get_stream(&self, ticket: &StreamTicket) -> Result<mpsc::Receiver<Result<DataBlock>>> {
pub fn get_stream(
&self,
ticket: &StreamTicket,
) -> Result<(mpsc::Receiver<Result<DataBlock>>, DataSchemaRef)> {
let stage_name = format!("{}/{}", ticket.query_id, ticket.stage_id);
if let Some(notify) = self.stages_notify.write().remove(&stage_name) {
notify.notify_waiters();
}

let stream_name = format!("{}/{}", stage_name, ticket.stream);
match self.streams.write().remove(&stream_name) {
Some(stream_info) => Ok(stream_info.rx),
Some(stream_info) => Ok((stream_info.rx, stream_info.schema)),
None => Err(ErrorCode::NotFoundStream("Stream is not found")),
}
}
Expand Down
11 changes: 9 additions & 2 deletions query/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::convert::TryInto;
use std::pin::Pin;
use std::sync::Arc;

use common_arrow::arrow::io::flight::serialize_schema;
use common_arrow::arrow::io::ipc::write::default_ipc_fields;
use common_arrow::arrow_format::flight::data::Action;
use common_arrow::arrow_format::flight::data::ActionType;
use common_arrow::arrow_format::flight::data::Criteria;
Expand Down Expand Up @@ -106,10 +108,15 @@ impl FlightService for DatabendQueryFlightService {

match ticket {
FlightTicket::StreamTicket(steam_ticket) => {
let receiver = self.dispatcher.get_stream(&steam_ticket)?;
let (receiver, data_schema) = self.dispatcher.get_stream(&steam_ticket)?;
let arrow_schema = data_schema.to_arrow();
let ipc_fields = default_ipc_fields(arrow_schema.fields());

serialize_schema(&arrow_schema, &ipc_fields);

Ok(RawResponse::new(
Box::pin(FlightDataStream::create(receiver)) as FlightStream<FlightData>,
Box::pin(FlightDataStream::create(receiver, ipc_fields))
as FlightStream<FlightData>,
))
}
}
Expand Down
11 changes: 9 additions & 2 deletions query/src/api/rpc/flight_service_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::convert::TryInto;

use common_arrow::arrow::io::flight::serialize_batch;
use common_arrow::arrow::io::ipc::write::WriteOptions;
use common_arrow::arrow::io::ipc::IpcField;
use common_arrow::arrow_format::flight::data::FlightData;
use common_base::tokio::macros::support::Pin;
use common_base::tokio::macros::support::Poll;
Expand All @@ -27,13 +28,18 @@ use tonic::Status;

pub struct FlightDataStream {
input: Receiver<common_exception::Result<DataBlock>>,
ipc_fields: Vec<IpcField>,
options: WriteOptions,
}

impl FlightDataStream {
pub fn create(input: Receiver<common_exception::Result<DataBlock>>) -> FlightDataStream {
pub fn create(
input: Receiver<common_exception::Result<DataBlock>>,
ipc_fields: Vec<IpcField>,
) -> FlightDataStream {
FlightDataStream {
input,
ipc_fields,
options: WriteOptions { compression: None },
}
}
Expand All @@ -49,7 +55,8 @@ impl Stream for FlightDataStream {
Some(Ok(block)) => match block.try_into() {
Err(error) => Some(Err(Status::from(error))),
Ok(record_batch) => {
let (dicts, values) = serialize_batch(&record_batch, &self.options);
let (dicts, values) =
serialize_batch(&record_batch, &self.ipc_fields, &self.options);

match dicts.is_empty() {
true => Some(Ok(values)),
Expand Down
6 changes: 3 additions & 3 deletions query/tests/it/api/rpc/flight_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn test_run_shuffle_action_with_no_scatters() -> Result<()> {
.await?;

let stream = stream_ticket(&query_id, &stage_id, &stream_id);
let receiver = flight_dispatcher.get_stream(&stream)?;
let (receiver, _data_scheme) = flight_dispatcher.get_stream(&stream)?;
let receiver_stream = ReceiverStream::new(receiver);
let collect_data_blocks = receiver_stream.collect::<Result<Vec<_>>>();

Expand Down Expand Up @@ -114,7 +114,7 @@ async fn test_run_shuffle_action_with_scatter() -> Result<()> {
.await?;

let stream_1 = stream_ticket(&query_id, &stage_id, "stream_1");
let receiver = flight_dispatcher.get_stream(&stream_1)?;
let (receiver, _data_scheme) = flight_dispatcher.get_stream(&stream_1)?;
let receiver_stream = ReceiverStream::new(receiver);
let collect_data_blocks = receiver_stream.collect::<Result<Vec<_>>>();

Expand All @@ -131,7 +131,7 @@ async fn test_run_shuffle_action_with_scatter() -> Result<()> {
assert_blocks_eq(expect, &collect_data_blocks.await?);

let stream_2 = stream_ticket(&query_id, &stage_id, "stream_2");
let receiver = flight_dispatcher.get_stream(&stream_2)?;
let (receiver, _data_scheme) = flight_dispatcher.get_stream(&stream_2)?;
let receiver_stream = ReceiverStream::new(receiver);
let collect_data_blocks = receiver_stream.collect::<Result<Vec<_>>>();

Expand Down

1 comment on commit 60bba3d

@vercel
Copy link

@vercel vercel bot commented on 60bba3d Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-flaneur2021.vercel.app
databend-git-main-flaneur2021.vercel.app
databend-opal.vercel.app

Please sign in to comment.