From 1683ea1bfeaf3941717f3ba433e3e2564ead5c0d Mon Sep 17 00:00:00 2001 From: Marco Edward Gorelli Date: Tue, 30 Jan 2024 14:29:38 +0000 Subject: [PATCH] fix: fix pivot when multiple columns are passed. Output is now aligned with what tidyverse / pandas.pivot_table would do (#14048) Co-authored-by: ritchie --- .../src/chunked_array/logical/struct_/mod.rs | 6 + crates/polars-ops/src/frame/pivot/mod.rs | 224 ++++++++++-------- .../polars-ops/src/frame/pivot/positioning.rs | 102 ++++++-- crates/polars/tests/it/core/pivot.rs | 10 +- py-polars/polars/dataframe/frame.py | 20 +- py-polars/tests/unit/operations/test_pivot.py | 87 ++++++- 6 files changed, 309 insertions(+), 140 deletions(-) diff --git a/crates/polars-core/src/chunked_array/logical/struct_/mod.rs b/crates/polars-core/src/chunked_array/logical/struct_/mod.rs index b3a3c792afcb..81ddf41cd746 100644 --- a/crates/polars-core/src/chunked_array/logical/struct_/mod.rs +++ b/crates/polars-core/src/chunked_array/logical/struct_/mod.rs @@ -9,6 +9,7 @@ use arrow::legacy::trusted_len::TrustedLenPush; use arrow::offset::OffsetsBuffer; use smartstring::alias::String as SmartString; +use self::sort::arg_sort_multiple::_get_rows_encoded_ca; use super::*; use crate::datatypes::*; use crate::utils::index_to_chunked_index; @@ -411,6 +412,11 @@ impl StructChunked { } self.cast_impl(dtype, true) } + + pub fn rows_encode(&self) -> PolarsResult { + let descending = vec![false; self.fields.len()]; + _get_rows_encoded_ca(self.name(), &self.fields, &descending, false) + } } impl LogicalType for StructChunked { diff --git a/crates/polars-ops/src/frame/pivot/mod.rs b/crates/polars-ops/src/frame/pivot/mod.rs index 89857eea6d3d..d9cc3bb38367 100644 --- a/crates/polars-ops/src/frame/pivot/mod.rs +++ b/crates/polars-ops/src/frame/pivot/mod.rs @@ -187,117 +187,155 @@ fn pivot_impl( // used as separator/delimiter in generated column names. separator: Option<&str>, ) -> PolarsResult { - let sep = separator.unwrap_or("_"); polars_ensure!(!index.is_empty(), ComputeError: "index cannot be zero length"); + polars_ensure!(!columns.is_empty(), ComputeError: "columns cannot be zero length"); + if !stable { + println!("unstable pivot not yet supported, using stable pivot"); + }; + if columns.len() > 1 { + let schema = Arc::new(pivot_df.schema()); + let binding = pivot_df.select_with_schema(columns, &schema)?; + let fields = binding.get_columns(); + let column = format!("{{\"{}\"}}", columns.join("\",\"")); + if schema.contains(column.as_str()) { + polars_bail!(ComputeError: "cannot use column name {column} that \ + already exists in the DataFrame. Please rename it prior to calling `pivot`.") + } + let columns_struct = StructChunked::new(&column, fields).unwrap().into_series(); + let mut binding = pivot_df.clone(); + let pivot_df = unsafe { binding.with_column_unchecked(columns_struct) }; + pivot_impl_single_column( + pivot_df, + &column, + values, + index, + agg_fn, + sort_columns, + separator, + ) + } else { + pivot_impl_single_column( + pivot_df, + unsafe { columns.get_unchecked(0) }, + values, + index, + agg_fn, + sort_columns, + separator, + ) + } +} +fn pivot_impl_single_column( + pivot_df: &DataFrame, + column: &str, + values: &[String], + index: &[String], + agg_fn: Option, + sort_columns: bool, + separator: Option<&str>, +) -> PolarsResult { + let sep = separator.unwrap_or("_"); let mut final_cols = vec![]; - let mut count = 0; let out: PolarsResult<()> = POOL.install(|| { - for column_column_name in columns { - let mut group_by = index.to_vec(); - group_by.push(column_column_name.clone()); + let mut group_by = index.to_vec(); + group_by.push(column.to_string()); - let groups = pivot_df.group_by_stable(group_by)?.take_groups(); + let groups = pivot_df.group_by_stable(group_by)?.take_groups(); - // these are the row locations - if !stable { - println!("unstable pivot not yet supported, using stable pivot"); - }; - - let (col, row) = POOL.join( - || positioning::compute_col_idx(pivot_df, column_column_name, &groups), - || positioning::compute_row_idx(pivot_df, index, &groups, count), - ); - let (col_locations, column_agg) = col?; - let (row_locations, n_rows, mut row_index) = row?; + let (col, row) = POOL.join( + || positioning::compute_col_idx(pivot_df, column, &groups), + || positioning::compute_row_idx(pivot_df, index, &groups, count), + ); + let (col_locations, column_agg) = col?; + let (row_locations, n_rows, mut row_index) = row?; - for value_col_name in values { - let value_col = pivot_df.column(value_col_name)?; + for value_col_name in values { + let value_col = pivot_df.column(value_col_name)?; - use PivotAgg::*; - let value_agg = unsafe { - match &agg_fn { - None => match value_col.len() > groups.len() { - true => polars_bail!(ComputeError: "found multiple elements in the same group, please specify an aggregation function"), - false => value_col.agg_first(&groups), - } - Some(agg_fn) => match agg_fn { - Sum => value_col.agg_sum(&groups), - Min => value_col.agg_min(&groups), - Max => value_col.agg_max(&groups), - Last => value_col.agg_last(&groups), - First => value_col.agg_first(&groups), - Mean => value_col.agg_mean(&groups), - Median => value_col.agg_median(&groups), - Count => groups.group_count().into_series(), - Expr(ref expr) => { - let name = expr.root_name()?; - let mut value_col = value_col.clone(); - value_col.rename(name); - let tmp_df = DataFrame::new_no_checks(vec![value_col]); - let mut aggregated = expr.evaluate(&tmp_df, &groups)?; - aggregated.rename(value_col_name); - aggregated - } - }, + use PivotAgg::*; + let value_agg = unsafe { + match &agg_fn { + None => match value_col.len() > groups.len() { + true => polars_bail!(ComputeError: "found multiple elements in the same group, please specify an aggregation function"), + false => value_col.agg_first(&groups), } - }; - - let headers = column_agg.unique_stable()?.cast(&DataType::String)?; - let mut headers = headers.str().unwrap().clone(); - if values.len() > 1 { - headers = headers.apply_values(|v| Cow::from(format!("{value_col_name}{sep}{column_column_name}{sep}{v}"))) + Some(agg_fn) => match agg_fn { + Sum => value_col.agg_sum(&groups), + Min => value_col.agg_min(&groups), + Max => value_col.agg_max(&groups), + Last => value_col.agg_last(&groups), + First => value_col.agg_first(&groups), + Mean => value_col.agg_mean(&groups), + Median => value_col.agg_median(&groups), + Count => groups.group_count().into_series(), + Expr(ref expr) => { + let name = expr.root_name()?; + let mut value_col = value_col.clone(); + value_col.rename(name); + let tmp_df = DataFrame::new_no_checks(vec![value_col]); + let mut aggregated = expr.evaluate(&tmp_df, &groups)?; + aggregated.rename(value_col_name); + aggregated + } + }, } + }; - let n_cols = headers.len(); - let value_agg_phys = value_agg.to_physical_repr(); - let logical_type = value_agg.dtype(); + let headers = column_agg.unique_stable()?.cast(&DataType::String)?; + let mut headers = headers.str().unwrap().clone(); + if values.len() > 1 { + headers = headers.apply_values(|v| Cow::from(format!("{value_col_name}{sep}{v}"))) + } - debug_assert_eq!(row_locations.len(), col_locations.len()); - debug_assert_eq!(value_agg_phys.len(), row_locations.len()); + let n_cols = headers.len(); + let value_agg_phys = value_agg.to_physical_repr(); + let logical_type = value_agg.dtype(); - let mut cols = if value_agg_phys.dtype().is_numeric() { - macro_rules! dispatch { - ($ca:expr) => {{ - positioning::position_aggregates_numeric( - n_rows, - n_cols, - &row_locations, - &col_locations, - $ca, - logical_type, - &headers, - ) - }}; - } - downcast_as_macro_arg_physical!(value_agg_phys, dispatch) - } else { - positioning::position_aggregates( - n_rows, - n_cols, - &row_locations, - &col_locations, - &value_agg_phys, - logical_type, - &headers, - ) - }; + debug_assert_eq!(row_locations.len(), col_locations.len()); + debug_assert_eq!(value_agg_phys.len(), row_locations.len()); - if sort_columns { - cols.sort_unstable_by(|a, b| a.name().partial_cmp(b.name()).unwrap()); + let mut cols = if value_agg_phys.dtype().is_numeric() { + macro_rules! dispatch { + ($ca:expr) => {{ + positioning::position_aggregates_numeric( + n_rows, + n_cols, + &row_locations, + &col_locations, + $ca, + logical_type, + &headers, + ) + }}; } + downcast_as_macro_arg_physical!(value_agg_phys, dispatch) + } else { + positioning::position_aggregates( + n_rows, + n_cols, + &row_locations, + &col_locations, + &value_agg_phys, + logical_type, + &headers, + ) + }; - let cols = if count == 0 { - let mut final_cols = row_index.take().unwrap(); - final_cols.extend(cols); - final_cols - } else { - cols - }; - count += 1; - final_cols.extend_from_slice(&cols); + if sort_columns { + cols.sort_unstable_by(|a, b| a.name().partial_cmp(b.name()).unwrap()); } + + let cols = if count == 0 { + let mut final_cols = row_index.take().unwrap(); + final_cols.extend(cols); + final_cols + } else { + cols + }; + count += 1; + final_cols.extend_from_slice(&cols); } Ok(()) }); diff --git a/crates/polars-ops/src/frame/pivot/positioning.rs b/crates/polars-ops/src/frame/pivot/positioning.rs index e7eae52fbaeb..151d1317712e 100644 --- a/crates/polars-ops/src/frame/pivot/positioning.rs +++ b/crates/polars-ops/src/frame/pivot/positioning.rs @@ -1,5 +1,6 @@ use std::hash::Hash; +use arrow::legacy::trusted_len::TrustedLenPush; use polars_core::prelude::*; use polars_utils::sync::SyncPtr; @@ -178,17 +179,46 @@ where { let mut col_to_idx = PlHashMap::with_capacity(HASHMAP_INIT_SIZE); let mut idx = 0 as IdxSize; - column_agg_physical - .into_iter() - .map(|v| { - let idx = *col_to_idx.entry(v).or_insert_with(|| { + let mut out = Vec::with_capacity(column_agg_physical.len()); + + for arr in column_agg_physical.downcast_iter() { + for opt_v in arr.into_iter() { + let idx = *col_to_idx.entry(opt_v).or_insert_with(|| { let old_idx = idx; idx += 1; old_idx }); - idx - }) - .collect() + // SAFETY: + // we pre-allocated + unsafe { out.push_unchecked(idx) }; + } + } + out +} + +fn compute_col_idx_gen<'a, T>(column_agg_physical: &'a ChunkedArray) -> Vec +where + T: PolarsDataType, + &'a T::Array: IntoIterator>>, + T::Physical<'a>: Hash + Eq, +{ + let mut col_to_idx = PlHashMap::with_capacity(HASHMAP_INIT_SIZE); + let mut idx = 0 as IdxSize; + let mut out = Vec::with_capacity(column_agg_physical.len()); + + for arr in column_agg_physical.downcast_iter() { + for opt_v in arr.into_iter() { + let idx = *col_to_idx.entry(opt_v).or_insert_with(|| { + let old_idx = idx; + idx += 1; + old_idx + }); + // SAFETY: + // we pre-allocated + unsafe { out.push_unchecked(idx) }; + } + } + out } pub(super) fn compute_col_idx( @@ -210,6 +240,24 @@ pub(super) fn compute_col_idx( let ca = column_agg_physical.bit_repr_large(); compute_col_idx_numeric(&ca) }, + Struct(_) => { + let ca = column_agg_physical.struct_().unwrap(); + let ca = ca.rows_encode()?; + compute_col_idx_gen(&ca) + }, + String => { + let ca = column_agg_physical.str().unwrap(); + let ca = ca.as_binary(); + compute_col_idx_gen(&ca) + }, + Binary => { + let ca = column_agg_physical.binary().unwrap(); + compute_col_idx_gen(ca) + }, + Boolean => { + let ca = column_agg_physical.bool().unwrap(); + compute_col_idx_gen(ca) + }, _ => { let mut col_to_idx = PlHashMap::with_capacity(HASHMAP_INIT_SIZE); let mut idx = 0 as IdxSize; @@ -230,32 +278,38 @@ pub(super) fn compute_col_idx( Ok((col_locations, column_agg)) } -fn compute_row_idx_numeric( +fn compute_row_index<'a, T>( index: &[String], - index_agg_physical: &ChunkedArray, + index_agg_physical: &'a ChunkedArray, count: usize, logical_type: &DataType, ) -> (Vec, usize, Option>) where - T: PolarsNumericType, - T::Native: Hash + Eq, + T: PolarsDataType, + T::Physical<'a>: Hash + Eq + Copy, + ChunkedArray: FromIterator>>, ChunkedArray: IntoSeries, { let mut row_to_idx = PlIndexMap::with_capacity_and_hasher(HASHMAP_INIT_SIZE, Default::default()); let mut idx = 0 as IdxSize; - let row_locations = index_agg_physical - .into_iter() - .map(|v| { - let idx = *row_to_idx.entry(v).or_insert_with(|| { + + let mut row_locations = Vec::with_capacity(index_agg_physical.len()); + for arr in index_agg_physical.downcast_iter() { + for opt_v in arr.iter() { + let idx = *row_to_idx.entry(opt_v).or_insert_with(|| { let old_idx = idx; idx += 1; old_idx }); - idx - }) - .collect::>(); + // SAFETY: + // we pre-allocated + unsafe { + row_locations.push_unchecked(idx); + } + } + } let row_index = match count { 0 => { let mut s = row_to_idx @@ -289,11 +343,19 @@ pub(super) fn compute_row_idx( match index_agg_physical.dtype() { Int32 | UInt32 | Float32 => { let ca = index_agg_physical.bit_repr_small(); - compute_row_idx_numeric(index, &ca, count, index_s.dtype()) + compute_row_index(index, &ca, count, index_s.dtype()) }, Int64 | UInt64 | Float64 => { let ca = index_agg_physical.bit_repr_large(); - compute_row_idx_numeric(index, &ca, count, index_s.dtype()) + compute_row_index(index, &ca, count, index_s.dtype()) + }, + Boolean => { + let ca = index_agg_physical.bool().unwrap(); + compute_row_index(index, ca, count, index_s.dtype()) + }, + String => { + let ca = index_agg_physical.str().unwrap(); + compute_row_index(index, ca, count, index_s.dtype()) }, _ => { let mut row_to_idx = diff --git a/crates/polars/tests/it/core/pivot.rs b/crates/polars/tests/it/core/pivot.rs index ea54aa02e16d..ce1bee178557 100644 --- a/crates/polars/tests/it/core/pivot.rs +++ b/crates/polars/tests/it/core/pivot.rs @@ -177,11 +177,11 @@ fn test_pivot_new() -> PolarsResult<()> { let expected = df![ "A" => ["foo", "foo", "bar", "bar"], "B" => ["one", "two", "one", "two"], - "large" => [Some(4), None, Some(4), Some(7)], - "small" => [1, 6, 5, 6], - "egg" => [Some(4), Some(3), None, None], - "jam" => [1, 3, 4, 13], - "potato" => [None, None, Some(5), None] + "{\"large\",\"egg\"}" => [Some(4), None, None, None], + "{\"large\",\"jam\"}" => [None, None, Some(4), Some(7)], + "{\"small\",\"egg\"}" => [None, Some(3), None, None], + "{\"small\",\"jam\"}" => [Some(1), Some(3), None, Some(6)], + "{\"small\",\"potato\"}" => [None, None, Some(5), None], ]?; assert!(out.equals_missing(&expected)); diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index aa7e05ca61e8..d004c4665bd8 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -7349,16 +7349,16 @@ def pivot( ... by=cs.string(), ... ) shape: (4, 6) - ┌─────┬─────┬──────┬──────┬──────┬──────┐ - │ foo ┆ bar ┆ one ┆ two ┆ x ┆ y │ - │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ - │ str ┆ str ┆ i64 ┆ i64 ┆ i64 ┆ i64 │ - ╞═════╪═════╪══════╪══════╪══════╪══════╡ - │ one ┆ x ┆ 5 ┆ null ┆ 5 ┆ null │ - │ one ┆ y ┆ 3 ┆ null ┆ null ┆ 3 │ - │ two ┆ x ┆ null ┆ 10 ┆ 10 ┆ null │ - │ two ┆ y ┆ null ┆ 3 ┆ null ┆ 3 │ - └─────┴─────┴──────┴──────┴──────┴──────┘ + ┌─────┬─────┬─────────────┬─────────────┬─────────────┬─────────────┐ + │ foo ┆ bar ┆ {"one","x"} ┆ {"one","y"} ┆ {"two","x"} ┆ {"two","y"} │ + │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ + │ str ┆ str ┆ i64 ┆ i64 ┆ i64 ┆ i64 │ + ╞═════╪═════╪═════════════╪═════════════╪═════════════╪═════════════╡ + │ one ┆ x ┆ 5 ┆ null ┆ null ┆ null │ + │ one ┆ y ┆ null ┆ 3 ┆ null ┆ null │ + │ two ┆ x ┆ null ┆ null ┆ 10 ┆ null │ + │ two ┆ y ┆ null ┆ null ┆ null ┆ 3 │ + └─────┴─────┴─────────────┴─────────────┴─────────────┴─────────────┘ Run an expression as aggregation function diff --git a/py-polars/tests/unit/operations/test_pivot.py b/py-polars/tests/unit/operations/test_pivot.py index 097a9f93a453..8b8e01d89308 100644 --- a/py-polars/tests/unit/operations/test_pivot.py +++ b/py-polars/tests/unit/operations/test_pivot.py @@ -162,10 +162,10 @@ def test_pivot_multiple_values_column_names_5116() -> None: ) expected = { "c1": ["A", "B"], - "x1|c2|C": [1, 2], - "x1|c2|D": [3, 4], - "x2|c2|C": [8, 7], - "x2|c2|D": [6, 5], + "x1|C": [1, 2], + "x1|D": [3, 4], + "x2|C": [8, 7], + "x2|D": [6, 5], } assert result.to_dict(as_series=False) == expected @@ -180,20 +180,83 @@ def test_pivot_duplicate_names_7731() -> None: "e": ["x", "y"], } ) - assert df.pivot( + result = df.pivot( values=cs.integer(), index=cs.float(), columns=cs.string(), aggregate_function="first", - ).to_dict(as_series=False) == { + ).to_dict(as_series=False) + expected = { "b": [1.5, 2.5], - "a_c_x": [1, 4], - "d_c_x": [7, 8], - "a_e_x": [1, None], - "a_e_y": [None, 4], - "d_e_x": [7, None], - "d_e_y": [None, 8], + 'a_{"x","x"}': [1, None], + 'a_{"x","y"}': [None, 4], + 'd_{"x","x"}': [7, None], + 'd_{"x","y"}': [None, 8], } + assert result == expected + + +def test_pivot_duplicate_names_11663() -> None: + df = pl.DataFrame({"a": [1, 2], "b": [1, 2], "c": ["x", "x"], "d": ["x", "y"]}) + result = df.pivot(values="a", index="b", columns=["c", "d"]).to_dict( + as_series=False + ) + expected = {"b": [1, 2], '{"x","x"}': [1, None], '{"x","y"}': [None, 2]} + assert result == expected + + +def test_pivot_multiple_columns_12407() -> None: + df = pl.DataFrame( + { + "a": ["beep", "bop"], + "b": ["a", "b"], + "c": ["s", "f"], + "d": [7, 8], + "e": ["x", "y"], + } + ) + result = df.pivot( + values=["a"], index="b", columns=["c", "e"], aggregate_function="len" + ).to_dict(as_series=False) + expected = {"b": ["a", "b"], '{"s","x"}': [1, None], '{"f","y"}': [None, 1]} + assert result == expected + + +def test_pivot_struct_13120() -> None: + df = pl.DataFrame( + { + "index": [1, 2, 3, 1, 2, 3], + "item_type": ["a", "a", "a", "b", "b", "b"], + "item_id": [123, 123, 123, 456, 456, 456], + "values": [4, 5, 6, 7, 8, 9], + } + ) + df = df.with_columns(pl.struct(["item_type", "item_id"]).alias("columns")).drop( + "item_type", "item_id" + ) + result = df.pivot(index="index", columns="columns", values="values").to_dict( + as_series=False + ) + expected = {"index": [1, 2, 3], '{"a",123}': [4, 5, 6], '{"b",456}': [7, 8, 9]} + assert result == expected + + +def test_pivot_name_already_exists() -> None: + # This should be extremely rare...but still, good to check it + df = pl.DataFrame( + { + "a": ["a", "b"], + "b": ["a", "b"], + '{"a","b"}': [1, 2], + } + ) + with pytest.raises(ComputeError, match="already exists in the DataFrame"): + df.pivot( + values='{"a","b"}', + index="a", + columns=["a", "b"], + aggregate_function="first", + ) def test_pivot_floats() -> None: