Skip to content

Commit

Permalink
Link spans to Embrace session by id (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
NachoEmbrace authored Feb 24, 2025
1 parent e1d6bc6 commit 3a12679
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 20 deletions.
7 changes: 6 additions & 1 deletion Sources/EmbraceCore/Embrace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ To start the SDK you first need to configure it using an `Embrace.Options` insta
logController?.sdkStateProvider = self

// setup otel
EmbraceOTel.setup(spanProcessors: .processors(for: storage, export: options.export, sdkStateProvider: self))
EmbraceOTel.setup(spanProcessors: .processors(
for: storage,
sessionController: sessionController,
export: options.export,
sdkStateProvider: self
))
let logSharedState = DefaultEmbraceLogSharedState.create(
storage: self.storage,
controller: self.logController,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import OpenTelemetrySdk
extension Collection where Element == SpanProcessor {
static func processors(
for storage: EmbraceStorage,
sessionController: SessionControllable,
export: OpenTelemetryExport?,
sdkStateProvider: EmbraceSDKStateProvider
) -> [SpanProcessor] {
var processors: [SpanProcessor] = [
SingleSpanProcessor(
spanExporter: StorageSpanExporter(
options: .init(storage: storage),
options: .init(storage: storage, sessionController: sessionController),
logger: Embrace.logger
),
sdkStateProvider: sdkStateProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ extension StorageSpanExporter {
class Options {

let storage: EmbraceStorage
let sessionController: SessionControllable
let validators: [SpanDataValidator]

init(storage: EmbraceStorage, validators: [SpanDataValidator] = .default) {
init(
storage: EmbraceStorage,
sessionController: SessionControllable,
validators: [SpanDataValidator] = .default
) {
self.storage = storage
self.sessionController = sessionController
self.validators = validators
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import OpenTelemetrySdk
class StorageSpanExporter: SpanExporter {

private(set) weak var storage: EmbraceStorage?
private(set) weak var sessionController: SessionControllable?
private weak var logger: InternalLogger?

let validation: SpanDataValidation

init(options: Options, logger: InternalLogger) {
self.storage = options.storage
self.sessionController = options.sessionController
self.validation = SpanDataValidation(validators: options.validators)
self.logger = logger
}
Expand Down Expand Up @@ -72,6 +74,8 @@ extension StorageSpanExporter {
type: spanData.embType,
data: data,
startTime: spanData.startTime,
endTime: endTime )
endTime: endTime,
sessionIdentifier: sessionController?.currentSession?.id
)
}
}
2 changes: 1 addition & 1 deletion Sources/EmbraceCore/Public/Embrace+OTel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import OpenTelemetrySdk
extension Embrace: EmbraceOpenTelemetry {
private var exporter: SpanExporter {
StorageSpanExporter(
options: .init(storage: storage),
options: .init(storage: storage, sessionController: sessionController),
logger: Embrace.logger
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//
// Copyright © 2024 Embrace Mobile, Inc. All rights reserved.
//

import GRDB

struct AddSessionIdentifierToSpanRecordMigration: Migration {
static var identifier = "AddSessionIdentifierToSpanRecord" // DEV: Must not change

private static var tempSpansTableName = "spans_temp_2"

func perform(_ db: GRDB.Database) throws {

// create copy of `spans` table in `spans_temp`
// include new column 'process_identifier'
try db.create(table: Self.tempSpansTableName) { t in

t.column(SpanRecord.Schema.id.name, .text).notNull()
t.column(SpanRecord.Schema.traceId.name, .text).notNull()
t.primaryKey([SpanRecord.Schema.traceId.name, SpanRecord.Schema.id.name])

t.column(SpanRecord.Schema.name.name, .text).notNull()
t.column(SpanRecord.Schema.type.name, .text).notNull()
t.column(SpanRecord.Schema.startTime.name, .datetime).notNull()
t.column(SpanRecord.Schema.endTime.name, .datetime)
t.column(SpanRecord.Schema.data.name, .blob).notNull()

t.column(SpanRecord.Schema.processIdentifier.name, .text).notNull()

// include new column into `spans_temp_2` table
t.column(SpanRecord.Schema.sessionIdentifier.name, .text)
}

// copy all existing data into temp table
// include default value for `process_identifier`
try db.execute(literal: """
INSERT INTO 'spans_temp_2' (
'id',
'trace_id',
'name',
'type',
'start_time',
'end_time',
'data',
'process_identifier'
) SELECT
id,
trace_id,
name,
type,
start_time,
end_time,
data,
process_identifier
FROM 'spans'
""")

// drop original table
try db.drop(table: SpanRecord.databaseTableName)

// rename temp table to be original table
try db.rename(table: Self.tempSpansTableName, to: SpanRecord.databaseTableName)

// Create Trigger on new spans table to prevent endTime from being modified on SpanRecord
try db.execute(sql:
"""
CREATE TRIGGER IF NOT EXISTS prevent_closed_span_modification
BEFORE UPDATE ON \(SpanRecord.databaseTableName)
WHEN OLD.end_time IS NOT NULL
BEGIN
SELECT RAISE(ABORT,'Attempted to modify an already closed span.');
END;
""" )
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ extension Array where Element == Migration {
AddSessionRecordMigration(),
AddMetadataRecordMigration(),
AddLogRecordMigration(),
AddProcessIdentifierToSpanRecordMigration()
AddProcessIdentifierToSpanRecordMigration(),
AddSessionIdentifierToSpanRecordMigration()
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import Foundation
import GRDB
import EmbraceCommonInternal

extension SpanRecord {
/// Build QueryInterfaceRequest for SpanRecord that will query for:
Expand All @@ -27,6 +28,7 @@ extension SpanRecord {

} else {
return SpanRecord.filter(
matchingSessionId(session.id) ||
overlappingStart(startTime: session.startTime) ||
entirelyWithin(startTime: session.startTime, endTime: sessionEndTime) ||
overlappingEnd(endTime: sessionEndTime) ||
Expand All @@ -35,6 +37,11 @@ extension SpanRecord {
}
}

/// Check sessionId mathcing
private static func matchingSessionId(_ sessionId: SessionIdentifier) -> SQLExpression {
SpanRecord.Schema.sessionIdentifier != nil && SpanRecord.Schema.sessionIdentifier == sessionId
}

/// Where `Span.startTime` occurs before session start and `Span.endTime` occurs after session start or has not ended
private static func overlappingStart(startTime: Date) -> SQLExpression {
SpanRecord.Schema.startTime <= startTime &&
Expand Down
6 changes: 5 additions & 1 deletion Sources/EmbraceStorageInternal/Records/SpanRecord.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public struct SpanRecord: Codable {
public var startTime: Date
public var endTime: Date?
public var processIdentifier: ProcessIdentifier
public var sessionIdentifier: SessionIdentifier?

public init(
id: String,
Expand All @@ -25,7 +26,8 @@ public struct SpanRecord: Codable {
data: Data,
startTime: Date,
endTime: Date? = nil,
processIdentifier: ProcessIdentifier = .current
processIdentifier: ProcessIdentifier = .current,
sessionIdentifier: SessionIdentifier? = nil
) {
self.id = id
self.traceId = traceId
Expand All @@ -35,6 +37,7 @@ public struct SpanRecord: Codable {
self.endTime = endTime
self.name = name
self.processIdentifier = processIdentifier
self.sessionIdentifier = sessionIdentifier
}
}

Expand All @@ -48,6 +51,7 @@ extension SpanRecord {
static var endTime: Column { Column("end_time") }
static var name: Column { Column("name") }
static var processIdentifier: Column { Column("process_identifier") }
static var sessionIdentifier: Column { Column("session_identifier") }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ final class SpanStorageIntegrationTests: IntegrationTestCase {

override func setUpWithError() throws {
storage = try EmbraceStorage.createInMemoryDb()
let exporter = StorageSpanExporter(options: .init(storage: storage), logger: MockLogger())
let sessionController = MockSessionController()
let exporter = StorageSpanExporter(options: .init(storage: storage, sessionController: sessionController), logger: MockLogger())

EmbraceOTel.setup(spanProcessors: [SingleSpanProcessor(spanExporter: exporter, sdkStateProvider: sdkStateProvider)])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ final class EmbraceSpanProcessor_StorageTests: XCTestCase {

func test_spanProcessor_withStorage_usesStorageExporter() throws {
let storage = try EmbraceStorage.createInMemoryDb()
let sessionController = MockSessionController()

defer {
try? storage.teardown()
}
let processor = SingleSpanProcessor(
spanExporter: StorageSpanExporter(
options: .init(storage: storage),
options: .init(storage: storage, sessionController: sessionController),
logger: MockLogger()
),
sdkStateProvider: sdkStateProvider
Expand Down
22 changes: 16 additions & 6 deletions Tests/EmbraceCoreTests/Internal/StorageSpanExporterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ final class StorageSpanExporterTests: XCTestCase {
func test_DB_preventsClosedSpan_fromUpdatingEndTime() throws {
// Given
let storage = try EmbraceStorage.createInMemoryDb()
let exporter = StorageSpanExporter(options: .init(storage: storage), logger: MockLogger())
let sessionController = MockSessionController()
sessionController.startSession(state: .foreground)
let exporter = StorageSpanExporter(options: .init(storage: storage, sessionController: sessionController), logger: MockLogger())

let traceId = TraceId.random()
let spanId = SpanId.random()
Expand Down Expand Up @@ -43,8 +45,8 @@ final class StorageSpanExporterTests: XCTestCase {
hasEnded: false )

// When spans are exported
exporter.export(spans: [closedSpanData])
exporter.export(spans: [updated_closedSpanData])
_ = exporter.export(spans: [closedSpanData])
_ = exporter.export(spans: [updated_closedSpanData])

let exportedSpans: [SpanRecord] = try storage.fetchAll()
XCTAssertTrue(exportedSpans.count == 1)
Expand All @@ -53,12 +55,17 @@ final class StorageSpanExporterTests: XCTestCase {
XCTAssertEqual(exportedSpan.traceId, traceId.hexString)
XCTAssertEqual(exportedSpan.id, spanId.hexString)
XCTAssertEqual(exportedSpan.endTime!.timeIntervalSince1970, endTime.timeIntervalSince1970, accuracy: 0.01)

XCTAssertNotNil(exportedSpan.sessionIdentifier)
XCTAssertEqual(exportedSpan.sessionIdentifier, sessionController.currentSession?.id)
}

func test_DB_allowsOpenSpan_toUpdateAttributes() throws {
// Given
let storage = try EmbraceStorage.createInMemoryDb()
let exporter = StorageSpanExporter(options: .init(storage: storage), logger: MockLogger())
let sessionController = MockSessionController()
sessionController.startSession(state: .foreground)
let exporter = StorageSpanExporter(options: .init(storage: storage, sessionController: sessionController), logger: MockLogger())

let traceId = TraceId.random()
let spanId = SpanId.random()
Expand Down Expand Up @@ -87,8 +94,8 @@ final class StorageSpanExporterTests: XCTestCase {
hasEnded: false )

// When spans are exported
exporter.export(spans: [openSpanData])
exporter.export(spans: [updated_openSpanData])
_ = exporter.export(spans: [openSpanData])
_ = exporter.export(spans: [updated_openSpanData])

let exportedSpans: [SpanRecord] = try storage.fetchAll()
XCTAssertTrue(exportedSpans.count == 1)
Expand All @@ -97,6 +104,9 @@ final class StorageSpanExporterTests: XCTestCase {
XCTAssertEqual(exportedSpan?.traceId, traceId.hexString)
XCTAssertEqual(exportedSpan?.id, spanId.hexString)

XCTAssertNotNil(exportedSpan!.sessionIdentifier)
XCTAssertEqual(exportedSpan!.sessionIdentifier, sessionController.currentSession?.id)

let spanData = try JSONDecoder().decode(SpanData.self, from: exportedSpan!.data)
XCTAssertEqual(spanData.attributes, ["foo": .string("baz")])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ final class EmbraceStorage_SpanForSessionRecordTests: XCTestCase {
name: String = "example",
processIdentifier: ProcessIdentifier = .current,
startTime: Date,
endTime: Date? = nil
endTime: Date? = nil,
sessionIdentifier: SessionIdentifier? = nil
) throws -> SpanRecord {
let span = SpanRecord(
id: SpanId.random().hexString,
Expand All @@ -42,7 +43,8 @@ final class EmbraceStorage_SpanForSessionRecordTests: XCTestCase {
data: Data(),
startTime: startTime,
endTime: endTime,
processIdentifier: processIdentifier
processIdentifier: processIdentifier,
sessionIdentifier: sessionIdentifier
)
try storage.upsertSpan(span)

Expand Down Expand Up @@ -394,4 +396,23 @@ final class EmbraceStorage_SpanForSessionRecordTests: XCTestCase {
let results = try storage.fetchSpans(for: session, ignoreSessionSpans: false)
XCTAssertTrue(results.contains(span))
}

func test_spansWithSessionId() throws {
// session : ---------------
// span : -a- -b-- -c----
let session = sessionRecord(
startTime: .relative(-20),
endTime: .relative(-10),
coldStart: false
)

let spanA = try addSpanRecord(name: "span-a", startTime: .relative(-28), endTime: .relative(-22), sessionIdentifier: session.id)
let spanB = try addSpanRecord(name: "span-b", startTime: .relative(-16), endTime: .relative(-12), sessionIdentifier: SessionIdentifier.random)
let spanC = try addSpanRecord(name: "span-c", startTime: .relative(-6), endTime: .relative(-2), sessionIdentifier: session.id)
let results = try storage.fetchSpans(for: session)

XCTAssertTrue(results.contains(spanA))
XCTAssertTrue(results.contains(spanB))
XCTAssertTrue(results.contains(spanC))
}
}
Loading

0 comments on commit 3a12679

Please sign in to comment.