Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Commit

Permalink
Improved subscription caching efficiency (#72)
Browse files Browse the repository at this point in the history
Improved subscription caching efficiency
  • Loading branch information
billkalter authored Dec 8, 2016
1 parent ee7385e commit 0ef403b
Show file tree
Hide file tree
Showing 13 changed files with 910 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.bazaarvoice.emodb.databus;

import com.bazaarvoice.emodb.common.cassandra.CassandraConfiguration;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAO;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;

Expand All @@ -27,6 +28,15 @@ public class DatabusConfiguration {
@JsonProperty("longPollPollingThreadCount")
private Optional<Integer> _longPollPollingThreadCount = Optional.absent();

/**
* The following is only necessary during the period while the legacy subscription cache is upgraded to the current
* implementation.
*/
@Valid
@NotNull
@JsonProperty("subscriptionCacheInvalidation")
private CachingSubscriptionDAO.CachingMode _subscriptionCacheInvalidation = CachingSubscriptionDAO.CachingMode.normal;

public CassandraConfiguration getCassandraConfiguration() {
return _cassandraConfiguration;
}
Expand All @@ -53,4 +63,13 @@ public DatabusConfiguration setLongPollPollingThreadCount(Integer longPollPollin
_longPollPollingThreadCount = Optional.of(longPollPollingThreadCount);
return this;
}

public CachingSubscriptionDAO.CachingMode getSubscriptionCacheInvalidation() {
return _subscriptionCacheInvalidation;
}

public DatabusConfiguration setSubscriptionCacheInvalidation(CachingSubscriptionDAO.CachingMode subscriptionCacheInvalidation) {
_subscriptionCacheInvalidation = subscriptionCacheInvalidation;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import com.bazaarvoice.emodb.databus.core.SubscriptionEvaluator;
import com.bazaarvoice.emodb.databus.core.SystemQueueMonitorManager;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.astyanax.AstyanaxSubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.cql.CqlSubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAODelegate;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAOExecutorService;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAORegistry;
import com.bazaarvoice.emodb.databus.repl.DefaultReplicationManager;
import com.bazaarvoice.emodb.databus.repl.DefaultReplicationSource;
Expand All @@ -53,16 +54,22 @@
import com.google.common.base.Supplier;
import com.google.common.eventbus.EventBus;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Key;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.sun.jersey.api.client.Client;
import io.dropwizard.lifecycle.ExecutorServiceManager;
import io.dropwizard.util.Duration;
import org.apache.curator.framework.CuratorFramework;

import java.time.Clock;
import java.util.Map;
import java.util.concurrent.Executors;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -111,10 +118,10 @@ public DatabusModule(EmoServiceMode serviceMode, MetricRegistry metricRegistry)

@Override
protected void configure() {
// Chain SubscriptionDAO -> CachingSubscriptionDAO -> AstyanaxSubscriptionDAO.
// Chain SubscriptionDAO -> CachingSubscriptionDAO -> CqlSubscriptionDAO.
bind(SubscriptionDAO.class).to(CachingSubscriptionDAO.class).asEagerSingleton();
bind(SubscriptionDAO.class).annotatedWith(CachingSubscriptionDAODelegate.class).to(AstyanaxSubscriptionDAO.class).asEagerSingleton();
bind(AstyanaxSubscriptionDAO.class).asEagerSingleton();
bind(SubscriptionDAO.class).annotatedWith(CachingSubscriptionDAODelegate.class).to(CqlSubscriptionDAO.class).asEagerSingleton();
bind(CqlSubscriptionDAO.class).asEagerSingleton();
bind(CassandraFactory.class).asEagerSingleton();

// Event Store
Expand Down Expand Up @@ -182,4 +189,17 @@ ValueStore<Boolean> provideReplicationEnabled(@DatabusZooKeeper CuratorFramework
return lifeCycle.manage(
new ZkValueStore<>(curator, "/settings/replication-enabled", new ZkBooleanSerializer(), true));
}

@Provides @Singleton @CachingSubscriptionDAOExecutorService
ListeningExecutorService provideCachingSubscriptionDAOExecutorService(LifeCycleRegistry lifeCycleRegistry) {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("subscription-cache-%d").build()));
lifeCycleRegistry.manage(new ExecutorServiceManager(service, Duration.seconds(1), "subscription-cache"));
return service;
}

@Provides @Singleton
CachingSubscriptionDAO.CachingMode provideCachingSubscriptionDAOCachingMode(DatabusConfiguration configuration) {
return configuration.getSubscriptionCacheInvalidation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@
import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
Expand All @@ -73,6 +71,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -254,39 +253,21 @@ public Iterator<Subscription> listSubscriptions(final String ownerId, @Nullable
checkArgument(limit > 0, "Limit must be >0");

// We always have all the subscriptions cached in memory so fetch them all.
Collection<OwnedSubscription> subscriptions = _subscriptionDao.getAllSubscriptions();

// Ignore subscriptions not accessible by the owner.
subscriptions = Collections2.filter(subscriptions,
(subscription) -> _databusAuthorizer.owner(ownerId).canAccessSubscription(subscription));

// Sort them by name. They're stored sorted in Cassandra so this should be a no-op, but
// do the sort anyway so we're not depending on internals of the subscription DAO.
List<? extends Subscription> sorted = new Ordering<Subscription>() {
@Override
public int compare(Subscription left, Subscription right) {
return left.getName().compareTo(right.getName());
}
}.immutableSortedCopy(subscriptions);

// Apply the "from" parameter.
if (fromSubscriptionExclusive != null) {
int start = 0;
for (; start < sorted.size(); start++) {
if (fromSubscriptionExclusive.compareTo(sorted.get(start).getName()) < 0) {
break;
}
}
sorted = sorted.subList(start, sorted.size());
}

// Apply the "limit" parameter (be careful to avoid overflow when limit == Long.MAX_VALUE).
if (sorted.size() > limit) {
sorted = sorted.subList(0, (int) limit);
}

//noinspection unchecked
return (Iterator<Subscription>) sorted.iterator();
Iterable<OwnedSubscription> allSubscriptions = _subscriptionDao.getAllSubscriptions();

return StreamSupport.stream(allSubscriptions.spliterator(), false)
// Ignore subscriptions not accessible by the owner.
.filter((subscription) -> _databusAuthorizer.owner(ownerId).canAccessSubscription(subscription))
// Sort them by name. They're stored sorted in Cassandra so this should be a no-op, but
// do the sort anyway so we're not depending on internals of the subscription DAO.
.sorted((left, right) -> left.getName().compareTo(right.getName()))
// Apply the "from" parameter
.filter(subscription -> fromSubscriptionExclusive == null || subscription.getName().compareTo(fromSubscriptionExclusive) > 0)
// Apply the "limit" parameter (be careful to avoid overflow when limit == Long.MAX_VALUE).
.limit(limit)
// Necessary to make generics work
.map(subscription -> (Subscription) subscription)
.iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,7 +44,7 @@ public class DefaultFanout extends AbstractScheduledService {
private final Function<Multimap<String, ByteBuffer>, Void> _eventSink;
private final boolean _replicateOutbound;
private final Duration _sleepWhenIdle;
private final Supplier<Collection<OwnedSubscription>> _subscriptionsSupplier;
private final Supplier<Iterable<OwnedSubscription>> _subscriptionsSupplier;
private final DataCenter _currentDataCenter;
private final RateLimitedLog _rateLimitedLog;
private final SubscriptionEvaluator _subscriptionEvaluator;
Expand All @@ -58,7 +57,7 @@ public DefaultFanout(String name,
Function<Multimap<String, ByteBuffer>, Void> eventSink,
boolean replicateOutbound,
Duration sleepWhenIdle,
Supplier<Collection<OwnedSubscription>> subscriptionsSupplier,
Supplier<Iterable<OwnedSubscription>> subscriptionsSupplier,
DataCenter currentDataCenter,
RateLimitedLogFactory logFactory,
SubscriptionEvaluator subscriptionEvaluator,
Expand Down Expand Up @@ -122,7 +121,7 @@ private boolean copyEvents() {
boolean copyEvents(List<EventData> rawEvents) {
// Read the list of subscriptions *after* reading events from the event store to avoid race conditions with
// creating a new subscription.
Collection<OwnedSubscription> subscriptions = _subscriptionsSupplier.get();
Iterable<OwnedSubscription> subscriptions = _subscriptionsSupplier.get();

// Copy the events to all the destination channels.
List<String> eventKeys = Lists.newArrayListWithCapacity(rawEvents.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -83,12 +84,7 @@ public Void apply(@Nullable Multimap<String, ByteBuffer> eventsByChannel) {
return null;
}
};
final Supplier<Collection<OwnedSubscription>> subscriptionsSupplier = new Supplier<Collection<OwnedSubscription>>() {
@Override
public Collection<OwnedSubscription> get() {
return _subscriptionDao.getAllSubscriptions();
}
};
final Supplier<Iterable<OwnedSubscription>> subscriptionsSupplier = () -> _subscriptionDao.getAllSubscriptions();

LeaderService leaderService = new LeaderService(
_curator, ZKPaths.makePath("/leader/fanout", name), _selfId, "LeaderSelector-" + name, 1, TimeUnit.MINUTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Collection;

public interface SubscriptionDAO {

Expand All @@ -17,5 +16,12 @@ void insertSubscription(String ownerId, String subscription, Condition tableFilt
@Nullable
OwnedSubscription getSubscription(String subscription);

Collection<OwnedSubscription> getAllSubscriptions();
Iterable<OwnedSubscription> getAllSubscriptions();

/**
* Potentially more efficient than {@link #getAllSubscriptions()} when the caller only needs a list of all
* subscription names. If possible the implementation should provide a more efficient implementation than
* actually loading all subscriptions.
*/
Iterable<String> getAllSubscriptionNames();
}

This file was deleted.

Loading

0 comments on commit 0ef403b

Please sign in to comment.