From 4871168388bbb1003af22c31ee4d4f443aea37cf Mon Sep 17 00:00:00 2001 From: pdoerner <122412190+pdoerner@users.noreply.github.com> Date: Mon, 5 Aug 2024 10:36:09 -0700 Subject: [PATCH] Test server Nexus endpoint operator apis (#2162) * Bump API version to v1.36.0 * Nexus endpoint test server CRUD API implementation * cleanup * functional tests * test operator service external setup * test environment setup * test environment setup * skip functional tests with external server --- temporal-serviceclient/src/main/proto | 2 +- .../testservice/TestNexusEndpointStore.java | 44 ++ .../TestNexusEndpointStoreImpl.java | 189 +++++++++ .../testservice/TestOperatorService.java | 86 +++- .../testservice/TestServicesStarter.java | 3 +- .../functional/NexusEndpointTest.java | 400 ++++++++++++++++++ .../testing/TestWorkflowEnvironment.java | 6 + .../TestWorkflowEnvironmentInternal.java | 5 + 8 files changed, 732 insertions(+), 3 deletions(-) create mode 100644 temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java create mode 100644 temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java create mode 100644 temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 2227a14f4..39b0f69d1 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 2227a14f482ae48fc440a5e9829cf6797009d5b8 +Subproject commit 39b0f69d19b67731e1f35fd2d231f2c871091359 diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java new file mode 100644 index 000000000..dcb9b9c23 --- /dev/null +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import java.io.Closeable; +import java.util.List; + +public interface TestNexusEndpointStore extends Closeable { + + Endpoint createEndpoint(EndpointSpec spec); + + Endpoint updateEndpoint(String id, long version, EndpointSpec spec); + + void deleteEndpoint(String id, long version); + + Endpoint getEndpoint(String id); + + List listEndpoints(long pageSize, byte[] nextPageToken, String name); + + void validateEndpointSpec(EndpointSpec spec); + + @Override + void close(); +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java new file mode 100644 index 000000000..dbaa2ddf3 --- /dev/null +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import io.grpc.Status; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for + * use with the test server. Because conflict resolution is not required, there is no handling for + * created or updated timestamps. + */ +public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore { + + private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$"); + + private final SortedMap endpoints = new ConcurrentSkipListMap<>(); + private final Set endpointNames = new HashSet<>(); + + @Override + public Endpoint createEndpoint(EndpointSpec spec) { + validateEndpointSpec(spec); + + if (!endpointNames.add(spec.getName())) { + throw Status.ALREADY_EXISTS + .withDescription("Nexus endpoint already registered with name: " + spec.getName()) + .asRuntimeException(); + } + + String id = UUID.randomUUID().toString(); + Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build(); + + if (endpoints.putIfAbsent(id, endpoint) != null) { + // This should never happen in practice + throw Status.ALREADY_EXISTS + .withDescription("Nexus endpoint already exists with ID: " + id) + .asRuntimeException(); + } + + return endpoint; + } + + @Override + public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) { + validateEndpointSpec(spec); + + Endpoint prev = endpoints.get(id); + + if (prev == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + id) + .asRuntimeException(); + } + + if (prev.getVersion() != version) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Error updating Nexus endpoint: version mismatch." + + " Expected: " + + prev.getVersion() + + " Received: " + + version) + .asRuntimeException(); + } + + if (!prev.getSpec().getName().equals(spec.getName()) && !endpointNames.add(spec.getName())) { + throw Status.ALREADY_EXISTS + .withDescription( + "Error updating Nexus endpoint: " + + "endpoint already registered with updated name: " + + spec.getName()) + .asRuntimeException(); + } else { + endpointNames.remove(prev.getSpec().getName()); + } + + Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build(); + + endpoints.put(id, updated); + return updated; + } + + @Override + public void deleteEndpoint(String id, long version) { + Endpoint existing = endpoints.get(id); + + if (existing == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + id) + .asRuntimeException(); + } + + if (existing.getVersion() != version) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Error deleting Nexus endpoint: version mismatch." + + " Expected " + + existing.getVersion() + + " Received: " + + version) + .asRuntimeException(); + } + + endpoints.remove(id); + } + + @Override + public Endpoint getEndpoint(String id) { + Endpoint endpoint = endpoints.get(id); + if (endpoint == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + id) + .asRuntimeException(); + } + return endpoint; + } + + @Override + public List listEndpoints(long pageSize, byte[] nextPageToken, String name) { + if (name != null && !name.isEmpty()) { + return endpoints.values().stream() + .filter(ep -> ep.getSpec().getName().equals(name)) + .limit(1) + .collect(Collectors.toList()); + } + + if (nextPageToken.length > 0) { + return endpoints.tailMap(new String(nextPageToken)).values().stream() + .skip(1) + .limit(pageSize) + .collect(Collectors.toList()); + } + return endpoints.values().stream().limit(pageSize).collect(Collectors.toList()); + } + + @Override + public void validateEndpointSpec(EndpointSpec spec) { + if (spec.getName().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("Nexus endpoint name cannot be empty") + .asRuntimeException(); + } + if (!ENDPOINT_NAME_REGEX.matcher(spec.getName()).matches()) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Nexus endpoint name (" + + spec.getName() + + ") does not match expected pattern: " + + ENDPOINT_NAME_REGEX.pattern()) + .asRuntimeException(); + } + if (!spec.hasTarget()) { + throw Status.INVALID_ARGUMENT + .withDescription("Nexus endpoint spec must have a target") + .asRuntimeException(); + } + if (!spec.getTarget().hasWorker()) { + throw Status.INVALID_ARGUMENT + .withDescription("Test server only supports Nexus endpoints with worker targets") + .asRuntimeException(); + } + } + + @Override + public void close() {} +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java index 3dc328aac..feb0ef084 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java @@ -20,12 +20,15 @@ package io.temporal.internal.testservice; +import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.temporal.api.enums.v1.IndexedValueType; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.api.operatorservice.v1.*; import java.io.Closeable; +import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +43,12 @@ final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplB private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class); private final TestVisibilityStore visibilityStore; + private final TestNexusEndpointStore nexusEndpointStore; - public TestOperatorService(TestVisibilityStore visibilityStore) { + public TestOperatorService( + TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) { this.visibilityStore = visibilityStore; + this.nexusEndpointStore = nexusEndpointStore; } @Override @@ -93,6 +99,84 @@ public void removeSearchAttributes( } } + @Override + public void getNexusEndpoint( + GetNexusEndpointRequest request, StreamObserver responseObserver) { + try { + Endpoint endpoint = nexusEndpointStore.getEndpoint(request.getId()); + responseObserver.onNext(GetNexusEndpointResponse.newBuilder().setEndpoint(endpoint).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void createNexusEndpoint( + CreateNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + Endpoint created = nexusEndpointStore.createEndpoint(request.getSpec()); + responseObserver.onNext( + CreateNexusEndpointResponse.newBuilder().setEndpoint(created).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void updateNexusEndpoint( + UpdateNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + Endpoint updated = + nexusEndpointStore.updateEndpoint( + request.getId(), request.getVersion(), request.getSpec()); + responseObserver.onNext( + UpdateNexusEndpointResponse.newBuilder().setEndpoint(updated).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void deleteNexusEndpoint( + DeleteNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + nexusEndpointStore.deleteEndpoint(request.getId(), request.getVersion()); + responseObserver.onNext(DeleteNexusEndpointResponse.newBuilder().build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void listNexusEndpoints( + ListNexusEndpointsRequest request, + StreamObserver responseObserver) { + try { + List endpoints = + nexusEndpointStore.listEndpoints( + request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName()); + ByteString nextPageToken = + (!endpoints.isEmpty() && endpoints.size() == request.getPageSize()) + ? endpoints.get(endpoints.size() - 1).getIdBytes() + : ByteString.empty(); + responseObserver.onNext( + ListNexusEndpointsResponse.newBuilder() + .addAllEndpoints(endpoints) + .setNextPageToken(nextPageToken) + .build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + private void handleStatusRuntimeException( StatusRuntimeException e, StreamObserver responseObserver) { if (e.getStatus().getCode() == Status.Code.INTERNAL) { diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java index 1162c92da..1ade1ff7c 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java @@ -30,6 +30,7 @@ public class TestServicesStarter implements Closeable { private final SelfAdvancingTimerImpl selfAdvancingTimer; private final TestVisibilityStore visibilityStore = new TestVisibilityStoreImpl(); + private final TestNexusEndpointStore nexusEndpointStore = new TestNexusEndpointStoreImpl(); private final TestWorkflowStore workflowStore; private final TestOperatorService operatorService; private final TestWorkflowService workflowService; @@ -46,7 +47,7 @@ public TestServicesStarter(boolean lockTimeSkipping, long initialTimeMillis) { this.selfAdvancingTimer = new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone()); this.workflowStore = new TestWorkflowStoreImpl(this.selfAdvancingTimer); - this.operatorService = new TestOperatorService(this.visibilityStore); + this.operatorService = new TestOperatorService(this.visibilityStore, this.nexusEndpointStore); this.testService = new TestService(this.workflowStore, this.selfAdvancingTimer, lockTimeSkipping); this.workflowService = diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java new file mode 100644 index 000000000..0a80308e8 --- /dev/null +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java @@ -0,0 +1,400 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.testserver.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeFalse; + +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; +import io.temporal.api.operatorservice.v1.*; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.UUID; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class NexusEndpointTest { + @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build(); + + @Before + public void checkExternal() { + // TODO: remove this skip once 1.25.0 is officially released and + // https://github.com/temporalio/sdk-java/issues/2165 is resolved + assumeFalse( + "Nexus APIs are not supported for server versions < 1.25.0", + testWorkflowRule.isUseExternalService()); + } + + @Test + public void testValidateEndpointSpec() { + // Create and Update use same validation logic, so just test once + EndpointSpec.Builder specBuilder = getTestEndpointSpecBuilder("valid_name_01"); + + // Valid + Endpoint testEndpoint = createTestEndpoint(specBuilder); + assertEquals(1, testEndpoint.getVersion()); + assertEquals(specBuilder.build(), testEndpoint.getSpec()); + + // Missing name + specBuilder.setName(""); + StatusRuntimeException ex = + assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals("Nexus endpoint name cannot be empty", ex.getStatus().getDescription()); + + // Name contains invalid characters + specBuilder.setName("*(test)_- :invalid"); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Nexus endpoint name (" + + specBuilder.getName() + + ") does not match expected pattern: ^[a-zA-Z_][a-zA-Z0-9_]*$", + ex.getStatus().getDescription()); + + // Missing target + specBuilder.setName("valid_name_02"); + specBuilder.clearTarget(); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals("Nexus endpoint spec must have a target", ex.getStatus().getDescription()); + + // External target (test server only supports worker targets) + specBuilder.setTarget( + EndpointTarget.newBuilder() + .setExternal(EndpointTarget.External.newBuilder().setUrl("localhost:8080"))); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Test server only supports Nexus endpoints with worker targets", + ex.getStatus().getDescription()); + } + + @Test + public void testCreate() { + EndpointSpec.Builder specBuilder = getTestEndpointSpecBuilder("valid_create_test_endpoint"); + + // Valid create + Endpoint testEndpoint = createTestEndpoint(specBuilder); + assertEquals(1, testEndpoint.getVersion()); + assertEquals(specBuilder.build(), testEndpoint.getSpec()); + + // Name already registered + StatusRuntimeException ex = + assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.ALREADY_EXISTS, ex.getStatus().getCode()); + assertEquals( + "Nexus endpoint already registered with name: " + specBuilder.getName(), + ex.getStatus().getDescription()); + } + + @Test + public void testUpdate() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("update_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + EndpointSpec updatedSpec = + EndpointSpec.newBuilder(testEndpoint.getSpec()) + .setDescription( + Payload.newBuilder().setData(ByteString.copyFromUtf8("updated description"))) + .build(); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(missingID) + .setVersion(testEndpoint.getVersion()) + .setSpec(updatedSpec) + .build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Version mismatch + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(15) + .setSpec(updatedSpec) + .build())); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Error updating Nexus endpoint: version mismatch." + + " Expected: " + + testEndpoint.getVersion() + + " Received: " + + 15, + ex.getStatus().getDescription()); + + // Updated name already registered + EndpointSpec.Builder otherSpec = getTestEndpointSpecBuilder("other_test_endpoint"); + createTestEndpoint(otherSpec); + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .setSpec(otherSpec.build()) + .build())); + assertEquals(Status.Code.ALREADY_EXISTS, ex.getStatus().getCode()); + assertEquals( + "Error updating Nexus endpoint: " + + "endpoint already registered with updated name: " + + otherSpec.getName(), + ex.getStatus().getDescription()); + + // Valid update + UpdateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .setSpec(updatedSpec) + .build()); + assertEquals(2, resp.getEndpoint().getVersion()); + assertEquals(updatedSpec, resp.getEndpoint().getSpec()); + } + + @Test + public void testDelete() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("delete_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(missingID) + .setVersion(testEndpoint.getVersion()) + .build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Version mismatch + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(15) + .build())); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Error deleting Nexus endpoint: version mismatch." + + " Expected " + + testEndpoint.getVersion() + + " Received: " + + 15, + ex.getStatus().getDescription()); + + // Valid delete + DeleteNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .build()); + assertEquals(DeleteNexusEndpointResponse.newBuilder().build(), resp); + } + + @Test + public void testGet() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("get_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .getNexusEndpoint( + GetNexusEndpointRequest.newBuilder().setId(missingID).build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Valid get + GetNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .getNexusEndpoint( + GetNexusEndpointRequest.newBuilder().setId(testEndpoint.getId()).build()); + assertEquals(testEndpoint, resp.getEndpoint()); + } + + @Test + public void testList() { + // Setup + List testEndpoints = new ArrayList<>(3); + for (int i = 0; i < 3; i++) { + testEndpoints.add(createTestEndpoint(getTestEndpointSpecBuilder("list_test_endpoint_" + i))); + } + testEndpoints.sort(Comparator.comparing(Endpoint::getId)); + + // List with filter for non-existent name + ListNexusEndpointsResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder().setName("some_missing_name").build()); + assertEquals(0, resp.getEndpointsCount()); + + // List with filter for existing name + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder() + .setName(testEndpoints.get(1).getSpec().getName()) + .build()); + assertEquals(1, resp.getEndpointsCount()); + assertEquals(testEndpoints.get(1), resp.getEndpoints(0)); + + // List all + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints(ListNexusEndpointsRequest.newBuilder().setPageSize(10).build()); + assertEquals(testEndpoints.size(), resp.getEndpointsCount()); + assertEquals(ByteString.empty(), resp.getNextPageToken()); + assertEquals(testEndpoints, resp.getEndpointsList()); + + // List page 1 + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints(ListNexusEndpointsRequest.newBuilder().setPageSize(2).build()); + assertEquals(2, resp.getEndpointsCount()); + assertEquals(testEndpoints.get(1).getIdBytes(), resp.getNextPageToken()); + assertEquals(testEndpoints.subList(0, 2), resp.getEndpointsList()); + + // List page 2 + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder() + .setPageSize(2) + .setNextPageToken(resp.getNextPageToken()) + .build()); + assertEquals(1, resp.getEndpointsCount()); + assertEquals(ByteString.empty(), resp.getNextPageToken()); + assertEquals(testEndpoints.subList(2, testEndpoints.size()), resp.getEndpointsList()); + } + + private EndpointSpec.Builder getTestEndpointSpecBuilder(String name) { + return EndpointSpec.newBuilder() + .setName(name) + .setDescription(Payload.newBuilder().setData(ByteString.copyFromUtf8("test endpoint"))) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskQueue(testWorkflowRule.getTaskQueue()))); + } + + private Endpoint createTestEndpoint(EndpointSpec.Builder spec) { + CreateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .createNexusEndpoint(CreateNexusEndpointRequest.newBuilder().setSpec(spec).build()); + return resp.getEndpoint(); + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java index 7bffd57a7..b03e21cd4 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java @@ -24,6 +24,7 @@ import io.temporal.api.enums.v1.IndexedValueType; import io.temporal.client.WorkflowClient; import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.serviceclient.OperatorServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; @@ -170,6 +171,11 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) { */ WorkflowServiceStubs getWorkflowServiceStubs(); + /** + * @return {@link io.temporal.serviceclient.OperatorServiceStubs} connected to the test server + */ + OperatorServiceStubs getOperatorServiceStubs(); + String getNamespace(); /** diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index fdc4b964f..0c0868584 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -229,6 +229,11 @@ public WorkflowServiceStubs getWorkflowServiceStubs() { return workflowServiceStubs; } + @Override + public OperatorServiceStubs getOperatorServiceStubs() { + return operatorServiceStubs; + } + @Override public String getNamespace() { return workflowClientOptions.getNamespace();