Skip to content

Commit

Permalink
Merge branch 'signalapp:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
offsoc authored Feb 6, 2025
2 parents 4c3b9d4 + 5d06228 commit 2f579cf
Show file tree
Hide file tree
Showing 51 changed files with 1,895 additions and 1,262 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
build:
runs-on: ubuntu-latest
container: ubuntu:22.04
timeout-minutes: 20

steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.whispersystems.textsecuregcm.filters.RemoteAddressFilter;
import org.whispersystems.textsecuregcm.filters.RemoteDeprecationFilter;
import org.whispersystems.textsecuregcm.filters.RequestStatisticsFilter;
import org.whispersystems.textsecuregcm.filters.RestDeprecationFilter;
import org.whispersystems.textsecuregcm.filters.TimestampResponseFilter;
import org.whispersystems.textsecuregcm.geo.MaxMindDatabaseManager;
import org.whispersystems.textsecuregcm.grpc.AccountsAnonymousGrpcService;
Expand Down Expand Up @@ -227,6 +228,7 @@
import org.whispersystems.textsecuregcm.storage.MessagesDynamoDb;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.storage.OneTimeDonationsManager;
import org.whispersystems.textsecuregcm.storage.PersistentTimer;
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
import org.whispersystems.textsecuregcm.storage.Profiles;
import org.whispersystems.textsecuregcm.storage.ProfilesManager;
Expand Down Expand Up @@ -430,7 +432,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
config.getDynamoDbTables().getRemoteConfig().getTableName());
PushChallengeDynamoDb pushChallengeDynamoDb = new PushChallengeDynamoDb(dynamoDbClient,
config.getDynamoDbTables().getPushChallenge().getTableName());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient,
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
config.getDynamoDbTables().getReportMessage().getTableName(),
config.getReportMessageConfiguration().getReportTtl());
RegistrationRecoveryPasswords registrationRecoveryPasswords = new RegistrationRecoveryPasswords(
Expand Down Expand Up @@ -617,7 +619,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, rateLimitersCluster,
config.getReportMessageConfiguration().getCounterTtl());
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, reportMessageManager,
messageDeletionAsyncExecutor);
messageDeletionAsyncExecutor, Clock.systemUTC());
AccountLockManager accountLockManager = new AccountLockManager(dynamoDbClient,
config.getDynamoDbTables().getDeletedAccountsLock().getTableName());
ClientPublicKeysManager clientPublicKeysManager =
Expand Down Expand Up @@ -1000,7 +1002,12 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
metricsHttpChannelListener.configure(environment);
final MessageMetrics messageMetrics = new MessageMetrics();

// BufferingInterceptor is needed on the base environment but not the WebSocketEnvironment,
// because we handle serialization of http responses on the websocket on our own and can
// compute content lengths without it
environment.jersey().register(new BufferingInterceptor());
environment.jersey().register(new RestDeprecationFilter(dynamicConfigurationManager, experimentEnrollmentManager));

environment.jersey().register(new VirtualExecutorServiceProvider("managed-async-virtual-thread-"));
environment.jersey().register(new RateLimitByIpFilter(rateLimiters));
environment.jersey().register(new RequestStatisticsFilter(TrafficSource.HTTP));
Expand Down Expand Up @@ -1097,6 +1104,7 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
log.info("Registered spam filter: {}", filter.getClass().getName());
});

final PersistentTimer persistentTimer = new PersistentTimer(rateLimitersCluster, clock);

final PhoneVerificationTokenManager phoneVerificationTokenManager = new PhoneVerificationTokenManager(
phoneNumberIdentifiers, registrationServiceClient, registrationRecoveryPasswordsManager, registrationRecoveryChecker);
Expand All @@ -1115,7 +1123,7 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
config.getDeliveryCertificate().ecPrivateKey(), config.getDeliveryCertificate().expiresDays()),
zkAuthOperations, callingGenericZkSecretParams, clock),
new ChallengeController(rateLimitChallengeManager, challengeConstraintChecker),
new DeviceController(accountsManager, clientPublicKeysManager, rateLimiters, config.getMaxDevices()),
new DeviceController(accountsManager, clientPublicKeysManager, rateLimiters, persistentTimer, config.getMaxDevices()),
new DeviceCheckController(clock, backupAuthManager, appleDeviceCheckManager, rateLimiters,
config.getDeviceCheck().backupRedemptionLevel(),
config.getDeviceCheck().backupRedemptionDuration()),
Expand All @@ -1126,7 +1134,7 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
new KeyTransparencyController(keyTransparencyServiceClient),
new MessageController(rateLimiters, messageByteLimitCardinalityEstimator, messageSender, receiptSender,
accountsManager, messagesManager, phoneNumberIdentifiers, pushNotificationManager, pushNotificationScheduler,
reportMessageManager, multiRecipientMessageExecutor, messageDeliveryScheduler, clientReleaseManager,
reportMessageManager, messageDeliveryScheduler, clientReleaseManager,
dynamicConfigurationManager, zkSecretParams, spamChecker, messageMetrics, messageDeliveryLoopMonitor,
Clock.systemUTC()),
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ public void verifyRegistrationLock(final Account account, @Nullable final String
throw new WebApplicationException(Response.status(FAILURE_HTTP_STATUS)
.entity(new RegistrationLockFailure(
existingRegistrationLock.getTimeRemaining().toMillis(),
svr2FailureCredentials(existingRegistrationLock, updatedAccount),
null))
svr2FailureCredentials(existingRegistrationLock, updatedAccount)))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import org.whispersystems.textsecuregcm.storage.Account;
import java.security.MessageDigest;
import java.util.Collection;
import java.util.function.Predicate;
import java.util.stream.IntStream;

public class UnidentifiedAccessUtil {

Expand All @@ -31,4 +34,42 @@ public static boolean checkUnidentifiedAccess(final Account targetAccount, final
.map(targetUnidentifiedAccessKey -> MessageDigest.isEqual(targetUnidentifiedAccessKey, unidentifiedAccessKey))
.orElse(false);
}

/**
* Checks whether an action (e.g. sending a message or retrieving pre-keys) may be taken on the collection of target
* accounts by an actor presenting the given combined unidentified access key.
*
* @param targetAccounts the accounts on which an actor wishes to take an action
* @param combinedUnidentifiedAccessKey the unidentified access key presented by the actor
*
* @return {@code true} if an actor presenting the given unidentified access key has permission to take an action on
* the target accounts or {@code false} otherwise
*/
public static boolean checkUnidentifiedAccess(final Collection<Account> targetAccounts, final byte[] combinedUnidentifiedAccessKey) {
return MessageDigest.isEqual(getCombinedUnidentifiedAccessKey(targetAccounts), combinedUnidentifiedAccessKey);
}

/**
* Calculates a combined unidentified access key for the given collection of accounts.
*
* @param accounts the accounts from which to derive a combined unidentified access key
* @return a combined unidentified access key
*
* @throws IllegalArgumentException if one or more of the given accounts had an unidentified access key with an
* unexpected length
*/
public static byte[] getCombinedUnidentifiedAccessKey(final Collection<Account> accounts) {
return accounts.stream()
.filter(Predicate.not(Account::isUnrestrictedUnidentifiedAccess))
.map(account ->
account.getUnidentifiedAccessKey()
.filter(b -> b.length == UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH)
.orElseThrow(IllegalArgumentException::new))
.reduce(new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH],
(a, b) -> {
final byte[] xor = new byte[UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH];
IntStream.range(0, UnidentifiedAccessUtil.UNIDENTIFIED_ACCESS_KEY_LENGTH).forEach(i -> xor[i] = (byte) (a[i] ^ b[i]));
return xor;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
package org.whispersystems.textsecuregcm.configuration.dynamic;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.vdurmont.semver4j.Semver;
import jakarta.validation.Valid;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.whispersystems.textsecuregcm.limits.RateLimiterConfig;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;

public class DynamicConfiguration {

Expand Down Expand Up @@ -72,6 +74,10 @@ public class DynamicConfiguration {
@Valid
List<String> svrStatusCodesToIgnoreForAccountDeletion = Collections.emptyList();

@JsonProperty
@Valid
Map<ClientPlatform, Semver> minimumRestFreeVersion = Map.of();

public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
return Optional.ofNullable(experiments.get(experimentName));
Expand Down Expand Up @@ -130,4 +136,8 @@ public List<String> getSvrStatusCodesToIgnoreForAccountDeletion() {
return svrStatusCodesToIgnoreForAccountDeletion;
}

public Map<ClientPlatform, Semver> minimumRestFreeVersion() {
return minimumRestFreeVersion;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
import io.dropwizard.auth.Auth;
import io.lettuce.core.RedisException;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -81,6 +80,7 @@
import org.whispersystems.textsecuregcm.storage.DeviceCapability;
import org.whispersystems.textsecuregcm.storage.DeviceSpec;
import org.whispersystems.textsecuregcm.storage.LinkDeviceTokenAlreadyUsedException;
import org.whispersystems.textsecuregcm.storage.PersistentTimer;
import org.whispersystems.textsecuregcm.util.DeviceCapabilityAdapter;
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
Expand All @@ -100,6 +100,7 @@ public class DeviceController {
private final AccountsManager accounts;
private final ClientPublicKeysManager clientPublicKeysManager;
private final RateLimiters rateLimiters;
private final PersistentTimer persistentTimer;
private final Map<String, Integer> maxDeviceConfiguration;

private final EnumMap<ClientPlatform, AtomicInteger> linkedDeviceListenersByPlatform;
Expand All @@ -108,9 +109,15 @@ public class DeviceController {
private static final String LINKED_DEVICE_LISTENER_GAUGE_NAME =
MetricsUtil.name(DeviceController.class, "linkedDeviceListeners");

private static final String WAIT_FOR_LINKED_DEVICE_TIMER_NAMESPACE = "wait_for_linked_device";
private static final String WAIT_FOR_LINKED_DEVICE_TIMER_NAME =
MetricsUtil.name(DeviceController.class, "waitForLinkedDeviceDuration");

private static final String WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAMESPACE = "wait_for_transfer_archive";
private static final String WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME =
MetricsUtil.name(DeviceController.class, "waitForTransferArchiveDuration");


@VisibleForTesting
static final int MIN_TOKEN_IDENTIFIER_LENGTH = 32;

Expand All @@ -120,11 +127,13 @@ public class DeviceController {
public DeviceController(final AccountsManager accounts,
final ClientPublicKeysManager clientPublicKeysManager,
final RateLimiters rateLimiters,
final PersistentTimer persistentTimer,
final Map<String, Integer> maxDeviceConfiguration) {

this.accounts = accounts;
this.clientPublicKeysManager = clientPublicKeysManager;
this.rateLimiters = rateLimiters;
this.persistentTimer = persistentTimer;
this.maxDeviceConfiguration = maxDeviceConfiguration;

linkedDeviceListenersByPlatform =
Expand Down Expand Up @@ -339,7 +348,7 @@ public LinkDeviceResponse linkDevice(@HeaderParam(HttpHeaders.AUTHORIZATION) Bas
@ApiResponse(responseCode = "204", description = "No device was linked to the account before the call completed; clients may repeat the call to continue waiting")
@ApiResponse(responseCode = "400", description = "The given token identifier or timeout was invalid")
@ApiResponse(responseCode = "429", description = "Rate-limited; try again after the prescribed delay")
public CompletableFuture<Response> waitForLinkedDevice(
public CompletionStage<Response> waitForLinkedDevice(
@ReadOnly @Auth final AuthenticatedDevice authenticatedDevice,

@PathParam("tokenIdentifier")
Expand All @@ -359,40 +368,33 @@ The amount of time (in seconds) to wait for a response. If the expected device i
given amount of time, this endpoint will return a status of HTTP/204.
""") final int timeoutSeconds,

@HeaderParam(HttpHeaders.USER_AGENT) String userAgent) throws RateLimitExceededException {

rateLimiters.getWaitForLinkedDeviceLimiter().validate(authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI));

@HeaderParam(HttpHeaders.USER_AGENT) String userAgent) {
final AtomicInteger linkedDeviceListenerCounter = getCounterForLinkedDeviceListeners(userAgent);
linkedDeviceListenerCounter.incrementAndGet();

final Timer.Sample sample = Timer.start();
try {
return accounts.waitForNewLinkedDevice(authenticatedDevice.getAccount().getUuid(),
authenticatedDevice.getAuthenticatedDevice(), tokenIdentifier, Duration.ofSeconds(timeoutSeconds))
.thenApply(maybeDeviceInfo -> maybeDeviceInfo
.map(deviceInfo -> Response.status(Response.Status.OK).entity(deviceInfo).build())
.orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build()))
.exceptionally(ExceptionUtils.exceptionallyHandler(IllegalArgumentException.class,
e -> Response.status(Response.Status.BAD_REQUEST).build()))
.whenComplete((response, throwable) -> {
linkedDeviceListenerCounter.decrementAndGet();

if (response != null) {
sample.stop(Timer.builder(WAIT_FOR_LINKED_DEVICE_TIMER_NAME)
.publishPercentileHistogram(true)
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(userAgent),
io.micrometer.core.instrument.Tag.of("deviceFound",
String.valueOf(response.getStatus() == Response.Status.OK.getStatusCode()))))
.register(Metrics.globalRegistry));
}
});
} catch (final RedisException e) {
// `waitForNewLinkedDevice` could fail synchronously if the Redis circuit breaker is open; prevent counter drift
// if that happens
linkedDeviceListenerCounter.decrementAndGet();
throw e;
}
return rateLimiters.getWaitForLinkedDeviceLimiter()
.validateAsync(authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI))
.thenCompose(ignored -> persistentTimer.start(WAIT_FOR_LINKED_DEVICE_TIMER_NAMESPACE, tokenIdentifier))
.thenCompose(sample -> accounts.waitForNewLinkedDevice(
authenticatedDevice.getAccount().getUuid(),
authenticatedDevice.getAuthenticatedDevice(),
tokenIdentifier,
Duration.ofSeconds(timeoutSeconds))
.thenApply(maybeDeviceInfo -> maybeDeviceInfo
.map(deviceInfo -> Response.status(Response.Status.OK).entity(deviceInfo).build())
.orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build()))
.exceptionally(ExceptionUtils.exceptionallyHandler(IllegalArgumentException.class,
e -> Response.status(Response.Status.BAD_REQUEST).build()))
.whenComplete((response, throwable) -> {
linkedDeviceListenerCounter.decrementAndGet();

if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
sample.stop(Timer.builder(WAIT_FOR_LINKED_DEVICE_TIMER_NAME)
.publishPercentileHistogram(true)
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(userAgent)))
.register(Metrics.globalRegistry));
}
}));
}

private AtomicInteger getCounterForLinkedDeviceListeners(final String userAgent) {
Expand Down Expand Up @@ -530,7 +532,8 @@ The amount of time (in seconds) to wait for a response. If a transfer archive fo
public CompletionStage<Void> recordTransferArchiveUploaded(@ReadOnly @Auth final AuthenticatedDevice authenticatedDevice,
@NotNull @Valid final TransferArchiveUploadedRequest transferArchiveUploadedRequest) {

return rateLimiters.getUploadTransferArchiveLimiter().validateAsync(authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI))
return rateLimiters.getUploadTransferArchiveLimiter()
.validateAsync(authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI))
.thenCompose(ignored -> accounts.recordTransferArchiveUpload(authenticatedDevice.getAccount(),
transferArchiveUploadedRequest.destinationDeviceId(),
Instant.ofEpochMilli(transferArchiveUploadedRequest.destinationDeviceCreated()),
Expand Down Expand Up @@ -565,17 +568,29 @@ public CompletionStage<Response> waitForTransferArchive(@ReadOnly @Auth final Au
description = """
The amount of time (in seconds) to wait for a response. If a transfer archive for the authenticated
device is not available within the given amount of time, this endpoint will return a status of HTTP/204.
""") final int timeoutSeconds) {
""") final int timeoutSeconds,

@HeaderParam(HttpHeaders.USER_AGENT) @Nullable String userAgent) {


final String rateLimiterKey = authenticatedDevice.getAccount().getIdentifier(IdentityType.ACI) +
":" + authenticatedDevice.getAuthenticatedDevice().getId();

return rateLimiters.getWaitForTransferArchiveLimiter().validateAsync(rateLimiterKey)
.thenCompose(ignored -> accounts.waitForTransferArchive(authenticatedDevice.getAccount(),
authenticatedDevice.getAuthenticatedDevice(),
Duration.ofSeconds(timeoutSeconds)))
.thenApply(maybeTransferArchive -> maybeTransferArchive
.map(transferArchive -> Response.status(Response.Status.OK).entity(transferArchive).build())
.orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build()));
.thenCompose(ignored -> persistentTimer.start(WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAMESPACE, rateLimiterKey))
.thenCompose(sample -> accounts.waitForTransferArchive(authenticatedDevice.getAccount(),
authenticatedDevice.getAuthenticatedDevice(),
Duration.ofSeconds(timeoutSeconds))
.thenApply(maybeTransferArchive -> maybeTransferArchive
.map(transferArchive -> Response.status(Response.Status.OK).entity(transferArchive).build())
.orElseGet(() -> Response.status(Response.Status.NO_CONTENT).build()))
.whenComplete((response, throwable) -> {
if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
sample.stop(Timer.builder(WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME)
.publishPercentileHistogram(true)
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(userAgent)))
.register(Metrics.globalRegistry));
}
}));
}
}
Loading

0 comments on commit 2f579cf

Please sign in to comment.