-
Notifications
You must be signed in to change notification settings - Fork 96
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[2.x] Adding replication (CCR) plugin interface and classes (#789)
- Loading branch information
1 parent
72e03b0
commit 81fce39
Showing
5 changed files
with
258 additions
and
0 deletions.
There are no files selected for viewing
68 changes: 68 additions & 0 deletions
68
src/main/kotlin/org/opensearch/commons/replication/ReplicationPluginInterface.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication | ||
|
||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.client.Client | ||
import org.opensearch.client.node.NodeClient | ||
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.commons.utils.recreateObject | ||
import org.opensearch.core.action.ActionListener | ||
import org.opensearch.core.action.ActionResponse | ||
import org.opensearch.core.common.io.stream.Writeable | ||
|
||
/** | ||
* Transport action plugin interfaces for the cross-cluster-replication plugin. | ||
*/ | ||
open class ReplicationPluginInterface { | ||
|
||
/** | ||
* Stop replication. | ||
* @param client Node client for making transport action | ||
* @param request The request object | ||
* @param listener The listener for getting response | ||
*/ | ||
|
||
open fun stopReplication( | ||
client: Client, | ||
request: StopIndexReplicationRequest, | ||
listener: ActionListener<AcknowledgedResponse> | ||
) { | ||
val nodeClient = client as NodeClient | ||
return nodeClient.execute( | ||
INTERNAL_STOP_REPLICATION_ACTION_TYPE, | ||
request, | ||
wrapActionListener(listener) { response -> | ||
recreateObject(response) { | ||
AcknowledgedResponse(it) | ||
} | ||
} | ||
) | ||
} | ||
|
||
/** | ||
* Wrap action listener on concrete response class by a new created one on ActionResponse. | ||
* This is required because the response may be loaded by different classloader across plugins. | ||
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate | ||
* the response object. | ||
*/ | ||
@Suppress("UNCHECKED_CAST") | ||
private fun <Response : AcknowledgedResponse> wrapActionListener( | ||
listener: ActionListener<Response>, | ||
recreate: (Writeable) -> Response | ||
): ActionListener<Response> { | ||
return object : ActionListener<ActionResponse> { | ||
override fun onResponse(response: ActionResponse) { | ||
val recreated = response as? Response ?: recreate(response) | ||
listener.onResponse(recreated) | ||
} | ||
|
||
override fun onFailure(exception: java.lang.Exception) { | ||
listener.onFailure(exception) | ||
} | ||
} as ActionListener<Response> | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
src/main/kotlin/org/opensearch/commons/replication/action/ReplicationActions.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.opensearch.action.ActionType | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
|
||
/** | ||
* Information related to the transport stop replication action for the Replication plugin | ||
*/ | ||
object ReplicationActions { | ||
|
||
/** | ||
* Action names for stopping replication | ||
* STOP_REPLICATION_ACTION_NAME: action used for _replication/_stop REST API | ||
* INTERNAL_STOP_REPLICATION_ACTION_NAME: (Internal only) Used by Index Management plugin to invoke stop replication | ||
*/ | ||
const val STOP_REPLICATION_ACTION_NAME = "indices:admin/plugins/replication/index/stop" | ||
const val INTERNAL_STOP_REPLICATION_ACTION_NAME = "indices:internal/plugins/replication/index/stop" | ||
|
||
/** | ||
* Stop replication transport action types | ||
*/ | ||
val STOP_REPLICATION_ACTION_TYPE = | ||
ActionType(STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) | ||
val INTERNAL_STOP_REPLICATION_ACTION_TYPE = | ||
ActionType(INTERNAL_STOP_REPLICATION_ACTION_NAME, ::AcknowledgedResponse) | ||
} |
70 changes: 70 additions & 0 deletions
70
src/main/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.opensearch.action.ActionRequestValidationException | ||
import org.opensearch.action.IndicesRequest | ||
import org.opensearch.action.support.IndicesOptions | ||
import org.opensearch.action.support.master.AcknowledgedRequest | ||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.common.io.stream.StreamOutput | ||
import org.opensearch.core.xcontent.ObjectParser | ||
import org.opensearch.core.xcontent.ToXContent | ||
import org.opensearch.core.xcontent.ToXContentObject | ||
import org.opensearch.core.xcontent.XContentBuilder | ||
import org.opensearch.core.xcontent.XContentParser | ||
|
||
class StopIndexReplicationRequest : | ||
AcknowledgedRequest<StopIndexReplicationRequest>, IndicesRequest.Replaceable, ToXContentObject { | ||
lateinit var indexName: String | ||
constructor(indexName: String) { | ||
this.indexName = indexName | ||
} | ||
|
||
private constructor() { | ||
} | ||
|
||
constructor(inp: StreamInput) : super(inp) { | ||
indexName = inp.readString() | ||
} | ||
companion object { | ||
private val PARSER = ObjectParser<StopIndexReplicationRequest, Void>("StopReplicationRequestParser") { | ||
StopIndexReplicationRequest() | ||
} | ||
|
||
fun fromXContent(parser: XContentParser, followerIndex: String): StopIndexReplicationRequest { | ||
val stopIndexReplicationRequest = PARSER.parse(parser, null) | ||
stopIndexReplicationRequest.indexName = followerIndex | ||
return stopIndexReplicationRequest | ||
} | ||
} | ||
|
||
override fun validate(): ActionRequestValidationException? { | ||
return null | ||
} | ||
|
||
override fun indices(vararg indices: String?): IndicesRequest { | ||
return this | ||
} | ||
override fun indices(): Array<String> { | ||
return arrayOf(indexName) | ||
} | ||
|
||
override fun indicesOptions(): IndicesOptions { | ||
return IndicesOptions.strictSingleIndexNoExpandForbidClosed() | ||
} | ||
|
||
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { | ||
builder.startObject() | ||
builder.field("indexName", indexName) | ||
builder.endObject() | ||
return builder | ||
} | ||
|
||
override fun writeTo(out: StreamOutput) { | ||
super.writeTo(out) | ||
out.writeString(indexName) | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
src/test/kotlin/org/opensearch/commons/replication/ReplicationPluginInterfaceTests.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.commons.replication | ||
|
||
import com.nhaarman.mockitokotlin2.whenever | ||
import org.junit.jupiter.api.Test | ||
import org.junit.jupiter.api.extension.ExtendWith | ||
import org.mockito.Mockito.any | ||
import org.mockito.Mockito.mock | ||
import org.mockito.Mockito.verify | ||
import org.mockito.junit.jupiter.MockitoExtension | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.client.node.NodeClient | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.core.action.ActionListener | ||
import org.opensearch.core.action.ActionResponse | ||
|
||
@ExtendWith(MockitoExtension::class) | ||
internal class ReplicationPluginInterfaceTests { | ||
|
||
@Test | ||
fun `test stopReplication successful response`() { | ||
// Mock dependencies | ||
val client: NodeClient = mock() | ||
val request: StopIndexReplicationRequest = mock() | ||
val listener: ActionListener<AcknowledgedResponse> = mock() | ||
val acknowledgedResponse = AcknowledgedResponse(true) // Successful response | ||
|
||
// Mock the behavior of NodeClient.execute() | ||
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>())) | ||
.thenAnswer { invocation -> | ||
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2) | ||
actionListener.onResponse(acknowledgedResponse) // Simulate success | ||
} | ||
|
||
val replicationPluginInterface = ReplicationPluginInterface() | ||
// Call method under test | ||
replicationPluginInterface.stopReplication(client, request, listener) | ||
// Verify that listener.onResponse is called with the correct response | ||
verify(listener).onResponse(acknowledgedResponse) | ||
} | ||
|
||
@Test | ||
fun `test stopReplication failure response`() { | ||
// Mock dependencies | ||
val client: NodeClient = mock() | ||
val request: StopIndexReplicationRequest = mock() | ||
val listener: ActionListener<AcknowledgedResponse> = mock() | ||
val exception = Exception("Test failure") | ||
|
||
// Mock the behavior of NodeClient.execute() | ||
whenever(client.execute(any(), any(), any<ActionListener<ActionResponse>>())) | ||
.thenAnswer { invocation -> | ||
val actionListener = invocation.getArgument<ActionListener<ActionResponse>>(2) | ||
actionListener.onFailure(exception) // Simulate failure | ||
} | ||
|
||
val replicationPluginInterface = ReplicationPluginInterface() | ||
// Call method under test | ||
replicationPluginInterface.stopReplication(client, request, listener) | ||
// Verify that listener.onResponse is called with the correct response | ||
verify(listener).onFailure(exception) | ||
} | ||
} |
23 changes: 23 additions & 0 deletions
23
...test/kotlin/org/opensearch/commons/replication/action/StopIndexReplicationRequestTests.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
package org.opensearch.commons.replication.action | ||
|
||
import org.junit.jupiter.api.Assertions.assertEquals | ||
import org.junit.jupiter.api.Assertions.assertNotNull | ||
import org.junit.jupiter.api.Assertions.assertNull | ||
import org.junit.jupiter.api.Test | ||
import org.opensearch.commons.utils.recreateObject | ||
|
||
internal class StopIndexReplicationRequestTests { | ||
@Test | ||
fun `Stop Replication request serialize and deserialize transport object should be equal`() { | ||
val index = "test-idx" | ||
val request = StopIndexReplicationRequest(index) | ||
val recreatedRequest = recreateObject(request) { StopIndexReplicationRequest(it) } | ||
assertNotNull(recreatedRequest) | ||
assertEquals(request.indexName, recreatedRequest.indexName) | ||
assertNull(recreatedRequest.validate()) | ||
} | ||
} |