From 019d70793d68e47040d02e7f8a273fda0f13e237 Mon Sep 17 00:00:00 2001 From: Will Donnelly Date: Thu, 20 Feb 2025 17:16:12 -0600 Subject: [PATCH] source-postgres: Handle NULL confirmed_flush_lsn In normal operation a replication slot will always have a non-null `confirmed_flush_lsn`, but we saw the other day that it _is_ actually possible to observe a null value for that field if the replication slot is stuck in the middle of being created because it has to wait for a long-running transaction to complete. Since one major cause of replication slot recreation is when the old slot gets invalidated, and one major cause of invalidation is when a long-running transaction forces excessive WAL retention, this is actually less rare than it seems. It will happen any time a long-running transaction causes slot invalidation and the user just hits "Backfill All" without killing the transaction (assuming it didn't end on its own, of course). Since I would really like to make these `queryReplicationSlotInfo` checks fatal errors in the near future this logic needs to be bulletproof, so we need to handle that situation. --- source-postgres/database.go | 2 +- source-postgres/replication.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source-postgres/database.go b/source-postgres/database.go index c5ca4f6526..c84b1fd71a 100644 --- a/source-postgres/database.go +++ b/source-postgres/database.go @@ -18,7 +18,7 @@ type replicationSlotInfo struct { SlotType string Active bool RestartLSN *pglogrepl.LSN - ConfirmedFlushLSN pglogrepl.LSN + ConfirmedFlushLSN *pglogrepl.LSN WALStatus string } diff --git a/source-postgres/replication.go b/source-postgres/replication.go index 7dda76d688..f319939daa 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -89,7 +89,9 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s logrus.WithField("slot", slot).Warn("replication slot is already active (is another capture already running against this database?)") } else if slotInfo.WALStatus == "lost" { logrus.WithField("slot", slot).Warn("replication slot was invalidated by the server, it must be deleted and all bindings backfilled") - } else if startLSN < slotInfo.ConfirmedFlushLSN { + } else if slotInfo.ConfirmedFlushLSN == nil { + logrus.WithField("slot", slot).Warn("replication slot has no confirmed_flush_lsn and is likely still being created (but waiting on a long-running transaction)") + } else if startLSN < *slotInfo.ConfirmedFlushLSN { logrus.WithFields(logrus.Fields{ "slot": slot, "confirmedLSN": slotInfo.ConfirmedFlushLSN.String(),