From e0f4e0d85f280e247b80671dea948eecc25f167b Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Jan 2025 18:10:48 +0100 Subject: [PATCH 01/18] Lazy initialization added for ParquetTableLocation --- .../locations/impl/AbstractTableLocation.java | 15 +++++++ .../table/location/ParquetTableLocation.java | 44 ++++++++++++++----- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index e1c0fed476b..0f952734731 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -73,6 +73,14 @@ protected void destroy() { }; } + /** + * This method is called before every public method so that the child classes can initialize the location lazily, if + * necessary. + */ + protected void initialize() { + // Do nothing + } + @Override public final String toString() { return toStringHelper(); @@ -90,21 +98,25 @@ public LivenessReferent asLivenessReferent() { @Override @NotNull public final Object getStateLock() { + initialize(); return state.getStateLock(); } @Override public final RowSet getRowSet() { + initialize(); return state.getRowSet(); } @Override public final long getSize() { + initialize(); return state.getSize(); } @Override public final long getLastModifiedTimeMillis() { + initialize(); return state.getLastModifiedTimeMillis(); } @@ -137,6 +149,7 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) { * @param lastModifiedTimeMillis The new lastModificationTimeMillis */ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) { + initialize(); if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -149,6 +162,7 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM * @param source The source to copy state values from */ public void handleUpdate(@NotNull final TableLocationState source) { + initialize(); if (source.copyStateValuesTo(state) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -165,6 +179,7 @@ private void deliverUpdateNotification() { @Override @NotNull public final ColumnLocation getColumnLocation(@NotNull final CharSequence name) { + initialize(); return columnLocations.putIfAbsent(name, n -> makeColumnLocation(n.toString())); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 7c8f47636e5..3a941853379 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -12,6 +12,7 @@ import io.deephaven.engine.table.BasicDataIndex; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.dataindex.StandaloneDataIndex; import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.locations.TableDataException; @@ -56,21 +57,23 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); + private boolean isInitialized; + private final ParquetInstructions readInstructions; - private final ParquetFileReader parquetFileReader; - private final int[] rowGroupIndices; + private ParquetFileReader parquetFileReader; + private int[] rowGroupIndices; - private final RowGroup[] rowGroups; - private final RegionedPageStore.Parameters regionParameters; - private final Map parquetColumnNameToPath; + private RowGroup[] rowGroups; + private RegionedPageStore.Parameters regionParameters; + private Map parquetColumnNameToPath; - private final TableInfo tableInfo; - private final Map groupingColumns; - private final List dataIndexes; - private final Map columnTypes; - private final List sortingColumns; + private TableInfo tableInfo; + private Map groupingColumns; + private List dataIndexes; + private Map columnTypes; + private List sortingColumns; - private final String version; + private String version; private volatile RowGroupReader[] rowGroupReaders; @@ -79,8 +82,16 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, @NotNull final ParquetInstructions readInstructions) { super(tableKey, tableLocationKey, false); this.readInstructions = readInstructions; + isInitialized = false; + } + + protected void initialize() { + if (isInitialized) { + return; + } + isInitialized = true; final ParquetMetadata parquetMetadata; - // noinspection SynchronizationOnLocalVariableOrMethodParameter + final ParquetTableLocationKey tableLocationKey = getParquetKey(); synchronized (tableLocationKey) { // Following methods are internally synchronized, we synchronize them together here to minimize lock/unlock // calls @@ -145,18 +156,22 @@ ParquetInstructions getReadInstructions() { } SeekableChannelsProvider getChannelProvider() { + initialize(); return parquetFileReader.getChannelsProvider(); } RegionedPageStore.Parameters getRegionParameters() { + initialize(); return regionParameters; } public Map getColumnTypes() { + initialize(); return columnTypes; } private RowGroupReader[] getRowGroupReaders() { + initialize(); RowGroupReader[] local; if ((local = rowGroupReaders) != null) { return local; @@ -175,12 +190,14 @@ private RowGroupReader[] getRowGroupReaders() { @Override @NotNull public List getSortedColumns() { + initialize(); return sortingColumns; } @Override @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { + initialize(); final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName); final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); final List nameList = @@ -211,6 +228,7 @@ private RowSet computeIndex() { @Override @NotNull public List getDataIndexColumns() { + initialize(); if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) { return List.of(); } @@ -224,6 +242,7 @@ public List getDataIndexColumns() { @Override public boolean hasDataIndex(@NotNull final String... columns) { + initialize(); // Check if the column name matches any of the grouping columns if (columns.length == 1 && groupingColumns.containsKey(columns[0])) { // Validate the index file exists (without loading and parsing it) @@ -252,6 +271,7 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) { @Override @Nullable public BasicDataIndex loadDataIndex(@NotNull final String... columns) { + initialize(); if (tableInfo == null) { return null; } From f9b09163eac5b50285a2cb2f42218e27024be9bd Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Jan 2025 20:16:41 +0100 Subject: [PATCH 02/18] Added lazy initialization for ParquetColumnLocation --- .../locations/impl/AbstractTableLocation.java | 3 +- .../table/location/ParquetColumnLocation.java | 46 ++++++++++++++++--- .../table/location/ParquetTableLocation.java | 23 ++++------ 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index 0f952734731..7117cafc476 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -138,6 +138,7 @@ public final ImmutableTableLocationKey getKey() { @Override protected final void deliverInitialSnapshot(@NotNull final Listener listener) { + initialize(); listener.handleUpdate(); } @@ -179,7 +180,6 @@ private void deliverUpdateNotification() { @Override @NotNull public final ColumnLocation getColumnLocation(@NotNull final CharSequence name) { - initialize(); return columnLocations.putIfAbsent(name, n -> makeColumnLocation(n.toString())); } @@ -238,6 +238,7 @@ private BasicDataIndex getDataIndex() { @Override @Nullable public final BasicDataIndex getDataIndex(@NotNull final String... columns) { + initialize(); final List columnNames = new ArrayList<>(columns.length); Collections.addAll(columnNames, columns); columnNames.sort(String::compareTo); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 4ddebb33685..8a969adf360 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -39,6 +39,8 @@ import java.math.BigInteger; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.function.Function; import java.util.function.LongFunction; @@ -58,7 +60,12 @@ final class ParquetColumnLocation extends AbstractColumnLoc private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance() .getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192); + private final ParquetTableLocation parquetTableLocation; + private final String columnName; private final String parquetColumnName; + + private boolean isInitialized; + /** * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to * ensure visibility of the derived fields. @@ -77,16 +84,31 @@ final class ParquetColumnLocation extends AbstractColumnLoc * * @param tableLocation The table location enclosing this column location * @param parquetColumnName The Parquet file column name - * @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location */ ParquetColumnLocation( @NotNull final ParquetTableLocation tableLocation, @NotNull final String columnName, - @NotNull final String parquetColumnName, - @Nullable final ColumnChunkReader[] columnChunkReaders) { + @NotNull final String parquetColumnName) { super(tableLocation, columnName); + this.parquetTableLocation = tableLocation; + this.columnName = columnName; this.parquetColumnName = parquetColumnName; - this.columnChunkReaders = columnChunkReaders; + this.isInitialized = false; + } + + private void initialize() { + if (isInitialized) { + return; + } + isInitialized = true; + tl().initialize(); + final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName); + final List nameList = + columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); + final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders()) + .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); + final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); + this.columnChunkReaders = exists ? columnChunkReaders : null; } private PageCache ensurePageCache() { @@ -114,14 +136,14 @@ public String getImplementationName() { @Override public boolean exists() { + initialize(); // If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to - // see a non-null - // pageStores array + // see a non-null pageStores array return columnChunkReaders != null || pageStores != null; } private ParquetTableLocation tl() { - return (ParquetTableLocation) getTableLocation(); + return parquetTableLocation; } private REGION_TYPE makeColumnRegion( @@ -147,6 +169,7 @@ private REGION_TYPE makeSingleColumnRegion(final SOURCE so @Override public ColumnRegionChar makeColumnRegionChar( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionChar) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionChar::createNull, ParquetColumnRegionChar::new, @@ -157,6 +180,7 @@ public ColumnRegionChar makeColumnRegionChar( @Override public ColumnRegionByte makeColumnRegionByte( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionByte) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionByte::createNull, ParquetColumnRegionByte::new, @@ -167,6 +191,7 @@ public ColumnRegionByte makeColumnRegionByte( @Override public ColumnRegionShort makeColumnRegionShort( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionShort) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionShort::createNull, ParquetColumnRegionShort::new, @@ -177,6 +202,7 @@ public ColumnRegionShort makeColumnRegionShort( @Override public ColumnRegionInt makeColumnRegionInt( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionInt) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionInt::createNull, ParquetColumnRegionInt::new, @@ -187,6 +213,7 @@ public ColumnRegionInt makeColumnRegionInt( @Override public ColumnRegionLong makeColumnRegionLong( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionLong) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionLong::createNull, ParquetColumnRegionLong::new, @@ -197,6 +224,7 @@ public ColumnRegionLong makeColumnRegionLong( @Override public ColumnRegionFloat makeColumnRegionFloat( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionFloat) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionFloat::createNull, ParquetColumnRegionFloat::new, @@ -207,6 +235,7 @@ public ColumnRegionFloat makeColumnRegionFloat( @Override public ColumnRegionDouble makeColumnRegionDouble( @NotNull final ColumnDefinition columnDefinition) { + initialize(); // noinspection unchecked return (ColumnRegionDouble) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionDouble::createNull, ParquetColumnRegionDouble::new, @@ -217,6 +246,7 @@ public ColumnRegionDouble makeColumnRegionDouble( @Override public ColumnRegionObject makeColumnRegionObject( @NotNull final ColumnDefinition columnDefinition) { + initialize(); final Class dataType = columnDefinition.getDataType(); final ColumnChunkPageStore[] sources = getPageStores(columnDefinition); final ColumnChunkPageStore[] dictKeySources = @@ -260,6 +290,7 @@ private ColumnRegionObject makeSingleColumnRegionObject( @NotNull public ColumnChunkPageStore[] getPageStores( @NotNull final ColumnDefinition columnDefinition) { + initialize(); fetchValues(columnDefinition); return pageStores; } @@ -272,6 +303,7 @@ public ColumnChunkPageStore[] getPageStores( */ public Supplier>[] getDictionaryChunkSuppliers( @NotNull final ColumnDefinition columnDefinition) { + initialize(); fetchValues(columnDefinition); return dictionaryChunkSuppliers; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 3a941853379..07c9f2cc94b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -155,11 +155,6 @@ ParquetInstructions getReadInstructions() { return readInstructions; } - SeekableChannelsProvider getChannelProvider() { - initialize(); - return parquetFileReader.getChannelsProvider(); - } - RegionedPageStore.Parameters getRegionParameters() { initialize(); return regionParameters; @@ -170,7 +165,7 @@ public Map getColumnTypes() { return columnTypes; } - private RowGroupReader[] getRowGroupReaders() { + RowGroupReader[] getRowGroupReaders() { initialize(); RowGroupReader[] local; if ((local = rowGroupReaders) != null) { @@ -194,19 +189,17 @@ public List getSortedColumns() { return sortingColumns; } + @NotNull + Map getParquetColumnNameToPath() { + initialize(); + return parquetColumnNameToPath; + } + @Override @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { - initialize(); final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName); - final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); - final List nameList = - columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); - final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders()) - .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); - final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); - return new ParquetColumnLocation<>(this, columnName, parquetColumnName, - exists ? columnChunkReaders : null); + return new ParquetColumnLocation<>(this, columnName, parquetColumnName); } private RowSet computeIndex() { From 24a544af7dd7a7613268630b7d19ec417797c9d4 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Jan 2025 21:51:37 +0100 Subject: [PATCH 03/18] Synchronized the initialization logic --- .../table/location/ParquetColumnLocation.java | 25 +++-- .../table/location/ParquetTableLocation.java | 105 ++++++++++-------- 2 files changed, 71 insertions(+), 59 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 8a969adf360..0e8c08a28ed 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -64,7 +64,7 @@ final class ParquetColumnLocation extends AbstractColumnLoc private final String columnName; private final String parquetColumnName; - private boolean isInitialized; + private volatile boolean isInitialized; /** * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to @@ -100,15 +100,20 @@ private void initialize() { if (isInitialized) { return; } - isInitialized = true; - tl().initialize(); - final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName); - final List nameList = - columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); - final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders()) - .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); - final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); - this.columnChunkReaders = exists ? columnChunkReaders : null; + synchronized (this) { + if (isInitialized) { + return; + } + tl().initialize(); + final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName); + final List nameList = + columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); + final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders()) + .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); + final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); + this.columnChunkReaders = exists ? columnChunkReaders : null; + isInitialized = true; + } } private PageCache ensurePageCache() { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 07c9f2cc94b..c00d7651804 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -12,7 +12,6 @@ import io.deephaven.engine.table.BasicDataIndex; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; -import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.dataindex.StandaloneDataIndex; import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.locations.TableDataException; @@ -23,7 +22,6 @@ import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource; import io.deephaven.engine.table.impl.sources.regioned.RegionedPageStore; -import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.parquet.base.RowGroupReader; import io.deephaven.parquet.table.ParquetInstructions; @@ -34,7 +32,6 @@ import io.deephaven.parquet.table.metadata.GroupingColumnInfo; import io.deephaven.parquet.table.metadata.SortColumnInfo; import io.deephaven.parquet.table.metadata.TableInfo; -import io.deephaven.util.channel.SeekableChannelsProvider; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.RowGroup; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -57,7 +54,8 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); - private boolean isInitialized; + // TODO Maybe I should add a local non-volatile + private volatile boolean isInitialized; private final ParquetInstructions readInstructions; private ParquetFileReader parquetFileReader; @@ -82,60 +80,69 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, @NotNull final ParquetInstructions readInstructions) { super(tableKey, tableLocationKey, false); this.readInstructions = readInstructions; - isInitialized = false; + this.isInitialized = false; } protected void initialize() { if (isInitialized) { return; } - isInitialized = true; - final ParquetMetadata parquetMetadata; - final ParquetTableLocationKey tableLocationKey = getParquetKey(); - synchronized (tableLocationKey) { - // Following methods are internally synchronized, we synchronize them together here to minimize lock/unlock - // calls - parquetFileReader = tableLocationKey.getFileReader(); - parquetMetadata = tableLocationKey.getMetadata(); - rowGroupIndices = tableLocationKey.getRowGroupIndices(); - } + synchronized (this) { + if (isInitialized) { + return; + } + final ParquetMetadata parquetMetadata; + final ParquetTableLocationKey tableLocationKey = getParquetKey(); + synchronized (tableLocationKey) { + // Following methods are internally synchronized, we synchronize them together here to minimize + // lock/unlock calls + parquetFileReader = tableLocationKey.getFileReader(); + parquetMetadata = tableLocationKey.getMetadata(); + rowGroupIndices = tableLocationKey.getRowGroupIndices(); + } - final int rowGroupCount = rowGroupIndices.length; - rowGroups = IntStream.of(rowGroupIndices) - .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) - .sorted(Comparator.comparingInt(RowGroup::getOrdinal)) - .toArray(RowGroup[]::new); - final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); - regionParameters = new RegionedPageStore.Parameters( - RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); - - parquetColumnNameToPath = new HashMap<>(); - for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { - final String[] path = column.getPath(); - if (path.length > 1) { - parquetColumnNameToPath.put(path[0], path); + final int rowGroupCount = rowGroupIndices.length; + rowGroups = IntStream.of(rowGroupIndices) + .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) + .sorted(Comparator.comparingInt(RowGroup::getOrdinal)) + .toArray(RowGroup[]::new); + final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); + regionParameters = new RegionedPageStore.Parameters( + RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); + + parquetColumnNameToPath = new HashMap<>(); + for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { + final String[] path = column.getPath(); + if (path.length > 1) { + parquetColumnNameToPath.put(path[0], path); + } } - } - // TODO (https://github.com/deephaven/deephaven-core/issues/958): - // When/if we support _metadata files for Deephaven-written Parquet tables, we may need to revise this - // in order to read *this* file's metadata, rather than inheriting file metadata from the _metadata file. - // Obvious issues included data index table paths, codecs, etc. - // Presumably, we could store per-file instances of the metadata in the _metadata file's map. - tableInfo = ParquetSchemaReader - .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) - .orElse(TableInfo.builder().build()); - version = tableInfo.version(); - groupingColumns = tableInfo.groupingColumnMap(); - dataIndexes = tableInfo.dataIndexes(); - columnTypes = tableInfo.columnTypeMap(); - sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - - if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { - // We do not have the last modified time for non-file URIs - handleUpdate(computeIndex(), TableLocationState.NULL_TIME); - } else { - handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified()); + // TODO (https://github.com/deephaven/deephaven-core/issues/958): + // When/if we support _metadata files for Deephaven-written Parquet tables, we may need to revise this + // in order to read *this* file's metadata, rather than inheriting file metadata from the _metadata file. + // Obvious issues included data index table paths, codecs, etc. + // Presumably, we could store per-file instances of the metadata in the _metadata file's map. + tableInfo = ParquetSchemaReader + .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) + .orElse(TableInfo.builder().build()); + version = tableInfo.version(); + groupingColumns = tableInfo.groupingColumnMap(); + dataIndexes = tableInfo.dataIndexes(); + columnTypes = tableInfo.columnTypeMap(); + sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); + + isInitialized = true; + + // The following calls might internally call initialize() again, but that's fine because we're already + // initialized at this point. + + if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { + // We do not have the last modified time for non-file URIs + handleUpdate(computeIndex(), TableLocationState.NULL_TIME); + } else { + handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified()); + } } } From dcb84eade08e94ba839b05d776ef9bea4c94a81a Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 Jan 2025 22:48:55 +0100 Subject: [PATCH 04/18] Updated some comments --- .../deephaven/engine/table/impl/locations/TableLocation.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java index 63ab9e03d1b..6b9d06c7243 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener { /** * @param name The column name - * @return The ColumnLocation for the defined column under this table location + * @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object + * should be returned for the same column name. */ @NotNull ColumnLocation getColumnLocation(@NotNull CharSequence name); From 6b4f80328d18ed0bf1bc2e5b1bcc65d567c5d9a4 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 23 Jan 2025 22:31:08 +0100 Subject: [PATCH 05/18] Added unit tests --- .../table/location/ParquetColumnLocation.java | 6 +- .../table/location/ParquetTableLocation.java | 3 - .../table/ParquetTableReadWriteTest.java | 124 ++++++++++++++++++ 3 files changed, 126 insertions(+), 7 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 0e8c08a28ed..c1167b580ed 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -293,9 +293,8 @@ private ColumnRegionObject makeSingleColumnRegionObject( * @return The page stores */ @NotNull - public ColumnChunkPageStore[] getPageStores( + private ColumnChunkPageStore[] getPageStores( @NotNull final ColumnDefinition columnDefinition) { - initialize(); fetchValues(columnDefinition); return pageStores; } @@ -306,9 +305,8 @@ public ColumnChunkPageStore[] getPageStores( * @param columnDefinition The {@link ColumnDefinition} used to lookup type information * @return The dictionary values chunk suppliers, or null if none exist */ - public Supplier>[] getDictionaryChunkSuppliers( + private Supplier>[] getDictionaryChunkSuppliers( @NotNull final ColumnDefinition columnDefinition) { - initialize(); fetchValues(columnDefinition); return dictionaryChunkSuppliers; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index c00d7651804..d21586bca22 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -163,7 +163,6 @@ ParquetInstructions getReadInstructions() { } RegionedPageStore.Parameters getRegionParameters() { - initialize(); return regionParameters; } @@ -173,7 +172,6 @@ public Map getColumnTypes() { } RowGroupReader[] getRowGroupReaders() { - initialize(); RowGroupReader[] local; if ((local = rowGroupReaders) != null) { return local; @@ -198,7 +196,6 @@ public List getSortedColumns() { @NotNull Map getParquetColumnNameToPath() { - initialize(); return parquetColumnNameToPath; } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 530be1c5d6b..b230f800767 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -23,6 +23,7 @@ import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfInt; import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfLong; import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfShort; +import io.deephaven.engine.rowset.impl.TrackingWritableRowSetImpl; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.PartitionedTable; @@ -33,6 +34,7 @@ import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.dataindex.DataIndexUtils; import io.deephaven.engine.table.impl.indexer.DataIndexer; +import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.table.impl.select.FunctionalColumn; @@ -86,6 +88,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; import java.net.URI; @@ -104,6 +107,7 @@ import java.util.Map; import java.util.Set; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.Function; import java.util.function.IntConsumer; @@ -132,6 +136,7 @@ import static io.deephaven.parquet.table.ParquetTools.writeTables; import static io.deephaven.util.QueryConstants.*; import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; @Category(OutOfBandTest.class) public final class ParquetTableReadWriteTest { @@ -3651,6 +3656,125 @@ public void overflowingCodecsTest() { assertEquals(columnMetadata.getEncodingStats().getNumDataPagesEncodedAs(Encoding.PLAIN), 2); } + private static void verifyMakeHandleException(final Runnable throwingRunnable) { + try { + throwingRunnable.run(); + fail("Expected UncheckedIOException"); + } catch (final UncheckedIOException e) { + assertTrue(e.getMessage().contains("makeHandle encountered exception")); + } + } + + private static void makeNewTableLocationAndVerifyNoMakeHandleException( + final Consumer parquetTableLocationConsumer) { + final File dest = new File(rootFile, "real.parquet"); + final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + writeTable(table, dest.getPath()); + + final ParquetTableLocationKey tableLocationKey = + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY); + final ParquetTableLocation tableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), tableLocationKey, EMPTY); + try { + parquetTableLocationConsumer.accept(tableLocation); + } catch (final Exception e) { + if (e instanceof UncheckedIOException && e.getMessage().contains("makeHandle encountered exception")) { + fail("Unexpected exception: " + e); + } + } + dest.delete(); + } + + @Test + public void testTableLocationReading() { + final File nonExistentParquetFile = new File(rootFile, "non-existent.parquet"); + + final ParquetTableLocationKey nonExistentTableLocationKey = + new ParquetTableLocationKey(nonExistentParquetFile.toURI(), 0, null, ParquetInstructions.EMPTY); + final ParquetTableLocation nonExistentTableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), nonExistentTableLocationKey, EMPTY); + + // Make sure the file does not exist + assertFalse(nonExistentParquetFile.exists()); + + // Verify that we can perform all these operations without actually touching the file + assertEquals(nonExistentTableLocation.getTableKey(), StandaloneTableKey.getInstance()); + assertEquals(nonExistentTableLocation.getKey(), nonExistentTableLocationKey); + assertNotNull(nonExistentTableLocation.toString()); + assertNotNull(nonExistentTableLocation.asLivenessReferent()); + nonExistentTableLocation.refresh(); + + // Verify that we can get a column location for a non-existent column + final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A"); + assertNotNull(nonExistentColumnLocation); + assertEquals("A", nonExistentColumnLocation.getName()); + assertEquals(nonExistentTableLocation, nonExistentColumnLocation.getTableLocation()); + assertNotNull(nonExistentColumnLocation.toString()); + assertNotNull(nonExistentColumnLocation.getImplementationName()); + + // Verify that all the following operations will fail when the file does not exist and pass when it does + + // APIs from TableLocation + verifyMakeHandleException(nonExistentTableLocation::getDataIndexColumns); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getDataIndexColumns); + + verifyMakeHandleException(nonExistentTableLocation::getSortedColumns); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSortedColumns); + + verifyMakeHandleException(nonExistentTableLocation::getColumnTypes); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getColumnTypes); + + verifyMakeHandleException(nonExistentTableLocation::getDataIndex); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getDataIndex); + + verifyMakeHandleException(nonExistentTableLocation::loadDataIndex); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::loadDataIndex); + + verifyMakeHandleException(nonExistentTableLocation::hasDataIndex); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::hasDataIndex); + + // APIs from TableLocationState + verifyMakeHandleException(nonExistentTableLocation::getStateLock); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getStateLock); + + verifyMakeHandleException(nonExistentTableLocation::getRowSet); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getRowSet); + + verifyMakeHandleException(nonExistentTableLocation::getSize); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSize); + + verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getLastModifiedTimeMillis); + + // APIs from AbstractTableLocation + verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(nonExistentTableLocation)); + makeNewTableLocationAndVerifyNoMakeHandleException( + (parquetTableLocation) -> parquetTableLocation.handleUpdate(parquetTableLocation)); + + verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); + makeNewTableLocationAndVerifyNoMakeHandleException( + (parquetTableLocation) -> parquetTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); + + // APIs from ColumnLocation + verifyMakeHandleException(nonExistentColumnLocation::exists); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionChar( + ColumnDefinition.fromGenericType("A", char.class, Character.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionByte( + ColumnDefinition.fromGenericType("A", byte.class, Byte.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionShort( + ColumnDefinition.fromGenericType("A", short.class, Short.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionInt( + ColumnDefinition.fromGenericType("A", int.class, Integer.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionLong( + ColumnDefinition.fromGenericType("A", long.class, Long.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionFloat( + ColumnDefinition.fromGenericType("A", float.class, Float.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionDouble( + ColumnDefinition.fromGenericType("A", double.class, Double.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionObject( + ColumnDefinition.fromGenericType("A", String.class, String.class))); + } + @Test public void readWriteStatisticsTest() { // Test simple structured table. From 047896ec05d192bac00ae10ca4bc6cbf562238e7 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 24 Jan 2025 18:45:17 +0100 Subject: [PATCH 06/18] Added a local variable along with the volatile --- .../parquet/table/location/ParquetColumnLocation.java | 6 +++++- .../parquet/table/location/ParquetTableLocation.java | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index c1167b580ed..2178ced51db 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -64,7 +64,8 @@ final class ParquetColumnLocation extends AbstractColumnLoc private final String columnName; private final String parquetColumnName; - private volatile boolean isInitialized; + private boolean isInitialized; + private volatile boolean isInitializedVolatile; /** * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to @@ -94,6 +95,7 @@ final class ParquetColumnLocation extends AbstractColumnLoc this.columnName = columnName; this.parquetColumnName = parquetColumnName; this.isInitialized = false; + this.isInitializedVolatile = false; } private void initialize() { @@ -101,6 +103,7 @@ private void initialize() { return; } synchronized (this) { + isInitialized = isInitializedVolatile; if (isInitialized) { return; } @@ -113,6 +116,7 @@ private void initialize() { final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); this.columnChunkReaders = exists ? columnChunkReaders : null; isInitialized = true; + isInitializedVolatile = true; } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index d21586bca22..4cc1ec5ddab 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -54,8 +54,8 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); - // TODO Maybe I should add a local non-volatile - private volatile boolean isInitialized; + private boolean isInitialized; + private volatile boolean isInitializedVolatile; private final ParquetInstructions readInstructions; private ParquetFileReader parquetFileReader; @@ -81,6 +81,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, super(tableKey, tableLocationKey, false); this.readInstructions = readInstructions; this.isInitialized = false; + this.isInitializedVolatile = false; } protected void initialize() { @@ -88,6 +89,7 @@ protected void initialize() { return; } synchronized (this) { + isInitialized = isInitializedVolatile; if (isInitialized) { return; } @@ -133,6 +135,7 @@ protected void initialize() { sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); isInitialized = true; + isInitializedVolatile = true; // The following calls might internally call initialize() again, but that's fine because we're already // initialized at this point. From cc7ca0941703476ad062d74e474cc92ebff5579c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 27 Jan 2025 18:52:59 +0100 Subject: [PATCH 07/18] Review with Devin --- .../locations/impl/AbstractTableLocation.java | 22 ++-------- .../table/location/ParquetTableLocation.java | 21 +++++++++- .../table/ParquetTableReadWriteTest.java | 42 +++++++++---------- 3 files changed, 44 insertions(+), 41 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index 7117cafc476..95a6866b752 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -73,14 +73,6 @@ protected void destroy() { }; } - /** - * This method is called before every public method so that the child classes can initialize the location lazily, if - * necessary. - */ - protected void initialize() { - // Do nothing - } - @Override public final String toString() { return toStringHelper(); @@ -98,25 +90,21 @@ public LivenessReferent asLivenessReferent() { @Override @NotNull public final Object getStateLock() { - initialize(); return state.getStateLock(); } @Override - public final RowSet getRowSet() { - initialize(); + public RowSet getRowSet() { return state.getRowSet(); } @Override - public final long getSize() { - initialize(); + public long getSize() { return state.getSize(); } @Override public final long getLastModifiedTimeMillis() { - initialize(); return state.getLastModifiedTimeMillis(); } @@ -138,7 +126,6 @@ public final ImmutableTableLocationKey getKey() { @Override protected final void deliverInitialSnapshot(@NotNull final Listener listener) { - initialize(); listener.handleUpdate(); } @@ -150,7 +137,6 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) { * @param lastModifiedTimeMillis The new lastModificationTimeMillis */ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) { - initialize(); if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -163,7 +149,6 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM * @param source The source to copy state values from */ public void handleUpdate(@NotNull final TableLocationState source) { - initialize(); if (source.copyStateValuesTo(state) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -237,8 +222,7 @@ private BasicDataIndex getDataIndex() { @Override @Nullable - public final BasicDataIndex getDataIndex(@NotNull final String... columns) { - initialize(); + public BasicDataIndex getDataIndex(@NotNull final String... columns) { final List columnNames = new ArrayList<>(columns.length); Collections.addAll(columnNames, columns); columnNames.sort(String::compareTo); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 4cc1ec5ddab..c4f47f8a73a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -84,7 +84,7 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, this.isInitializedVolatile = false; } - protected void initialize() { + final void initialize() { if (isInitialized) { return; } @@ -202,6 +202,18 @@ Map getParquetColumnNameToPath() { return parquetColumnNameToPath; } + @Override + public final RowSet getRowSet() { + initialize(); + return super.getRowSet(); + } + + @Override + public final long getSize() { + initialize(); + return super.getSize(); + } + @Override @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { @@ -268,6 +280,13 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) { return !fileURI.getScheme().equals(FILE_URI_SCHEME) || Files.exists(Path.of(fileURI)); } + @Override + @Nullable + public final BasicDataIndex getDataIndex(@NotNull final String... columns) { + initialize(); + return super.getDataIndex(columns); + } + @Override @Nullable public BasicDataIndex loadDataIndex(@NotNull final String... columns) { diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index b230f800767..b368532719d 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3687,22 +3687,38 @@ private static void makeNewTableLocationAndVerifyNoMakeHandleException( @Test public void testTableLocationReading() { + // Make a new ParquetTableLocation for a non-existent parquet file final File nonExistentParquetFile = new File(rootFile, "non-existent.parquet"); - + assertFalse(nonExistentParquetFile.exists()); final ParquetTableLocationKey nonExistentTableLocationKey = new ParquetTableLocationKey(nonExistentParquetFile.toURI(), 0, null, ParquetInstructions.EMPTY); final ParquetTableLocation nonExistentTableLocation = new ParquetTableLocation(StandaloneTableKey.getInstance(), nonExistentTableLocationKey, EMPTY); - // Make sure the file does not exist - assertFalse(nonExistentParquetFile.exists()); - - // Verify that we can perform all these operations without actually touching the file + // Ensure operations don't touch the file or throw exceptions assertEquals(nonExistentTableLocation.getTableKey(), StandaloneTableKey.getInstance()); assertEquals(nonExistentTableLocation.getKey(), nonExistentTableLocationKey); assertNotNull(nonExistentTableLocation.toString()); assertNotNull(nonExistentTableLocation.asLivenessReferent()); + assertNotNull(nonExistentTableLocation.getStateLock()); + nonExistentTableLocation.getLastModifiedTimeMillis(); nonExistentTableLocation.refresh(); + nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0); + { + // To test the following make a new temporary table location + final File tempDest = new File(rootFile, "testLocation.parquet"); + final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + writeTable(table, tempDest.getPath()); + + final ParquetTableLocationKey tableLocationKey = + new ParquetTableLocationKey(tempDest.toURI(), 0, null, ParquetInstructions.EMPTY); + final ParquetTableLocation tableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), tableLocationKey, EMPTY); + + // Test this operation works without throwing an exception + nonExistentTableLocation.handleUpdate(tableLocation); + tempDest.delete(); + } // Verify that we can get a column location for a non-existent column final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A"); @@ -3713,7 +3729,6 @@ public void testTableLocationReading() { assertNotNull(nonExistentColumnLocation.getImplementationName()); // Verify that all the following operations will fail when the file does not exist and pass when it does - // APIs from TableLocation verifyMakeHandleException(nonExistentTableLocation::getDataIndexColumns); makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getDataIndexColumns); @@ -3734,27 +3749,12 @@ public void testTableLocationReading() { makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::hasDataIndex); // APIs from TableLocationState - verifyMakeHandleException(nonExistentTableLocation::getStateLock); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getStateLock); - verifyMakeHandleException(nonExistentTableLocation::getRowSet); makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getRowSet); verifyMakeHandleException(nonExistentTableLocation::getSize); makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSize); - verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getLastModifiedTimeMillis); - - // APIs from AbstractTableLocation - verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(nonExistentTableLocation)); - makeNewTableLocationAndVerifyNoMakeHandleException( - (parquetTableLocation) -> parquetTableLocation.handleUpdate(parquetTableLocation)); - - verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); - makeNewTableLocationAndVerifyNoMakeHandleException( - (parquetTableLocation) -> parquetTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); - // APIs from ColumnLocation verifyMakeHandleException(nonExistentColumnLocation::exists); verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionChar( From 1e0e36f4f6bdbcbb4f660832f8bdcd1a4a7e63c9 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 27 Jan 2025 19:12:57 +0100 Subject: [PATCH 08/18] Minor tweaks --- .../parquet/table/location/ParquetTableLocation.java | 9 +++------ .../parquet/table/ParquetTableReadWriteTest.java | 9 +++++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index c4f47f8a73a..cb8e2297bb4 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -134,18 +134,15 @@ final void initialize() { columnTypes = tableInfo.columnTypeMap(); sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - isInitialized = true; - isInitializedVolatile = true; - - // The following calls might internally call initialize() again, but that's fine because we're already - // initialized at this point. - if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs handleUpdate(computeIndex(), TableLocationState.NULL_TIME); } else { handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified()); } + + isInitialized = true; + isInitializedVolatile = true; } } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index b368532719d..dbe3c8b57f1 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3671,12 +3671,12 @@ private static void makeNewTableLocationAndVerifyNoMakeHandleException( final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); writeTable(table, dest.getPath()); - final ParquetTableLocationKey tableLocationKey = + final ParquetTableLocationKey newTableLocationKey = new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY); - final ParquetTableLocation tableLocation = - new ParquetTableLocation(StandaloneTableKey.getInstance(), tableLocationKey, EMPTY); + final ParquetTableLocation newTableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), newTableLocationKey, EMPTY); try { - parquetTableLocationConsumer.accept(tableLocation); + parquetTableLocationConsumer.accept(newTableLocation); } catch (final Exception e) { if (e instanceof UncheckedIOException && e.getMessage().contains("makeHandle encountered exception")) { fail("Unexpected exception: " + e); @@ -3704,6 +3704,7 @@ public void testTableLocationReading() { nonExistentTableLocation.getLastModifiedTimeMillis(); nonExistentTableLocation.refresh(); nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0); + makeNewTableLocationAndVerifyNoMakeHandleException(nonExistentTableLocation::handleUpdate); { // To test the following make a new temporary table location final File tempDest = new File(rootFile, "testLocation.parquet"); From b67b4f14ea7800b5913a6591816eeb2f59f8f50d Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 27 Jan 2025 19:13:34 +0100 Subject: [PATCH 09/18] Removing redundant tests --- .../parquet/table/ParquetTableReadWriteTest.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index dbe3c8b57f1..f8c74b98891 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3705,21 +3705,6 @@ public void testTableLocationReading() { nonExistentTableLocation.refresh(); nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0); makeNewTableLocationAndVerifyNoMakeHandleException(nonExistentTableLocation::handleUpdate); - { - // To test the following make a new temporary table location - final File tempDest = new File(rootFile, "testLocation.parquet"); - final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); - writeTable(table, tempDest.getPath()); - - final ParquetTableLocationKey tableLocationKey = - new ParquetTableLocationKey(tempDest.toURI(), 0, null, ParquetInstructions.EMPTY); - final ParquetTableLocation tableLocation = - new ParquetTableLocation(StandaloneTableKey.getInstance(), tableLocationKey, EMPTY); - - // Test this operation works without throwing an exception - nonExistentTableLocation.handleUpdate(tableLocation); - tempDest.delete(); - } // Verify that we can get a column location for a non-existent column final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A"); From 70788aba9fc0eb9941e91ee2c271175f502137b0 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 29 Jan 2025 18:15:29 +0100 Subject: [PATCH 10/18] Review with Devin --- .../locations/impl/AbstractTableLocation.java | 2 +- .../table/location/ParquetColumnLocation.java | 48 ++++++++----------- .../table/location/ParquetTableLocation.java | 41 ++++++---------- 3 files changed, 36 insertions(+), 55 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index 95a6866b752..7fed3f273d5 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -222,7 +222,7 @@ private BasicDataIndex getDataIndex() { @Override @Nullable - public BasicDataIndex getDataIndex(@NotNull final String... columns) { + public final BasicDataIndex getDataIndex(@NotNull final String... columns) { final List columnNames = new ArrayList<>(columns.length); Collections.addAll(columnNames, columns); columnNames.sort(String::compareTo); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 2178ced51db..771207941c2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -60,19 +60,25 @@ final class ParquetColumnLocation extends AbstractColumnLoc private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance() .getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192); - private final ParquetTableLocation parquetTableLocation; private final String columnName; private final String parquetColumnName; - private boolean isInitialized; - private volatile boolean isInitializedVolatile; + private volatile boolean readersInitialized; + + // Access to following variables must be guarded by initializeReaders() /** * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to - * ensure visibility of the derived fields. + * ensure visibility of the derived fields. We delay initializing this field till we need to read the column data. */ private volatile ColumnChunkReader[] columnChunkReaders; + /** + * Whether the column location actually exists. + */ + private boolean exists; + // ----------------------------------------------------------------------- + // We should consider moving this to column level if needed. Column-location level likely allows more parallelism. private volatile PageCache pageCache; @@ -91,32 +97,27 @@ final class ParquetColumnLocation extends AbstractColumnLoc @NotNull final String columnName, @NotNull final String parquetColumnName) { super(tableLocation, columnName); - this.parquetTableLocation = tableLocation; this.columnName = columnName; this.parquetColumnName = parquetColumnName; - this.isInitialized = false; - this.isInitializedVolatile = false; + this.readersInitialized = false; } - private void initialize() { - if (isInitialized) { + private void initializeReaders() { + if (readersInitialized) { return; } synchronized (this) { - isInitialized = isInitializedVolatile; - if (isInitialized) { + if (readersInitialized) { return; } - tl().initialize(); final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName); final List nameList = columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders()) .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); - final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); + exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); this.columnChunkReaders = exists ? columnChunkReaders : null; - isInitialized = true; - isInitializedVolatile = true; + readersInitialized = true; } } @@ -145,14 +146,12 @@ public String getImplementationName() { @Override public boolean exists() { - initialize(); - // If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to - // see a non-null pageStores array - return columnChunkReaders != null || pageStores != null; + initializeReaders(); + return exists; } private ParquetTableLocation tl() { - return parquetTableLocation; + return (ParquetTableLocation) getTableLocation(); } private REGION_TYPE makeColumnRegion( @@ -178,7 +177,6 @@ private REGION_TYPE makeSingleColumnRegion(final SOURCE so @Override public ColumnRegionChar makeColumnRegionChar( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionChar) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionChar::createNull, ParquetColumnRegionChar::new, @@ -189,7 +187,6 @@ public ColumnRegionChar makeColumnRegionChar( @Override public ColumnRegionByte makeColumnRegionByte( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionByte) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionByte::createNull, ParquetColumnRegionByte::new, @@ -200,7 +197,6 @@ public ColumnRegionByte makeColumnRegionByte( @Override public ColumnRegionShort makeColumnRegionShort( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionShort) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionShort::createNull, ParquetColumnRegionShort::new, @@ -211,7 +207,6 @@ public ColumnRegionShort makeColumnRegionShort( @Override public ColumnRegionInt makeColumnRegionInt( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionInt) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionInt::createNull, ParquetColumnRegionInt::new, @@ -222,7 +217,6 @@ public ColumnRegionInt makeColumnRegionInt( @Override public ColumnRegionLong makeColumnRegionLong( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionLong) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionLong::createNull, ParquetColumnRegionLong::new, @@ -233,7 +227,6 @@ public ColumnRegionLong makeColumnRegionLong( @Override public ColumnRegionFloat makeColumnRegionFloat( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionFloat) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionFloat::createNull, ParquetColumnRegionFloat::new, @@ -244,7 +237,6 @@ public ColumnRegionFloat makeColumnRegionFloat( @Override public ColumnRegionDouble makeColumnRegionDouble( @NotNull final ColumnDefinition columnDefinition) { - initialize(); // noinspection unchecked return (ColumnRegionDouble) makeColumnRegion(this::getPageStores, columnDefinition, ColumnRegionDouble::createNull, ParquetColumnRegionDouble::new, @@ -255,7 +247,6 @@ public ColumnRegionDouble makeColumnRegionDouble( @Override public ColumnRegionObject makeColumnRegionObject( @NotNull final ColumnDefinition columnDefinition) { - initialize(); final Class dataType = columnDefinition.getDataType(); final ColumnChunkPageStore[] sources = getPageStores(columnDefinition); final ColumnChunkPageStore[] dictKeySources = @@ -330,6 +321,7 @@ private ColumnChunkPageStore[] getDictionaryKeysPageStores( @SuppressWarnings("unchecked") private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { + initializeReaders(); if (columnChunkReaders == null) { return; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index cb8e2297bb4..dab4c86c80d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -54,26 +54,25 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); - private boolean isInitialized; - private volatile boolean isInitializedVolatile; - private final ParquetInstructions readInstructions; + + private volatile boolean isInitialized; + + // Access to all the following variables must be guarded by initialize() private ParquetFileReader parquetFileReader; private int[] rowGroupIndices; - private RowGroup[] rowGroups; private RegionedPageStore.Parameters regionParameters; private Map parquetColumnNameToPath; private TableInfo tableInfo; private Map groupingColumns; - private List dataIndexes; private Map columnTypes; private List sortingColumns; - private String version; private volatile RowGroupReader[] rowGroupReaders; + // ----------------------------------------------------------------------- public ParquetTableLocation(@NotNull final TableKey tableKey, @NotNull final ParquetTableLocationKey tableLocationKey, @@ -81,15 +80,13 @@ public ParquetTableLocation(@NotNull final TableKey tableKey, super(tableKey, tableLocationKey, false); this.readInstructions = readInstructions; this.isInitialized = false; - this.isInitializedVolatile = false; } - final void initialize() { + private void initialize() { if (isInitialized) { return; } synchronized (this) { - isInitialized = isInitializedVolatile; if (isInitialized) { return; } @@ -104,7 +101,7 @@ final void initialize() { } final int rowGroupCount = rowGroupIndices.length; - rowGroups = IntStream.of(rowGroupIndices) + final RowGroup[] rowGroups = IntStream.of(rowGroupIndices) .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) .sorted(Comparator.comparingInt(RowGroup::getOrdinal)) .toArray(RowGroup[]::new); @@ -130,19 +127,17 @@ final void initialize() { .orElse(TableInfo.builder().build()); version = tableInfo.version(); groupingColumns = tableInfo.groupingColumnMap(); - dataIndexes = tableInfo.dataIndexes(); columnTypes = tableInfo.columnTypeMap(); sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs - handleUpdate(computeIndex(), TableLocationState.NULL_TIME); + handleUpdate(computeIndex(rowGroups), TableLocationState.NULL_TIME); } else { - handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified()); + handleUpdate(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified()); } isInitialized = true; - isInitializedVolatile = true; } } @@ -163,6 +158,7 @@ ParquetInstructions getReadInstructions() { } RegionedPageStore.Parameters getRegionParameters() { + initialize(); return regionParameters; } @@ -180,6 +176,7 @@ RowGroupReader[] getRowGroupReaders() { if ((local = rowGroupReaders) != null) { return local; } + initialize(); return rowGroupReaders = IntStream.of(rowGroupIndices) .mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) @@ -196,6 +193,7 @@ public List getSortedColumns() { @NotNull Map getParquetColumnNameToPath() { + initialize(); return parquetColumnNameToPath; } @@ -218,7 +216,7 @@ protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { return new ParquetColumnLocation<>(this, columnName, parquetColumnName); } - private RowSet computeIndex() { + private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { final RowSetBuilderSequential sequentialBuilder = RowSetFactory.builderSequential(); for (int rgi = 0; rgi < rowGroups.length; ++rgi) { @@ -238,6 +236,7 @@ private RowSet computeIndex() { @NotNull public List getDataIndexColumns() { initialize(); + final List dataIndexes = tableInfo.dataIndexes(); if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) { return List.of(); } @@ -259,7 +258,7 @@ public boolean hasDataIndex(@NotNull final String... columns) { return metadata != null && parquetFileExists(metadata.fileURI); } // Check if the column names match any of the data indexes - for (final DataIndexInfo dataIndex : dataIndexes) { + for (final DataIndexInfo dataIndex : tableInfo.dataIndexes()) { if (dataIndex.matchesColumns(columns)) { // Validate the index file exists (without loading and parsing it) final IndexFileMetadata metadata = getIndexFileMetadata( @@ -277,20 +276,10 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) { return !fileURI.getScheme().equals(FILE_URI_SCHEME) || Files.exists(Path.of(fileURI)); } - @Override - @Nullable - public final BasicDataIndex getDataIndex(@NotNull final String... columns) { - initialize(); - return super.getDataIndex(columns); - } - @Override @Nullable public BasicDataIndex loadDataIndex(@NotNull final String... columns) { initialize(); - if (tableInfo == null) { - return null; - } final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns); if (indexFileMetaData == null) { throw new TableDataException( From 45272738899b05e27ea7216e9d22a27c075dd50c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 29 Jan 2025 18:36:37 +0100 Subject: [PATCH 11/18] Removed ColumnLocationState --- .../regioned/RegionedColumnSourceManager.java | 42 ++++--------------- 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java index f33e85514ec..85fec72346f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java @@ -603,7 +603,9 @@ private class IncludedTableLocationEntry implements Comparable> columnLocationStates = new ArrayList<>(); + + // Collection of column sources for which we have added a region, useful for invalidating together + private final Collection> regionedColumnSources = new ArrayList<>(); /** * RowSet in the region's space, not the table's space. @@ -631,13 +633,11 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi .appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey)); for (final ColumnDefinition columnDefinition : columnDefinitions) { - // noinspection unchecked,rawtypes - final ColumnLocationState state = new ColumnLocationState( - columnDefinition, - columnSources.get(columnDefinition.getName()), - location.getColumnLocation(columnDefinition.getName())); - columnLocationStates.add(state); - state.regionAllocated(regionIndex); + final RegionedColumnSource regionedColumnSource = columnSources.get(columnDefinition.getName()); + final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName()); + Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation), + "regionedColumnSource.addRegion((definition, location)"); + regionedColumnSources.add(regionedColumnSource); } rowSetAtLastUpdate = initialRowSet; @@ -710,7 +710,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) { } private void invalidate() { - columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex)); + regionedColumnSources.forEach(source -> source.invalidateRegion(regionIndex)); } @Override @@ -734,30 +734,6 @@ public ImmutableTableLocationKey getKey( } }; - /** - * Batches up a definition, source, and location for ease of use. Implements grouping maintenance. - */ - private static class ColumnLocationState { - - protected final ColumnDefinition definition; - protected final RegionedColumnSource source; - protected final ColumnLocation location; - - private ColumnLocationState( - ColumnDefinition definition, - RegionedColumnSource source, - ColumnLocation location) { - this.definition = definition; - this.source = source; - this.location = location; - } - - private void regionAllocated(final int regionIndex) { - Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location), - "source.addRegion((definition, location)"); - } - } - public Map getTableAttributes( @NotNull TableUpdateMode tableUpdateMode, @NotNull TableUpdateMode tableLocationUpdateMode) { From a8330b701c84fb50960a11153f27f5cb3dcead49 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 29 Jan 2025 18:59:32 +0100 Subject: [PATCH 12/18] Minor optimizations --- .../table/location/ParquetTableLocation.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index dab4c86c80d..f9e79157b9f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -125,10 +125,10 @@ private void initialize() { tableInfo = ParquetSchemaReader .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) .orElse(TableInfo.builder().build()); - version = tableInfo.version(); groupingColumns = tableInfo.groupingColumnMap(); columnTypes = tableInfo.columnTypeMap(); sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); + version = tableInfo.version(); if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs @@ -254,17 +254,14 @@ public boolean hasDataIndex(@NotNull final String... columns) { // Check if the column name matches any of the grouping columns if (columns.length == 1 && groupingColumns.containsKey(columns[0])) { // Validate the index file exists (without loading and parsing it) - final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns); + final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); } // Check if the column names match any of the data indexes for (final DataIndexInfo dataIndex : tableInfo.dataIndexes()) { if (dataIndex.matchesColumns(columns)) { // Validate the index file exists (without loading and parsing it) - final IndexFileMetadata metadata = getIndexFileMetadata( - getParquetKey().getURI(), - tableInfo, - columns); + final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); } } @@ -280,7 +277,7 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) { @Nullable public BasicDataIndex loadDataIndex(@NotNull final String... columns) { initialize(); - final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns); + final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), columns); if (indexFileMetaData == null) { throw new TableDataException( String.format( @@ -326,13 +323,12 @@ private static URI makeRelativeURI(@NotNull final URI parentFileURI, @NotNull fi } } - private static IndexFileMetadata getIndexFileMetadata( + private IndexFileMetadata getIndexFileMetadata( @NotNull final URI parentFileURI, - @NotNull final TableInfo info, @NotNull final String... keyColumnNames) { if (keyColumnNames.length == 1) { // If there's only one key column, there might be (legacy) grouping info - final GroupingColumnInfo groupingColumnInfo = info.groupingColumnMap().get(keyColumnNames[0]); + final GroupingColumnInfo groupingColumnInfo = groupingColumns.get(keyColumnNames[0]); if (groupingColumnInfo != null) { return new IndexFileMetadata( makeRelativeURI(parentFileURI, groupingColumnInfo.groupingTablePath()), @@ -343,7 +339,7 @@ private static IndexFileMetadata getIndexFileMetadata( // Either there are more than 1 key columns, or there was no grouping info, so lets see if there was a // DataIndex. - final DataIndexInfo dataIndexInfo = info.dataIndexes().stream() + final DataIndexInfo dataIndexInfo = tableInfo.dataIndexes().stream() .filter(item -> item.matchesColumns(keyColumnNames)) .findFirst() .orElse(null); From 80cd3df6dce06011ed48b778a33400c6378c60b1 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 19:16:38 +0100 Subject: [PATCH 13/18] Review with Devin contd. --- .../locations/impl/AbstractTableLocation.java | 13 ++++- .../table/location/ParquetColumnLocation.java | 44 +++++++------- .../table/location/ParquetTableLocation.java | 58 ++++++++++++------- .../table/ParquetTableReadWriteTest.java | 8 ++- 4 files changed, 74 insertions(+), 49 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index 7fed3f273d5..ab4ebeec5ee 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -87,6 +87,10 @@ public LivenessReferent asLivenessReferent() { // TableLocationState implementation // ------------------------------------------------------------------------------------------------------------------ + protected void initializeState() { + // No-op by default, can be overridden by subclasses to initialize state on first access + } + @Override @NotNull public final Object getStateLock() { @@ -94,17 +98,20 @@ public final Object getStateLock() { } @Override - public RowSet getRowSet() { + public final RowSet getRowSet() { + initializeState(); return state.getRowSet(); } @Override - public long getSize() { + public final long getSize() { + initializeState(); return state.getSize(); } @Override public final long getLastModifiedTimeMillis() { + initializeState(); return state.getLastModifiedTimeMillis(); } @@ -137,6 +144,7 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) { * @param lastModifiedTimeMillis The new lastModificationTimeMillis */ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) { + initializeState(); if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -149,6 +157,7 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM * @param source The source to copy state values from */ public void handleUpdate(@NotNull final TableLocationState source) { + initializeState(); if (source.copyStateValuesTo(state) && supportsSubscriptions()) { deliverUpdateNotification(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 771207941c2..43440495d93 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -67,6 +67,7 @@ final class ParquetColumnLocation extends AbstractColumnLoc private volatile boolean readersInitialized; // Access to following variables must be guarded by initializeReaders() + // ----------------------------------------------------------------------- /** * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to * ensure visibility of the derived fields. We delay initializing this field till we need to read the column data. @@ -79,12 +80,14 @@ final class ParquetColumnLocation extends AbstractColumnLoc private boolean exists; // ----------------------------------------------------------------------- - // We should consider moving this to column level if needed. Column-location level likely allows more parallelism. - private volatile PageCache pageCache; + private volatile boolean pagesInitialized; + // Access to following variables must be guarded by initializePages() + // ----------------------------------------------------------------------- private ColumnChunkPageStore[] pageStores; private Supplier>[] dictionaryChunkSuppliers; private ColumnChunkPageStore[] dictionaryKeysPageStores; + // ----------------------------------------------------------------------- /** * Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name. @@ -100,6 +103,7 @@ final class ParquetColumnLocation extends AbstractColumnLoc this.columnName = columnName; this.parquetColumnName = parquetColumnName; this.readersInitialized = false; + this.pagesInitialized = false; } private void initializeReaders() { @@ -121,20 +125,6 @@ private void initializeReaders() { } } - private PageCache ensurePageCache() { - PageCache localPageCache; - if ((localPageCache = pageCache) != null) { - return localPageCache; - } - - synchronized (this) { - if ((localPageCache = pageCache) != null) { - return localPageCache; - } - return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE); - } - } - // ----------------------------------------------------------------------------------------------------------------- // AbstractColumnLocation implementation // ----------------------------------------------------------------------------------------------------------------- @@ -290,7 +280,7 @@ private ColumnRegionObject makeSingleColumnRegionObject( @NotNull private ColumnChunkPageStore[] getPageStores( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return pageStores; } @@ -302,7 +292,7 @@ private ColumnChunkPageStore[] getPageStores( */ private Supplier>[] getDictionaryChunkSuppliers( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return dictionaryChunkSuppliers; } @@ -315,18 +305,18 @@ private Supplier>[] getDictionaryChunkSuppliers( */ private ColumnChunkPageStore[] getDictionaryKeysPageStores( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return dictionaryKeysPageStores; } @SuppressWarnings("unchecked") - private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { - initializeReaders(); - if (columnChunkReaders == null) { + private void initializePages(@NotNull final ColumnDefinition columnDefinition) { + if (pagesInitialized) { return; } + initializeReaders(); synchronized (this) { - if (columnChunkReaders == null) { + if (pagesInitialized) { return; } @@ -334,12 +324,17 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { pageStores = new ColumnChunkPageStore[pageStoreCount]; dictionaryChunkSuppliers = new Supplier[pageStoreCount]; dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount]; + + // We should consider moving this page-cache to column level if needed. + // Column-location level likely allows more parallelism. + final PageCache pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE); + for (int psi = 0; psi < pageStoreCount; ++psi) { final ColumnChunkReader columnChunkReader = columnChunkReaders[psi]; try { final ColumnChunkPageStore.CreatorResult creatorResult = ColumnChunkPageStore.create( - ensurePageCache(), + pageCache, columnChunkReader, tl().getRegionParameters().regionMask, makeToPage(tl().getColumnTypes().get(parquetColumnName), @@ -356,6 +351,7 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { } columnChunkReaders = null; + pagesInitialized = true; } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index f9e79157b9f..9f05f462609 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -59,6 +59,7 @@ public class ParquetTableLocation extends AbstractTableLocation { private volatile boolean isInitialized; // Access to all the following variables must be guarded by initialize() + // ----------------------------------------------------------------------- private ParquetFileReader parquetFileReader; private int[] rowGroupIndices; @@ -66,6 +67,8 @@ public class ParquetTableLocation extends AbstractTableLocation { private Map parquetColumnNameToPath; private TableInfo tableInfo; + + // Following are initialized on first access, so should only be accessed via their getters private Map groupingColumns; private Map columnTypes; private List sortingColumns; @@ -125,10 +128,8 @@ private void initialize() { tableInfo = ParquetSchemaReader .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) .orElse(TableInfo.builder().build()); - groupingColumns = tableInfo.groupingColumnMap(); - columnTypes = tableInfo.columnTypeMap(); - sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - version = tableInfo.version(); + + isInitialized = true; if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs @@ -136,8 +137,6 @@ private void initialize() { } else { handleUpdate(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified()); } - - isInitialized = true; } } @@ -164,6 +163,9 @@ RegionedPageStore.Parameters getRegionParameters() { public Map getColumnTypes() { initialize(); + if (columnTypes == null) { + columnTypes = Collections.unmodifiableMap(tableInfo.columnTypeMap()); + } return columnTypes; } @@ -177,10 +179,12 @@ RowGroupReader[] getRowGroupReaders() { return local; } initialize(); - return rowGroupReaders = IntStream.of(rowGroupIndices) - .mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) + local = IntStream.of(rowGroupIndices) + .mapToObj(idx -> parquetFileReader.getRowGroup(idx, getVersion())) .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) .toArray(RowGroupReader[]::new); + rowGroupReaders = local; + return local; } } @@ -188,25 +192,38 @@ RowGroupReader[] getRowGroupReaders() { @NotNull public List getSortedColumns() { initialize(); + if (sortingColumns == null) { + sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); + } return sortingColumns; } @NotNull - Map getParquetColumnNameToPath() { + private Map getGroupingColumns() { initialize(); - return parquetColumnNameToPath; + if (groupingColumns == null) { + groupingColumns = Collections.unmodifiableMap(tableInfo.groupingColumnMap()); + } + return groupingColumns; } - @Override - public final RowSet getRowSet() { + private String getVersion() { + initialize(); + if (version == null) { + version = tableInfo.version(); + } + return version; + } + + @NotNull + Map getParquetColumnNameToPath() { initialize(); - return super.getRowSet(); + return parquetColumnNameToPath; } @Override - public final long getSize() { + public final void initializeState() { initialize(); - return super.getSize(); } @Override @@ -237,14 +254,15 @@ private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { public List getDataIndexColumns() { initialize(); final List dataIndexes = tableInfo.dataIndexes(); - if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) { + final Map localGroupingColumns = getGroupingColumns(); + if (dataIndexes.isEmpty() && localGroupingColumns.isEmpty()) { return List.of(); } - final List dataIndexColumns = new ArrayList<>(dataIndexes.size() + groupingColumns.size()); + final List dataIndexColumns = new ArrayList<>(dataIndexes.size() + localGroupingColumns.size()); // Add the data indexes to the list dataIndexes.stream().map(di -> di.columns().toArray(String[]::new)).forEach(dataIndexColumns::add); // Add grouping columns to the list - groupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); + localGroupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); return dataIndexColumns; } @@ -252,7 +270,7 @@ public List getDataIndexColumns() { public boolean hasDataIndex(@NotNull final String... columns) { initialize(); // Check if the column name matches any of the grouping columns - if (columns.length == 1 && groupingColumns.containsKey(columns[0])) { + if (columns.length == 1 && getGroupingColumns().containsKey(columns[0])) { // Validate the index file exists (without loading and parsing it) final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); @@ -328,7 +346,7 @@ private IndexFileMetadata getIndexFileMetadata( @NotNull final String... keyColumnNames) { if (keyColumnNames.length == 1) { // If there's only one key column, there might be (legacy) grouping info - final GroupingColumnInfo groupingColumnInfo = groupingColumns.get(keyColumnNames[0]); + final GroupingColumnInfo groupingColumnInfo = getGroupingColumns().get(keyColumnNames[0]); if (groupingColumnInfo != null) { return new IndexFileMetadata( makeRelativeURI(parentFileURI, groupingColumnInfo.groupingTablePath()), diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index f8c74b98891..a9a0deccd31 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3701,10 +3701,7 @@ public void testTableLocationReading() { assertNotNull(nonExistentTableLocation.toString()); assertNotNull(nonExistentTableLocation.asLivenessReferent()); assertNotNull(nonExistentTableLocation.getStateLock()); - nonExistentTableLocation.getLastModifiedTimeMillis(); nonExistentTableLocation.refresh(); - nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0); - makeNewTableLocationAndVerifyNoMakeHandleException(nonExistentTableLocation::handleUpdate); // Verify that we can get a column location for a non-existent column final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A"); @@ -3741,6 +3738,11 @@ public void testTableLocationReading() { verifyMakeHandleException(nonExistentTableLocation::getSize); makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSize); + verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis); + makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getLastModifiedTimeMillis); + + verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); + // APIs from ColumnLocation verifyMakeHandleException(nonExistentColumnLocation::exists); verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionChar( From 2babf800e2df1821bc08409abbd3e108e73d5677 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 21:05:10 +0100 Subject: [PATCH 14/18] More review comments with Devin --- .../locations/impl/AbstractTableLocation.java | 20 +++++- .../table/location/ParquetColumnLocation.java | 3 +- .../table/location/ParquetTableLocation.java | 66 +++++-------------- 3 files changed, 35 insertions(+), 54 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index ab4ebeec5ee..a51eb640cc3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -87,9 +87,15 @@ public LivenessReferent asLivenessReferent() { // TableLocationState implementation // ------------------------------------------------------------------------------------------------------------------ - protected void initializeState() { - // No-op by default, can be overridden by subclasses to initialize state on first access - } + /** + * No-op by default, can be overridden by subclasses to initialize state on first access. + *

+ * The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)} + * instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of + * {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization + * logic will recurse infinitely. + */ + protected void initializeState() {} @Override @NotNull @@ -145,6 +151,10 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) { */ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) { initializeState(); + handleUpdateInternal(rowSet, lastModifiedTimeMillis); + } + + protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) { if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -158,6 +168,10 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM */ public void handleUpdate(@NotNull final TableLocationState source) { initializeState(); + handleUpdateInternal(source); + } + + protected final void handleUpdateInternal(@NotNull final TableLocationState source) { if (source.copyStateValuesTo(state) && supportsSubscriptions()) { deliverUpdateNotification(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 43440495d93..beadf764910 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -314,12 +314,11 @@ private void initializePages(@NotNull final ColumnDefinition columnDefinition if (pagesInitialized) { return; } - initializeReaders(); synchronized (this) { if (pagesInitialized) { return; } - + initializeReaders(); final int pageStoreCount = columnChunkReaders.length; pageStores = new ColumnChunkPageStore[pageStoreCount]; dictionaryChunkSuppliers = new Supplier[pageStoreCount]; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 9f05f462609..cc77021a352 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -67,14 +67,11 @@ public class ParquetTableLocation extends AbstractTableLocation { private Map parquetColumnNameToPath; private TableInfo tableInfo; - - // Following are initialized on first access, so should only be accessed via their getters private Map groupingColumns; private Map columnTypes; private List sortingColumns; - private String version; - private volatile RowGroupReader[] rowGroupReaders; + private RowGroupReader[] rowGroupReaders; // ----------------------------------------------------------------------- public ParquetTableLocation(@NotNull final TableKey tableKey, @@ -128,15 +125,23 @@ private void initialize() { tableInfo = ParquetSchemaReader .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) .orElse(TableInfo.builder().build()); + groupingColumns = tableInfo.groupingColumnMap(); + columnTypes = tableInfo.columnTypeMap(); + sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - isInitialized = true; + rowGroupReaders = IntStream.of(rowGroupIndices) + .mapToObj(idx -> parquetFileReader.getRowGroup(idx, tableInfo.version())) + .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) + .toArray(RowGroupReader[]::new); if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { // We do not have the last modified time for non-file URIs - handleUpdate(computeIndex(rowGroups), TableLocationState.NULL_TIME); + handleUpdateInternal(computeIndex(rowGroups), TableLocationState.NULL_TIME); } else { - handleUpdate(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified()); + handleUpdateInternal(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified()); } + + isInitialized = true; } } @@ -163,58 +168,21 @@ RegionedPageStore.Parameters getRegionParameters() { public Map getColumnTypes() { initialize(); - if (columnTypes == null) { - columnTypes = Collections.unmodifiableMap(tableInfo.columnTypeMap()); - } return columnTypes; } RowGroupReader[] getRowGroupReaders() { - RowGroupReader[] local; - if ((local = rowGroupReaders) != null) { - return local; - } - synchronized (this) { - if ((local = rowGroupReaders) != null) { - return local; - } - initialize(); - local = IntStream.of(rowGroupIndices) - .mapToObj(idx -> parquetFileReader.getRowGroup(idx, getVersion())) - .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) - .toArray(RowGroupReader[]::new); - rowGroupReaders = local; - return local; - } + initialize(); + return rowGroupReaders; } @Override @NotNull public List getSortedColumns() { initialize(); - if (sortingColumns == null) { - sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - } return sortingColumns; } - @NotNull - private Map getGroupingColumns() { - initialize(); - if (groupingColumns == null) { - groupingColumns = Collections.unmodifiableMap(tableInfo.groupingColumnMap()); - } - return groupingColumns; - } - - private String getVersion() { - initialize(); - if (version == null) { - version = tableInfo.version(); - } - return version; - } - @NotNull Map getParquetColumnNameToPath() { initialize(); @@ -254,7 +222,7 @@ private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { public List getDataIndexColumns() { initialize(); final List dataIndexes = tableInfo.dataIndexes(); - final Map localGroupingColumns = getGroupingColumns(); + final Map localGroupingColumns = groupingColumns; if (dataIndexes.isEmpty() && localGroupingColumns.isEmpty()) { return List.of(); } @@ -270,7 +238,7 @@ public List getDataIndexColumns() { public boolean hasDataIndex(@NotNull final String... columns) { initialize(); // Check if the column name matches any of the grouping columns - if (columns.length == 1 && getGroupingColumns().containsKey(columns[0])) { + if (columns.length == 1 && groupingColumns.containsKey(columns[0])) { // Validate the index file exists (without loading and parsing it) final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); @@ -346,7 +314,7 @@ private IndexFileMetadata getIndexFileMetadata( @NotNull final String... keyColumnNames) { if (keyColumnNames.length == 1) { // If there's only one key column, there might be (legacy) grouping info - final GroupingColumnInfo groupingColumnInfo = getGroupingColumns().get(keyColumnNames[0]); + final GroupingColumnInfo groupingColumnInfo = groupingColumns.get(keyColumnNames[0]); if (groupingColumnInfo != null) { return new IndexFileMetadata( makeRelativeURI(parentFileURI, groupingColumnInfo.groupingTablePath()), From 1c8d7d041a34f4873e993b9fcc60d0bb6d9bf205 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 21:43:02 +0100 Subject: [PATCH 15/18] Review contd. --- .../parquet/table/location/ParquetColumnLocation.java | 6 +++--- .../parquet/table/location/ParquetTableLocation.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index beadf764910..c085730ba26 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -69,10 +69,10 @@ final class ParquetColumnLocation extends AbstractColumnLoc // Access to following variables must be guarded by initializeReaders() // ----------------------------------------------------------------------- /** - * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to - * ensure visibility of the derived fields. We delay initializing this field till we need to read the column data. + * Factory object needed for deferred initialization of the remaining fields. We delay initializing this field + * itself till we need to read the column data. */ - private volatile ColumnChunkReader[] columnChunkReaders; + private ColumnChunkReader[] columnChunkReaders; /** * Whether the column location actually exists. diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index cc77021a352..198c02857e6 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -61,7 +61,6 @@ public class ParquetTableLocation extends AbstractTableLocation { // Access to all the following variables must be guarded by initialize() // ----------------------------------------------------------------------- private ParquetFileReader parquetFileReader; - private int[] rowGroupIndices; private RegionedPageStore.Parameters regionParameters; private Map parquetColumnNameToPath; @@ -92,6 +91,7 @@ private void initialize() { } final ParquetMetadata parquetMetadata; final ParquetTableLocationKey tableLocationKey = getParquetKey(); + final int[] rowGroupIndices; synchronized (tableLocationKey) { // Following methods are internally synchronized, we synchronize them together here to minimize // lock/unlock calls @@ -190,7 +190,7 @@ Map getParquetColumnNameToPath() { } @Override - public final void initializeState() { + protected final void initializeState() { initialize(); } From 386038d2d9abca38243de7c1d51bd9ce04a9672c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 22:44:18 +0100 Subject: [PATCH 16/18] More review comments --- .../table/location/ParquetTableLocation.java | 10 +++--- .../table/ParquetTableReadWriteTest.java | 34 ++++++++----------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 198c02857e6..87c4e9b11bf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -221,14 +221,16 @@ private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { @NotNull public List getDataIndexColumns() { initialize(); - final List dataIndexes = tableInfo.dataIndexes(); final Map localGroupingColumns = groupingColumns; - if (dataIndexes.isEmpty() && localGroupingColumns.isEmpty()) { + if (tableInfo.dataIndexes().isEmpty() && localGroupingColumns.isEmpty()) { return List.of(); } - final List dataIndexColumns = new ArrayList<>(dataIndexes.size() + localGroupingColumns.size()); + final List dataIndexColumns = + new ArrayList<>(tableInfo.dataIndexes().size() + localGroupingColumns.size()); // Add the data indexes to the list - dataIndexes.stream().map(di -> di.columns().toArray(String[]::new)).forEach(dataIndexColumns::add); + tableInfo.dataIndexes().stream() + .map(di -> di.columns().toArray(String[]::new)) + .forEach(dataIndexColumns::add); // Add grouping columns to the list localGroupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); return dataIndexColumns; diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index a9a0deccd31..a180a46855f 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3665,23 +3665,18 @@ private static void verifyMakeHandleException(final Runnable throwingRunnable) { } } - private static void makeNewTableLocationAndVerifyNoMakeHandleException( + private static void makeNewTableLocationAndVerifyNoException( final Consumer parquetTableLocationConsumer) { final File dest = new File(rootFile, "real.parquet"); final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + DataIndexer.getOrCreateDataIndex(table, "A"); writeTable(table, dest.getPath()); final ParquetTableLocationKey newTableLocationKey = new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY); final ParquetTableLocation newTableLocation = new ParquetTableLocation(StandaloneTableKey.getInstance(), newTableLocationKey, EMPTY); - try { - parquetTableLocationConsumer.accept(newTableLocation); - } catch (final Exception e) { - if (e instanceof UncheckedIOException && e.getMessage().contains("makeHandle encountered exception")) { - fail("Unexpected exception: " + e); - } - } + parquetTableLocationConsumer.accept(newTableLocation); dest.delete(); } @@ -3714,32 +3709,33 @@ public void testTableLocationReading() { // Verify that all the following operations will fail when the file does not exist and pass when it does // APIs from TableLocation verifyMakeHandleException(nonExistentTableLocation::getDataIndexColumns); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getDataIndexColumns); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getDataIndexColumns); verifyMakeHandleException(nonExistentTableLocation::getSortedColumns); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSortedColumns); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getSortedColumns); verifyMakeHandleException(nonExistentTableLocation::getColumnTypes); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getColumnTypes); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getColumnTypes); + verifyMakeHandleException(nonExistentTableLocation::hasDataIndex); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::hasDataIndex); + + // Assuming here there will be an index on column "A" verifyMakeHandleException(nonExistentTableLocation::getDataIndex); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getDataIndex); + makeNewTableLocationAndVerifyNoException(tableLocation -> tableLocation.getDataIndex("A")); verifyMakeHandleException(nonExistentTableLocation::loadDataIndex); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::loadDataIndex); - - verifyMakeHandleException(nonExistentTableLocation::hasDataIndex); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::hasDataIndex); + makeNewTableLocationAndVerifyNoException(tableLocation -> tableLocation.loadDataIndex("A")); // APIs from TableLocationState verifyMakeHandleException(nonExistentTableLocation::getRowSet); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getRowSet); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getRowSet); verifyMakeHandleException(nonExistentTableLocation::getSize); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getSize); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getSize); verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis); - makeNewTableLocationAndVerifyNoMakeHandleException(ParquetTableLocation::getLastModifiedTimeMillis); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getLastModifiedTimeMillis); verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); From 9dce33d6051ec2ade0942ebc1a3efe8dd2668419 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 22:47:06 +0100 Subject: [PATCH 17/18] Added a comment --- .../io/deephaven/parquet/table/ParquetTableReadWriteTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index a180a46855f..f4bc2d3ffcf 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -3676,6 +3676,8 @@ private static void makeNewTableLocationAndVerifyNoException( new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY); final ParquetTableLocation newTableLocation = new ParquetTableLocation(StandaloneTableKey.getInstance(), newTableLocationKey, EMPTY); + + // The following operations should not throw exceptions parquetTableLocationConsumer.accept(newTableLocation); dest.delete(); } From cdd2313d12321f153baac1629824fd099d6c4074 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 30 Jan 2025 23:19:30 +0100 Subject: [PATCH 18/18] Review contd. --- .../parquet/table/location/ParquetTableLocation.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 87c4e9b11bf..ee2fe802710 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -221,18 +221,17 @@ private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { @NotNull public List getDataIndexColumns() { initialize(); - final Map localGroupingColumns = groupingColumns; - if (tableInfo.dataIndexes().isEmpty() && localGroupingColumns.isEmpty()) { + if (tableInfo.dataIndexes().isEmpty() && groupingColumns.isEmpty()) { return List.of(); } final List dataIndexColumns = - new ArrayList<>(tableInfo.dataIndexes().size() + localGroupingColumns.size()); + new ArrayList<>(tableInfo.dataIndexes().size() + groupingColumns.size()); // Add the data indexes to the list tableInfo.dataIndexes().stream() .map(di -> di.columns().toArray(String[]::new)) .forEach(dataIndexColumns::add); // Add grouping columns to the list - localGroupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); + groupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); return dataIndexColumns; }