From f0b1d266e9edf07ad4e2d78e1bff1587173535c3 Mon Sep 17 00:00:00 2001 From: Maksym Rymar Date: Fri, 17 May 2024 17:00:10 +0300 Subject: [PATCH] DRILL-8495: Tried to remove unmanaged buffer (#2913) --- .../hive/readers/HiveDefaultRecordReader.java | 6 ++--- .../hive/writers/HiveValueWriterFactory.java | 20 ++++++++--------- .../drill/exec/hive/TestHiveStorage.java | 19 ++++++++++++++++ .../hive/TestInfoSchemaOnHiveStorage.java | 2 ++ .../store/hive/HiveTestDataGenerator.java | 22 +++++++++++++++++++ 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java index 8598643a75f..640d95698ce 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java @@ -168,7 +168,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader { protected boolean empty; /** - * Buffer used for population of partition vectors and to fill in data into vectors via writers + * Buffer used for population of partition vectors */ private final DrillBuf drillBuf; @@ -238,7 +238,7 @@ public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition par this.proxyUserGroupInfo = proxyUgi; this.empty = inputSplits == null || inputSplits.isEmpty(); this.inputSplitsIterator = empty ? Collections.emptyIterator() : inputSplits.iterator(); - this.drillBuf = context.getManagedBuffer().reallocIfNeeded(256); + this.drillBuf = context.getManagedBuffer(); this.partitionVectors = new ValueVector[0]; this.partitionValues = new Object[0]; setColumns(projectedColumns); @@ -333,7 +333,7 @@ private Callable getInitTask(OutputMutator output) { this.selectedStructFieldRefs = new StructField[selectedColumnNames.size()]; this.columnValueWriters = new HiveValueWriter[selectedColumnNames.size()]; this.outputWriter = new VectorContainerWriter(output, /*enabled union*/ false); - HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(drillBuf, outputWriter.getWriter()); + HiveValueWriterFactory hiveColumnValueWriterFactory = new HiveValueWriterFactory(fragmentContext.getManagedBufferManager(), outputWriter.getWriter()); for (int refIdx = 0; refIdx < selectedStructFieldRefs.length; refIdx++) { String columnName = selectedColumnNames.get(refIdx); StructField fieldRef = finalObjInspector.getStructFieldRef(columnName); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java index fcb89ce7a40..d430b8370ae 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/writers/HiveValueWriterFactory.java @@ -21,8 +21,8 @@ import java.util.function.BiFunction; import java.util.function.Function; -import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.BufferManager; import org.apache.drill.exec.store.hive.writers.complex.HiveListWriter; import org.apache.drill.exec.store.hive.writers.complex.HiveMapWriter; import org.apache.drill.exec.store.hive.writers.complex.HiveStructWriter; @@ -97,18 +97,18 @@ public final class HiveValueWriterFactory { private static final Logger logger = LoggerFactory.getLogger(HiveValueWriterFactory.class); /** - * Buffer shared across created Hive writers. May be used by writer for reading data - * to buffer than from buffer to vector. + * Buffer manager used to create buffers for Hive writers for reading data + * to buffer than from buffer to vector if needed. */ - private final DrillBuf drillBuf; + private final BufferManager bufferManager; /** * Used to manage and create column writers. */ private final SingleMapWriter rootWriter; - public HiveValueWriterFactory(DrillBuf drillBuf, SingleMapWriter rootWriter) { - this.drillBuf = drillBuf; + public HiveValueWriterFactory(BufferManager bufferManager, SingleMapWriter rootWriter) { + this.bufferManager = bufferManager; this.rootWriter = rootWriter; } @@ -200,7 +200,7 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp case BINARY: { VarBinaryWriter writer = extractWriter(name, parentWriter, MapWriter::varBinary, ListWriter::varBinary, UnionVectorWriter::varBinary); - return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer, drillBuf); + return new HiveBinaryWriter((BinaryObjectInspector) inspector, writer, bufferManager.getManagedBuffer()); } case BOOLEAN: { BitWriter writer = extractWriter(name, parentWriter, @@ -240,12 +240,12 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp case STRING: { VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar); - return new HiveStringWriter((StringObjectInspector) inspector, writer, drillBuf); + return new HiveStringWriter((StringObjectInspector) inspector, writer, bufferManager.getManagedBuffer()); } case VARCHAR: { VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar); - return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector, writer, drillBuf); + return new HiveVarCharWriter((HiveVarcharObjectInspector) inspector, writer, bufferManager.getManagedBuffer()); } case TIMESTAMP: { TimeStampWriter writer = extractWriter(name, parentWriter, @@ -260,7 +260,7 @@ private HiveValueWriter createPrimitiveHiveValueWriter(String name, PrimitiveTyp case CHAR: { VarCharWriter writer = extractWriter(name, parentWriter, MapWriter::varChar, ListWriter::varChar, UnionVectorWriter::varChar); - return new HiveCharWriter((HiveCharObjectInspector) inspector, writer, drillBuf); + return new HiveCharWriter((HiveCharObjectInspector) inspector, writer, bufferManager.getManagedBuffer()); } case DECIMAL: { DecimalTypeInfo decimalType = (DecimalTypeInfo) typeInfo; diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index ecfda55df9b..d8d8ef0e9dc 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.hive; +import static org.apache.drill.shaded.guava.com.google.common.base.Strings.repeat; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -462,6 +463,24 @@ public void testTableWithEmptyParquet() throws Exception { .go(); } + @Test // see DRILL-8495 + public void testReadingHiveDataBiggerThan256Bytes() throws Exception { + testBuilder() + .sqlQuery("select * from hive.`256_bytes_plus_table`") + .unOrdered() + .baselineColumns( + "char_col", + "varchar_col", + "binary_col", + "string_col") + .baselineValues( + repeat("A", 255), + repeat("B", 1200), + repeat("C", 320).getBytes(), + repeat("D", 2200)) + .go(); + } + private void verifyColumnsMetadata(List columnsList, Map expectedResult) { for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) { assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName())); diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java index a34a05a32d1..e5df7628676 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java @@ -56,6 +56,7 @@ public void showTablesFromDb() throws Exception{ .baselineValues("hive.default", "hive_view_m") .baselineValues("hive.default", "view_over_hive_view") .baselineValues("hive.default", "table_with_empty_parquet") + .baselineValues("hive.default", "256_bytes_plus_table") .go(); testBuilder() @@ -268,6 +269,7 @@ public void showInfoSchema() throws Exception { .baselineValues("DRILL", "hive.default", "hive_view_m", "TABLE") .baselineValues("DRILL", "hive.default", "view_over_hive_view", "VIEW") .baselineValues("DRILL", "hive.default", "table_with_empty_parquet", "TABLE") + .baselineValues("DRILL", "hive.default", "256_bytes_plus_table", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE") diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index 376b49d59f6..3be0d0357e8 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -23,6 +23,7 @@ import java.sql.Date; import java.sql.Timestamp; +import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.drill.exec.hive.HiveTestUtilities; @@ -99,6 +100,9 @@ private void generateDataInternal(Driver hiveDriver) throws Exception { FileUtils.forceDelete(emptyTableLocation); } + // generate table with variable length columns and populate if with different size data + generateTableWithVariableLengthColumns(hiveDriver); + // create a Hive table that has columns with data types which are supported for reading in Drill. testDataFile = generateAllTypesDataFile(); executeQuery(hiveDriver, @@ -609,4 +613,22 @@ private String generateTestDataWithHeadersAndFooters(String tableName, int rowCo return sb.toString(); } + + private void generateTableWithVariableLengthColumns(Driver hiveDriver) { + executeQuery(hiveDriver, "CREATE TABLE IF NOT EXISTS 256_bytes_plus_table (" + + " char_col CHAR(255)," + + " varchar_col VARCHAR(1500)," + + " binary_col BINARY," + + " string_col STRING" + + ")"); + + String insertQuery = String.format("INSERT INTO 256_bytes_plus_table VALUES\n" + + " ('%s', '%s', '%s', '%s')", + Strings.repeat("A", 255), + Strings.repeat("B", 1200), + Strings.repeat("C", 320), + Strings.repeat("D", 2200)); + + executeQuery(hiveDriver, insertQuery); + } }