-
-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tests(realtime): increase code coverage
- Loading branch information
Showing
22 changed files
with
5,659 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
import ConcurrencyExtras | ||
import Foundation | ||
import Helpers | ||
|
||
final class CallbackManager: Sendable { | ||
struct MutableState { | ||
var id = 0 | ||
var serverChanges: [PostgresJoinConfig] = [] | ||
var callbacks: [RealtimeCallback] = [] | ||
} | ||
|
||
private let mutableState = LockIsolated(MutableState()) | ||
|
||
var serverChanges: [PostgresJoinConfig] { | ||
mutableState.serverChanges | ||
} | ||
|
||
var callbacks: [RealtimeCallback] { | ||
mutableState.callbacks | ||
} | ||
|
||
deinit { | ||
reset() | ||
} | ||
|
||
@discardableResult | ||
func addBroadcastCallback( | ||
event: String, | ||
callback: @escaping @Sendable (JSONObject) -> Void | ||
) -> Int { | ||
mutableState.withValue { | ||
$0.id += 1 | ||
$0.callbacks.append( | ||
.broadcast( | ||
BroadcastCallback( | ||
id: $0.id, | ||
event: event, | ||
callback: callback | ||
) | ||
) | ||
) | ||
return $0.id | ||
} | ||
} | ||
|
||
@discardableResult | ||
func addPostgresCallback( | ||
filter: PostgresJoinConfig, | ||
callback: @escaping @Sendable (AnyAction) -> Void | ||
) -> Int { | ||
mutableState.withValue { | ||
$0.id += 1 | ||
$0.callbacks.append( | ||
.postgres( | ||
PostgresCallback( | ||
id: $0.id, | ||
filter: filter, | ||
callback: callback | ||
) | ||
) | ||
) | ||
return $0.id | ||
} | ||
} | ||
|
||
@discardableResult | ||
func addPresenceCallback(callback: @escaping @Sendable (any PresenceAction) -> Void) -> Int { | ||
mutableState.withValue { | ||
$0.id += 1 | ||
$0.callbacks.append(.presence(PresenceCallback(id: $0.id, callback: callback))) | ||
return $0.id | ||
} | ||
} | ||
|
||
@discardableResult | ||
func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int { | ||
mutableState.withValue { | ||
$0.id += 1 | ||
$0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback))) | ||
return $0.id | ||
} | ||
} | ||
|
||
func setServerChanges(changes: [PostgresJoinConfig]) { | ||
mutableState.withValue { | ||
$0.serverChanges = changes | ||
} | ||
} | ||
|
||
func removeCallback(id: Int) { | ||
mutableState.withValue { | ||
$0.callbacks.removeAll { $0.id == id } | ||
} | ||
} | ||
|
||
func triggerPostgresChanges(ids: [Int], data: AnyAction) { | ||
// Read mutableState at start to acquire lock once. | ||
let mutableState = mutableState.value | ||
|
||
let filters = mutableState.serverChanges.filter { | ||
ids.contains($0.id) | ||
} | ||
let postgresCallbacks = mutableState.callbacks.compactMap { | ||
if case let .postgres(callback) = $0 { | ||
return callback | ||
} | ||
return nil | ||
} | ||
|
||
let callbacks = postgresCallbacks.filter { cc in | ||
filters.contains { sc in | ||
cc.filter == sc | ||
} | ||
} | ||
|
||
for item in callbacks { | ||
item.callback(data) | ||
} | ||
} | ||
|
||
func triggerBroadcast(event: String, json: JSONObject) { | ||
let broadcastCallbacks = mutableState.callbacks.compactMap { | ||
if case let .broadcast(callback) = $0 { | ||
return callback | ||
} | ||
return nil | ||
} | ||
let callbacks = broadcastCallbacks.filter { $0.event == event } | ||
callbacks.forEach { $0.callback(json) } | ||
} | ||
|
||
func triggerPresenceDiffs( | ||
joins: [String: PresenceV2], | ||
leaves: [String: PresenceV2], | ||
rawMessage: RealtimeMessageV2 | ||
) { | ||
let presenceCallbacks = mutableState.callbacks.compactMap { | ||
if case let .presence(callback) = $0 { | ||
return callback | ||
} | ||
return nil | ||
} | ||
for presenceCallback in presenceCallbacks { | ||
presenceCallback.callback( | ||
PresenceActionImpl( | ||
joins: joins, | ||
leaves: leaves, | ||
rawMessage: rawMessage | ||
) | ||
) | ||
} | ||
} | ||
|
||
func triggerSystem(message: RealtimeMessageV2) { | ||
let systemCallbacks = mutableState.callbacks.compactMap { | ||
if case .system(let callback) = $0 { | ||
return callback | ||
} | ||
return nil | ||
} | ||
|
||
for systemCallback in systemCallbacks { | ||
systemCallback.callback(message) | ||
} | ||
} | ||
|
||
func reset() { | ||
mutableState.setValue(MutableState()) | ||
} | ||
} | ||
|
||
struct PostgresCallback { | ||
var id: Int | ||
var filter: PostgresJoinConfig | ||
var callback: @Sendable (AnyAction) -> Void | ||
} | ||
|
||
struct BroadcastCallback { | ||
var id: Int | ||
var event: String | ||
var callback: @Sendable (JSONObject) -> Void | ||
} | ||
|
||
struct PresenceCallback { | ||
var id: Int | ||
var callback: @Sendable (any PresenceAction) -> Void | ||
} | ||
|
||
struct SystemCallback { | ||
var id: Int | ||
var callback: @Sendable (RealtimeMessageV2) -> Void | ||
} | ||
|
||
enum RealtimeCallback { | ||
case postgres(PostgresCallback) | ||
case broadcast(BroadcastCallback) | ||
case presence(PresenceCallback) | ||
case system(SystemCallback) | ||
|
||
var id: Int { | ||
switch self { | ||
case let .postgres(callback): callback.id | ||
case let .broadcast(callback): callback.id | ||
case let .presence(callback): callback.id | ||
case let .system(callback): callback.id | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// Copyright (c) 2021 David Stump <david@davidstump.net> | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
import Foundation | ||
|
||
/// A collection of default values and behaviors used across the Client | ||
public enum Defaults { | ||
/// Default timeout when sending messages | ||
public static let timeoutInterval: TimeInterval = 10.0 | ||
|
||
/// Default interval to send heartbeats on | ||
public static let heartbeatInterval: TimeInterval = 30.0 | ||
|
||
/// Default maximum amount of time which the system may delay heartbeat events in order to | ||
/// minimize power usage | ||
public static let heartbeatLeeway: DispatchTimeInterval = .milliseconds(10) | ||
|
||
/// Default reconnect algorithm for the socket | ||
public static let reconnectSteppedBackOff: (Int) -> TimeInterval = { tries in | ||
tries > 9 ? 5.0 : [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.5, 1.0, 2.0][tries - 1] | ||
} | ||
|
||
/** Default rejoin algorithm for individual channels */ | ||
public static let rejoinSteppedBackOff: (Int) -> TimeInterval = { tries in | ||
tries > 3 ? 10 : [1, 2, 5][tries - 1] | ||
} | ||
|
||
public static let vsn = "2.0.0" | ||
|
||
/// Default encode function, utilizing JSONSerialization.data | ||
public static let encode: (Any) -> Data = { json in | ||
try! JSONSerialization | ||
.data( | ||
withJSONObject: json, | ||
options: JSONSerialization.WritingOptions() | ||
) | ||
} | ||
|
||
/// Default decode function, utilizing JSONSerialization.jsonObject | ||
public static let decode: (Data) -> Any? = { data in | ||
guard | ||
let json = | ||
try? JSONSerialization | ||
.jsonObject( | ||
with: data, | ||
options: JSONSerialization.ReadingOptions() | ||
) | ||
else { return nil } | ||
return json | ||
} | ||
|
||
public static let heartbeatQueue: DispatchQueue = .init( | ||
label: "com.phoenix.socket.heartbeat" | ||
) | ||
} | ||
|
||
/// Represents the multiple states that a Channel can be in | ||
/// throughout it's lifecycle. | ||
public enum ChannelState: String { | ||
case closed | ||
case errored | ||
case joined | ||
case joining | ||
case leaving | ||
} | ||
|
||
/// Represents the different events that can be sent through | ||
/// a channel regarding a Channel's lifecycle. | ||
public enum ChannelEvent { | ||
public static let join = "phx_join" | ||
public static let leave = "phx_leave" | ||
public static let close = "phx_close" | ||
public static let error = "phx_error" | ||
public static let reply = "phx_reply" | ||
public static let system = "system" | ||
public static let broadcast = "broadcast" | ||
public static let accessToken = "access_token" | ||
public static let presence = "presence" | ||
public static let presenceDiff = "presence_diff" | ||
public static let presenceState = "presence_state" | ||
public static let postgresChanges = "postgres_changes" | ||
|
||
public static let heartbeat = "heartbeat" | ||
|
||
static func isLifecyleEvent(_ event: String) -> Bool { | ||
switch event { | ||
case join, leave, reply, error, close: true | ||
default: false | ||
} | ||
} | ||
} |
Oops, something went wrong.