diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/BodyIsExceptionMessage.java b/zipkin-server/src/main/java/zipkin2/server/internal/BodyIsExceptionMessage.java index 2934b11bb1..db1860bc02 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/BodyIsExceptionMessage.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/BodyIsExceptionMessage.java @@ -11,6 +11,7 @@ import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import zipkin2.internal.ClosedComponentException; import static com.linecorp.armeria.common.HttpStatus.BAD_REQUEST; import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; @@ -29,7 +30,10 @@ public HttpResponse handleException(ServiceRequestContext ctx, HttpRequest req, if (cause instanceof IllegalArgumentException) { return HttpResponse.of(BAD_REQUEST, ANY_TEXT_TYPE, message); } else { - LOGGER.warn("Unexpected error handling request.", cause); + // Don't fill logs with exceptions about closed components. + if (!(cause instanceof ClosedComponentException)) { + LOGGER.warn("Unexpected error handling {} {}", req.method(), req.path()); + } return HttpResponse.of(INTERNAL_SERVER_ERROR, ANY_TEXT_TYPE, message); } diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/health/ComponentHealth.java b/zipkin-server/src/main/java/zipkin2/server/internal/health/ComponentHealth.java index a4f3b9fafd..a979e30129 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/health/ComponentHealth.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/health/ComponentHealth.java @@ -4,12 +4,16 @@ */ package zipkin2.server.internal.health; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import zipkin2.Call; import zipkin2.CheckResult; import zipkin2.Component; +import zipkin2.internal.ClosedComponentException; import zipkin2.internal.Nullable; final class ComponentHealth { + static final Logger LOGGER = LoggerFactory.getLogger(ComponentHealth.class); static final String STATUS_UP = "UP", STATUS_DOWN = "DOWN"; static ComponentHealth ofComponent(Component component) { @@ -23,7 +27,13 @@ static ComponentHealth ofComponent(Component component) { } if (t == null) return new ComponentHealth(component.toString(), STATUS_UP, null); String message = t.getMessage(); - String error = t.getClass().getName() + (message != null ? ": " + message : ""); + String error; + if (t instanceof ClosedComponentException) { + error = message; + } else { + error = t.getClass().getSimpleName() + (message != null ? ": " + message : ""); + } + LOGGER.debug(error); return new ComponentHealth(component.toString(), STATUS_DOWN, error); } diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index 54de6903d7..f8f1b128a7 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -251,6 +251,8 @@ logging: com.datastax.oss.driver.internal.core.DefaultMavenCoordinates: 'WARN' # We exclude Geo codec and Graph extensions to keep size down com.datastax.oss.driver.internal.core.context.InternalDriverContext: 'WARN' + # Avoid logs about adding handlers + com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor: 'WARN' # Use of native clocks in Cassandra is not insightful com.datastax.oss.driver.internal.core.time.Clock: 'WARN' # Unless it's serious we don't want to know diff --git a/zipkin-server/src/test/java/zipkin2/server/internal/health/ComponentHealthTest.java b/zipkin-server/src/test/java/zipkin2/server/internal/health/ComponentHealthTest.java index 3288a122c8..a5fa3b1481 100644 --- a/zipkin-server/src/test/java/zipkin2/server/internal/health/ComponentHealthTest.java +++ b/zipkin-server/src/test/java/zipkin2/server/internal/health/ComponentHealthTest.java @@ -20,8 +20,7 @@ class ComponentHealthTest { } }); - assertThat(health.error) - .isEqualTo("java.io.IOException: socket disconnect"); + assertThat(health.error).isEqualTo("IOException: socket disconnect"); } @Test void doesntAddNullMessageToDetails() { @@ -31,7 +30,6 @@ class ComponentHealthTest { } }); - assertThat(health.error) - .isEqualTo("com.linecorp.armeria.common.ClosedSessionException"); + assertThat(health.error).isEqualTo("ClosedSessionException"); } } diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java index 98018bc966..86cad639ea 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraSpanStore.java @@ -51,13 +51,19 @@ class CassandraSpanStore implements SpanStore, Traces, ServiceAndSpanNames { //n @Nullable final SelectTraceIdsFromServiceRemoteService.Factory traceIdsFromServiceRemoteService; CassandraSpanStore(CassandraStorage storage) { - CqlSession session = storage.session(); - Schema.Metadata metadata = storage.metadata(); - int maxTraceCols = storage.maxTraceCols; - indexFetchMultiplier = storage.indexFetchMultiplier; - boolean strictTraceId = storage.strictTraceId; - searchEnabled = storage.searchEnabled; + this(storage.session(), + storage.metadata(), + Schema.ensureKeyspaceMetadata(storage.session(), storage.keyspace), + storage.maxTraceCols, + storage.indexFetchMultiplier, + storage.strictTraceId, + storage.searchEnabled); + } + CassandraSpanStore(CqlSession session, Schema.Metadata metadata, KeyspaceMetadata keyspace, + int maxTraceCols, int indexFetchMultiplier, boolean strictTraceId, boolean searchEnabled) { + this.indexFetchMultiplier = indexFetchMultiplier; + this.searchEnabled = searchEnabled; spans = new SelectFromSpan.Factory(session, strictTraceId, maxTraceCols); dependencies = new SelectDependencies.Factory(session); @@ -72,8 +78,7 @@ class CassandraSpanStore implements SpanStore, Traces, ServiceAndSpanNames { //n return; } - KeyspaceMetadata md = Schema.ensureKeyspaceMetadata(session, storage.keyspace); - indexTtl = KeyspaceMetadataUtil.getDefaultTtl(md, TABLE_TRACE_BY_SERVICE_SPAN); + indexTtl = KeyspaceMetadataUtil.getDefaultTtl(keyspace, TABLE_TRACE_BY_SERVICE_SPAN); serviceNames = new SelectServiceNames.Factory(session).create(); if (metadata.hasRemoteService) { remoteServiceNames = new SelectRemoteServiceNames.Factory(session); diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java index 5c596b05c3..a43e9b8802 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorage.java @@ -12,6 +12,7 @@ import java.util.Set; import zipkin2.Call; import zipkin2.CheckResult; +import zipkin2.internal.ClosedComponentException; import zipkin2.internal.Nullable; import zipkin2.storage.AutocompleteTags; import zipkin2.storage.ServiceAndSpanNames; @@ -19,7 +20,6 @@ import zipkin2.storage.SpanStore; import zipkin2.storage.StorageComponent; import zipkin2.storage.Traces; -import zipkin2.storage.cassandra.internal.CassandraStorageBuilder; import zipkin2.storage.cassandra.internal.call.ResultSetFutureCall; /** @@ -47,8 +47,6 @@ public static Builder newBuilder() { } public static final class Builder extends CassandraStorageBuilder { - SessionFactory sessionFactory = SessionFactory.DEFAULT; - Builder() { super(Schema.DEFAULT_KEYSPACE); } @@ -70,22 +68,8 @@ public static final class Builder extends CassandraStorageBuilder { return super.ensureSchema(ensureSchema); } - /** Override to control how sessions are created. */ - public Builder sessionFactory(SessionFactory sessionFactory) { - if (sessionFactory == null) throw new NullPointerException("sessionFactory == null"); - this.sessionFactory = sessionFactory; - return this; - } - @Override public CassandraStorage build() { - AuthProvider authProvider = null; - if (username != null) { - authProvider = new ProgrammaticPlainTextAuthProvider(username, password); - } - return new CassandraStorage(strictTraceId, searchEnabled, autocompleteKeys, autocompleteTtl, - autocompleteCardinality, contactPoints, localDc, poolingOptions(), authProvider, useSsl, - sslHostnameValidation, sessionFactory, keyspace, ensureSchema, maxTraceCols, - indexFetchMultiplier); + return new CassandraStorage(this); } } @@ -99,39 +83,36 @@ autocompleteCardinality, contactPoints, localDc, poolingOptions(), authProvider, final boolean useSsl; final boolean sslHostnameValidation; final String keyspace; - final boolean ensureSchema; - final int maxTraceCols, indexFetchMultiplier; final LazySession session; - CassandraStorage(boolean strictTraceId, boolean searchEnabled, Set autocompleteKeys, - int autocompleteTtl, int autocompleteCardinality, String contactPoints, String localDc, - Map poolingOptions, AuthProvider authProvider, boolean useSsl, - boolean sslHostnameValidation, SessionFactory sessionFactory, String keyspace, - boolean ensureSchema, int maxTraceCols, int indexFetchMultiplier) { + CassandraStorage(CassandraStorageBuilder builder) { // Assign generic configuration for all storage components - this.strictTraceId = strictTraceId; - this.searchEnabled = searchEnabled; - this.autocompleteKeys = autocompleteKeys; - this.autocompleteTtl = autocompleteTtl; - this.autocompleteCardinality = autocompleteCardinality; + this.strictTraceId = builder.strictTraceId; + this.searchEnabled = builder.searchEnabled; + this.autocompleteKeys = builder.autocompleteKeys; + this.autocompleteTtl = builder.autocompleteTtl; + this.autocompleteCardinality = builder.autocompleteCardinality; // Assign configuration used to create a session - this.contactPoints = contactPoints; - this.localDc = localDc; - this.poolingOptions = poolingOptions; - this.authProvider = authProvider; - this.useSsl = useSsl; - this.sslHostnameValidation = sslHostnameValidation; - this.ensureSchema = ensureSchema; - this.keyspace = keyspace; + this.contactPoints = builder.contactPoints; + this.localDc = builder.localDc; + this.poolingOptions = builder.poolingOptions(); + if (builder.username != null) { + this.authProvider = new ProgrammaticPlainTextAuthProvider(builder.username, builder.password); + } else { + this.authProvider = null; + } + this.useSsl = builder.useSsl; + this.sslHostnameValidation = builder.sslHostnameValidation; + this.keyspace = builder.keyspace; // Assign configuration used to control queries - this.maxTraceCols = maxTraceCols; - this.indexFetchMultiplier = indexFetchMultiplier; + this.maxTraceCols = builder.maxTraceCols; + this.indexFetchMultiplier = builder.indexFetchMultiplier; - this.session = new LazySession(sessionFactory, this); + this.session = new LazySession(this, builder.sessionFactory, builder.ensureSchema); } /** close is typically called from a different thread */ @@ -202,7 +183,7 @@ Schema.Metadata metadata() { } @Override public CheckResult check() { - if (closeCalled) throw new IllegalStateException("closed"); + if (closeCalled) throw new ClosedComponentException(); try { session.healthCheck(); } catch (Throwable e) { diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilder.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorageBuilder.java similarity index 80% rename from zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilder.java rename to zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorageBuilder.java index 9ddf1a2297..d191339013 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilder.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/CassandraStorageBuilder.java @@ -2,17 +2,17 @@ * Copyright The OpenZipkin Authors * SPDX-License-Identifier: Apache-2.0 */ -package zipkin2.storage.cassandra.internal; +package zipkin2.storage.cassandra; +import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverOption; -import java.util.Collections; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import zipkin2.internal.Nullable; import zipkin2.storage.QueryRequest; import zipkin2.storage.StorageComponent; @@ -20,25 +20,27 @@ import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_MAX_REQUESTS; import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE; -public abstract class CassandraStorageBuilder> +abstract class CassandraStorageBuilder> extends StorageComponent.Builder { - protected boolean strictTraceId = true, searchEnabled = true; - protected Set autocompleteKeys = Set.of(); - protected int autocompleteTtl = (int) TimeUnit.HOURS.toMillis(1); - protected int autocompleteCardinality = 5 * 4000; // Ex. 5 site tags with cardinality 4000 each - protected String contactPoints = "localhost"; + CassandraStorage.SessionFactory sessionFactory = CassandraStorage.SessionFactory.DEFAULT; + boolean strictTraceId = true, searchEnabled = true; + Set autocompleteKeys = Set.of(); + int autocompleteTtl = (int) TimeUnit.HOURS.toMillis(1); + int autocompleteCardinality = 5 * 4000; // Ex. 5 site tags with cardinality 4000 each + + String contactPoints = "localhost"; // Driver v4 requires this, so take a guess! When we are wrong, the user can override anyway - protected String localDc = "datacenter1"; - @Nullable protected String username, password; - protected boolean useSsl = false; - protected boolean sslHostnameValidation = true; + String localDc = "datacenter1"; + @Nullable String username, password; + boolean useSsl = false; + boolean sslHostnameValidation = true; - protected String keyspace; - protected boolean ensureSchema = true; + String keyspace; + BiFunction ensureSchema = Schema::ensure; - protected int maxTraceCols = 100_000; - protected int indexFetchMultiplier = 3; + int maxTraceCols = 100_000; + int indexFetchMultiplier = 3; // Zipkin collectors can create out a lot of async requests in bursts, so we // increase some properties beyond the norm. @@ -49,14 +51,14 @@ public abstract class CassandraStorageBuilder poolingOptions() { + Map poolingOptions() { Map result = new LinkedHashMap<>(); result.put(CONNECTION_POOL_LOCAL_SIZE, poolLocalSize); result.put(CONNECTION_MAX_REQUESTS, maxRequestsPerConnection); return result; } - protected CassandraStorageBuilder(String defaultKeyspace) { + CassandraStorageBuilder(String defaultKeyspace) { keyspace = defaultKeyspace; } @@ -148,8 +150,19 @@ public B keyspace(String keyspace) { return (B) this; } + /** Override to control how sessions are created. */ + public B sessionFactory(CassandraStorage.SessionFactory sessionFactory) { + if (sessionFactory == null) throw new NullPointerException("sessionFactory == null"); + this.sessionFactory = sessionFactory; + return (B) this; + } + public B ensureSchema(boolean ensureSchema) { - this.ensureSchema = ensureSchema; + if (ensureSchema) { + this.ensureSchema = Schema::ensure; + } else { + this.ensureSchema = Schema::validate; + } return (B) this; } diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/DefaultSessionFactory.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/DefaultSessionFactory.java index ac11d8d6cd..c0a482ed59 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/DefaultSessionFactory.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/DefaultSessionFactory.java @@ -5,48 +5,11 @@ package zipkin2.storage.cassandra; import com.datastax.oss.driver.api.core.CqlSession; -import com.datastax.oss.driver.api.core.data.UdtValue; -import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; -import com.datastax.oss.driver.api.core.type.codec.TypeCodec; -import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import zipkin2.storage.cassandra.internal.SessionBuilder; -import static zipkin2.Call.propagateIfFatal; - -/** - * Creates a session and ensures schema if configured. Closes the cluster and session if any - * exception occurred. - */ final class DefaultSessionFactory implements CassandraStorage.SessionFactory { - static final Logger LOG = LoggerFactory.getLogger(DefaultSessionFactory.class); - - /** - * Creates a session and ensures schema if configured. Closes the cluster and session if any - * exception occurred. - */ @Override public CqlSession create(CassandraStorage cassandra) { - CqlSession session = null; - try { - session = buildSession(cassandra); - - String keyspace = cassandra.keyspace; - if (cassandra.ensureSchema) { - Schema.ensureExists(keyspace, cassandra.searchEnabled, session); - } else { - LOG.debug("Skipping schema check on keyspace {} as ensureSchema was false", keyspace); - } - - session.execute("USE " + keyspace); - initializeUDTs(session, keyspace); - - return session; - } catch (RuntimeException | Error e) { // don't leak on unexpected exception! - propagateIfFatal(e); - if (session != null) session.close(); - throw e; - } + return buildSession(cassandra); } static CqlSession buildSession(CassandraStorage cassandra) { @@ -59,19 +22,4 @@ static CqlSession buildSession(CassandraStorage cassandra) { cassandra.sslHostnameValidation ); } - - static void initializeUDTs(CqlSession session, String keyspace) { - KeyspaceMetadata ks = session.getMetadata().getKeyspace(keyspace).get(); - MutableCodecRegistry codecRegistry = - (MutableCodecRegistry) session.getContext().getCodecRegistry(); - - TypeCodec annotationUDTCodec = - codecRegistry.codecFor(ks.getUserDefinedType("annotation").get()); - codecRegistry.register(new AnnotationCodec(annotationUDTCodec)); - - LOG.debug("Registering endpoint and annotation UDTs to keyspace {}", keyspace); - TypeCodec endpointUDTCodec = - codecRegistry.codecFor(ks.getUserDefinedType("endpoint").get()); - codecRegistry.register(new EndpointCodec(endpointUDTCodec)); - } } diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/LazySession.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/LazySession.java index f34a2e5855..db9e7a6c81 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/LazySession.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/LazySession.java @@ -6,33 +6,53 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import java.util.function.BiFunction; +import zipkin2.internal.ClosedComponentException; import zipkin2.storage.cassandra.CassandraStorage.SessionFactory; +import static zipkin2.Call.propagateIfFatal; import static zipkin2.storage.cassandra.Schema.TABLE_SPAN; final class LazySession { - final SessionFactory sessionFactory; final CassandraStorage storage; + final SessionFactory sessionFactory; + final BiFunction ensureSchema; + volatile CqlSession session; volatile PreparedStatement healthCheck; // guarded by session volatile Schema.Metadata metadata; // guarded by session - LazySession(SessionFactory sessionFactory, CassandraStorage storage) { + LazySession(CassandraStorage storage, SessionFactory sessionFactory, + BiFunction ensureSchema) { this.sessionFactory = sessionFactory; + this.ensureSchema = ensureSchema; this.storage = storage; } + /** Creates a session and ensures schema if configured. */ CqlSession get() { - if (session == null) { - synchronized (this) { - if (session == null) { - session = sessionFactory.create(storage); - // cached here to warn only once when schema problems exist - metadata = Schema.readMetadata(session, storage.keyspace); - healthCheck = session.prepare("SELECT trace_id FROM " + TABLE_SPAN + " limit 1"); - } + if (session != null) return session; + synchronized (this) { + if (session != null) return session; // lost race + session = sessionFactory.create(storage); + + // If we got this far, the session is healthy. So, everything below only happens once. + try { + metadata = ensureSchema.apply(storage, session); + session.execute("USE " + storage.keyspace); + Schema.initializeUDTs(session, storage.keyspace); + healthCheck = session.prepare("SELECT trace_id FROM " + TABLE_SPAN + " limit 1"); + } catch (RuntimeException | Error e) { + propagateIfFatal(e); + // An error here was from installing or validating the schema. To ensure we don't repeat + // failed commands, close, but don't null the session. For example, repeating may look like + // an upgrade due to the first failure, and distract from the original problem. + session.close(); } } + if (session.isClosed()) { + throw new ClosedComponentException("Session initialization failed. See server logs"); + } return session; } diff --git a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java index 56e53d366f..50ffc07100 100644 --- a/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java +++ b/zipkin-storage/cassandra/src/main/java/zipkin2/storage/cassandra/Schema.java @@ -6,8 +6,12 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.Version; +import com.datastax.oss.driver.api.core.data.UdtValue; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException; +import com.datastax.oss.driver.api.core.type.codec.TypeCodec; +import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; import java.util.Map; import java.util.UUID; import org.slf4j.Logger; @@ -32,7 +36,14 @@ final class Schema { static final String UPGRADE_1 = "/zipkin2-schema-upgrade-1.cql"; static final String UPGRADE_2 = "/zipkin2-schema-upgrade-2.cql"; - static Metadata readMetadata(CqlSession session, String keyspace) { + static Metadata ensure(CassandraStorage cassandra, CqlSession session) { + String keyspace = cassandra.keyspace; + Schema.ensureExists(session, keyspace, cassandra.searchEnabled); + return validate(cassandra, session); + } + + static Metadata validate(CassandraStorage cassandra, CqlSession session) { + String keyspace = cassandra.keyspace; KeyspaceMetadata keyspaceMetadata = ensureKeyspaceMetadata(session, keyspace); Map replication = keyspaceMetadata.getReplication(); @@ -43,20 +54,61 @@ static Metadata readMetadata(CqlSession session, String keyspace) { } boolean hasAutocompleteTags = hasUpgrade1_autocompleteTags(keyspaceMetadata); + boolean hasRemoteService = hasUpgrade2_remoteService(keyspaceMetadata); + + Metadata md = new Metadata(hasAutocompleteTags, hasRemoteService); + // Begin validation of externally provided schema. + if (!has_schema(keyspaceMetadata)) { + logAndThrow("schema not installed: apply %s, or set CASSANDRA_ENSURE_SCHEMA=true", + SCHEMA_RESOURCE); + } + + if (!cassandra.searchEnabled) { + return md; + } + + if (!has_indexing(keyspaceMetadata)) { + logAndThrow( + "schema lacks indexing: apply %s, or set CASSANDRA_ENSURE_SCHEMA=true", INDEX_RESOURCE); + } + + // Don't throw on more esoteric features if (!hasAutocompleteTags) { LOG.warn( - "schema lacks autocomplete indexing: apply {}, or set CassandraStorage.ensureSchema=true", + "schema lacks autocomplete indexing: apply {}, or set CASSANDRA_ENSURE_SCHEMA=true", UPGRADE_1); } - boolean hasRemoteService = hasUpgrade2_remoteService(keyspaceMetadata); if (!hasRemoteService) { LOG.warn( - "schema lacks remote service indexing: apply {}, or set CassandraStorage.ensureSchema=true", + "schema lacks remote service indexing: apply {}, or set CASSANDRA_ENSURE_SCHEMA=true", UPGRADE_2); } - return new Metadata(hasAutocompleteTags, hasRemoteService); + return md; + } + + static void logAndThrow(String messageFormat, Object... args) { + String message = String.format(messageFormat, args); + // Ensure we can look at logs to see the problem. Otherwise, it may only + // be visible in API error responses, such as /health or /api/v2/traces. + LOG.error(message); + throw new RuntimeException(message); + } + + static void initializeUDTs(CqlSession session, String keyspace) { + KeyspaceMetadata ks = session.getMetadata().getKeyspace(keyspace).get(); + MutableCodecRegistry codecRegistry = + (MutableCodecRegistry) session.getContext().getCodecRegistry(); + + TypeCodec annotationUDTCodec = + codecRegistry.codecFor(ks.getUserDefinedType("annotation").get()); + codecRegistry.register(new AnnotationCodec(annotationUDTCodec)); + + LOG.debug("Registering endpoint and annotation UDTs to keyspace {}", keyspace); + TypeCodec endpointUDTCodec = + codecRegistry.codecFor(ks.getUserDefinedType("endpoint").get()); + codecRegistry.register(new EndpointCodec(endpointUDTCodec)); } static final class Metadata { @@ -69,7 +121,6 @@ static final class Metadata { } static KeyspaceMetadata ensureKeyspaceMetadata(CqlSession session, String keyspace) { - ensureVersion(session.getMetadata()); KeyspaceMetadata keyspaceMetadata = session.getMetadata().getKeyspace(keyspace).orElse(null); if (keyspaceMetadata == null) { throw new IllegalStateException( @@ -96,28 +147,36 @@ static Version ensureVersion(com.datastax.oss.driver.api.core.metadata.Metadata return version; } - static void ensureExists(String keyspace, boolean searchEnabled, CqlSession session) { + static void ensureExists(CqlSession session, String keyspace, boolean searchEnabled) { KeyspaceMetadata result = session.getMetadata().getKeyspace(keyspace).orElse(null); + Version version = ensureVersion(session.getMetadata()); if (result == null || result.getTable(Schema.TABLE_SPAN).isEmpty()) { LOG.info("Installing schema {} for keyspace {}", SCHEMA_RESOURCE, keyspace); - applyCqlFile(keyspace, session, SCHEMA_RESOURCE); + applyCqlFile(version, keyspace, session, SCHEMA_RESOURCE); if (searchEnabled) { LOG.info("Installing indexes {} for keyspace {}", INDEX_RESOURCE, keyspace); - applyCqlFile(keyspace, session, INDEX_RESOURCE); + applyCqlFile(version, keyspace, session, INDEX_RESOURCE); + } + } else if (searchEnabled) { // prior installation + if (!hasUpgrade1_autocompleteTags(result)) { + LOG.info("Upgrading schema {}", UPGRADE_1); + applyCqlFile(version, keyspace, session, UPGRADE_1); + } + if (!hasUpgrade2_remoteService(result)) { + LOG.info("Upgrading schema {}", UPGRADE_2); + applyCqlFile(version, keyspace, session, UPGRADE_2); } - // refresh metadata since we've installed the schema - result = ensureKeyspaceMetadata(session, keyspace); - } - if (searchEnabled && !hasUpgrade1_autocompleteTags(result)) { - LOG.info("Upgrading schema {}", UPGRADE_1); - applyCqlFile(keyspace, session, UPGRADE_1); - } - if (searchEnabled && !hasUpgrade2_remoteService(result)) { - LOG.info("Upgrading schema {}", UPGRADE_2); - applyCqlFile(keyspace, session, UPGRADE_2); } } + static boolean has_schema(KeyspaceMetadata keyspaceMetadata) { + return keyspaceMetadata.getTable(TABLE_SPAN).isPresent(); + } + + static boolean has_indexing(KeyspaceMetadata keyspaceMetadata) { + return keyspaceMetadata.getTable(TABLE_SERVICE_SPANS).isPresent(); + } + static boolean hasUpgrade1_autocompleteTags(KeyspaceMetadata keyspaceMetadata) { return keyspaceMetadata.getTable(TABLE_AUTOCOMPLETE_TAGS).isPresent(); } @@ -126,18 +185,26 @@ static boolean hasUpgrade2_remoteService(KeyspaceMetadata keyspaceMetadata) { return keyspaceMetadata.getTable(TABLE_SERVICE_REMOTE_SERVICES).isPresent(); } - static void applyCqlFile(String keyspace, CqlSession session, String resource) { - Version version = ensureVersion(session.getMetadata()); + static void applyCqlFile(Version version, String keyspace, CqlSession session, String resource) { for (String cmd : resourceToString(resource).split(";", 100)) { cmd = cmd.trim().replace(" " + DEFAULT_KEYSPACE, " " + keyspace); if (cmd.isEmpty()) continue; cmd = reviseCQL(version, cmd); - session.execute(cmd); + try { + session.execute(cmd); + } catch (InvalidQueryException e) { + // Add context so it is obvious which line was wrong + String message = String.format("Failed to execute [%s]: %s", cmd, e.getMessage()); + // Ensure we can look at logs to see the problem. Otherwise, it may only + // be visible in API error responses, such as /health or /api/v2/traces. + LOG.error(message); + throw new RuntimeException(message, e); + } } } static String reviseCQL(Version version, String cql) { - if (version.getMajor() == 4) { + if (version.getMajor() >= 4) { // read_repair_chance options were removed and make Cassandra crash starting in v4 // See https://cassandra.apache.org/doc/latest/operating/read_repair.html#background-read-repair cql = cql.replaceAll(" *AND [^\\s]*read_repair_chance = 0\n", ""); diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanConsumerTest.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanConsumerTest.java index aee9fce837..181f661e55 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanConsumerTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanConsumerTest.java @@ -4,8 +4,13 @@ */ package zipkin2.storage.cassandra; +import com.datastax.oss.driver.api.core.CqlSession; import java.util.List; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import zipkin2.Call; import zipkin2.Span; import zipkin2.internal.AggregateCall; @@ -18,10 +23,16 @@ import static zipkin2.TestObjects.BACKEND; import static zipkin2.TestObjects.FRONTEND; import static zipkin2.TestObjects.TODAY; -import static zipkin2.storage.cassandra.InternalForTests.mockSession; +@ExtendWith(MockitoExtension.class) class CassandraSpanConsumerTest { - CassandraSpanConsumer consumer = spanConsumer(CassandraStorage.newBuilder()); + @Mock CqlSession session; + Schema.Metadata metadata = new Schema.Metadata(true, true); + CassandraSpanConsumer consumer; + + @BeforeEach void setup() { + consumer = spanConsumer(CassandraStorage.newBuilder()); + } Span spanWithoutAnnotationsOrTags = Span.newBuilder() @@ -208,7 +219,9 @@ class CassandraSpanConsumerTest { .isInstanceOf(ResultSetFutureCall.class); } - static CassandraSpanConsumer spanConsumer(CassandraStorage.Builder builder) { - return new CassandraSpanConsumer(builder.sessionFactory(storage -> mockSession()).build()); + CassandraSpanConsumer spanConsumer(CassandraStorage.Builder builder) { + return new CassandraSpanConsumer(session, metadata, builder.strictTraceId, + builder.searchEnabled, builder.autocompleteKeys, builder.autocompleteTtl, + builder.autocompleteCardinality); } } diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanStoreTest.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanStoreTest.java index 7957b8f013..e9e1639649 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanStoreTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraSpanStoreTest.java @@ -4,9 +4,16 @@ */ package zipkin2.storage.cassandra; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import java.util.List; import java.util.Map; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import zipkin2.Call; import zipkin2.Span; import zipkin2.storage.QueryRequest; @@ -15,13 +22,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static zipkin2.TestObjects.DAY; import static zipkin2.TestObjects.TODAY; -import static zipkin2.storage.cassandra.InternalForTests.mockSession; // TODO: tests use toString because the call composition chain is complex (includes flat mapping) // This could be made a little less complex if we scrub out map=>map to a list of transformations, // or possibly special-casing common transformations. +@ExtendWith(MockitoExtension.class) class CassandraSpanStoreTest { - CassandraSpanStore spanStore = spanStore(CassandraStorage.newBuilder()); + @Mock CqlSession session; + Schema.Metadata metadata = new Schema.Metadata(true, true); + @Mock KeyspaceMetadata keyspace; + CassandraSpanStore spanStore; + + @BeforeEach void setup() { + spanStore = spanStore(CassandraStorage.newBuilder().ensureSchema(false)); + } + QueryRequest.Builder queryBuilder = QueryRequest.newBuilder().endTs(TODAY).lookback(DAY).limit(5); @Test void timestampRange_withIndexTtlProvidedAvoidsOverflow() { @@ -113,7 +128,8 @@ class CassandraSpanStoreTest { assertThat(spanStore.getSpanNames("icecream")).hasToString("ConstantCall{value=[]}"); } - static CassandraSpanStore spanStore(CassandraStorage.Builder builder) { - return new CassandraSpanStore(builder.sessionFactory(storage -> mockSession()).build()); + CassandraSpanStore spanStore(CassandraStorage.Builder builder) { + return new CassandraSpanStore(session, metadata, keyspace, builder.maxTraceCols, + builder.indexFetchMultiplier, builder.strictTraceId, builder.searchEnabled); } } diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilderTest.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageBuilderTest.java similarity index 97% rename from zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilderTest.java rename to zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageBuilderTest.java index 2b9240a404..703eb9e0a5 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/internal/CassandraStorageBuilderTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageBuilderTest.java @@ -2,7 +2,7 @@ * Copyright The OpenZipkin Authors * SPDX-License-Identifier: Apache-2.0 */ -package zipkin2.storage.cassandra.internal; +package zipkin2.storage.cassandra; import java.util.List; import java.util.function.Function; diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java index 296a15d5de..4652462e0b 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/ITEnsureSchema.java @@ -5,6 +5,7 @@ package zipkin2.storage.cassandra; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import java.util.List; import org.junit.jupiter.api.Test; @@ -44,14 +45,14 @@ abstract class ITEnsureSchema extends ITStorage { abstract CqlSession session(); @Test void installsKeyspaceWhenMissing() { - Schema.ensureExists(storage.keyspace, true, session()); + Schema.ensureExists(session(), storage.keyspace, true); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); assertThat(metadata).isNotNull(); } @Test void installsKeyspaceWhenMissing_searchDisabled() { - Schema.ensureExists(storage.keyspace, false, session()); + Schema.ensureExists(session(), storage.keyspace, false); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); assertThat(metadata).isNotNull(); @@ -61,7 +62,7 @@ abstract class ITEnsureSchema extends ITStorage { session().execute("CREATE KEYSPACE " + storage.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};"); - Schema.ensureExists(storage.keyspace, false, session()); + Schema.ensureExists(session(), storage.keyspace, false); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); assertThat(metadata.getTable(TABLE_SPAN)).isNotNull(); @@ -77,7 +78,7 @@ abstract class ITEnsureSchema extends ITStorage { session().execute("CREATE KEYSPACE " + storage.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};"); - Schema.ensureExists(storage.keyspace, true, session()); + Schema.ensureExists(session(), storage.keyspace, true); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); @@ -88,21 +89,25 @@ abstract class ITEnsureSchema extends ITStorage { } @Test void upgradesOldSchema_autocomplete() { - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema.cql"); - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema-indexes-original.cql"); + Version version = Schema.ensureVersion(session().getMetadata()); + Schema.applyCqlFile(version, storage.keyspace, session(), "/zipkin2-schema.cql"); + Schema.applyCqlFile(version, storage.keyspace, session(), + "/zipkin2-schema-indexes-original.cql"); - Schema.ensureExists(storage.keyspace, true, session()); + Schema.ensureExists(session(), storage.keyspace, true); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); assertThat(Schema.hasUpgrade1_autocompleteTags(metadata)).isTrue(); } @Test void upgradesOldSchema_remoteService() { - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema.cql"); - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema-indexes-original.cql"); - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema-upgrade-1.cql"); + Version version = Schema.ensureVersion(session().getMetadata()); + Schema.applyCqlFile(version, storage.keyspace, session(), "/zipkin2-schema.cql"); + Schema.applyCqlFile(version, storage.keyspace, session(), + "/zipkin2-schema-indexes-original.cql"); + Schema.applyCqlFile(version, storage.keyspace, session(), "/zipkin2-schema-upgrade-1.cql"); - Schema.ensureExists(storage.keyspace, true, session()); + Schema.ensureExists(session(), storage.keyspace, true); KeyspaceMetadata metadata = session().getMetadata().getKeyspace(storage.keyspace).get(); assertThat(Schema.hasUpgrade2_remoteService(metadata)).isTrue(); @@ -110,9 +115,11 @@ abstract class ITEnsureSchema extends ITStorage { /** This tests we don't accidentally rely on new indexes such as autocomplete tags */ @Test void worksWithOldSchema(TestInfo testInfo) throws Exception { + Version version = Schema.ensureVersion(session().getMetadata()); String testSuffix = testSuffix(testInfo); - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema.cql"); - Schema.applyCqlFile(storage.keyspace, session(), "/zipkin2-schema-indexes-original.cql"); + Schema.applyCqlFile(version, storage.keyspace, session(), "/zipkin2-schema.cql"); + Schema.applyCqlFile(version, storage.keyspace, session(), + "/zipkin2-schema-indexes-original.cql"); // Ensure the storage component is functional before proceeding CheckResult check = storage.check(); diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/SchemaTest.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/SchemaTest.java index 6637eeed9b..36874e04ca 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/SchemaTest.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/SchemaTest.java @@ -21,25 +21,22 @@ import static org.mockito.Mockito.when; class SchemaTest { - @Test void ensureKeyspaceMetadata_failsWhenVersionLessThan3_11_3() { - CqlSession session = mock(CqlSession.class); + @Test void ensureVersion_failsWhenVersionLessThan3_11_3() { Metadata metadata = mock(Metadata.class); Node node = mock(Node.class); - when(session.getMetadata()).thenReturn(metadata); when(metadata.getNodes()).thenReturn(Map.of( UUID.fromString("11111111-1111-1111-1111-111111111111"), node )); when(node.getCassandraVersion()).thenReturn(Version.parse("3.11.2")); - assertThatThrownBy(() -> Schema.ensureKeyspaceMetadata(session, "zipkin2")) + assertThatThrownBy(() -> Schema.ensureVersion(metadata)) .isInstanceOf(RuntimeException.class) .hasMessage( "Node 11111111-1111-1111-1111-111111111111 is running Cassandra 3.11.2, but minimum version is 3.11.3"); } - @Test void ensureKeyspaceMetadata_failsWhenOneVersionLessThan3_11_3() { - CqlSession session = mock(CqlSession.class); + @Test void ensureVersion_failsWhenOneVersionLessThan3_11_3() { Metadata metadata = mock(Metadata.class); Node node1 = mock(Node.class); Node node2 = mock(Node.class); @@ -47,61 +44,58 @@ class SchemaTest { nodes.put(UUID.fromString("11111111-1111-1111-1111-111111111111"), node1); nodes.put(UUID.fromString("22222222-2222-2222-2222-222222222222"), node2); - when(session.getMetadata()).thenReturn(metadata); when(metadata.getNodes()).thenReturn(nodes); when(node1.getCassandraVersion()).thenReturn(Version.parse("3.11.3")); when(node2.getCassandraVersion()).thenReturn(Version.parse("3.11.2")); - assertThatThrownBy(() -> Schema.ensureKeyspaceMetadata(session, "zipkin2")) + assertThatThrownBy(() -> Schema.ensureVersion(metadata)) .isInstanceOf(RuntimeException.class) .hasMessage( "Node 22222222-2222-2222-2222-222222222222 is running Cassandra 3.11.2, but minimum version is 3.11.3"); } - @Test void ensureKeyspaceMetadata_passesWhenVersion3_11_3AndKeyspaceMetadataIsNotNull() { - CqlSession session = mock(CqlSession.class); + @Test void ensureVersion_passesWhenVersion3_11_3() { Metadata metadata = mock(Metadata.class); Node node = mock(Node.class); - KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class); - when(session.getMetadata()).thenReturn(metadata); when(metadata.getNodes()).thenReturn(Map.of( UUID.fromString("11111111-1111-1111-1111-111111111111"), node )); when(node.getCassandraVersion()).thenReturn(Version.parse("3.11.3")); - when(metadata.getKeyspace("zipkin2")).thenReturn(Optional.of(keyspaceMetadata)); - assertThat(Schema.ensureKeyspaceMetadata(session, "zipkin2")) - .isSameAs(keyspaceMetadata); + assertThat(Schema.ensureVersion(metadata)) + .isEqualTo(Version.parse("3.11.3")); } - @Test void ensureKeyspaceMetadata_passesWhenVersion3_11_4AndKeyspaceMetadataIsNotNull() { - CqlSession session = mock(CqlSession.class); + @Test void ensureVersion_passesWhenVersion3_11_4() { Metadata metadata = mock(Metadata.class); Node node = mock(Node.class); - KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class); - when(session.getMetadata()).thenReturn(metadata); when(metadata.getNodes()).thenReturn(Map.of( UUID.fromString("11111111-1111-1111-1111-111111111111"), node )); when(node.getCassandraVersion()).thenReturn(Version.parse("3.11.4")); + + assertThat(Schema.ensureVersion(metadata)) + .isEqualTo(Version.parse("3.11.4")); + } + + @Test void ensureKeyspaceMetadata() { + CqlSession session = mock(CqlSession.class); + Metadata metadata = mock(Metadata.class); + when(session.getMetadata()).thenReturn(metadata); + KeyspaceMetadata keyspaceMetadata = mock(KeyspaceMetadata.class); when(metadata.getKeyspace("zipkin2")).thenReturn(Optional.of(keyspaceMetadata)); assertThat(Schema.ensureKeyspaceMetadata(session, "zipkin2")) .isSameAs(keyspaceMetadata); } - @Test void ensureKeyspaceMetadata_failsWhenKeyspaceMetadataIsNotNull() { + @Test void ensureKeyspaceMetadata_failsWhenKeyspaceMetadataIsNull() { CqlSession session = mock(CqlSession.class); Metadata metadata = mock(Metadata.class); - Node node = mock(Node.class); when(session.getMetadata()).thenReturn(metadata); - when(metadata.getNodes()).thenReturn(Map.of( - UUID.fromString("11111111-1111-1111-1111-111111111111"), node - )); - when(node.getCassandraVersion()).thenReturn(Version.parse("3.11.3")); assertThatThrownBy(() -> Schema.ensureKeyspaceMetadata(session, "zipkin2")) .isInstanceOf(RuntimeException.class) diff --git a/zipkin/src/main/java/zipkin2/internal/ClosedComponentException.java b/zipkin/src/main/java/zipkin2/internal/ClosedComponentException.java new file mode 100644 index 0000000000..22d02bb843 --- /dev/null +++ b/zipkin/src/main/java/zipkin2/internal/ClosedComponentException.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.internal; + +public final class ClosedComponentException extends IllegalStateException { + static final long serialVersionUID = -4636520624634625689L; + + /** Convenience constructor that ensures the message is never null. */ + public ClosedComponentException() { + this(null); + } + + public ClosedComponentException(String message) { + super(message != null ? message : "closed"); + } +}