Skip to content

Commit

Permalink
Test server Nexus endpoint operator apis (#2162)
Browse files Browse the repository at this point in the history
* Bump API version to v1.36.0

* Nexus endpoint test server CRUD API implementation

* cleanup

* functional tests

* test operator service external setup

* test environment setup

* test environment setup

* skip functional tests with external server
  • Loading branch information
pdoerner authored Aug 5, 2024
1 parent 5d22bb5 commit 4871168
Show file tree
Hide file tree
Showing 8 changed files with 732 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.testservice;

import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.nexus.v1.EndpointSpec;
import java.io.Closeable;
import java.util.List;

public interface TestNexusEndpointStore extends Closeable {

Endpoint createEndpoint(EndpointSpec spec);

Endpoint updateEndpoint(String id, long version, EndpointSpec spec);

void deleteEndpoint(String id, long version);

Endpoint getEndpoint(String id);

List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name);

void validateEndpointSpec(EndpointSpec spec);

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.testservice;

import io.grpc.Status;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.nexus.v1.EndpointSpec;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for
* use with the test server. Because conflict resolution is not required, there is no handling for
* created or updated timestamps.
*/
public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore {

private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");

private final SortedMap<String, Endpoint> endpoints = new ConcurrentSkipListMap<>();
private final Set<String> endpointNames = new HashSet<>();

@Override
public Endpoint createEndpoint(EndpointSpec spec) {
validateEndpointSpec(spec);

if (!endpointNames.add(spec.getName())) {
throw Status.ALREADY_EXISTS
.withDescription("Nexus endpoint already registered with name: " + spec.getName())
.asRuntimeException();
}

String id = UUID.randomUUID().toString();
Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build();

if (endpoints.putIfAbsent(id, endpoint) != null) {
// This should never happen in practice
throw Status.ALREADY_EXISTS
.withDescription("Nexus endpoint already exists with ID: " + id)
.asRuntimeException();
}

return endpoint;
}

@Override
public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) {
validateEndpointSpec(spec);

Endpoint prev = endpoints.get(id);

if (prev == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}

if (prev.getVersion() != version) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Error updating Nexus endpoint: version mismatch."
+ " Expected: "
+ prev.getVersion()
+ " Received: "
+ version)
.asRuntimeException();
}

if (!prev.getSpec().getName().equals(spec.getName()) && !endpointNames.add(spec.getName())) {
throw Status.ALREADY_EXISTS
.withDescription(
"Error updating Nexus endpoint: "
+ "endpoint already registered with updated name: "
+ spec.getName())
.asRuntimeException();
} else {
endpointNames.remove(prev.getSpec().getName());
}

Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build();

endpoints.put(id, updated);
return updated;
}

@Override
public void deleteEndpoint(String id, long version) {
Endpoint existing = endpoints.get(id);

if (existing == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}

if (existing.getVersion() != version) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Error deleting Nexus endpoint: version mismatch."
+ " Expected "
+ existing.getVersion()
+ " Received: "
+ version)
.asRuntimeException();
}

endpoints.remove(id);
}

@Override
public Endpoint getEndpoint(String id) {
Endpoint endpoint = endpoints.get(id);
if (endpoint == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}
return endpoint;
}

@Override
public List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name) {
if (name != null && !name.isEmpty()) {
return endpoints.values().stream()
.filter(ep -> ep.getSpec().getName().equals(name))
.limit(1)
.collect(Collectors.toList());
}

if (nextPageToken.length > 0) {
return endpoints.tailMap(new String(nextPageToken)).values().stream()
.skip(1)
.limit(pageSize)
.collect(Collectors.toList());
}
return endpoints.values().stream().limit(pageSize).collect(Collectors.toList());
}

@Override
public void validateEndpointSpec(EndpointSpec spec) {
if (spec.getName().isEmpty()) {
throw Status.INVALID_ARGUMENT
.withDescription("Nexus endpoint name cannot be empty")
.asRuntimeException();
}
if (!ENDPOINT_NAME_REGEX.matcher(spec.getName()).matches()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Nexus endpoint name ("
+ spec.getName()
+ ") does not match expected pattern: "
+ ENDPOINT_NAME_REGEX.pattern())
.asRuntimeException();
}
if (!spec.hasTarget()) {
throw Status.INVALID_ARGUMENT
.withDescription("Nexus endpoint spec must have a target")
.asRuntimeException();
}
if (!spec.getTarget().hasWorker()) {
throw Status.INVALID_ARGUMENT
.withDescription("Test server only supports Nexus endpoints with worker targets")
.asRuntimeException();
}
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

package io.temporal.internal.testservice;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.temporal.api.enums.v1.IndexedValueType;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.operatorservice.v1.*;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,9 +43,12 @@ final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplB
private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class);

private final TestVisibilityStore visibilityStore;
private final TestNexusEndpointStore nexusEndpointStore;

public TestOperatorService(TestVisibilityStore visibilityStore) {
public TestOperatorService(
TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) {
this.visibilityStore = visibilityStore;
this.nexusEndpointStore = nexusEndpointStore;
}

@Override
Expand Down Expand Up @@ -93,6 +99,84 @@ public void removeSearchAttributes(
}
}

@Override
public void getNexusEndpoint(
GetNexusEndpointRequest request, StreamObserver<GetNexusEndpointResponse> responseObserver) {
try {
Endpoint endpoint = nexusEndpointStore.getEndpoint(request.getId());
responseObserver.onNext(GetNexusEndpointResponse.newBuilder().setEndpoint(endpoint).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void createNexusEndpoint(
CreateNexusEndpointRequest request,
StreamObserver<CreateNexusEndpointResponse> responseObserver) {
try {
Endpoint created = nexusEndpointStore.createEndpoint(request.getSpec());
responseObserver.onNext(
CreateNexusEndpointResponse.newBuilder().setEndpoint(created).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void updateNexusEndpoint(
UpdateNexusEndpointRequest request,
StreamObserver<UpdateNexusEndpointResponse> responseObserver) {
try {
Endpoint updated =
nexusEndpointStore.updateEndpoint(
request.getId(), request.getVersion(), request.getSpec());
responseObserver.onNext(
UpdateNexusEndpointResponse.newBuilder().setEndpoint(updated).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void deleteNexusEndpoint(
DeleteNexusEndpointRequest request,
StreamObserver<DeleteNexusEndpointResponse> responseObserver) {
try {
nexusEndpointStore.deleteEndpoint(request.getId(), request.getVersion());
responseObserver.onNext(DeleteNexusEndpointResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void listNexusEndpoints(
ListNexusEndpointsRequest request,
StreamObserver<ListNexusEndpointsResponse> responseObserver) {
try {
List<Endpoint> endpoints =
nexusEndpointStore.listEndpoints(
request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName());
ByteString nextPageToken =
(!endpoints.isEmpty() && endpoints.size() == request.getPageSize())
? endpoints.get(endpoints.size() - 1).getIdBytes()
: ByteString.empty();
responseObserver.onNext(
ListNexusEndpointsResponse.newBuilder()
.addAllEndpoints(endpoints)
.setNextPageToken(nextPageToken)
.build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

private void handleStatusRuntimeException(
StatusRuntimeException e, StreamObserver<?> responseObserver) {
if (e.getStatus().getCode() == Status.Code.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class TestServicesStarter implements Closeable {
private final SelfAdvancingTimerImpl selfAdvancingTimer;
private final TestVisibilityStore visibilityStore = new TestVisibilityStoreImpl();
private final TestNexusEndpointStore nexusEndpointStore = new TestNexusEndpointStoreImpl();
private final TestWorkflowStore workflowStore;
private final TestOperatorService operatorService;
private final TestWorkflowService workflowService;
Expand All @@ -46,7 +47,7 @@ public TestServicesStarter(boolean lockTimeSkipping, long initialTimeMillis) {
this.selfAdvancingTimer =
new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
this.workflowStore = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
this.operatorService = new TestOperatorService(this.visibilityStore);
this.operatorService = new TestOperatorService(this.visibilityStore, this.nexusEndpointStore);
this.testService =
new TestService(this.workflowStore, this.selfAdvancingTimer, lockTimeSkipping);
this.workflowService =
Expand Down
Loading

0 comments on commit 4871168

Please sign in to comment.