Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7730] HoodieStorage.openSeekable should not have wrapStream param #12774

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private static HFileReader createReader(String hFilePath, HoodieStorage storage)
LOG.info("Opening HFile for reading :" + hFilePath);
StoragePath path = new StoragePath(hFilePath);
long fileSize = storage.getPathInfo(path).getLength();
SeekableDataInputStream stream = storage.openSeekable(path, false);
SeekableDataInputStream stream = storage.openSeekable(path);
return new HFileReaderImpl(stream, fileSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ public static SeekableDataInputStream getDataInputStream(HoodieStorage storage,
HoodieLogFile logFile,
int bufferSize) {
try {
return storage.openSeekable(logFile.getPath(), bufferSize, true);
return storage.openSeekable(logFile.getPath(), bufferSize);
} catch (IOException e) {
throw new HoodieIOException("Unable to get seekable input stream for " + logFile, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private HFileReader newHFileReader() throws IOException {
long fileSize;
if (path.isPresent()) {
fileSize = storage.getPathInfo(path.get()).getLength();
inputStream = storage.openSeekable(path.get(), false);
inputStream = storage.openSeekable(path.get());
} else {
fileSize = bytesContent.get().length;
inputStream = new ByteArraySeekableDataInputStream(new ByteBufferBackedInputStream(bytesContent.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
Expand Down Expand Up @@ -247,6 +248,10 @@ public static FSDataInputStream getFSDataInputStream(FileSystem fs,
return fsDataInputStream;
}

public static SeekableDataInputStream createHadoopSeekableStream(FileSystem fs, StoragePath filePath, int bufferSize, boolean wrapStream) {
return new HadoopSeekableDataInputStream(HadoopFSUtils.getFSDataInputStream(fs, filePath, bufferSize, wrapStream));
}

/**
* GCS FileSystem needs some special handling for seek and hence this method assists to fetch the right {@link FSDataInputStream} to be
* used by wrapping with required input streams.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.hadoop.fs.HoodieRetryWrapperFileSystem;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.io.SeekableDataInputStream;
Expand Down Expand Up @@ -150,9 +149,13 @@ public InputStream open(StoragePath path) throws IOException {
}

@Override
public SeekableDataInputStream openSeekable(StoragePath path, int bufferSize, boolean wrapStream) throws IOException {
return new HadoopSeekableDataInputStream(
HadoopFSUtils.getFSDataInputStream(fs, path, bufferSize, wrapStream));
public SeekableDataInputStream openSeekable(StoragePath path, int bufferSize) throws IOException {
return HadoopFSUtils.createHadoopSeekableStream(fs, path, bufferSize, true);
}

@Override
public SeekableDataInputStream openSeekable(StoragePath path) throws IOException {
return HadoopFSUtils.createHadoopSeekableStream(fs, path, getDefaultBufferSize(), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ public abstract HoodieStorage newInstance(StoragePath path,
*
* @param path the file to open.
* @param bufferSize buffer size to use.
* @param wrapStream true if we want to wrap the inputstream based on filesystem specific criteria
* @return the InputStream to read from.
* @throws IOException IO error.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public abstract SeekableDataInputStream openSeekable(StoragePath path, int bufferSize, boolean wrapStream) throws IOException;
public abstract SeekableDataInputStream openSeekable(StoragePath path, int bufferSize) throws IOException;

/**
* Appends to an existing file (optional operation).
Expand Down Expand Up @@ -426,14 +425,11 @@ public boolean createNewFile(StoragePath path) throws IOException {
* Opens an SeekableDataInputStream at the indicated path with seeks supported.
*
* @param path the file to open.
* @param wrapStream true if we want to wrap the inputstream based on filesystem specific criteria
* @return the InputStream to read from.
* @throws IOException IO error.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public SeekableDataInputStream openSeekable(StoragePath path, boolean wrapStream) throws IOException {
return openSeekable(path, getDefaultBlockSize(path), wrapStream);
}
public abstract SeekableDataInputStream openSeekable(StoragePath path) throws IOException;

/**
* Lists the file info of the direct files/directories in the given list of paths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ public void testSeekable() throws IOException {
stream.flush();
}

try (SeekableDataInputStream seekableStream = storage.openSeekable(path, true)) {
try (SeekableDataInputStream seekableStream = storage.openSeekable(path)) {
validateSeekableDataInputStream(seekableStream, data);
}

try (SeekableDataInputStream seekableStream = storage.openSeekable(path, 2, true)) {
try (SeekableDataInputStream seekableStream = storage.openSeekable(path, 2)) {
validateSeekableDataInputStream(seekableStream, data);
}
}
Expand Down
Loading