diff --git a/tpch/common_utils.py b/tpch/common_utils.py index 4ab3e7b..56b3cae 100644 --- a/tpch/common_utils.py +++ b/tpch/common_utils.py @@ -38,6 +38,9 @@ def print_result_fn(solution: str, result: pd.DataFrame, query: str): if not os.path.exists(result_prefix): os.makedirs(result_prefix) result_path = f"{result_prefix}/{query}.out" + if(solution == "polars"): + result.write_csv(result_path) + return result.to_csv(result_path, index=False) diff --git a/tpch/cudf_queries/queries.py b/tpch/cudf_queries/queries.py new file mode 100644 index 0000000..34ccf8f --- /dev/null +++ b/tpch/cudf_queries/queries.py @@ -0,0 +1,1315 @@ +import cudf +import cudf.pandas +cudf.pandas.install() +import pandas as pd + +import argparse +import json +import os +import time +import traceback +from typing import Dict + +import sys + +# import pandas as pd +from common_utils import log_time_fn, parse_common_arguments, print_result_fn + +dataset_dict = {} + + +def load_lineitem(root: str, storage_options: Dict): + if "lineitem" not in dataset_dict: + data_path = root + "/lineitem" + df = pd.read_parquet(data_path, storage_options=storage_options) + df.L_SHIPDATE = pd.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d") + df.L_RECEIPTDATE = pd.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d") + df.L_COMMITDATE = pd.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d") + result = df + dataset_dict["lineitem"] = result + else: + result = dataset_dict["lineitem"] + return result + + +def load_part(root: str, storage_options: Dict): + if "part" not in dataset_dict: + data_path = root + "/part" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["part"] = result + else: + result = dataset_dict["part"] + return result + + +def load_orders(root: str, storage_options: Dict): + if "orders" not in dataset_dict: + data_path = root + "/orders" + df = pd.read_parquet(data_path, storage_options=storage_options) + df.O_ORDERDATE = pd.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d") + result = df + dataset_dict["orders"] = result + else: + result = dataset_dict["orders"] + return result + + +def load_customer(root: str, storage_options: Dict): + if "customer" not in dataset_dict: + data_path = root + "/customer" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["customer"] = result + else: + result = dataset_dict["customer"] + return result + + +def load_nation(root: str, storage_options: Dict): + if "nation" not in dataset_dict: + data_path = root + "/nation" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["nation"] = result + else: + result = dataset_dict["nation"] + return result + + +def load_region(root: str, storage_options: Dict): + if "region" not in dataset_dict: + data_path = root + "/region" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["region"] = result + else: + result = dataset_dict["region"] + return result + + +def load_supplier(root: str, storage_options: Dict): + if "supplier" not in dataset_dict: + data_path = root + "/supplier" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["supplier"] = result + else: + result = dataset_dict["supplier"] + return result + + +def load_partsupp(root: str, storage_options: Dict): + if "partsupp" not in dataset_dict: + data_path = root + "/partsupp" + df = pd.read_parquet(data_path, storage_options=storage_options) + result = df + dataset_dict["partsupp"] = result + else: + result = dataset_dict["partsupp"] + return result + + +def q01(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + + date = pd.Timestamp("1998-09-02") + lineitem_filtered = lineitem.loc[ + :, + [ + "L_ORDERKEY", + "L_QUANTITY", + "L_EXTENDEDPRICE", + "L_DISCOUNT", + "L_TAX", + "L_RETURNFLAG", + "L_LINESTATUS", + "L_SHIPDATE", + ], + ] + sel = lineitem_filtered.L_SHIPDATE <= date + lineitem_filtered = lineitem_filtered[sel] + lineitem_filtered["AVG_QTY"] = lineitem_filtered.L_QUANTITY + lineitem_filtered["AVG_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE + lineitem_filtered["DISC_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE * ( + 1 - lineitem_filtered.L_DISCOUNT + ) + lineitem_filtered["CHARGE"] = ( + lineitem_filtered.L_EXTENDEDPRICE + * (1 - lineitem_filtered.L_DISCOUNT) + * (1 + lineitem_filtered.L_TAX) + ) + gb = lineitem_filtered.groupby(["L_RETURNFLAG", "L_LINESTATUS"], as_index=False) + total = gb.agg( + { + "L_QUANTITY": "sum", + "L_EXTENDEDPRICE": "sum", + "DISC_PRICE": "sum", + "CHARGE": "sum", + "AVG_QTY": "mean", + "AVG_PRICE": "mean", + "L_DISCOUNT": "mean", + "L_ORDERKEY": "count", + } + ) + total = total.sort_values(["L_RETURNFLAG", "L_LINESTATUS"]).rename( + columns={ + "L_QUANTITY": "SUM_QTY", + "L_EXTENDEDPRICE": "SUM_BASE_PRICE", + "DISC_PRICE": "SUM_DISC_PRICE", + "CHARGE": "SUM_CHARGE", + "L_DISCOUNT": "AVG_DISC", + "L_ORDERKEY": "COUNT_ORDER", + } + ) + + return total + + +def q02(root: str, storage_options: Dict): + part = load_part(root, storage_options) + partsupp = load_partsupp(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + region = load_region(root, storage_options) + + size = 15 + p_type = "BRASS" + region_name = "EUROPE" + + nation_filtered = nation.loc[:, ["N_NATIONKEY", "N_NAME", "N_REGIONKEY"]] + region_filtered = region[(region["R_NAME"] == region_name)] + region_filtered = region_filtered.loc[:, ["R_REGIONKEY"]] + r_n_merged = nation_filtered.merge( + region_filtered, left_on="N_REGIONKEY", right_on="R_REGIONKEY", how="inner" + ) + r_n_merged = r_n_merged.loc[:, ["N_NATIONKEY", "N_NAME"]] + supplier_filtered = supplier.loc[ + :, + [ + "S_SUPPKEY", + "S_NAME", + "S_ADDRESS", + "S_NATIONKEY", + "S_PHONE", + "S_ACCTBAL", + "S_COMMENT", + ], + ] + s_r_n_merged = r_n_merged.merge( + supplier_filtered, left_on="N_NATIONKEY", right_on="S_NATIONKEY", how="inner" + ) + s_r_n_merged = s_r_n_merged.loc[ + :, + [ + "N_NAME", + "S_SUPPKEY", + "S_NAME", + "S_ADDRESS", + "S_PHONE", + "S_ACCTBAL", + "S_COMMENT", + ], + ] + partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY", "PS_SUPPLYCOST"]] + ps_s_r_n_merged = s_r_n_merged.merge( + partsupp_filtered, left_on="S_SUPPKEY", right_on="PS_SUPPKEY", how="inner" + ) + ps_s_r_n_merged = ps_s_r_n_merged.loc[ + :, + [ + "N_NAME", + "S_NAME", + "S_ADDRESS", + "S_PHONE", + "S_ACCTBAL", + "S_COMMENT", + "PS_PARTKEY", + "PS_SUPPLYCOST", + ], + ] + part_filtered = part.loc[:, ["P_PARTKEY", "P_MFGR", "P_SIZE", "P_TYPE"]] + part_filtered = part_filtered[ + (part_filtered["P_SIZE"] == size) + & (part_filtered["P_TYPE"].str.endswith(p_type)) + ] + part_filtered = part_filtered.loc[:, ["P_PARTKEY", "P_MFGR"]] + merged_df = part_filtered.merge( + ps_s_r_n_merged, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner" + ) + merged_df = merged_df.loc[ + :, + [ + "N_NAME", + "S_NAME", + "S_ADDRESS", + "S_PHONE", + "S_ACCTBAL", + "S_COMMENT", + "PS_SUPPLYCOST", + "P_PARTKEY", + "P_MFGR", + ], + ] + min_values = merged_df.groupby("P_PARTKEY", as_index=False)["PS_SUPPLYCOST"].min() + min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"] + merged_df = merged_df.merge( + min_values, + left_on=["P_PARTKEY", "PS_SUPPLYCOST"], + right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"], + how="inner", + ) + total = merged_df.loc[ + :, + [ + "S_ACCTBAL", + "S_NAME", + "N_NAME", + "P_PARTKEY", + "P_MFGR", + "S_ADDRESS", + "S_PHONE", + "S_COMMENT", + ], + ] + total = total.sort_values( + by=["S_ACCTBAL", "N_NAME", "S_NAME", "P_PARTKEY"], + ascending=[False, True, True, True], + ) + total = total.head(100) + + return total + + +def q03(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + customer = load_customer(root, storage_options) + + mktsegment = "HOUSEHOLD" + date = pd.Timestamp("1995-03-04") + lineitem_filtered = lineitem.loc[ + :, ["L_ORDERKEY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE"] + ] + orders_filtered = orders.loc[ + :, ["O_ORDERKEY", "O_CUSTKEY", "O_ORDERDATE", "O_SHIPPRIORITY"] + ] + customer_filtered = customer.loc[:, ["C_MKTSEGMENT", "C_CUSTKEY"]] + lsel = lineitem_filtered.L_SHIPDATE > date + osel = orders_filtered.O_ORDERDATE < date + csel = customer_filtered.C_MKTSEGMENT == mktsegment + flineitem = lineitem_filtered[lsel] + forders = orders_filtered[osel] + fcustomer = customer_filtered[csel] + jn1 = fcustomer.merge(forders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") + jn2 = jn1.merge(flineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") + jn2["REVENUE"] = jn2.L_EXTENDEDPRICE * (1 - jn2.L_DISCOUNT) + total = ( + jn2.groupby(["L_ORDERKEY", "O_ORDERDATE", "O_SHIPPRIORITY"], as_index=False)[ + "REVENUE" + ] + .sum() + .sort_values(["REVENUE"], ascending=False) + ) + + total = total[:10].loc[ + :, ["L_ORDERKEY", "REVENUE", "O_ORDERDATE", "O_SHIPPRIORITY"] + ] + + # [change 1]Convert cudf DataFrame to Pandas DataFrame and format timestamp + total["O_ORDERDATE"] = pd.to_datetime(total["O_ORDERDATE"]).dt.strftime("%Y-%m-%d") + return total + + +def q04(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + + date2 = pd.Timestamp("1993-8-01") + date1 = date2 + pd.DateOffset(months=3) + lsel = lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE + osel = (orders.O_ORDERDATE < date1) & (orders.O_ORDERDATE >= date2) + flineitem = lineitem[lsel] + forders = orders[osel] + jn = forders[forders["O_ORDERKEY"].isin(flineitem["L_ORDERKEY"])] + total = ( + jn.groupby("O_ORDERPRIORITY", as_index=False)["O_ORDERKEY"] + .count() + .sort_values(["O_ORDERPRIORITY"]) + .rename(columns={"O_ORDERKEY": "ORDER_COUNT"}) + ) + + return total + + +def q05(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + customer = load_customer(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + region = load_region(root, storage_options) + + region_name = "ASIA" + date1 = pd.Timestamp("1996-01-01") + date2 = date1 + pd.DateOffset(years=1) + rsel = region.R_NAME == region_name + osel = (orders.O_ORDERDATE >= date1) & (orders.O_ORDERDATE < date2) + + forders = orders[osel] + fregion = region[rsel] + jn1 = fregion.merge(nation, left_on="R_REGIONKEY", right_on="N_REGIONKEY") + jn2 = jn1.merge(customer, left_on="N_NATIONKEY", right_on="C_NATIONKEY") + jn3 = jn2.merge(forders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") + jn4 = jn3.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") + + jn5 = supplier.merge( + jn4, left_on=["S_SUPPKEY", "S_NATIONKEY"], right_on=["L_SUPPKEY", "N_NATIONKEY"] + ) + jn5["REVENUE"] = jn5.L_EXTENDEDPRICE * (1.0 - jn5.L_DISCOUNT) + gb = jn5.groupby("N_NAME", as_index=False)["REVENUE"].sum() + total = gb.sort_values("REVENUE", ascending=False) + + return total + + +def q06(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + + date1 = pd.Timestamp("1996-01-01") + date2 = date1 + pd.DateOffset(years=1) + lineitem_filtered = lineitem.loc[ + :, ["L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE"] + ] + sel = ( + (lineitem_filtered.L_SHIPDATE >= date1) + & (lineitem_filtered.L_SHIPDATE < date2) + & (lineitem_filtered.L_DISCOUNT >= 0.08) + & (lineitem_filtered.L_DISCOUNT <= 0.1) + & (lineitem_filtered.L_QUANTITY < 24) + ) + flineitem = lineitem_filtered[sel] + result_value = (flineitem.L_EXTENDEDPRICE * flineitem.L_DISCOUNT).sum() + result_df = pd.DataFrame({"REVENUE": [result_value]}) + + return result_df + + +def q07(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + customer = load_customer(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + + nation1 = "FRANCE" + nation2 = "GERMANY" + lineitem_filtered = lineitem.loc[ + (lineitem["L_SHIPDATE"] >= pd.Timestamp("1995-01-01")) + & (lineitem["L_SHIPDATE"] < pd.Timestamp("1997-01-01")) + ] + lineitem_filtered["L_YEAR"] = lineitem_filtered["L_SHIPDATE"].dt.year + lineitem_filtered["VOLUME"] = lineitem_filtered["L_EXTENDEDPRICE"] * ( + 1.0 - lineitem_filtered["L_DISCOUNT"] + ) + lineitem_filtered = lineitem_filtered.loc[ + :, ["L_ORDERKEY", "L_SUPPKEY", "L_YEAR", "VOLUME"] + ] + supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]] + orders_filtered = orders.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]] + customer_filtered = customer.loc[:, ["C_CUSTKEY", "C_NATIONKEY"]] + n1 = nation[(nation["N_NAME"] == nation1)].loc[:, ["N_NATIONKEY", "N_NAME"]] + n2 = nation[(nation["N_NAME"] == nation2)].loc[:, ["N_NATIONKEY", "N_NAME"]] + + # ----- do nation 1 ----- + N1_C = customer_filtered.merge( + n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + N1_C = N1_C.drop(columns=["C_NATIONKEY", "N_NATIONKEY"]).rename( + columns={"N_NAME": "CUST_NATION"} + ) + N1_C_O = N1_C.merge( + orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="inner" + ) + N1_C_O = N1_C_O.drop(columns=["C_CUSTKEY", "O_CUSTKEY"]) + + N2_S = supplier_filtered.merge( + n2, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + N2_S = N2_S.drop(columns=["S_NATIONKEY", "N_NATIONKEY"]).rename( + columns={"N_NAME": "SUPP_NATION"} + ) + N2_S_L = N2_S.merge( + lineitem_filtered, left_on="S_SUPPKEY", right_on="L_SUPPKEY", how="inner" + ) + N2_S_L = N2_S_L.drop(columns=["S_SUPPKEY", "L_SUPPKEY"]) + + total1 = N1_C_O.merge( + N2_S_L, left_on="O_ORDERKEY", right_on="L_ORDERKEY", how="inner" + ) + total1 = total1.drop(columns=["O_ORDERKEY", "L_ORDERKEY"]) + + # ----- do nation 2 ----- (same as nation 1 section but with nation 2) + N2_C = customer_filtered.merge( + n2, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + N2_C = N2_C.drop(columns=["C_NATIONKEY", "N_NATIONKEY"]).rename( + columns={"N_NAME": "CUST_NATION"} + ) + N2_C_O = N2_C.merge( + orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="inner" + ) + N2_C_O = N2_C_O.drop(columns=["C_CUSTKEY", "O_CUSTKEY"]) + + N1_S = supplier_filtered.merge( + n1, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + N1_S = N1_S.drop(columns=["S_NATIONKEY", "N_NATIONKEY"]).rename( + columns={"N_NAME": "SUPP_NATION"} + ) + N1_S_L = N1_S.merge( + lineitem_filtered, left_on="S_SUPPKEY", right_on="L_SUPPKEY", how="inner" + ) + N1_S_L = N1_S_L.drop(columns=["S_SUPPKEY", "L_SUPPKEY"]) + + total2 = N2_C_O.merge( + N1_S_L, left_on="O_ORDERKEY", right_on="L_ORDERKEY", how="inner" + ) + total2 = total2.drop(columns=["O_ORDERKEY", "L_ORDERKEY"]) + + # concat results + total = pd.concat([total1, total2]) + + total = ( + total.groupby(["SUPP_NATION", "CUST_NATION", "L_YEAR"], as_index=False) + .agg(REVENUE=pd.NamedAgg(column="VOLUME", aggfunc="sum")) + .sort_values( + by=["SUPP_NATION", "CUST_NATION", "L_YEAR"], ascending=[True, True, True] + ) + ) + + return total + + +def q08(root: str, storage_options: Dict): + part = load_part(root, storage_options) + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + customer = load_customer(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + region = load_region(root, storage_options) + + nation_name = "BRAZIL" + region_name = "AMERICA" + p_type = "ECONOMY ANODIZED STEEL" + part_filtered = part[(part["P_TYPE"] == p_type)] + part_filtered = part_filtered.loc[:, ["P_PARTKEY"]] + lineitem_filtered = lineitem.loc[:, ["L_PARTKEY", "L_SUPPKEY", "L_ORDERKEY"]] + lineitem_filtered["VOLUME"] = lineitem["L_EXTENDEDPRICE"] * ( + 1.0 - lineitem["L_DISCOUNT"] + ) + total = part_filtered.merge( + lineitem_filtered, left_on="P_PARTKEY", right_on="L_PARTKEY", how="inner" + ) + total = total.loc[:, ["L_SUPPKEY", "L_ORDERKEY", "VOLUME"]] + supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]] + total = total.merge( + supplier_filtered, left_on="L_SUPPKEY", right_on="S_SUPPKEY", how="inner" + ) + total = total.loc[:, ["L_ORDERKEY", "VOLUME", "S_NATIONKEY"]] + orders_filtered = orders[ + (orders["O_ORDERDATE"] >= pd.Timestamp("1995-01-01")) + & (orders["O_ORDERDATE"] < pd.Timestamp("1997-01-01")) + ] + orders_filtered["O_YEAR"] = orders_filtered["O_ORDERDATE"].dt.year + orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY", "O_YEAR"]] + total = total.merge( + orders_filtered, left_on="L_ORDERKEY", right_on="O_ORDERKEY", how="inner" + ) + total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_CUSTKEY", "O_YEAR"]] + customer_filtered = customer.loc[:, ["C_CUSTKEY", "C_NATIONKEY"]] + total = total.merge( + customer_filtered, left_on="O_CUSTKEY", right_on="C_CUSTKEY", how="inner" + ) + total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_YEAR", "C_NATIONKEY"]] + n1_filtered = nation.loc[:, ["N_NATIONKEY", "N_REGIONKEY"]] + n2_filtered = nation.loc[:, ["N_NATIONKEY", "N_NAME"]].rename( + columns={"N_NAME": "NATION"} + ) + total = total.merge( + n1_filtered, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_YEAR", "N_REGIONKEY"]] + total = total.merge( + n2_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + total = total.loc[:, ["VOLUME", "O_YEAR", "N_REGIONKEY", "NATION"]] + region_filtered = region[(region["R_NAME"] == region_name)] + region_filtered = region_filtered.loc[:, ["R_REGIONKEY"]] + total = total.merge( + region_filtered, left_on="N_REGIONKEY", right_on="R_REGIONKEY", how="inner" + ) + total = total.loc[:, ["VOLUME", "O_YEAR", "NATION"]] + + def udf(df): + demonimator = df["VOLUME"].sum() + df = df[df["NATION"] == nation_name] + numerator = df["VOLUME"].sum() + return numerator / demonimator + + total = total.groupby("O_YEAR").apply(udf, include_groups=False).reset_index() + total.columns = ["O_YEAR", "MKT_SHARE"] + total = total.sort_values(by=["O_YEAR"], ascending=[True]) + + return total + + +def q09(root: str, storage_options: Dict): + part = load_part(root, storage_options) + partsupp = load_partsupp(root, storage_options) + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + + p_name = "ghost" + psel = part.P_NAME.str.contains(p_name) + fpart = part[psel] + jn1 = lineitem.merge(fpart, left_on="L_PARTKEY", right_on="P_PARTKEY") + jn2 = jn1.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY") + jn3 = jn2.merge(nation, left_on="S_NATIONKEY", right_on="N_NATIONKEY") + jn4 = partsupp.merge( + jn3, left_on=["PS_PARTKEY", "PS_SUPPKEY"], right_on=["L_PARTKEY", "L_SUPPKEY"] + ) + jn5 = jn4.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY") + jn5["TMP"] = jn5.L_EXTENDEDPRICE * (1 - jn5.L_DISCOUNT) - ( + (1 * jn5.PS_SUPPLYCOST) * jn5.L_QUANTITY + ) + jn5["O_YEAR"] = jn5.O_ORDERDATE.dt.year + gb = jn5.groupby(["N_NAME", "O_YEAR"], as_index=False)["TMP"].sum() + total = gb.sort_values(["N_NAME", "O_YEAR"], ascending=[True, False]) + total = total.rename(columns={"TMP": "SUM_PROFIT"}) + + return total + + +def q10(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + nation = load_nation(root, storage_options) + customer = load_customer(root, storage_options) + + date1 = pd.Timestamp("1994-11-01") + date2 = date1 + pd.DateOffset(months=3) + osel = (orders.O_ORDERDATE >= date1) & (orders.O_ORDERDATE < date2) + lsel = lineitem.L_RETURNFLAG == "R" + forders = orders[osel] + flineitem = lineitem[lsel] + jn1 = flineitem.merge(forders, left_on="L_ORDERKEY", right_on="O_ORDERKEY") + jn2 = jn1.merge(customer, left_on="O_CUSTKEY", right_on="C_CUSTKEY") + jn3 = jn2.merge(nation, left_on="C_NATIONKEY", right_on="N_NATIONKEY") + jn3["REVENUE"] = jn3.L_EXTENDEDPRICE * (1.0 - jn3.L_DISCOUNT) + gb = jn3.groupby( + [ + "C_CUSTKEY", + "C_NAME", + "C_ACCTBAL", + "C_PHONE", + "N_NAME", + "C_ADDRESS", + "C_COMMENT", + ], + as_index=False, + )["REVENUE"].sum() + total = gb.sort_values("REVENUE", ascending=False) + total = total.head(20) + total = total[ + [ + "C_CUSTKEY", + "C_NAME", + "REVENUE", + "C_ACCTBAL", + "N_NAME", + "C_ADDRESS", + "C_PHONE", + "C_COMMENT", + ] + ] + + return total + + +def q11(root: str, storage_options: Dict): + partsupp = load_partsupp(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + + nation_name = "GERMANY" + fraction = 0.0001 + + partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY"]] + partsupp_filtered["TOTAL_COST"] = ( + partsupp["PS_SUPPLYCOST"] * partsupp["PS_AVAILQTY"] + ) + supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]] + ps_supp_merge = partsupp_filtered.merge( + supplier_filtered, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="inner" + ) + ps_supp_merge = ps_supp_merge.loc[:, ["PS_PARTKEY", "S_NATIONKEY", "TOTAL_COST"]] + nation_filtered = nation[(nation["N_NAME"] == nation_name)] + nation_filtered = nation_filtered.loc[:, ["N_NATIONKEY"]] + ps_supp_n_merge = ps_supp_merge.merge( + nation_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + ps_supp_n_merge = ps_supp_n_merge.loc[:, ["PS_PARTKEY", "TOTAL_COST"]] + sum_val = ps_supp_n_merge["TOTAL_COST"].sum() * fraction + total = ps_supp_n_merge.groupby(["PS_PARTKEY"], as_index=False).agg( + VALUE=pd.NamedAgg(column="TOTAL_COST", aggfunc="sum") + ) + total = total[total["VALUE"] > sum_val] + total = total.sort_values("VALUE", ascending=False) + + return total + + +def q12(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + + shipmode1 = "MAIL" + shipmode2 = "SHIP" + date1 = pd.Timestamp("1994-01-01") + date2 = date1 + pd.DateOffset(years=1) + sel = ( + (lineitem.L_RECEIPTDATE < date2) + & (lineitem.L_COMMITDATE < date2) + & (lineitem.L_SHIPDATE < date2) + & (lineitem.L_SHIPDATE < lineitem.L_COMMITDATE) + & (lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE) + & (lineitem.L_RECEIPTDATE >= date1) + & ((lineitem.L_SHIPMODE == shipmode1) | (lineitem.L_SHIPMODE == shipmode2)) + ) + flineitem = lineitem[sel] + jn = flineitem.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY") + + def g1(x): + return ((x == "1-URGENT") | (x == "2-HIGH")).sum() + + def g2(x): + return ((x != "1-URGENT") & (x != "2-HIGH")).sum() + + total = jn.groupby("L_SHIPMODE", as_index=False)["O_ORDERPRIORITY"].agg((g1, g2)) + total = total.sort_values("L_SHIPMODE").rename( + columns={"g1": "HIGH_LINE_COUNT", "g2": "LOW_LINE_COUNT"} + ) + + # Round the result to one decimal place -- If you use test_result.py to test the results, please uncomment the following two lines. + # total["HIGH_LINE_COUNT"] = total["HIGH_LINE_COUNT"].astype(float).round(1) + # total["LOW_LINE_COUNT"] = total["LOW_LINE_COUNT"].astype(float).round(1) + + + return total + + +def q13(root: str, storage_options: Dict): + customer = load_customer(root, storage_options) + orders = load_orders(root, storage_options) + + word1 = "special" + word2 = "requests" + customer_filtered = customer.loc[:, ["C_CUSTKEY"]] + orders_filtered = orders[ + ~orders["O_COMMENT"].str.contains(f"{word1}.*{word2}") + ] + orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]] + c_o_merged = customer_filtered.merge( + orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left" + ) + c_o_merged = c_o_merged.loc[:, ["C_CUSTKEY", "O_ORDERKEY"]] + count_df = c_o_merged.groupby(["C_CUSTKEY"], as_index=False).agg( + C_COUNT=pd.NamedAgg(column="O_ORDERKEY", aggfunc="count") + ) + + total = count_df.groupby(["C_COUNT"], as_index=False).size() + # [change 3] for TypeError: Series.sort_values() got an unexpected keyword argument 'by' + # the error is caused here: in cuDF,DataFrameGroupBy.size() Return the size of each group. https://docs.rapids.ai/api/cudf/stable/user_guide/api_docs/api/cudf.core.groupby.groupby.dataframegroupby.size/# + # while in pandas, DataFrameGroupBy.size() Returns DataFrame or Series, Number of rows in each group as a Series if as_index is True or a DataFrame if as_index is False. https://pandas.pydata.org/docs/reference/api/pandas.core.groupby.DataFrameGroupBy.size.html#pandas.core.groupby.DataFrameGroupBy.size + total = total.reset_index(name='size') + total.columns = ["C_COUNT", "CUSTDIST"] + + total = total.sort_values( + by=["CUSTDIST", "C_COUNT"], + ascending=[False, False], + ) + + return total + + +def q14(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + part = load_part(root, storage_options) + + startDate = pd.Timestamp("1994-03-01") + endDate = startDate + pd.DateOffset(months=1) + p_type_like = "PROMO" + part_filtered = part.loc[:, ["P_PARTKEY", "P_TYPE"]] + lineitem_filtered = lineitem.loc[ + :, ["L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE", "L_PARTKEY"] + ] + sel = (lineitem_filtered.L_SHIPDATE >= startDate) & ( + lineitem_filtered.L_SHIPDATE < endDate + ) + flineitem = lineitem_filtered[sel] + jn = flineitem.merge(part_filtered, left_on="L_PARTKEY", right_on="P_PARTKEY") + jn["PROMO_REVENUE"] = jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT) + total = ( + jn[jn.P_TYPE.str.startswith(p_type_like)].PROMO_REVENUE.sum() + * 100 + / jn.PROMO_REVENUE.sum() + ) + + result_df = pd.DataFrame({"PROMO_REVENUE": [total]}) + return result_df + + +def q15(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + supplier = load_supplier(root, storage_options) + + lineitem_filtered = lineitem[ + (lineitem["L_SHIPDATE"] >= pd.Timestamp("1996-01-01")) + & ( + lineitem["L_SHIPDATE"] + < (pd.Timestamp("1996-01-01") + pd.DateOffset(months=3)) + ) + ] + lineitem_filtered["REVENUE_PARTS"] = lineitem_filtered["L_EXTENDEDPRICE"] * ( + 1.0 - lineitem_filtered["L_DISCOUNT"] + ) + lineitem_filtered = lineitem_filtered.loc[:, ["L_SUPPKEY", "REVENUE_PARTS"]] + revenue_table = ( + lineitem_filtered.groupby("L_SUPPKEY", as_index=False) + .agg(TOTAL_REVENUE=pd.NamedAgg(column="REVENUE_PARTS", aggfunc="sum")) + .rename(columns={"L_SUPPKEY": "SUPPLIER_NO"}) + ) + max_revenue = revenue_table["TOTAL_REVENUE"].max() + revenue_table = revenue_table[revenue_table["TOTAL_REVENUE"] == max_revenue] + supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_PHONE"]] + total = supplier_filtered.merge( + revenue_table, left_on="S_SUPPKEY", right_on="SUPPLIER_NO", how="inner" + ) + total = total.loc[ + :, ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_PHONE", "TOTAL_REVENUE"] + ] + + return total + + +def q16(root: str, storage_options: Dict): + part = load_part(root, storage_options) + partsupp = load_partsupp(root, storage_options) + supplier = load_supplier(root, storage_options) + + BRAND = "Brand#45" + TYPE = "MEDIUM POLISHED" + SIZE_LIST = [49, 14, 23, 45, 19, 3, 36, 9] + + # Merge part and partsupp DataFrames + merged_df = pd.merge(part, partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner") + + # Apply filters + filtered_df = merged_df[ + (merged_df["P_BRAND"] != BRAND) & + (~merged_df["P_TYPE"].str.startswith(TYPE)) & + (merged_df["P_SIZE"].isin(SIZE_LIST)) + ] + + # Exclude unwanted suppliers + supplier_filtered = supplier[supplier["S_COMMENT"].str.contains("CUSTOMER.*COMPLAINTS")] + filtered_df = pd.merge(filtered_df, supplier_filtered["S_SUPPKEY"], left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left") + filtered_df = filtered_df[filtered_df["S_SUPPKEY"].isna()] + + # Group by and count unique suppliers + total = filtered_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"]).agg(SUPPLIER_CNT=("PS_SUPPKEY", "nunique")).reset_index() + + # Sort the result + total = total.sort_values(by=["SUPPLIER_CNT", "P_BRAND", "P_TYPE", "P_SIZE"], ascending=[False, True, True, True]) + + return total + + +def q17(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + part = load_part(root, storage_options) + + brand = "Brand#23" + container = "MED BOX" + + left = lineitem.loc[:, ["L_PARTKEY", "L_QUANTITY", "L_EXTENDEDPRICE"]] + right = part[((part["P_BRAND"] == brand) & (part["P_CONTAINER"] == container))] + right = right.loc[:, ["P_PARTKEY"]] + line_part_merge = left.merge( + right, left_on="L_PARTKEY", right_on="P_PARTKEY", how="inner" + ) + line_part_merge = line_part_merge.loc[ + :, ["L_QUANTITY", "L_EXTENDEDPRICE", "P_PARTKEY"] + ] + lineitem_filtered = lineitem.loc[:, ["L_PARTKEY", "L_QUANTITY"]] + lineitem_avg = lineitem_filtered.groupby(["L_PARTKEY"], as_index=False).agg( + avg=pd.NamedAgg(column="L_QUANTITY", aggfunc="mean") + ) + lineitem_avg["avg"] = 0.2 * lineitem_avg["avg"] + lineitem_avg = lineitem_avg.loc[:, ["L_PARTKEY", "avg"]] + total = line_part_merge.merge( + lineitem_avg, left_on="P_PARTKEY", right_on="L_PARTKEY", how="inner" + ) + total = total[total["L_QUANTITY"] < total["avg"]] + total = pd.DataFrame({"AVG_YEARLY": [total["L_EXTENDEDPRICE"].sum() / 7.0]}) + + return total + + +def q18(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + customer = load_customer(root, storage_options) + + quantity = 300 + gb1 = lineitem.groupby("L_ORDERKEY", as_index=False)["L_QUANTITY"].sum() + fgb1 = gb1[gb1.L_QUANTITY > quantity] + jn1 = fgb1.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY") + jn2 = jn1.merge(customer, left_on="O_CUSTKEY", right_on="C_CUSTKEY") + gb2 = jn2.groupby( + ["C_NAME", "C_CUSTKEY", "O_ORDERKEY", "O_ORDERDATE", "O_TOTALPRICE"], + as_index=False, + )["L_QUANTITY"].sum() + total = gb2.sort_values(["O_TOTALPRICE", "O_ORDERDATE"], ascending=[False, True]) + total = total.head(100) + + # [change 2]Convert cudf DataFrame to Pandas DataFrame and format timestamp + total["O_ORDERDATE"] = pd.to_datetime(total["O_ORDERDATE"]).dt.strftime("%Y-%m-%d") + + return total + + +def q19(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + part = load_part(root, storage_options) + + quantity1 = 4 + quantity2 = 15 + quantity3 = 26 + brand1 = "Brand#31" + brand2 = "Brand#24" + brand3 = "Brand#35" + + lsel = ( + ( + ( + (lineitem.L_QUANTITY <= quantity3 + 10) + & (lineitem.L_QUANTITY >= quantity3) + ) + | ( + (lineitem.L_QUANTITY <= quantity2 + 10) + & (lineitem.L_QUANTITY >= quantity2) + ) + | ( + (lineitem.L_QUANTITY <= quantity1 + 10) + & (lineitem.L_QUANTITY >= quantity1) + ) + ) + & (lineitem.L_SHIPINSTRUCT == "DELIVER IN PERSON") + & ((lineitem.L_SHIPMODE == "AIR") | (lineitem.L_SHIPMODE == "AIR REG")) + ) + psel = (part.P_SIZE >= 1) & ( + ( + (part.P_SIZE <= 5) + & (part.P_BRAND == brand1) + & ( + (part.P_CONTAINER == "SM BOX") + | (part.P_CONTAINER == "SM CASE") + | (part.P_CONTAINER == "SM PACK") + | (part.P_CONTAINER == "SM PKG") + ) + ) + | ( + (part.P_SIZE <= 10) + & (part.P_BRAND == brand2) + & ( + (part.P_CONTAINER == "MED BAG") + | (part.P_CONTAINER == "MED BOX") + | (part.P_CONTAINER == "MED PACK") + | (part.P_CONTAINER == "MED PKG") + ) + ) + | ( + (part.P_SIZE <= 15) + & (part.P_BRAND == brand3) + & ( + (part.P_CONTAINER == "LG BOX") + | (part.P_CONTAINER == "LG CASE") + | (part.P_CONTAINER == "LG PACK") + | (part.P_CONTAINER == "LG PKG") + ) + ) + ) + flineitem = lineitem[lsel] + fpart = part[psel] + jn = flineitem.merge(fpart, left_on="L_PARTKEY", right_on="P_PARTKEY") + jnsel = ( + ( + (jn.P_BRAND == brand1) + & ( + (jn.P_CONTAINER == "SM BOX") + | (jn.P_CONTAINER == "SM CASE") + | (jn.P_CONTAINER == "SM PACK") + | (jn.P_CONTAINER == "SM PKG") + ) + & (jn.L_QUANTITY >= quantity1) + & (jn.L_QUANTITY <= quantity1 + 10) + & (jn.P_SIZE <= 5) + ) + | + ( + (jn.P_BRAND == brand2) + & ( + (jn.P_CONTAINER == "MED BAG") + | (jn.P_CONTAINER == "MED BOX") + | (jn.P_CONTAINER == "MED PACK") + | (jn.P_CONTAINER == "MED PKG") + ) + & (jn.L_QUANTITY >= quantity2) + & (jn.L_QUANTITY <= quantity2 + 10) + & (jn.P_SIZE <= 10) + ) + | + ( + (jn.P_BRAND == brand3) + & ( + (jn.P_CONTAINER == "LG BOX") + | (jn.P_CONTAINER == "LG CASE") + | (jn.P_CONTAINER == "LG PACK") + | (jn.P_CONTAINER == "LG PKG") + ) + & (jn.L_QUANTITY >= quantity3) + & (jn.L_QUANTITY <= quantity3 + 10) + & (jn.P_SIZE <= 15) + ) + ) + jn = jn[jnsel] + result_value = (jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT)).sum() + result_df = pd.DataFrame({"REVENUE": [result_value]}) + + return result_df + + +def q20(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + part = load_part(root, storage_options) + nation = load_nation(root, storage_options) + partsupp = load_partsupp(root, storage_options) + supplier = load_supplier(root, storage_options) + + p_name = "azure" + date1 = pd.Timestamp("1996-01-01") + date2 = date1 + pd.DateOffset(years=1) + psel = part.P_NAME.str.startswith(p_name) + nsel = nation.N_NAME == "JORDAN" + lsel = (lineitem.L_SHIPDATE >= date1) & (lineitem.L_SHIPDATE < date2) + fpart = part[psel] + fnation = nation[nsel] + flineitem = lineitem[lsel] + jn1 = fpart.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY") + jn2 = jn1.merge( + flineitem, + left_on=["PS_PARTKEY", "PS_SUPPKEY"], + right_on=["L_PARTKEY", "L_SUPPKEY"], + ) + gb = jn2.groupby(["PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY"], as_index=False)[ + "L_QUANTITY" + ].sum() + gbsel = gb.PS_AVAILQTY > (0.5 * gb.L_QUANTITY) + fgb = gb[gbsel] + jn3 = fgb.merge(supplier, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") + jn4 = fnation.merge(jn3, left_on="N_NATIONKEY", right_on="S_NATIONKEY") + jn4 = jn4.loc[:, ["S_NAME", "S_ADDRESS"]] + total = jn4.sort_values("S_NAME").drop_duplicates() + + return total + + +def q21(root: str, storage_options: Dict): + lineitem = load_lineitem(root, storage_options) + orders = load_orders(root, storage_options) + supplier = load_supplier(root, storage_options) + nation = load_nation(root, storage_options) + + nation_name = "SAUDI ARABIA" + lineitem_filtered = lineitem.loc[ + :, ["L_ORDERKEY", "L_SUPPKEY", "L_RECEIPTDATE", "L_COMMITDATE"] + ] + + # Exists + lineitem_orderkeys = ( + lineitem_filtered.loc[:, ["L_ORDERKEY", "L_SUPPKEY"]] + .groupby("L_ORDERKEY", as_index=False)["L_SUPPKEY"] + .nunique() + ) + lineitem_orderkeys.columns = ["L_ORDERKEY", "nunique_col"] + lineitem_orderkeys = lineitem_orderkeys[lineitem_orderkeys["nunique_col"] > 1] + lineitem_orderkeys = lineitem_orderkeys.loc[:, ["L_ORDERKEY"]] + + # Filter + lineitem_filtered = lineitem_filtered[ + lineitem_filtered["L_RECEIPTDATE"] > lineitem_filtered["L_COMMITDATE"] + ] + lineitem_filtered = lineitem_filtered.loc[:, ["L_ORDERKEY", "L_SUPPKEY"]] + + # Merge Filter + Exists + lineitem_filtered = lineitem_filtered.merge( + lineitem_orderkeys, on="L_ORDERKEY", how="inner" + ) + + # Not Exists: Check the exists condition isn't still satisfied on the output. + lineitem_orderkeys = lineitem_filtered.groupby("L_ORDERKEY", as_index=False)[ + "L_SUPPKEY" + ].nunique() + lineitem_orderkeys.columns = ["L_ORDERKEY", "nunique_col"] + lineitem_orderkeys = lineitem_orderkeys[lineitem_orderkeys["nunique_col"] == 1] + lineitem_orderkeys = lineitem_orderkeys.loc[:, ["L_ORDERKEY"]] + + # Merge Filter + Not Exists + lineitem_filtered = lineitem_filtered.merge( + lineitem_orderkeys, on="L_ORDERKEY", how="inner" + ) + + orders_filtered = orders.loc[:, ["O_ORDERSTATUS", "O_ORDERKEY"]] + orders_filtered = orders_filtered[orders_filtered["O_ORDERSTATUS"] == "F"] + orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY"]] + total = lineitem_filtered.merge( + orders_filtered, left_on="L_ORDERKEY", right_on="O_ORDERKEY", how="inner" + ) + total = total.loc[:, ["L_SUPPKEY"]] + + supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY", "S_NAME"]] + total = total.merge( + supplier_filtered, left_on="L_SUPPKEY", right_on="S_SUPPKEY", how="inner" + ) + total = total.loc[:, ["S_NATIONKEY", "S_NAME"]] + nation_filtered = nation.loc[:, ["N_NAME", "N_NATIONKEY"]] + nation_filtered = nation_filtered[nation_filtered["N_NAME"] == nation_name] + total = total.merge( + nation_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner" + ) + total = total.loc[:, ["S_NAME"]] + total = total.groupby("S_NAME", as_index=False).size() + # [change 4] add reset_index for the same error in q13 + total = total.reset_index(name='size') + total.columns = ["S_NAME", "NUMWAIT"] + total = total.sort_values(by=["NUMWAIT", "S_NAME"], ascending=[False, True]) + total = total.head(100) + + return total + + +def q22(root: str, storage_options: Dict): + customer = load_customer(root, storage_options) + orders = load_orders(root, storage_options) + + I1 = "13" + I2 = "31" + I3 = "23" + I4 = "29" + I5 = "30" + I6 = "18" + I7 = "17" + customer_filtered = customer.loc[:, ["C_ACCTBAL", "C_CUSTKEY"]] + customer_filtered["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2) + customer_filtered = customer_filtered[ + (customer["C_ACCTBAL"] > 0.00) + & customer_filtered["CNTRYCODE"].isin([I1, I2, I3, I4, I5, I6, I7]) + ] + avg_value = customer_filtered["C_ACCTBAL"].mean() + customer_filtered = customer_filtered[customer_filtered["C_ACCTBAL"] > avg_value] + # Select only the keys that don't match by performing a left join and only selecting columns with an na value + orders_filtered = orders.loc[:, ["O_CUSTKEY"]].drop_duplicates() + customer_keys = customer_filtered.loc[:, ["C_CUSTKEY"]].drop_duplicates() + customer_selected = customer_keys.merge( + orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left" + ) + customer_selected = customer_selected[customer_selected["O_CUSTKEY"].isna()] + customer_selected = customer_selected.loc[:, ["C_CUSTKEY"]] + customer_selected = customer_selected.merge( + customer_filtered, on="C_CUSTKEY", how="inner" + ) + customer_selected = customer_selected.loc[:, ["CNTRYCODE", "C_ACCTBAL"]] + agg1 = customer_selected.groupby(["CNTRYCODE"], as_index=False).size() + # [change 5] add reset_index for the same error in q13 + agg1 = agg1.reset_index(name='size') + + agg1.columns = ["CNTRYCODE", "NUMCUST"] + agg2 = customer_selected.groupby(["CNTRYCODE"], as_index=False).agg( + TOTACCTBAL=pd.NamedAgg(column="C_ACCTBAL", aggfunc="sum") + ) + total = agg1.merge(agg2, on="CNTRYCODE", how="inner") + total = total.sort_values(by=["CNTRYCODE"], ascending=[True]) + + return total + + +query_to_loaders = { + 1: [load_lineitem], + 2: [load_part, load_partsupp, load_supplier, load_nation, load_region], + 3: [load_lineitem, load_orders, load_customer], + 4: [load_lineitem, load_orders], + 5: [ + load_lineitem, + load_orders, + load_customer, + load_nation, + load_region, + load_supplier, + ], + 6: [load_lineitem], + 7: [load_lineitem, load_supplier, load_orders, load_customer, load_nation], + 8: [ + load_part, + load_lineitem, + load_supplier, + load_orders, + load_customer, + load_nation, + load_region, + ], + 9: [ + load_lineitem, + load_orders, + load_part, + load_nation, + load_partsupp, + load_supplier, + ], + 10: [load_lineitem, load_orders, load_nation, load_customer], + 11: [load_partsupp, load_supplier, load_nation], + 12: [load_lineitem, load_orders], + 13: [load_customer, load_orders], + 14: [load_lineitem, load_part], + 15: [load_lineitem, load_supplier], + 16: [load_part, load_partsupp, load_supplier], + 17: [load_lineitem, load_part], + 18: [load_lineitem, load_orders, load_customer], + 19: [load_lineitem, load_part], + 20: [load_lineitem, load_part, load_nation, load_partsupp, load_supplier], + 21: [load_lineitem, load_orders, load_supplier, load_nation], + 22: [load_customer, load_orders], +} + +query_to_runner = { + 1: q01, + 2: q02, + 3: q03, + 4: q04, + 5: q05, + 6: q06, + 7: q07, + 8: q08, + 9: q09, + 10: q10, + 11: q11, + 12: q12, + 13: q13, + 14: q14, + 15: q15, + 16: q16, + 17: q17, + 18: q18, + 19: q19, + 20: q20, + 21: q21, + 22: q22, +} + + +def run_queries( + path, + storage_options, + queries, + log_time=True, + print_result=False, + include_io=False, +): + version = cudf.__version__ + data_start_time = time.time() + for query in queries: + loaders = query_to_loaders[query] + for loader in loaders: + loader(path, storage_options) + print(f"Total data loading time (s): {time.time() - data_start_time}") + + total_start = time.time() + for query in queries: + try: + start_time = time.time() + result = query_to_runner[query](path, storage_options) + without_io_time = time.time() - start_time + success = True + if print_result: + print_result_fn("cudf", result, query) + except Exception as e: + print("".join(traceback.TracebackException.from_exception(e).format())) + without_io_time = 0.0 + success = False + finally: + pass + if log_time: + log_time_fn( + "cudf", + query, + version=version, + without_io_time=without_io_time, + success=success, + ) + print(f"Total query execution time (s): {time.time() - total_start}") + + +def main(): + parser = argparse.ArgumentParser(description="TPC-H benchmark.") + parser.add_argument( + "--storage_options", + type=str, + required=False, + help="storage options json file.", + ) + parser = parse_common_arguments(parser) + args = parser.parse_args() + + # path to TPC-H data in parquet. + path = args.path + print(f"Path: {path}") + + # credentials to access the datasource. + storage_options = {} + if args.storage_options is not None: + with open(args.storage_options, "r") as fp: + storage_options = json.load(fp) + print(f"Storage options: {storage_options}") + + queries = list(range(1, 23)) + if args.queries is not None: + queries = args.queries + print(f"Queries to run: {queries}") + print(f"Include IO: {args.include_io}") + + run_queries( + path, + storage_options, + queries, + args.log_time, + args.print_result, + args.include_io, + ) + + +if __name__ == "__main__": + main() diff --git a/tpch/duckdb_queries/queries.py b/tpch/duckdb_queries/queries.py index a0bd128..caced32 100644 --- a/tpch/duckdb_queries/queries.py +++ b/tpch/duckdb_queries/queries.py @@ -137,9 +137,9 @@ def q02(root: str): part = load_part(root) partsupp = load_partsupp(root) - SIZE = 15 - TYPE = "BRASS" - REGION = "EUROPE" + size = 15 + type = "BRASS" + region_name = "EUROPE" total = duckdb.sql( f"""SELECT S_ACCTBAL, @@ -159,11 +159,11 @@ def q02(root: str): WHERE P_PARTKEY = PS_PARTKEY AND S_SUPPKEY = PS_SUPPKEY - AND P_SIZE = {SIZE} - AND P_TYPE LIKE '%{TYPE}' + AND P_SIZE = {size} + AND P_TYPE LIKE '%{type}' AND S_NATIONKEY = N_NATIONKEY AND N_REGIONKEY = R_REGIONKEY - AND R_NAME = '{REGION}' + AND R_NAME = '{region_name}' AND PS_SUPPLYCOST = ( SELECT MIN(PS_SUPPLYCOST) @@ -175,13 +175,14 @@ def q02(root: str): AND S_SUPPKEY = PS_SUPPKEY AND S_NATIONKEY = N_NATIONKEY AND N_REGIONKEY = R_REGIONKEY - AND R_NAME = '{REGION}' + AND R_NAME = '{region_name}' ) ORDER BY S_ACCTBAL DESC, N_NAME, S_NAME, - P_PARTKEY""" + P_PARTKEY + LIMIT 100""" ) return total @@ -192,8 +193,8 @@ def q03(root: str): customer = load_customer(root) orders = load_orders(root) - MKTSEGMENT = "HOUSEHOLD" - DATE = "1995-03-04" + mktsegment = "HOUSEHOLD" + date = "1995-03-04" total = duckdb.sql( f"""SELECT L_ORDERKEY, @@ -205,11 +206,11 @@ def q03(root: str): orders, LINEITEM WHERE - C_MKTSEGMENT = '{MKTSEGMENT}' + C_MKTSEGMENT = '{mktsegment}' AND C_CUSTKEY = O_CUSTKEY AND L_ORDERKEY = O_ORDERKEY - AND O_ORDERDATE < DATE '{DATE}' - AND L_SHIPDATE > DATE '{DATE}' + AND O_ORDERDATE < DATE '{date}' + AND L_SHIPDATE > DATE '{date}' GROUP BY L_ORDERKEY, O_ORDERDATE, @@ -226,7 +227,7 @@ def q04(root: str): line_item = load_lineitem(root) orders = load_orders(root) - DATE = "1993-08-01" + date = "1993-08-01" total = duckdb.sql( f"""SELECT O_ORDERPRIORITY, @@ -234,8 +235,8 @@ def q04(root: str): FROM orders WHERE - O_ORDERDATE >= DATE '{DATE}' - AND O_ORDERDATE < DATE '{DATE}' + INTERVAL '3' MONTH + O_ORDERDATE >= DATE '{date}' + AND O_ORDERDATE < DATE '{date}' + INTERVAL '3' MONTH AND EXISTS ( SELECT * @@ -262,8 +263,8 @@ def q05(root: str): lineitem = load_lineitem(root) customer = load_customer(root) - REGION = "ASIA" - DATE = "1996-01-01" + region_name = "ASIA" + date = "1996-01-01" total = duckdb.sql( f"""SELECT N_NAME, @@ -282,9 +283,9 @@ def q05(root: str): AND C_NATIONKEY = S_NATIONKEY AND S_NATIONKEY = N_NATIONKEY AND N_REGIONKEY = R_REGIONKEY - AND R_NAME = '{REGION}' - AND O_ORDERDATE >= DATE '{DATE}' - AND O_ORDERDATE < DATE '{DATE}' + INTERVAL '1' YEAR + AND R_NAME = '{region_name}' + AND O_ORDERDATE >= DATE '{date}' + AND O_ORDERDATE < DATE '{date}' + INTERVAL '1' YEAR GROUP BY N_NAME ORDER BY @@ -296,15 +297,15 @@ def q05(root: str): def q06(root: str): lineitem = load_lineitem(root) - DATE = "1996-01-01" + date = "1996-01-01" total = duckdb.sql( f"""SELECT SUM(L_EXTENDEDPRICE * L_DISCOUNT) AS REVENUE FROM LINEITEM WHERE - L_SHIPDATE >= DATE '{DATE}' - AND L_SHIPDATE < DATE '{DATE}' + INTERVAL '1' YEAR + L_SHIPDATE >= DATE '{date}' + AND L_SHIPDATE < DATE '{date}' + INTERVAL '1' YEAR AND L_DISCOUNT BETWEEN .08 AND .1 AND L_QUANTITY < 24""" ) @@ -318,8 +319,8 @@ def q07(root: str): lineitem = load_lineitem(root) customer = load_customer(root) - NATION1 = "FRANCE" - NATION2 = "GERMANY" + nation1 = "FRANCE" + nation2 = "GERMANY" total = duckdb.sql( f"""SELECT SUPP_NATION, @@ -345,8 +346,8 @@ def q07(root: str): AND S_NATIONKEY = N1.N_NATIONKEY AND C_NATIONKEY = N2.N_NATIONKEY AND ( - (N1.N_name = '{NATION1}' AND N2.N_NAME = '{NATION2}') - OR (N1.N_NAME = '{NATION2}' AND N2.N_NAME = '{NATION1}') + (N1.N_name = '{nation1}' AND N2.N_NAME = '{nation2}') + OR (N1.N_NAME = '{nation2}' AND N2.N_NAME = '{nation1}') ) AND L_SHIPDATE BETWEEN DATE '1995-01-01' AND DATE '1996-12-31' ) AS SHIPPING @@ -371,14 +372,14 @@ def q08(root: str): lineitem = load_lineitem(root) customer = load_customer(root) - NATION = "BRAZIL" - REGION = "AMERICA" - TYPE = "ECONOMY ANODIZED STEEL" + nation_name = "BRAZIL" + region_name = "AMERICA" + type = "ECONOMY ANODIZED STEEL" total = duckdb.sql( f"""SELECT O_YEAR, SUM(CASE - WHEN NAtion = '{NATION}' + WHEN NAtion = '{nation_name}' THEN VOLUME ELSE 0 END) / SUM(VOLUME) AS MKT_SHARE @@ -403,10 +404,10 @@ def q08(root: str): AND O_CUSTKEY = C_CUSTKEY AND C_NATIONKEY = N1.N_NATIONKEY AND N1.N_REGIONKEY = R_REGIONKEY - AND R_NAME = '{REGION}' + AND R_NAME = '{region_name}' AND S_NATIONKEY = N2.N_NATIONKEY AND O_ORDERDATE BETWEEN DATE '1995-01-01' AND DATE '1996-12-31' - AND P_TYPE = '{TYPE}' + AND P_TYPE = '{type}' ) AS ALL_NATIONS GROUP BY O_YEAR @@ -425,7 +426,7 @@ def q09(root: str): lineitem = load_lineitem(root) partsupp = load_partsupp(root) - NAME = "ghost" + name = "ghost" total = duckdb.sql( f"""SELECT @@ -452,7 +453,7 @@ def q09(root: str): AND P_PARTKEY = L_PARTKEY AND O_ORDERKEY = L_ORDERKEY AND S_NATIONKEY = N_NATIONKEY - AND P_NAME LIKE '%{NAME}%' + AND P_NAME LIKE '%{name}%' ) AS PROFIT GROUP BY NATION, @@ -470,7 +471,7 @@ def q10(root: str): lineitem = load_lineitem(root) customer = load_customer(root) - DATE = "1994-11-01" + date = "1994-11-01" total = duckdb.sql( f"""SELECT C_CUSTKEY, @@ -489,8 +490,8 @@ def q10(root: str): WHERE C_CUSTKEY = O_CUSTKEY AND L_ORDERKEY = O_ORDERKEY - AND O_ORDERDATE >= DATE '{DATE}' - AND O_ORDERDATE < DATE '{DATE}' + INTERVAL '3' MONTH + AND O_ORDERDATE >= DATE '{date}' + AND O_ORDERDATE < DATE '{date}' + INTERVAL '3' MONTH AND L_RETURNFLAG = 'R' AND C_NATIONKEY = N_NATIONKEY GROUP BY @@ -508,14 +509,14 @@ def q10(root: str): return total -# todo: result is empty + def q11(root: str): partsupp = load_partsupp(root) supplier = load_supplier(root) nation = load_nation(root) - NATION = "GERMANY" - FRACTION = 0.0001 + nation_name = "GERMANY" + fraction = 0.0001 total = duckdb.sql( f"""SELECT @@ -528,12 +529,12 @@ def q11(root: str): WHERE PS_SUPPKEY = S_SUPPKEY AND S_NATIONKEY = N_NATIONKEY - AND N_NAME = '{NATION}' + AND N_NAME = '{nation_name}' GROUP BY PS_PARTKEY HAVING SUM(PS_SUPPLYCOST * PS_AVAILQTY) > ( SELECT - SUM(PS_SUPPLYCOST * PS_AVAILQTY) * {FRACTION} + SUM(PS_SUPPLYCOST * PS_AVAILQTY) * {fraction} FROM PARTSUPP, SUPPLIER, @@ -541,7 +542,7 @@ def q11(root: str): WHERE PS_SUPPKEY = S_SUPPKEY AND S_NATIONKEY = N_NATIONKEY - AND N_NAME = '{NATION}' + AND N_NAME = '{nation_name}' ) ORDER BY VALUE DESC""" @@ -554,9 +555,9 @@ def q12(root): lineitem = load_lineitem(root) orders = load_orders(root) - SHIPMODE1 = "MAIL" - SHIPMODE2 = "SHIP" - DATE = "1994-01-01" + shipmode1 = "MAIL" + shipmode2 = "SHIP" + date = "1994-01-01" total = duckdb.sql( f"""SELECT L_SHIPMODE, @@ -577,11 +578,11 @@ def q12(root): LINEITEM WHERE O_ORDERKEY = L_ORDERKEY - AND L_SHIPMODE IN ('{SHIPMODE1}', '{SHIPMODE2}') + AND L_SHIPMODE IN ('{shipmode1}', '{shipmode2}') AND L_COMMITDATE < L_RECEIPTDATE AND L_SHIPDATE < L_COMMITDATE - AND L_RECEIPTDATE >= DATE '{DATE}' - AND L_RECEIPTDATE < DATE '{DATE}' + INTERVAL '1' YEAR + AND L_RECEIPTDATE >= DATE '{date}' + AND L_RECEIPTDATE < DATE '{date}' + INTERVAL '1' YEAR GROUP BY L_SHIPMODE ORDER BY @@ -595,8 +596,8 @@ def q13(root: str): customer = load_customer(root) orders = load_orders(root) - WORD1 = "special" - WORD2 = "requests" + word1 = "special" + word2 = "requests" total = duckdb.sql( f"""SELECT C_COUNT, COUNT(*) AS CUSTDIST @@ -607,7 +608,7 @@ def q13(root: str): FROM CUSTOMER LEFT OUTER JOIN orders ON C_CUSTKEY = O_CUSTKEY - AND O_COMMENT NOT LIKE '%{WORD1}%{WORD2}%' + AND O_COMMENT NOT LIKE '%{word1}%{word2}%' GROUP BY C_CUSTKEY )AS C_orders (C_CUSTKEY, C_COUNT) @@ -624,7 +625,7 @@ def q14(root): lineitem = load_lineitem(root) part = load_part(root) - DATE = "1994-03-01" + date = "1994-03-01" total = duckdb.sql( f"""SELECT 100.00 * SUM(CASE @@ -637,8 +638,8 @@ def q14(root): PART WHERE L_PARTKEY = P_PARTKEY - AND L_SHIPDATE >= DATE '{DATE}' - AND L_SHIPDATE < DATE '{DATE}' + INTERVAL '1' MONTH""" + AND L_SHIPDATE >= DATE '{date}' + AND L_SHIPDATE < DATE '{date}' + INTERVAL '1' MONTH""" ) return total @@ -647,7 +648,7 @@ def q15(root): lineitem = load_lineitem(root) supplier = load_supplier(root) - DATE = "1996-01-01" + date = "1996-01-01" _ = duckdb.execute( f"""CREATE TEMP VIEW REVENUE (SUPPLIER_NO, TOTAL_REVENUE) AS SELECT @@ -656,8 +657,8 @@ def q15(root): FROM LINEITEM WHERE - L_SHIPDATE >= DATE '{DATE}' - AND L_SHIPDATE < DATE '{DATE}' + INTERVAL '3' MONTH + L_SHIPDATE >= DATE '{date}' + AND L_SHIPDATE < DATE '{date}' + INTERVAL '3' MONTH GROUP BY L_SUPPKEY""" ) @@ -693,16 +694,16 @@ def q16(root): partsupp = load_partsupp(root) supplier = load_supplier(root) - BRAND = "Brand#45" - TYPE = "MEDIUM POLISHED" - SIZE1 = 49 - SIZE2 = 14 - SIZE3 = 23 - SIZE4 = 45 - SIZE5 = 19 - SIZE6 = 3 - SIZE7 = 36 - SIZE8 = 9 + brand = "Brand#45" + type = "MEDIUM POLISHED" + size1 = 49 + size2 = 14 + size3 = 23 + size4 = 45 + size5 = 19 + size6 = 3 + size7 = 36 + size8 = 9 total = duckdb.sql( f"""SELECT P_BRAND, @@ -714,9 +715,9 @@ def q16(root): PART WHERE P_PARTKEY = PS_PARTKEY - AND P_BRAND <> '{BRAND}' - AND P_TYPE NOT LIKE '{TYPE}%' - AND P_SIZE IN ({SIZE1}, {SIZE2}, {SIZE3}, {SIZE4}, {SIZE5}, {SIZE6}, {SIZE7}, {SIZE8}) + AND P_BRAND <> '{brand}' + AND P_TYPE NOT LIKE '{type}%' + AND P_SIZE IN ({size1}, {size2}, {size3}, {size4}, {size5}, {size6}, {size7}, {size8}) AND PS_SUPPKEY NOT IN ( SELECT S_SUPPKEY @@ -742,8 +743,8 @@ def q17(root): lineitem = load_lineitem(root) part = load_part(root) - BRAND = "Brand#23" - CONTAINER = "MED BOX" + brand = "Brand#23" + container = "MED BOX" total = duckdb.sql( f"""SELECT SUM(L_EXTENDEDPRICE) / 7.0 AS AVG_YEARLY @@ -752,8 +753,8 @@ def q17(root): PART WHERE P_PARTKEY = L_PARTKEY - AND P_BRAND = '{BRAND}' - AND P_CONTAINER = '{CONTAINER}' + AND P_BRAND = '{brand}' + AND P_CONTAINER = '{container}' AND L_QUANTITY < ( SELECT 0.2 * AVG(L_QUANTITY) @@ -771,7 +772,7 @@ def q18(root): orders = load_orders(root) customer = load_customer(root) - QUANTITY = 300 + quantity = 300 total = duckdb.sql( f"""SELECT C_NAME, @@ -792,7 +793,7 @@ def q18(root): LINEITEM GROUP BY L_ORDERKEY HAVING - SUM(L_QUANTITY) > {QUANTITY} + SUM(L_QUANTITY) > {quantity} ) AND C_CUSTKEY = O_CUSTKEY AND O_ORDERKEY = L_ORDERKEY @@ -814,8 +815,10 @@ def q19(root): lineitem = load_lineitem(root) part = load_part(root) - BRAND = "Brand#31" - QUANTITY = 4 + brand1 = "Brand#31" + brand2 = "Brand#24" + brand3 = "Brand#35" + quantity = 4 total = duckdb.sql( f"""SELECT SUM(L_EXTENDEDPRICE* (1 - L_DISCOUNT)) AS REVENUE @@ -825,9 +828,9 @@ def q19(root): WHERE ( P_PARTKEY = L_PARTKEY - AND P_BRAND = '{BRAND}' + AND P_BRAND = '{brand1}' AND P_CONTAINER IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') - AND L_QUANTITY >= {QUANTITY} AND L_QUANTITY <= {QUANTITY} + 10 + AND L_QUANTITY >= {quantity} AND L_QUANTITY <= {quantity} + 10 AND P_SIZE BETWEEN 1 AND 5 AND L_SHIPMODE IN ('AIR', 'AIR REG') AND L_SHIPINSTRUCT = 'DELIVER IN PERSON' @@ -835,7 +838,7 @@ def q19(root): OR ( P_PARTKEY = L_PARTKEY - AND P_BRAND = 'BRAND#43' + AND P_BRAND = '{brand2}' AND P_CONTAINER IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') AND L_QUANTITY >= 15 AND L_QUANTITY <= 25 AND P_SIZE BETWEEN 1 AND 10 @@ -845,7 +848,7 @@ def q19(root): OR ( P_PARTKEY = L_PARTKEY - AND P_BRAND = 'BRAND#43' + AND P_BRAND = '{brand3}' AND P_CONTAINER IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') AND L_QUANTITY >= 26 AND L_QUANTITY <= 36 AND P_SIZE BETWEEN 1 AND 15 @@ -864,8 +867,8 @@ def q20(root): partsupp = load_partsupp(root) supplier = load_supplier(root) - NAME = "azure" - DATE = "1996-01-01" + name = "azure" + date = "1996-01-01" total = duckdb.sql( f"""SELECT S_NAME, @@ -886,7 +889,7 @@ def q20(root): FROM PART WHERE - P_NAME LIKE '{NAME}%' + P_NAME LIKE '{name}%' ) AND PS_AVAILQTY > ( SELECT @@ -896,8 +899,8 @@ def q20(root): WHERE L_PARTKEY = PS_PARTKEY AND L_SUPPKEY = PS_SUPPKEY - AND L_SHIPDATE >= DATE '{DATE}' - AND L_SHIPDATE < DATE '{DATE}' + INTERVAL '1' YEAR + AND L_SHIPDATE >= DATE '{date}' + AND L_SHIPDATE < DATE '{date}' + INTERVAL '1' YEAR ) ) AND S_NATIONKEY = N_NATIONKEY @@ -915,7 +918,7 @@ def q21(root): nation = load_nation(root) supplier = load_supplier(root) - NATION = "SAUDI ARABIA" + nation_name = "SAUDI ARABIA" total = duckdb.sql( f"""SELECT S_NAME, @@ -950,12 +953,13 @@ def q21(root): AND L3.L_RECEIPTDATE > L3.L_COMMITDATE ) AND S_NATIONKEY = N_NATIONKEY - AND N_NAME = '{NATION}' + AND N_NAME = '{nation_name}' GROUP BY S_NAME ORDER BY NUMWAIT DESC, - S_NAME""" + S_NAME + LIMIT 100""" ) return total diff --git a/tpch/pandas_queries/queries.py b/tpch/pandas_queries/queries.py index cf4fdc5..e24bd99 100644 --- a/tpch/pandas_queries/queries.py +++ b/tpch/pandas_queries/queries.py @@ -121,7 +121,7 @@ def q01(root: str, storage_options: Dict): ], ] sel = lineitem_filtered.L_SHIPDATE <= date - lineitem_filtered = lineitem_filtered[sel] + lineitem_filtered = lineitem_filtered.loc[sel] lineitem_filtered["AVG_QTY"] = lineitem_filtered.L_QUANTITY lineitem_filtered["AVG_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE lineitem_filtered["DISC_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE * ( @@ -399,8 +399,8 @@ def q07(root: str, storage_options: Dict): (lineitem["L_SHIPDATE"] >= pd.Timestamp("1995-01-01")) & (lineitem["L_SHIPDATE"] < pd.Timestamp("1997-01-01")) ] - lineitem_filtered["L_YEAR"] = lineitem_filtered["L_SHIPDATE"].dt.year - lineitem_filtered["VOLUME"] = lineitem_filtered["L_EXTENDEDPRICE"] * ( + lineitem_filtered.loc[:, "L_YEAR"] = lineitem_filtered["L_SHIPDATE"].dt.year + lineitem_filtered.loc[:, "VOLUME"] = lineitem_filtered["L_EXTENDEDPRICE"] * ( 1.0 - lineitem_filtered["L_DISCOUNT"] ) lineitem_filtered = lineitem_filtered.loc[ @@ -549,7 +549,7 @@ def udf(df): numerator = df["VOLUME"].sum() return numerator / demonimator - total = total.groupby("O_YEAR").apply(udf).reset_index() + total = total.groupby("O_YEAR").apply(udf, include_groups=False).reset_index() total.columns = ["O_YEAR", "MKT_SHARE"] total = total.sort_values(by=["O_YEAR"], ascending=[True]) @@ -694,6 +694,11 @@ def g2(x): total = total.sort_values("L_SHIPMODE").rename( columns={"g1": "HIGH_LINE_COUNT", "g2": "LOW_LINE_COUNT"} ) + + # Round the result to one decimal place -- If you use test_result.py to test the results, please uncomment the following two lines. + # total["HIGH_LINE_COUNT"] = total["HIGH_LINE_COUNT"].astype(float).round(1) + # total["LOW_LINE_COUNT"] = total["LOW_LINE_COUNT"].astype(float).round(1) + return total @@ -705,7 +710,7 @@ def q13(root: str, storage_options: Dict): word2 = "requests" customer_filtered = customer.loc[:, ["C_CUSTKEY"]] orders_filtered = orders[ - ~orders["O_COMMENT"].str.contains(f"{word1}(\\S|\\s)*{word2}") + ~orders["O_COMMENT"].str.contains(f"{word1}.*{word2}") ] orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]] c_o_merged = customer_filtered.merge( @@ -791,44 +796,29 @@ def q16(root: str, storage_options: Dict): supplier = load_supplier(root, storage_options) brand = "Brand#45" - p_type = "MEDIUM POLISHED" - size1 = 49 - size2 = 14 - size3 = 23 - size4 = 45 - size5 = 19 - size6 = 3 - size7 = 36 - size8 = 9 - part_filtered = part[ - (part["P_BRAND"] != brand) - & (~part["P_TYPE"].str.contains(f"^{p_type}")) - & part["P_SIZE"].isin([size1, size2, size3, size4, size5, size6, size7, size8]) - ] - part_filtered = part_filtered.loc[:, ["P_PARTKEY", "P_BRAND", "P_TYPE", "P_SIZE"]] - partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY"]] - total = part_filtered.merge( - partsupp_filtered, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner" - ) - total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]] - supplier_filtered = supplier[ - supplier["S_COMMENT"].str.contains("Customer(\\S|\\s)*Complaints") + type = "MEDIUM POLISHED" + size_list = [49, 14, 23, 45, 19, 3, 36, 9] + + # Merge part and partsupp DataFrames + merged_df = pd.merge(part, partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner") + + # Apply filters + filtered_df = merged_df[ + (merged_df["P_BRAND"] != brand) & + (~merged_df["P_TYPE"].str.startswith(type)) & + (merged_df["P_SIZE"].isin(size_list)) ] - supplier_filtered = supplier_filtered.loc[:, ["S_SUPPKEY"]].drop_duplicates() - # left merge to select only PS_SUPPKEY values not in supplier_filtered - total = total.merge( - supplier_filtered, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left" - ) - total = total[total["S_SUPPKEY"].isna()] - total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]] - total = total.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False)[ - "PS_SUPPKEY" - ].nunique() - total.columns = ["P_BRAND", "P_TYPE", "P_SIZE", "SUPPLIER_CNT"] - total = total.sort_values( - by=["SUPPLIER_CNT", "P_BRAND", "P_TYPE", "P_SIZE"], - ascending=[False, True, True, True], - ) + + # Exclude unwanted suppliers + supplier_filtered = supplier[supplier["S_COMMENT"].str.contains("CUSTOMER.*COMPLAINTS")] + filtered_df = pd.merge(filtered_df, supplier_filtered["S_SUPPKEY"], left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left") + filtered_df = filtered_df[filtered_df["S_SUPPKEY"].isna()] + + # Group by and count unique suppliers + total = filtered_df.groupby(["P_BRAND", "P_TYPE", "P_SIZE"]).agg(SUPPLIER_CNT=("PS_SUPPKEY", "nunique")).reset_index() + + # Sort the result + total = total.sort_values(by=["SUPPLIER_CNT", "P_BRAND", "P_TYPE", "P_SIZE"], ascending=[False, True, True, True]) return total @@ -892,7 +882,8 @@ def q19(root: str, storage_options: Dict): quantity2 = 15 quantity3 = 26 brand1 = "Brand#31" - brand2 = "Brand#43" + brand2 = "Brand#24" + brand3 = "Brand#35" lsel = ( ( @@ -910,7 +901,7 @@ def q19(root: str, storage_options: Dict): ) ) & (lineitem.L_SHIPINSTRUCT == "DELIVER IN PERSON") - & ((lineitem.L_SHIPMODE == "AIR") | (lineitem.L_SHIPMODE == "AIRREG")) + & ((lineitem.L_SHIPMODE == "AIR") | (lineitem.L_SHIPMODE == "AIR REG")) ) psel = (part.P_SIZE >= 1) & ( ( @@ -935,7 +926,7 @@ def q19(root: str, storage_options: Dict): ) | ( (part.P_SIZE <= 15) - & (part.P_BRAND == brand2) + & (part.P_BRAND == brand3) & ( (part.P_CONTAINER == "LG BOX") | (part.P_CONTAINER == "LG CASE") @@ -948,36 +939,44 @@ def q19(root: str, storage_options: Dict): fpart = part[psel] jn = flineitem.merge(fpart, left_on="L_PARTKEY", right_on="P_PARTKEY") jnsel = ( - (jn.P_BRAND == brand1) - & ( - (jn.P_CONTAINER == "SM BOX") - | (jn.P_CONTAINER == "SM CASE") - | (jn.P_CONTAINER == "SM PACK") - | (jn.P_CONTAINER == "SM PKG") + ( + (jn.P_BRAND == brand1) + & ( + (jn.P_CONTAINER == "SM BOX") + | (jn.P_CONTAINER == "SM CASE") + | (jn.P_CONTAINER == "SM PACK") + | (jn.P_CONTAINER == "SM PKG") + ) + & (jn.L_QUANTITY >= quantity1) + & (jn.L_QUANTITY <= quantity1 + 10) + & (jn.P_SIZE <= 5) ) - & (jn.L_QUANTITY >= quantity1) - & (jn.L_QUANTITY <= quantity1 + 10) - & (jn.P_SIZE <= 5) - | (jn.P_BRAND == brand2) - & ( - (jn.P_CONTAINER == "MED BAG") - | (jn.P_CONTAINER == "MED BOX") - | (jn.P_CONTAINER == "MED PACK") - | (jn.P_CONTAINER == "MED PKG") + | + ( + (jn.P_BRAND == brand2) + & ( + (jn.P_CONTAINER == "MED BAG") + | (jn.P_CONTAINER == "MED BOX") + | (jn.P_CONTAINER == "MED PACK") + | (jn.P_CONTAINER == "MED PKG") + ) + & (jn.L_QUANTITY >= quantity2) + & (jn.L_QUANTITY <= quantity2 + 10) + & (jn.P_SIZE <= 10) ) - & (jn.L_QUANTITY >= quantity2) - & (jn.L_QUANTITY <= quantity2 + 10) - & (jn.P_SIZE <= 10) - | (jn.P_BRAND == brand2) - & ( - (jn.P_CONTAINER == "LG BOX") - | (jn.P_CONTAINER == "LG CASE") - | (jn.P_CONTAINER == "LG PACK") - | (jn.P_CONTAINER == "LG PKG") + | + ( + (jn.P_BRAND == brand3) + & ( + (jn.P_CONTAINER == "LG BOX") + | (jn.P_CONTAINER == "LG CASE") + | (jn.P_CONTAINER == "LG PACK") + | (jn.P_CONTAINER == "LG PKG") + ) + & (jn.L_QUANTITY >= quantity3) + & (jn.L_QUANTITY <= quantity3 + 10) + & (jn.P_SIZE <= 15) ) - & (jn.L_QUANTITY >= quantity3) - & (jn.L_QUANTITY <= quantity3 + 10) - & (jn.P_SIZE <= 15) ) jn = jn[jnsel] result_value = (jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT)).sum() diff --git a/tpch/polars_queries/queries.py b/tpch/polars_queries/queries.py index 4e29a71..2417746 100644 --- a/tpch/polars_queries/queries.py +++ b/tpch/polars_queries/queries.py @@ -99,10 +99,11 @@ def load_partsupp_lazy(root: str, storage_options: Dict): def q01(root: str, storage_options: Dict): - var_1 = datetime(1998, 9, 2) lineitem = load_lineitem_lazy(root, storage_options) + + date = datetime(1998, 9, 2) q_final = ( - lineitem.filter(pl.col("L_SHIPDATE") <= var_1) + lineitem.filter(pl.col("L_SHIPDATE") <= date) .group_by(["L_RETURNFLAG", "L_LINESTATUS"]) .agg( [ @@ -121,7 +122,7 @@ def q01(root: str, storage_options: Dict): pl.mean("L_QUANTITY").alias("AVG_QTY"), pl.mean("L_EXTENDEDPRICE").alias("AVG_PRICE"), pl.mean("L_DISCOUNT").alias("AVG_DISC"), - pl.count().alias("COUNT_ORDER"), + pl.len(), ] ) .sort(["L_RETURNFLAG", "L_LINESTATUS"]) @@ -131,24 +132,24 @@ def q01(root: str, storage_options: Dict): def q02(root: str, storage_options: Dict): - var_1 = 15 - var_2 = "BRASS" - var_3 = "EUROPE" - part_ds = load_part_lazy(root, storage_options) partsupp_ds = load_partsupp_lazy(root, storage_options) supplier_ds = load_supplier_lazy(root, storage_options) nation_ds = load_nation_lazy(root, storage_options) region_ds = load_region_lazy(root, storage_options) + size = 15 + p_type = "BRASS" + region_name = "EUROPE" + result_q1 = ( part_ds.join(partsupp_ds, left_on="P_PARTKEY", right_on="PS_PARTKEY") .join(supplier_ds, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") .join(nation_ds, left_on="S_NATIONKEY", right_on="N_NATIONKEY") .join(region_ds, left_on="N_REGIONKEY", right_on="R_REGIONKEY") - .filter(pl.col("P_SIZE") == var_1) - .filter(pl.col("P_TYPE").str.ends_with(var_2)) - .filter(pl.col("R_NAME") == var_3) + .filter(pl.col("P_SIZE") == size) + .filter(pl.col("P_TYPE").str.ends_with(p_type)) + .filter(pl.col("R_NAME") == region_name) ).cache() final_cols = [ @@ -175,26 +176,25 @@ def q02(root: str, storage_options: Dict): by=["S_ACCTBAL", "N_NAME", "S_NAME", "P_PARTKEY"], descending=[True, False, False, False], ) - .limit(100) - .with_columns(pl.col(pl.datatypes.Utf8).str.strip_chars().name.keep()) + .with_columns(pl.col(pl.datatypes.Utf8).str.strip_chars().name.keep()).limit(100) ) return q_final def q03(root: str, storage_options: Dict): - var_1 = var_2 = datetime(1995, 3, 15) - var_3 = "BUILDING" + date = datetime(1995, 3, 4) + mktsegment = "HOUSEHOLD" customer_ds = load_customer_lazy(root, storage_options) line_item_ds = load_lineitem_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options) q_final = ( - customer_ds.filter(pl.col("C_MKTSEGMENT") == var_3) + customer_ds.filter(pl.col("C_MKTSEGMENT") == mktsegment) .join(orders_ds, left_on="C_CUSTKEY", right_on="O_CUSTKEY") .join(line_item_ds, left_on="O_ORDERKEY", right_on="L_ORDERKEY") - .filter(pl.col("O_ORDERDATE") < var_2) - .filter(pl.col("L_SHIPDATE") > var_1) + .filter(pl.col("O_ORDERDATE") < date) + .filter(pl.col("L_SHIPDATE") > date) .with_columns( (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))).alias("REVENUE") ) @@ -208,27 +208,26 @@ def q03(root: str, storage_options: Dict): "O_SHIPPRIORITY", ] ) - .sort(by=["REVENUE", "O_ORDERDATE"], descending=[True, False]) - .limit(10) + .sort(by=["REVENUE", "O_ORDERDATE"], descending=[True, False]).limit(10) ) return q_final def q04(root: str, storage_options: Dict): - var_1 = datetime(1993, 7, 1) - var_2 = datetime(1993, 10, 1) + date1 = datetime(1993, 8, 1) + date2 = datetime(1993, 11, 1) line_item_ds = load_lineitem_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options) q_final = ( line_item_ds.join(orders_ds, left_on="L_ORDERKEY", right_on="O_ORDERKEY") - .filter(pl.col("O_ORDERDATE").is_between(var_1, var_2, closed="left")) + .filter(pl.col("O_ORDERDATE").is_between(date1, date2, closed="left")) .filter(pl.col("L_COMMITDATE") < pl.col("L_RECEIPTDATE")) .unique(subset=["O_ORDERPRIORITY", "L_ORDERKEY"]) .group_by("O_ORDERPRIORITY") - .agg(pl.count().alias("ORDER_COUNT")) + .agg(pl.len().alias("ORDER_COUNT")) .sort(by="O_ORDERPRIORITY") .with_columns(pl.col("ORDER_COUNT").cast(pl.datatypes.Int64)) ) @@ -237,9 +236,9 @@ def q04(root: str, storage_options: Dict): def q05(root: str, storage_options: Dict): - var_1 = "ASIA" - var_2 = datetime(1994, 1, 1) - var_3 = datetime(1995, 1, 1) + region_name = "ASIA" + date1 = datetime(1996, 1, 1) + date2 = datetime(1997, 1, 1) region_ds = load_region_lazy(root, storage_options) nation_ds = load_nation_lazy(root, storage_options) @@ -258,8 +257,8 @@ def q05(root: str, storage_options: Dict): left_on=["L_SUPPKEY", "N_NATIONKEY"], right_on=["S_SUPPKEY", "S_NATIONKEY"], ) - .filter(pl.col("R_NAME") == var_1) - .filter(pl.col("O_ORDERDATE").is_between(var_2, var_3, closed="left")) + .filter(pl.col("R_NAME") == region_name) + .filter(pl.col("O_ORDERDATE").is_between(date1, date2, closed="left")) .with_columns( (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))).alias("REVENUE") ) @@ -272,18 +271,18 @@ def q05(root: str, storage_options: Dict): def q06(root: str, storage_options: Dict): - var_1 = datetime(1994, 1, 1) - var_2 = datetime(1995, 1, 1) - var_3 = 24 + date1 = datetime(1996, 1, 1) + date2 = datetime(1997, 1, 1) + quantity = 24 line_item_ds = load_lineitem_lazy(root, storage_options) q_final = ( line_item_ds.filter( - pl.col("L_SHIPDATE").is_between(var_1, var_2, closed="left") + pl.col("L_SHIPDATE").is_between(date1, date2, closed="left") ) - .filter(pl.col("L_DISCOUNT").is_between(0.05, 0.07)) - .filter(pl.col("L_QUANTITY") < var_3) + .filter(pl.col("L_DISCOUNT").is_between(0.08, 1.00)) + .filter(pl.col("L_QUANTITY") < quantity) .with_columns( (pl.col("L_EXTENDEDPRICE") * pl.col("L_DISCOUNT")).alias("REVENUE") ) @@ -302,8 +301,8 @@ def q07(root: str, storage_options: Dict): n1 = nation_ds.filter(pl.col("N_NAME") == "FRANCE") n2 = nation_ds.filter(pl.col("N_NAME") == "GERMANY") - var_1 = datetime(1995, 1, 1) - var_2 = datetime(1996, 12, 31) + date1 = datetime(1995, 1, 1) + date2 = datetime(1996, 12, 31) df1 = ( customer_ds.join(n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY") @@ -327,7 +326,7 @@ def q07(root: str, storage_options: Dict): q_final = ( pl.concat([df1, df2]) - .filter(pl.col("L_SHIPDATE").is_between(var_1, var_2)) + .filter(pl.col("L_SHIPDATE").is_between(date1, date2)) .with_columns( (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))).alias("VOLUME") ) @@ -340,6 +339,12 @@ def q07(root: str, storage_options: Dict): def q08(root: str, storage_options: Dict): + nation_name = "BRAZIL" + region_name = "AMERICA" + p_type = "ECONOMY ANODIZED STEEL" + date_begin = datetime(1995,1,1) + date_end = datetime(1996,12,31) + part_ds = load_part_lazy(root, storage_options) supplier_ds = load_supplier_lazy(root, storage_options) line_item_ds = load_lineitem_lazy(root, storage_options) @@ -358,14 +363,14 @@ def q08(root: str, storage_options: Dict): .join(customer_ds, left_on="O_CUSTKEY", right_on="C_CUSTKEY") .join(n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY") .join(region_ds, left_on="N_REGIONKEY", right_on="R_REGIONKEY") - .filter(pl.col("R_NAME") == "AMERICA") + .filter(pl.col("R_NAME") == region_name) .join(n2, left_on="S_NATIONKEY", right_on="N_NATIONKEY") .filter( pl.col("O_ORDERDATE").is_between( - datetime(1995, 1, 1), datetime(1996, 12, 31) + date_begin, date_end ) ) - .filter(pl.col("P_TYPE") == "ECONOMY ANODIZED STEEL") + .filter(pl.col("P_TYPE") == p_type) .select( [ pl.col("O_ORDERDATE").dt.year().alias("O_YEAR"), @@ -376,19 +381,21 @@ def q08(root: str, storage_options: Dict): ] ) .with_columns( - pl.when(pl.col("NATION") == "BRAZIL") + pl.when(pl.col("NATION") == nation_name) .then(pl.col("VOLUME")) .otherwise(0) .alias("_tmp") ) .group_by("O_YEAR") - .agg((pl.sum("_tmp") / pl.sum("VOLUME")).round(2).alias("MKT_SHARE")) + .agg((pl.sum("_tmp") / pl.sum("VOLUME")).alias("MKT_SHARE")) .sort("O_YEAR") ) return q_final def q09(root: str, storage_options: Dict): + p_name = "ghost" + part_ds = load_part_lazy(root, storage_options) supplier_ds = load_supplier_lazy(root, storage_options) line_item_ds = load_lineitem_lazy(root, storage_options) @@ -406,7 +413,7 @@ def q09(root: str, storage_options: Dict): .join(part_ds, left_on="L_PARTKEY", right_on="P_PARTKEY") .join(orders_ds, left_on="L_ORDERKEY", right_on="O_ORDERKEY") .join(nation_ds, left_on="S_NATIONKEY", right_on="N_NATIONKEY") - .filter(pl.col("P_NAME").str.contains("green")) + .filter(pl.col("P_NAME").str.contains(p_name)) .select( [ pl.col("N_NAME").alias("NATION"), @@ -418,7 +425,7 @@ def q09(root: str, storage_options: Dict): ] ) .group_by(["NATION", "O_YEAR"]) - .agg(pl.sum("AMOUNT").round(2).alias("SUM_PROFIT")) + .agg(pl.sum("AMOUNT").alias("SUM_PROFIT")) .sort(by=["NATION", "O_YEAR"], descending=[False, True]) ) return q_final @@ -430,14 +437,14 @@ def q10(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) nation_ds = load_nation_lazy(root, storage_options) - var_1 = datetime(1993, 10, 1) - var_2 = datetime(1994, 1, 1) + date1 = datetime(1994, 11, 1) + date2 = datetime(1995, 2, 1) q_final = ( customer_ds.join(orders_ds, left_on="C_CUSTKEY", right_on="O_CUSTKEY") .join(line_item_ds, left_on="O_ORDERKEY", right_on="L_ORDERKEY") .join(nation_ds, left_on="C_NATIONKEY", right_on="N_NATIONKEY") - .filter(pl.col("O_ORDERDATE").is_between(var_1, var_2, closed="left")) + .filter(pl.col("O_ORDERDATE").is_between(date1, date2, closed="left")) .filter(pl.col("L_RETURNFLAG") == "R") .group_by( [ @@ -454,7 +461,6 @@ def q10(root: str, storage_options: Dict): [ (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))) .sum() - .round(2) .alias("REVENUE") ] ) @@ -485,17 +491,17 @@ def q11(root: str, storage_options: Dict): part_supp_ds = load_partsupp_lazy(root, storage_options) nation_ds = load_nation_lazy(root, storage_options) - var_1 = "GERMANY" - var_2 = 0.0001 + nation_name = "GERMANY" + fraction = 0.0001 res_1 = ( part_supp_ds.join(supplier_ds, left_on="PS_SUPPKEY", right_on="S_SUPPKEY") .join(nation_ds, left_on="S_NATIONKEY", right_on="N_NATIONKEY") - .filter(pl.col("N_NAME") == var_1) + .filter(pl.col("N_NAME") == nation_name) ) res_2 = res_1.select( (pl.col("PS_SUPPLYCOST") * pl.col("PS_AVAILQTY")).sum().round(2).alias("TMP") - * var_2 + * fraction ).with_columns(pl.lit(1).alias("LIT")) q_final = ( @@ -503,7 +509,6 @@ def q11(root: str, storage_options: Dict): .agg( (pl.col("PS_SUPPLYCOST") * pl.col("PS_AVAILQTY")) .sum() - .round(2) .alias("VALUE") ) .with_columns(pl.lit(1).alias("LIT")) @@ -519,17 +524,17 @@ def q12(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options) - var_1 = "MAIL" - var_2 = "SHIP" - var_3 = datetime(1994, 1, 1) - var_4 = datetime(1995, 1, 1) + shipmode1 = "MAIL" + shipmode2 = "SHIP" + date1 = datetime(1994, 1, 1) + date2 = datetime(1995, 1, 1) q_final = ( orders_ds.join(line_item_ds, left_on="O_ORDERKEY", right_on="L_ORDERKEY") - .filter(pl.col("L_SHIPMODE").is_in([var_1, var_2])) + .filter(pl.col("L_SHIPMODE").is_in([shipmode1, shipmode2])) .filter(pl.col("L_COMMITDATE") < pl.col("L_RECEIPTDATE")) .filter(pl.col("L_SHIPDATE") < pl.col("L_COMMITDATE")) - .filter(pl.col("L_RECEIPTDATE").is_between(var_3, var_4, closed="left")) + .filter(pl.col("L_RECEIPTDATE").is_between(date1, date2, closed="left")) .with_columns( [ pl.when(pl.col("O_ORDERPRIORITY").is_in(["1-URGENT", "2-HIGH"])) @@ -543,19 +548,20 @@ def q12(root: str, storage_options: Dict): ] ) .group_by("L_SHIPMODE") - .agg([pl.col("HIGH_LINE_COUNT").sum(), pl.col("LOW_LINE_COUNT").sum()]) + .agg([pl.col("HIGH_LINE_COUNT").sum().cast(float).round(1), pl.col("LOW_LINE_COUNT").sum().cast(float).round(1)]) .sort("L_SHIPMODE") ) + return q_final def q13(root: str, storage_options: Dict): - var_1 = "special" - var_2 = "requests" + word1 = "special" + word2 = "requests" customer_ds = load_customer_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options).filter( - pl.col("O_COMMENT").str.contains(f"{var_1}.*{var_2}").not_() + pl.col("O_COMMENT").str.contains(f"{word1}.*{word2}").not_() ) q_final = ( customer_ds.join( @@ -564,14 +570,18 @@ def q13(root: str, storage_options: Dict): .group_by("C_CUSTKEY") .agg( [ - pl.col("O_ORDERKEY").count().alias("C_COUNT"), + pl.col("O_ORDERKEY").len().alias("C_COUNT"), pl.col("O_ORDERKEY").null_count().alias("NULL_C_COUNT"), ] ) .with_columns((pl.col("C_COUNT") - pl.col("NULL_C_COUNT")).alias("C_COUNT")) .group_by("C_COUNT") - .count() - .select([pl.col("C_COUNT"), pl.col("COUNT").alias("CUSTDIST")]) + .agg( + [ + pl.len().alias("CUSTDIST") + ] + ) + .select([pl.col("C_COUNT"), pl.col("CUSTDIST")]) .sort(["CUSTDIST", "C_COUNT"], descending=[True, True]) ) return q_final @@ -581,12 +591,12 @@ def q14(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) part_ds = load_part_lazy(root, storage_options) - var_1 = datetime(1995, 9, 1) - var_2 = datetime(1995, 10, 1) + date1 = datetime(1994, 3, 1) + date2 = datetime(1994, 4, 1) q_final = ( line_item_ds.join(part_ds, left_on="L_PARTKEY", right_on="P_PARTKEY") - .filter(pl.col("L_SHIPDATE").is_between(var_1, var_2, closed="left")) + .filter(pl.col("L_SHIPDATE").is_between(date1, date2, closed="left")) .select( ( 100.00 @@ -596,7 +606,6 @@ def q14(root: str, storage_options: Dict): .sum() / (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))).sum() ) - .round(2) .alias("PROMO_REVENUE") ) ) @@ -607,12 +616,12 @@ def q15(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) supplier_ds = load_supplier_lazy(root, storage_options) - var_1 = datetime(1996, 1, 1) - var_2 = datetime(1996, 4, 1) + date1 = datetime(1996, 1, 1) + date2 = datetime(1996, 4, 1) revenue_ds = ( line_item_ds.filter( - pl.col("L_SHIPDATE").is_between(var_1, var_2, closed="left") + pl.col("L_SHIPDATE").is_between(date1, date2, closed="left") ) .group_by("L_SUPPKEY") .agg( @@ -636,21 +645,25 @@ def q15(root: str, storage_options: Dict): def q16(root: str, storage_options: Dict): part_supp_ds = load_partsupp_lazy(root, storage_options) part_ds = load_part_lazy(root, storage_options) - supplier_ds = ( - load_supplier_lazy(root, storage_options) - .filter(pl.col("S_COMMENT").str.contains(".*Customer.*Complaints.*")) - .select(pl.col("S_SUPPKEY"), pl.col("S_SUPPKEY").alias("PS_SUPPKEY")) - ) + supplier_ds = load_supplier_lazy(root, storage_options) - var_1 = "Brand#45" + brand = "Brand#45" + type = "MEDIUM POLISHED" + size_list = [49, 14, 23, 45, 19, 3, 36, 9] q_final = ( part_ds.join(part_supp_ds, left_on="P_PARTKEY", right_on="PS_PARTKEY") - .filter(pl.col("P_BRAND") != var_1) - .filter(pl.col("P_TYPE").str.contains("MEDIUM POLISHED*").not_()) - .filter(pl.col("P_SIZE").is_in([49, 14, 23, 45, 19, 3, 36, 9])) - .join(supplier_ds, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left") - .filter(pl.col("PS_SUPPKEY_RIGHT").is_null()) + .filter(pl.col("P_BRAND") != brand) + .filter(pl.col("P_TYPE").str.contains(f"^{type}").not_()) + .filter(pl.col("P_SIZE").is_in(size_list)) + .join( + supplier_ds.filter( + pl.col("S_COMMENT").str.contains(".*CUSTOMER.*COMPLAINTS.*") + ).select(pl.col("S_SUPPKEY")), + left_on="PS_SUPPKEY", + right_on="S_SUPPKEY", + how="left", + ) .group_by(["P_BRAND", "P_TYPE", "P_SIZE"]) .agg([pl.col("PS_SUPPKEY").n_unique().alias("SUPPLIER_CNT")]) .sort( @@ -658,19 +671,21 @@ def q16(root: str, storage_options: Dict): descending=[True, False, False, False], ) ) + return q_final + def q17(root: str, storage_options: Dict): - var_1 = "Brand#23" - var_2 = "MED BOX" + brand = "Brand#23" + container = "MED BOX" line_item_ds = load_lineitem_lazy(root, storage_options) part_ds = load_part_lazy(root, storage_options) res_1 = ( - part_ds.filter(pl.col("P_BRAND") == var_1) - .filter(pl.col("P_CONTAINER") == var_2) + part_ds.filter(pl.col("P_BRAND") == brand) + .filter(pl.col("P_CONTAINER") == container) .join(line_item_ds, how="left", left_on="P_PARTKEY", right_on="L_PARTKEY") ).cache() @@ -680,7 +695,7 @@ def q17(root: str, storage_options: Dict): .select([pl.col("P_PARTKEY").alias("KEY"), pl.col("AVG_QUANTITY")]) .join(res_1, left_on="KEY", right_on="P_PARTKEY") .filter(pl.col("L_QUANTITY") < pl.col("AVG_QUANTITY")) - .select((pl.col("L_EXTENDEDPRICE").sum() / 7.0).round(2).alias("AVG_YEARLY")) + .select((pl.col("L_EXTENDEDPRICE").sum() / 7.0).alias("AVG_YEARLY")) ) return q_final @@ -690,12 +705,12 @@ def q18(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options) - var_1 = 300 + quantity = 300 q_final = ( line_item_ds.group_by("L_ORDERKEY") .agg(pl.col("L_QUANTITY").sum().alias("SUM_QUANTITY")) - .filter(pl.col("SUM_QUANTITY") > var_1) + .filter(pl.col("SUM_QUANTITY") > quantity) .select([pl.col("L_ORDERKEY").alias("KEY"), pl.col("SUM_QUANTITY")]) .join(orders_ds, left_on="KEY", right_on="O_ORDERKEY") .join(line_item_ds, left_on="KEY", right_on="L_ORDERKEY") @@ -722,40 +737,45 @@ def q19(root: str, storage_options: Dict): line_item_ds = load_lineitem_lazy(root, storage_options) part_ds = load_part_lazy(root, storage_options) + quantity1 = 4 + quantity2 = 15 + quantity3 = 26 + brand1 = "Brand#31" + brand2 = "Brand#24" + brand3 = "Brand#35" q_final = ( part_ds.join(line_item_ds, left_on="P_PARTKEY", right_on="L_PARTKEY") .filter(pl.col("L_SHIPMODE").is_in(["AIR", "AIR REG"])) .filter(pl.col("L_SHIPINSTRUCT") == "DELIVER IN PERSON") .filter( ( - (pl.col("P_BRAND") == "Brand#12") + (pl.col("P_BRAND") == brand1) & pl.col("P_CONTAINER").is_in( ["SM CASE", "SM BOX", "SM PACK", "SM PKG"] ) - & (pl.col("L_QUANTITY").is_between(1, 11)) + & (pl.col("L_QUANTITY").is_between(quantity1, quantity1+10)) & (pl.col("P_SIZE").is_between(1, 5)) ) | ( - (pl.col("P_BRAND") == "Brand#23") + (pl.col("P_BRAND") == brand2) & pl.col("P_CONTAINER").is_in( ["MED BAG", "MED BOX", "MED PKG", "MED PACK"] ) - & (pl.col("L_QUANTITY").is_between(10, 20)) + & (pl.col("L_QUANTITY").is_between(quantity2, quantity2+10)) & (pl.col("P_SIZE").is_between(1, 10)) ) | ( - (pl.col("P_BRAND") == "Brand#34") + (pl.col("P_BRAND") == brand3) & pl.col("P_CONTAINER").is_in( ["LG CASE", "LG BOX", "LG PACK", "LG PKG"] ) - & (pl.col("L_QUANTITY").is_between(20, 30)) + & (pl.col("L_QUANTITY").is_between(quantity3, quantity3+10)) & (pl.col("P_SIZE").is_between(1, 15)) ) ) .select( (pl.col("L_EXTENDEDPRICE") * (1 - pl.col("L_DISCOUNT"))) .sum() - .round(2) .alias("REVENUE") ) ) @@ -769,23 +789,23 @@ def q20(root: str, storage_options: Dict): part_ds = load_part_lazy(root, storage_options) part_supp_ds = load_partsupp_lazy(root, storage_options) - var_1 = datetime(1994, 1, 1) - var_2 = datetime(1995, 1, 1) - var_3 = "CANADA" - var_4 = "forest" + date1 = datetime(1996, 1, 1) + date2 = datetime(1997, 1, 1) + name = "JORDAN" + p_name = "azure" res_1 = ( line_item_ds.filter( - pl.col("L_SHIPDATE").is_between(var_1, var_2, closed="left") + pl.col("L_SHIPDATE").is_between(date1, date2, closed="left") ) .group_by("L_PARTKEY", "L_SUPPKEY") .agg((pl.col("L_QUANTITY").sum() * 0.5).alias("SUM_QUANTITY")) ) - res_2 = nation_ds.filter(pl.col("N_NAME") == var_3) + res_2 = nation_ds.filter(pl.col("N_NAME") == name) res_3 = supplier_ds.join(res_2, left_on="S_NATIONKEY", right_on="N_NATIONKEY") q_final = ( - part_ds.filter(pl.col("P_NAME").str.starts_with(var_4)) + part_ds.filter(pl.col("P_NAME").str.starts_with(p_name)) .select(pl.col("P_PARTKEY").unique()) .join(part_supp_ds, left_on="P_PARTKEY", right_on="PS_PARTKEY") .join( @@ -809,7 +829,7 @@ def q21(root: str, storage_options: Dict): nation_ds = load_nation_lazy(root, storage_options) orders_ds = load_orders_lazy(root, storage_options) - var_1 = "SAUDI ARABIA" + nation_name = "SAUDI ARABIA" res_1 = ( ( @@ -831,10 +851,10 @@ def q21(root: str, storage_options: Dict): .join(nation_ds, left_on="S_NATIONKEY", right_on="N_NATIONKEY") .join(orders_ds, left_on="L_ORDERKEY", right_on="O_ORDERKEY") .filter(pl.col("NUNIQUE_COL") == 1) - .filter(pl.col("N_NAME") == var_1) + .filter(pl.col("N_NAME") == nation_name) .filter(pl.col("O_ORDERSTATUS") == "F") .group_by("S_NAME") - .agg(pl.count().alias("NUMWAIT")) + .agg(pl.len().alias("NUMWAIT")) .sort(by=["NUMWAIT", "S_NAME"], descending=[True, False]) .limit(100) ) @@ -994,7 +1014,7 @@ def run_queries( without_io_time = time.time() - start_time success = True if print_result: - print_result_fn("pandas", result, query) + print_result_fn("polars", result.collect(), query) except Exception as e: print("".join(traceback.TracebackException.from_exception(e).format())) without_io_time = 0.0 diff --git a/tpch/test_result.py b/tpch/test_result.py index 3aa5dd6..43db127 100644 --- a/tpch/test_result.py +++ b/tpch/test_result.py @@ -62,6 +62,17 @@ def main(): print(f"current result: {args.cur_result_path}") print(f"answer path: {args.answer_path}") + # Check if answer_path exists + if not os.path.exists(args.answer_path): + print(f"Error: answer_path '{args.answer_path}' does not exist.") + return + + # Check if cur_result_path exists + if not os.path.exists(args.cur_result_path): + print(f"Error: cur_result_path '{args.cur_result_path}' does not exist.") + return + + queries = list(range(1, 23)) if args.queries is not None: queries = args.queries