Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Fix multi-lines printing issue for datafusion-cli #14954

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
use std::fs::File;
use std::io::BufReader;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -62,9 +63,24 @@ impl Command {
Self::Help => {
let now = Instant::now();
let command_batch = all_commands_info();
let schema = command_batch.schema();
let num_rows = command_batch.num_rows();
print_options.print_batches(schema, &[command_batch], now, num_rows)
let schema = command_batch.schema();
let stdout = std::io::stdout();
let writer = &mut stdout.lock();
// Help is using the default format Automatic
print_options.format.print_no_table_batches(
writer,
schema,
&[command_batch],
true,
)?;
let formatted_exec_details =
print_options.get_execution_details_formatted(num_rows, now);

if !print_options.quiet {
writeln!(writer, "{formatted_exec_details}")?;
}
Ok(())
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
Expand Down
57 changes: 15 additions & 42 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
Expand All @@ -41,13 +35,15 @@ use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use datafusion::sql::sqlparser::dialect::dialect_from_str;
use rustyline::error::ReadlineError;
use rustyline::Editor;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::sync::Arc;
use tokio::signal;

/// run and execute SQL statements and commands, against a context with the given print options
Expand Down Expand Up @@ -229,57 +225,34 @@ pub(super) async fn exec_and_print(
for statement in statements {
let adjusted =
AdjustedPrintOptions::new(print_options.clone()).with_statement(&statement);

let plan = create_plan(ctx, statement).await?;
let adjusted = adjusted.with_plan(&plan);

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
let is_unbounded = physical_plan.boundedness().is_unbounded();
let stream = execute_stream(Arc::clone(&physical_plan), task_ctx.clone())?;
let schema = physical_plan.schema();

if physical_plan.boundedness().is_unbounded() {
// Both bounded and unbounded streams are streaming prints
if is_unbounded {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
print_options.print_stream(schema, stream, now).await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
row_count += curr_num_rows;
}
// We need to finalize and return the inner `PrintOptions` for unbounded streams
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
reservation.free();
.print_stream(schema, stream, now)
.await?;
}
}

Ok(())
}

Expand Down
Loading