Skip to content

Commit

Permalink
source-mongodb: select projections of change stream documents
Browse files Browse the repository at this point in the history
Rather than including all available fields in change stream documents, apply filtering based on
projections to get only the fields we need. The prevents issues such as the `updateDescription` of a
document re-stating a huge field that is also in the `fullDocument` and causing errors because the
change stream document is over the 16MB limit with the doubly enumerated field.

A slightly more thorough solution would also involve the `$changeStreamSplitLargeEvent` pipeline
filter, although that option has some limitations: It is only supported on relatively new versions
of MongoDB, and it is not completely bulletproof in that it does not traverse and split nested
fields. So we would almost certainly want to use these projections to select only the necessary
fields even if we used the `$changeStreamSplitLargeEvent` filter too.
  • Loading branch information
williamhbaker committed Feb 27, 2024
1 parent 5d413ef commit 350504d
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions source-mongodb/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,26 @@ func (c *capture) ChangeStream(ctx context.Context, client *mongo.Client, bindin
opts = opts.SetStartAtOperationTime(startAt)
}

// Use a pipeline to project the fields we need from the change stream. Importantly, this
// suppresses fields like `updateDescription`, which contain a list of fields in the document
// that were updated. If a large field is updated, it will appear both in the `fullDocument` and
// `updateDescription`, which can cause the change stream total BSON document size to exceed the
// 16MB limit.
pl := mongo.Pipeline{
{{Key: "$project", Value: bson.M{
"documentKey": 1,
"operationType": 1,
"fullDocument": 1,
"ns": 1,
}}},
}

// First attempt to do an instance-wide change stream, this is possible if we
// have readAnyDatabase access on the instance. If this fails for permission
// issues, we then attempt to do a database-level change stream. In the case
// of the database-level change stream we expect only a single database to be
// provided and do not support multiple databases in this mode.
cursor, err := client.Watch(ctx, mongo.Pipeline{}, opts)
cursor, err := client.Watch(ctx, pl, opts)
if err != nil {
// This error means events from the starting point are no longer available,
// which means we have hit a gap in between last run of the connector and
Expand Down Expand Up @@ -481,7 +495,7 @@ func (c *capture) ChangeStream(ctx context.Context, client *mongo.Client, bindin
}

// this event is from a collection that we have no binding for, skip it, but
// still emit a checkpoint with the resule token updated so that we can stay
// still emit a checkpoint with the resume token updated so that we can stay
// on top of oplog events across the whole instance / database
if !ok {
if err = c.Output.Checkpoint(checkpointJson, true); err != nil {
Expand Down

0 comments on commit 350504d

Please sign in to comment.