From f8fbf9f98e1069f2b587155b9bfbbaddce9f37da Mon Sep 17 00:00:00 2001 From: taylorswift Date: Fri, 28 Jun 2024 20:32:17 +0000 Subject: [PATCH] refine the Mongo.Session synchronization API, and enable synchronizing across concurrency domains --- .../CausalConsistency/CausalConsistency.swift | 4 +- Sources/MongoDBTests/Cursors/Cursors.swift | 7 ++- .../MongoDriver/Sessions/Mongo.Session.swift | 51 ++++++++++++++----- .../MongoDriverTests/TestSessionPool.swift | 2 +- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/Sources/MongoDBTests/CausalConsistency/CausalConsistency.swift b/Sources/MongoDBTests/CausalConsistency/CausalConsistency.swift index 7c2bf32d..ce84304a 100644 --- a/Sources/MongoDBTests/CausalConsistency/CausalConsistency.swift +++ b/Sources/MongoDBTests/CausalConsistency/CausalConsistency.swift @@ -41,7 +41,7 @@ struct CausalConsistency:MongoTestBattery tests.expect(response ==? .init(inserted: 1)) } - let other:Mongo.Session = try await .init(from: pool, forking: session) + let other:Mongo.Session = try await session.fork() // We should be able to observe a precondition time after performing the // initialization. @@ -267,7 +267,7 @@ struct CausalConsistency:MongoTestBattery // current session, however. await (tests ! "timeout-forked").do(catching: AnyTimeoutError.self) { - let forked:Mongo.Session = try await .init(from: pool, forking: session) + let forked:Mongo.Session = try await session.fork() // A forked session should initially share the same precondition // time as the session it was forked from. diff --git a/Sources/MongoDBTests/Cursors/Cursors.swift b/Sources/MongoDBTests/Cursors/Cursors.swift index bf0ac40d..b6785b7d 100644 --- a/Sources/MongoDBTests/Cursors/Cursors.swift +++ b/Sources/MongoDBTests/Cursors/Cursors.swift @@ -48,7 +48,7 @@ struct Cursors:MongoTestBattery where Configuration:CursorTestCon { // We should be using a session that is causally-consistent with the // insertion operation at the beginning of this test. - let session:Mongo.Session = try await .init(from: pool, forking: initializer) + let session:Mongo.Session = try await initializer.fork() // We should be reusing session identifiers. tests.expect(await pool.count ==? 2) // We should be able to query the collection for results in batches of @@ -161,8 +161,7 @@ struct Cursors:MongoTestBattery where Configuration:CursorTestCon { // We should be using a session that is causally-consistent with the // insertion operation at the beginning of this test. - let session:Mongo.Session = try await .init(from: pool, - forking: initializer) + let session:Mongo.Session = try await initializer.fork() let cursor:Mongo.CursorIdentifier? = try await session.run( command: Mongo.Find>>.init(collection, @@ -224,7 +223,7 @@ struct Cursors:MongoTestBattery where Configuration:CursorTestCon await tests.do { - let session:Mongo.Session = try await .init(from: pool, forking: initializer) + let session:Mongo.Session = try await initializer.fork() try await session.run( command: Mongo.Find>>.init(collection, stride: 10), diff --git a/Sources/MongoDriver/Sessions/Mongo.Session.swift b/Sources/MongoDriver/Sessions/Mongo.Session.swift index 68daf7a6..2d7256bd 100644 --- a/Sources/MongoDriver/Sessions/Mongo.Session.swift +++ b/Sources/MongoDriver/Sessions/Mongo.Session.swift @@ -71,10 +71,11 @@ extension Mongo private init(allocation:SessionPool.Allocation, pool:SessionPool, - fork:__shared Mongo.Session?) + preconditionTime:BSON.Timestamp? = nil, + notarizedTime:ClusterTime? = nil) { - self.preconditionTime = fork?.preconditionTime - self.notarizedTime = fork?.notarizedTime + self.preconditionTime = preconditionTime + self.notarizedTime = notarizedTime self.transaction = allocation.transaction self.touched = allocation.touched @@ -99,24 +100,37 @@ extension Mongo.Session /// Creates a session from a session pool. Do not escape the session /// from the scope that yielded the pool, because doing so will prevent /// the pool from draining on scope exit. - /// - /// If `original` is non-nil, operations on the newly-created session will - /// reflect writes performed using the original session at the time of - /// session creation, but the two sessions will be otherwise unrelated. - /// - /// If `original` is non-nil, calling this initializer is roughly - /// equivalent to creating an unforked session and immediately calling - /// ``synchronize(with:)``. public convenience - init(from pool:Mongo.SessionPool, forking original:__shared Mongo.Session? = nil, + init(from pool:Mongo.SessionPool, by deadline:ContinuousClock.Instant? = nil) async throws { self.init(allocation: await pool.create( capabilities: try await pool.deployment.capabilities( by: deadline ?? pool.deployment.timeout.deadline())), - pool: pool, - fork: original) + pool: pool) } + + /// Forks this session, returning a new session. Operations on the newly-created session + /// will reflect writes performed using the original session at the time of session + /// creation, but the two sessions will be otherwise unrelated. + /// + /// Calling this initializer is roughly equivalent to creating an unforked session and + /// immediately calling ``synchronize(with:)``, although the unforked session will not + /// gossip the cluster time from the original session. + /// + /// Do not escape the session from the scope that yielded the pool the original session was + /// created in, because doing so will prevent the pool from draining on scope exit. + public + func fork(by deadline:ContinuousClock.Instant? = nil) async throws -> Self + { + .init(allocation: await self.pool.create( + capabilities: try await self.pool.deployment.capabilities( + by: deadline ?? self.pool.deployment.timeout.deadline())), + pool: self.pool, + preconditionTime: self.preconditionTime, + notarizedTime: self.notarizedTime) + } + /// Fast-forwards this session’s precondition time to the other session’s /// precondition time, if it is non-nil and greater than this /// session’s precondition time. The other session’s precondition time @@ -126,6 +140,15 @@ extension Mongo.Session { other.preconditionTime?.combine(into: &self.preconditionTime) } + + /// Similar to ``synchronize(with:)``, but accepts a ``BSON.Timestamp`` instead of a full + /// session instance. This is useful for synchronizing sessions across concurrency domains, + /// as sessions themselves are non-``Sendable``. + public + func synchronize(to preconditionTime:BSON.Timestamp) + { + preconditionTime.combine(into: &self.preconditionTime) + } } @available(*, unavailable, message: "sessions have reference semantics") extension Mongo.Session:Sendable diff --git a/Sources/MongoDriverTests/TestSessionPool.swift b/Sources/MongoDriverTests/TestSessionPool.swift index e5395781..4e4592d2 100644 --- a/Sources/MongoDriverTests/TestSessionPool.swift +++ b/Sources/MongoDriverTests/TestSessionPool.swift @@ -120,7 +120,7 @@ func TestSessionPool(_ tests:TestGroup, bootstrap:Mongo.DriverBootstrap, try await a.refresh() - let b:Mongo.Session = try await .init(from: $0, forking: a) + let b:Mongo.Session = try await a.fork() tests.expect(await $0.count ==? 2)