Skip to content

Commit

Permalink
Writable warm replica replication/recovery (opensearch-project#17390)
Browse files Browse the repository at this point in the history
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
skumawat2025 authored Mar 7, 2025
1 parent 7388205 commit cb869c0
Show file tree
Hide file tree
Showing 13 changed files with 1,773 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

import java.nio.file.Path;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;

/**
* This class runs Segment Replication Integ test suite with remote store enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ private IndexMetadata.Builder updateInSyncAllocations(
allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID;
} else {
assert (recoverySource instanceof RecoverySource.SnapshotRecoverySource
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource) : recoverySource;
|| recoverySource instanceof RecoverySource.RemoteStoreRecoverySource
|| recoverySource instanceof RecoverySource.ExistingStoreRecoverySource) : recoverySource;
allocationId = updates.initializedPrimary.allocationId().getId();
}
// forcing a stale primary resets the in-sync allocations to the singleton set with the stale id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Queue;
import java.util.Set;

import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;

/**
* A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations
* for remote shards within the cluster.
Expand Down Expand Up @@ -345,7 +347,8 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false) {
if (RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType()) == false
&& isPartialIndex(allocation.metadata().getIndexSafe(shard.index())) == false) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ public boolean shouldPeriodicallyFlush() {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
// Skip flushing for indices with partial locality (warm indices)
// For these indices, we don't need to commit as we will sync from the remote store on re-open
if (engineConfig.getIndexSettings().isStoreLocalityPartial()) {
return;
}
// readLock is held here to wait/block any concurrent close that acquires the writeLock.
try (final ReleasableLock lock = readLock.acquire()) {
ensureOpen();
Expand Down Expand Up @@ -442,7 +447,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
latestSegmentInfos.changed();
}
try {
commitSegmentInfos(latestSegmentInfos);
if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
commitSegmentInfos(latestSegmentInfos);
}
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5142,7 +5142,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
if (indexSettings.isStoreLocalityPartial() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
Expand All @@ -5158,7 +5160,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
|| indexSettings.isStoreLocalityPartial() : "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.lucene.index.IndexFileNames.SEGMENTS;

/**
* Composite Directory will contain both local and remote directory
Expand Down Expand Up @@ -74,12 +78,37 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
);
}

/**
* Returns names of all files stored in local directory
* @throws IOException in case of I/O error
*/
private String[] listLocalFiles() throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: listLocalOnly() called", this::toString);
return localDirectory.listAll();
}

/**
* Returns a list of names of all block files stored in the local directory for a given file,
* including the original file itself if present.
*
* @param fileName The name of the file to search for, along with its associated block files.
* @return A list of file names, including the original file (if present) and all its block files.
* @throws IOException in case of I/O error while listing files.
*/
private List<String> listBlockFiles(String fileName) throws IOException {
return Stream.of(listLocalFiles())
.filter(file -> file.equals(fileName) || file.startsWith(fileName + FileTypeUtils.BLOCK_FILE_IDENTIFIER))
.collect(Collectors.toList());
}

/**
* Returns names of all files stored in this directory in sorted order
* Does not include locally stored block files (having _block_ in their names) and files pending deletion
*
* @throws IOException in case of I/O error
*/
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17527
@Override
public String[] listAll() throws IOException {
ensureOpen();
Expand All @@ -105,6 +134,7 @@ public String[] listAll() throws IOException {
* Currently deleting only from local directory as files from remote should not be deleted as that is taken care by garbage collection logic of remote directory
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException when file does not exist in the directory
*/
@Override
public void deleteFile(String name) throws IOException {
Expand All @@ -115,7 +145,21 @@ public void deleteFile(String name) throws IOException {
} else if (Arrays.asList(listAll()).contains(name) == false) {
throw new NoSuchFileException("File " + name + " not found in directory");
} else {
fileCache.remove(getFilePath(name));
List<String> blockFiles = listBlockFiles(name);
if (blockFiles.isEmpty()) {
// Remove this condition when this issue is addressed.
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17526
logger.debug("The file [{}] or its block files do not exist in local directory", name);
} else {
for (String blockFile : blockFiles) {
if (fileCache.get(getFilePath(blockFile)) == null) {
logger.debug("The file [{}] exists in local but not part of FileCache, deleting it from local", blockFile);
localDirectory.deleteFile(blockFile);
} else {
fileCache.remove(getFilePath(blockFile));
}
}
}
}
}

Expand Down Expand Up @@ -254,6 +298,15 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
public void close() throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: close() called", this::toString);
String[] localFiles = listLocalFiles();
for (String localFile : localFiles) {
// Delete segments_N file with ref count 1 created during index creation on replica shards
// TODO: https://github.com/opensearch-project/OpenSearch/issues/17534
if (localFile.startsWith(SEGMENTS)) {
fileCache.remove(getFilePath(localFile));
}
}
fileCache.prune();
localDirectory.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
@ExperimentalApi
public class FileTypeUtils {

public static String BLOCK_FILE_IDENTIFIER = "_block_";

public static boolean isTempFile(String name) {
return name.endsWith(".tmp");
}

public static boolean isBlockFile(String name) {
return name.contains("_block_");
return name.contains(BLOCK_FILE_IDENTIFIER);
}

public static boolean isExtraFSFile(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,20 @@ public CacheStats stats() {
public void logCurrentState() {
lock.lock();
try {
String allFiles = "\n";
final StringBuilder allFiles = new StringBuilder("\n");
for (Map.Entry<K, Node<K, V>> entry : data.entrySet()) {
String path = entry.getKey().toString();
String file = path.substring(path.lastIndexOf('/'));
allFiles += file + " [RefCount: " + entry.getValue().refCount + " , Weight: " + entry.getValue().weight + " ]\n";
allFiles.append(file)
.append(" [RefCount: ")
.append(entry.getValue().refCount)
.append(" , Weight: ")
.append(entry.getValue().weight)
.append(" ]\n");
}
if (allFiles.length() > 1) {
logger.trace(() -> "Cache entries : " + allFiles);
}
logger.trace("Cache entries : " + allFiles);
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ public CacheStats stats() {
public void logCurrentState() {
int i = 0;
for (RefCountedCache<K, V> cache : table) {
logger.trace("SegmentedCache " + i);
((LRUCache<K, V>) cache).logCurrentState();
if (cache.size() > 0) {
final int segmentIndex = i;
logger.trace(() -> "SegmentedCache " + segmentIndex);
((LRUCache<K, V>) cache).logCurrentState();
}
i++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -202,6 +203,12 @@ public void startReplication(ActionListener<Void> listener, BiConsumer<Replicati
private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);

// Return an empty list for warm indices, In this case, replica shards don't require downloading files from remote storage
// as replicas will sync all files from remote in case of failure.
if (indexShard.indexSettings().isStoreLocalityPartial()) {
return Collections.emptyList();
}
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
// local files
final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,16 @@ public class CompositeDirectoryTests extends BaseRemoteSegmentStoreDirectoryTest
private FSDirectory localDirectory;
private CompositeDirectory compositeDirectory;

private final static String[] LOCAL_FILES = new String[] { "_1.cfe", "_2.cfe", "_0.cfe_block_7", "_0.cfs_block_7", "temp_file.tmp" };
private final static String[] LOCAL_FILES = new String[] {
"_1.cfe",
"_1.cfe_block_0",
"_1.cfe_block_1",
"_2.cfe",
"_0.cfe_block_7",
"_0.cfs_block_7",
"temp_file.tmp" };
private final static String FILE_PRESENT_LOCALLY = "_1.cfe";
private final static String BLOCK_FILE_PRESENT_LOCALLY = "_1.cfe_block_0";
private final static String FILE_PRESENT_IN_REMOTE_ONLY = "_0.si";
private final static String NON_EXISTENT_FILE = "non_existent_file";
private final static String NEW_FILE = "new_file";
Expand All @@ -67,9 +75,11 @@ public void testListAll() throws IOException {

public void testDeleteFile() throws IOException {
assertTrue(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
assertTrue(existsInLocalDirectory(BLOCK_FILE_PRESENT_LOCALLY));
// Delete the file and assert that it no more is a part of the directory
compositeDirectory.deleteFile(FILE_PRESENT_LOCALLY);
assertFalse(existsInCompositeDirectory(FILE_PRESENT_LOCALLY));
assertFalse(existsInCompositeDirectory(BLOCK_FILE_PRESENT_LOCALLY));
// Reading deleted file from directory should result in NoSuchFileException
assertThrows(NoSuchFileException.class, () -> compositeDirectory.openInput(FILE_PRESENT_LOCALLY, IOContext.DEFAULT));
}
Expand Down

0 comments on commit cb869c0

Please sign in to comment.