diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index dc9207da51cb..fc295b078c74 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -277,6 +277,11 @@ impl DataSource for FileScanConfig { file_scan.new_exec() as _ })) } + + fn collect_info(&self) -> HashMap { + // TODO: collect explain info + HashMap::new() + } } impl FileScanConfig { diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 6f61c8623e97..f209b8529a7e 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -18,6 +18,7 @@ //! Implementation of physical plan display. See //! [`crate::displayable`] for examples of how to format +use std::collections::HashMap; use std::fmt::Formatter; use std::{fmt, str::FromStr}; @@ -28,6 +29,8 @@ use datafusion_common::DataFusionError; use datafusion_expr::display_schema; use datafusion_physical_expr::LexOrdering; +use crate::render_tree::RenderTree; + use super::{accept, ExecutionPlan, ExecutionPlanVisitor}; /// Options for controlling how each [`ExecutionPlan`] should format itself @@ -243,36 +246,16 @@ impl<'a> DisplayableExecutionPlan<'a> { } pub fn tree_render(&self) -> impl fmt::Display + 'a { - let format_type = DisplayFormatType::TreeRender; struct Wrapper<'a> { - format_type: DisplayFormatType, plan: &'a dyn ExecutionPlan, - show_metrics: ShowMetrics, - show_statistics: bool, - show_schema: bool, } impl fmt::Display for Wrapper<'_> { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - let mut visitor = TreeRenderVisitor { - t: self.format_type, - f, - indent: 0, - show_metrics: self.show_metrics, - show_statistics: self.show_statistics, - show_schema: self.show_schema, - height: 0, - weight: 0, - }; - accept(self.plan, &mut visitor) + let mut visitor = TreeRenderVisitor { f }; + visitor.visit(self.plan) } } - Wrapper { - format_type, - plan: self.inner, - show_metrics: self.show_metrics, - show_statistics: self.show_statistics, - show_schema: self.show_schema, - } + Wrapper { plan: self.inner } } /// Return a single-line summary of the root of the plan @@ -315,13 +298,11 @@ impl<'a> DisplayableExecutionPlan<'a> { plan_type: PlanType, explain_format: DisplayFormatType, ) -> StringifiedPlan { - match explain_format { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - StringifiedPlan::new(plan_type, self.indent(verbose).to_string()) - } - DisplayFormatType::TreeRender => { + match (&explain_format, &plan_type) { + (DisplayFormatType::TreeRender, PlanType::FinalPhysicalPlan) => { StringifiedPlan::new(plan_type, self.tree_render().to_string()) } + _ => StringifiedPlan::new(plan_type, self.indent(verbose).to_string()), } } } @@ -512,36 +493,369 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { } struct TreeRenderVisitor<'a, 'b> { - /// How to format each node - t: DisplayFormatType, /// Write to this formatter f: &'a mut Formatter<'b>, - /// Indent size - indent: usize, - /// How to show metrics - show_metrics: ShowMetrics, - /// If statistics should be displayed - show_statistics: bool, - /// If schema should be displayed - show_schema: bool, - height: usize, - weight: usize, } -impl ExecutionPlanVisitor for TreeRenderVisitor<'_, '_> { - type Error = fmt::Error; +impl TreeRenderVisitor<'_, '_> { + // TODO: Make these variables configurable. + const LTCORNER: &'static str = "┌"; + const RTCORNER: &'static str = "┐"; + const LDCORNER: &'static str = "└"; + const RDCORNER: &'static str = "┘"; + + const TMIDDLE: &'static str = "┬"; + const LMIDDLE: &'static str = "├"; + const DMIDDLE: &'static str = "┴"; + + const VERTICAL: &'static str = "│"; + const HORIZONTAL: &'static str = "─"; + + const MAXIMUM_RENDER_WIDTH: usize = 240; + const NODE_RENDER_WIDTH: usize = 29; + const MAX_EXTRA_LINES: usize = 30; + + pub fn visit(&mut self, plan: &dyn ExecutionPlan) -> Result<(), fmt::Error> { + let root = RenderTree::create_tree(plan); + + for y in 0..root.height { + // Start by rendering the top layer. + self.render_top_layer(&root, y)?; + // Now we render the content of the boxes + self.render_box_content(&root, y)?; + // Render the bottom layer of each of the boxes + self.render_bottom_layer(&root, y)?; + } - fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + Ok(()) + } - self.height += 1; - self.weight += 1; - todo!() + fn render_top_layer( + &mut self, + root: &RenderTree, + y: usize, + ) -> Result<(), fmt::Error> { + for x in 0..root.width { + if root.has_node(x, y) { + write!(self.f, "{}", Self::LTCORNER)?; + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2 - 1) + )?; + if y == 0 { + // top level node: no node above this one + write!(self.f, "{}", Self::HORIZONTAL)?; + } else { + // render connection to node above this one + write!(self.f, "{}", Self::DMIDDLE)?; + } + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2 - 1) + )?; + write!(self.f, "{}", Self::RTCORNER)?; + } else { + let mut has_adjacent_nodes = false; + for i in 0..(root.width - x) { + has_adjacent_nodes = has_adjacent_nodes || root.has_node(x + i, y); + } + if !has_adjacent_nodes { + // There are no nodes to the right side of this position + // no need to fill the empty space + continue; + } + // there are nodes next to this, fill the space + write!(self.f, "{}", &" ".repeat(Self::NODE_RENDER_WIDTH))?; + } + } + writeln!(self.f)?; + + Ok(()) } - fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result { + fn render_box_content( + &mut self, + root: &RenderTree, + y: usize, + ) -> Result<(), fmt::Error> { + let mut extra_info: Vec> = vec![vec![]; root.width]; + let mut extra_height = 0; + + for (x, extra_info_item) in extra_info.iter_mut().enumerate().take(root.width) { + if let Some(node) = root.get_node(x, y) { + Self::split_up_extra_info( + &node.extra_text, + extra_info_item, + Self::MAX_EXTRA_LINES, + ); + if extra_info_item.len() > extra_height { + extra_height = extra_info_item.len(); + } + } + } - self.weight -= 1; - Ok(true) + let halfway_point = (extra_height + 1) / 2; + + // Render the actual node. + for render_y in 0..=extra_height { + for (x, node) in root.nodes.iter().enumerate().take(root.width) { + if x * Self::NODE_RENDER_WIDTH >= Self::MAXIMUM_RENDER_WIDTH { + break; + } + + let has_adjacent_nodes = + (0..root.width - x).any(|i| root.has_node(x + i, y)); + + if let Some(node) = node { + write!(self.f, "{}", Self::VERTICAL)?; + + // Rigure out what to render. + let mut render_text = String::new(); + if render_y == 0 { + render_text = node.name.clone(); + } else if render_y <= extra_info[x].len() { + render_text = extra_info[x][render_y - 1].clone(); + } + + render_text = Self::adjust_text_for_rendering( + &render_text, + Self::NODE_RENDER_WIDTH - 2, + ); + write!(self.f, "{}", render_text)?; + + if render_y == halfway_point && node.child_positions.len() > 1 { + write!(self.f, "{}", Self::LMIDDLE)?; + } else { + write!(self.f, "{}", Self::VERTICAL)?; + } + } else if render_y == halfway_point { + let has_child_to_the_right = + Self::should_render_whitespace(root, x, y); + if root.has_node(x, y + 1) { + // Node right below this one. + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2) + )?; + if has_child_to_the_right { + write!(self.f, "{}", Self::TMIDDLE)?; + // Have another child to the right, Keep rendering the line. + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2) + )?; + } else { + write!(self.f, "{}", Self::RTCORNER)?; + if has_adjacent_nodes { + // Only a child below this one: fill the reset with spaces. + write!( + self.f, + "{}", + " ".repeat(Self::NODE_RENDER_WIDTH / 2) + )?; + } + } + } else if has_child_to_the_right { + // Child to the right, but no child right below this one: render a full + // line. + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH) + )?; + } else if has_adjacent_nodes { + // Empty spot: render spaces. + write!(self.f, "{}", " ".repeat(Self::NODE_RENDER_WIDTH))?; + } + } else if render_y >= halfway_point { + if root.has_node(x, y + 1) { + // Have a node below this empty spot: render a vertical line. + write!( + self.f, + "{}{}", + " ".repeat(Self::NODE_RENDER_WIDTH / 2), + Self::VERTICAL + )?; + if has_adjacent_nodes + || Self::should_render_whitespace(root, x, y) + { + write!( + self.f, + "{}", + " ".repeat(Self::NODE_RENDER_WIDTH / 2) + )?; + } + } else if has_adjacent_nodes + || Self::should_render_whitespace(root, x, y) + { + // Empty spot: render spaces. + write!(self.f, "{}", " ".repeat(Self::NODE_RENDER_WIDTH))?; + } + } else if has_adjacent_nodes { + // Empty spot: render spaces. + write!(self.f, "{}", " ".repeat(Self::NODE_RENDER_WIDTH))?; + } + } + writeln!(self.f)?; + } + + Ok(()) + } + + fn render_bottom_layer( + &mut self, + root: &RenderTree, + y: usize, + ) -> Result<(), fmt::Error> { + for x in 0..=root.width { + if x * Self::NODE_RENDER_WIDTH >= Self::MAXIMUM_RENDER_WIDTH { + break; + } + let mut has_adjacent_nodes = false; + for i in 0..(root.width - x) { + has_adjacent_nodes = has_adjacent_nodes || root.has_node(x + i, y); + } + if root.get_node(x, y).is_some() { + write!(self.f, "{}", Self::LDCORNER)?; + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2 - 1) + )?; + if root.has_node(x, y + 1) { + // node below this one: connect to that one + write!(self.f, "{}", Self::TMIDDLE)?; + } else { + // no node below this one: end the box + write!(self.f, "{}", Self::HORIZONTAL)?; + } + write!( + self.f, + "{}", + Self::HORIZONTAL.repeat(Self::NODE_RENDER_WIDTH / 2 - 1) + )?; + write!(self.f, "{}", Self::RDCORNER)?; + } else if root.has_node(x, y + 1) { + write!(self.f, "{}", &" ".repeat(Self::NODE_RENDER_WIDTH / 2))?; + write!(self.f, "{}", Self::VERTICAL)?; + if has_adjacent_nodes || Self::should_render_whitespace(root, x, y) { + write!(self.f, "{}", &" ".repeat(Self::NODE_RENDER_WIDTH / 2))?; + } + } else if has_adjacent_nodes || Self::should_render_whitespace(root, x, y) { + write!(self.f, "{}", &" ".repeat(Self::NODE_RENDER_WIDTH))?; + } + } + writeln!(self.f)?; + + Ok(()) + } + + fn extra_info_separator() -> String { + "-".repeat(Self::NODE_RENDER_WIDTH - 9) + } + + fn remove_padding(s: &str) -> String { + s.trim().to_string() + } + + fn split_up_extra_info( + extra_info: &HashMap, + result: &mut Vec, + max_lines: usize, + ) { + if extra_info.is_empty() { + return; + } + + result.push(Self::extra_info_separator()); + + let mut requires_padding = false; + let mut was_inlined = false; + for (key, value) in extra_info { + let mut str = Self::remove_padding(value); + if str.is_empty() { + continue; + } + let mut is_inlined = false; + let available_width = Self::NODE_RENDER_WIDTH - 7; + let total_size = key.len() + str.len() + 2; + let is_multiline = str.contains('\n'); + if !is_multiline && total_size < available_width { + str = format!("{}: {}", key, str); + is_inlined = true; + } else { + str = format!("{}:\n{}", key, str); + } + if is_inlined && was_inlined { + requires_padding = false; + } + if requires_padding { + result.push(String::new()); + } + + let mut splits: Vec = str.split('\n').map(String::from).collect(); + if splits.len() > max_lines { + let mut truncated_splits = Vec::new(); + for split in splits.iter().take(max_lines / 2) { + truncated_splits.push(split.clone()); + } + truncated_splits.push("...".to_string()); + for split in splits.iter().skip(splits.len() - max_lines / 2) { + truncated_splits.push(split.clone()); + } + splits = truncated_splits; + } + for split in splits { + // TODO: check every line is less than MAX_LINE_RENDER_SIZE. + result.push(split); + } + requires_padding = true; + was_inlined = is_inlined; + } + } + + fn adjust_text_for_rendering(source: &str, max_render_width: usize) -> String { + let render_width = source.chars().count(); + if render_width > max_render_width { + let truncated = &source[..max_render_width - 3]; + format!("{}...", truncated) + } else { + let total_spaces = max_render_width - render_width; + let half_spaces = total_spaces / 2; + let extra_left_space = if total_spaces % 2 == 0 { 0 } else { 1 }; + format!( + "{}{}{}", + " ".repeat(half_spaces + extra_left_space), + source, + " ".repeat(half_spaces) + ) + } + } + + fn should_render_whitespace(root: &RenderTree, x: usize, y: usize) -> bool { + let mut found_children = 0; + + for i in (0..=x).rev() { + let node = root.get_node(i, y); + if root.has_node(i, y + 1) { + found_children += 1; + } + if let Some(node) = node { + if node.child_positions.len() > 1 + && found_children < node.child_positions.len() + { + return true; + } + + return false; + } + } + + false } } @@ -552,6 +866,10 @@ pub trait DisplayAs { /// /// Should not include a newline fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; + + fn collect_info(&self) -> HashMap { + HashMap::new() + } } /// A new type wrapper to display `T` implementing`DisplayAs` using the `Default` mode diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 9210e3b0273c..bdbde643c724 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -47,6 +47,7 @@ pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; mod ordering; +mod render_tree; mod topk; mod visitor; diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 0c0049787e1c..f1b5cf121738 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -18,6 +18,7 @@ //! Execution plan for reading in-memory batches of data use std::any::Any; +use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::task::{Context, Poll}; @@ -436,6 +437,34 @@ impl DataSource for MemorySourceConfig { } } + fn collect_info(&self) -> HashMap { + let mut result = HashMap::new(); + + let partition_sizes: Vec<_> = self.partitions.iter().map(|b| b.len()).collect(); + result.insert( + "partition_sizes".to_string(), + format!("{:?}", partition_sizes), + ); + + if let Some(output_ordering) = self.sort_information.first() { + result.insert("output_ordering".to_string(), output_ordering.to_string()); + } + + let eq_properties = self.eq_properties(); + let constraints = eq_properties.constraints(); + if !constraints.is_empty() { + result.insert("constraints".to_string(), constraints.to_string()); + } + + if let Some(limit) = self.fetch { + result.insert("fetch".to_string(), limit.to_string()); + } + + result.insert("partitions".to_string(), partition_sizes.len().to_string()); + + result + } + fn output_partitioning(&self) -> Partitioning { Partitioning::UnknownPartitioning(self.partitions.len()) } diff --git a/datafusion/physical-plan/src/render_tree.rs b/datafusion/physical-plan/src/render_tree.rs new file mode 100644 index 000000000000..95e28232bea9 --- /dev/null +++ b/datafusion/physical-plan/src/render_tree.rs @@ -0,0 +1,141 @@ +use std::cmp; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::ExecutionPlan; + +#[allow(dead_code)] +pub struct Coordinate { + pub x: usize, + pub y: usize, +} + +impl Coordinate { + pub fn new(x: usize, y: usize) -> Self { + Coordinate { x, y } + } +} + +pub struct RenderTreeNode { + pub name: String, + pub extra_text: HashMap, + pub child_positions: Vec, +} + +impl RenderTreeNode { + pub fn new(name: String, extra_text: HashMap) -> Self { + RenderTreeNode { + name, + extra_text, + child_positions: vec![], + } + } + + fn add_child_position(&mut self, x: usize, y: usize) { + self.child_positions.push(Coordinate::new(x, y)); + } +} + +pub struct RenderTree { + pub nodes: Vec>>, + pub width: usize, + pub height: usize, +} + +impl RenderTree { + pub fn create_tree(plan: &dyn ExecutionPlan) -> Self { + let (width, height) = get_tree_width_height(plan); + + let mut result = Self::new(width, height); + + create_tree_recursive(&mut result, plan, 0, 0); + + result + } + + fn new(width: usize, height: usize) -> Self { + RenderTree { + nodes: vec![None; (width + 1) * (height + 1)], + width, + height, + } + } + + pub fn get_node(&self, x: usize, y: usize) -> Option> { + if x >= self.width || y >= self.height { + return None; + } + + let pos = self.get_position(x, y); + self.nodes.get(pos).and_then(|node| node.clone()) + } + + pub fn set_node(&mut self, x: usize, y: usize, node: Arc) { + let pos = self.get_position(x, y); + if let Some(slot) = self.nodes.get_mut(pos) { + *slot = Some(node); + } + } + + pub fn has_node(&self, x: usize, y: usize) -> bool { + if x >= self.width || y >= self.height { + return false; + } + + let pos = self.get_position(x, y); + self.nodes.get(pos).is_some_and(|node| node.is_some()) + } + + fn get_position(&self, x: usize, y: usize) -> usize { + y * self.width + x + } +} + +fn get_tree_width_height(plan: &dyn ExecutionPlan) -> (usize, usize) { + let children = plan.children(); + + if children.is_empty() { + return (1, 1); + } + + let mut width = 0; + let mut height = 0; + + for child in children { + let (child_width, child_height) = get_tree_width_height(child.as_ref()); + width += child_width; + height = cmp::max(height, child_height); + } + + height += 1; + + (width, height) +} + +fn create_tree_recursive( + result: &mut RenderTree, + plan: &dyn ExecutionPlan, + x: usize, + y: usize, +) -> usize { + let mut node = RenderTreeNode::new(plan.name().to_string(), plan.collect_info()); + + let children = plan.children(); + + if children.is_empty() { + result.set_node(x, y, Arc::new(node)); + return 1; + } + + let mut width = 0; + for child in children { + let child_x = x + width; + let child_y = y + 1; + node.add_child_position(child_x, child_y); + width += create_tree_recursive(result, child.as_ref(), child_x, child_y); + } + + result.set_node(x, y, Arc::new(node)); + + width +} diff --git a/datafusion/physical-plan/src/source.rs b/datafusion/physical-plan/src/source.rs index 0c1dfddd2678..fe45ce2760e4 100644 --- a/datafusion/physical-plan/src/source.rs +++ b/datafusion/physical-plan/src/source.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -41,6 +42,7 @@ pub trait DataSource: Send + Sync { ) -> datafusion_common::Result; fn as_any(&self) -> &dyn Any; fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; + fn collect_info(&self) -> HashMap; fn repartitioned( &self, _target_partitions: usize, @@ -82,6 +84,10 @@ impl DisplayAs for DataSourceExec { write!(f, "DataSourceExec: ")?; self.source.fmt_as(t, f) } + + fn collect_info(&self) -> HashMap { + self.source.collect_info() + } } impl ExecutionPlan for DataSourceExec {