diff --git a/source-postgres/database.go b/source-postgres/database.go index c5ca4f6526..c84b1fd71a 100644 --- a/source-postgres/database.go +++ b/source-postgres/database.go @@ -18,7 +18,7 @@ type replicationSlotInfo struct { SlotType string Active bool RestartLSN *pglogrepl.LSN - ConfirmedFlushLSN pglogrepl.LSN + ConfirmedFlushLSN *pglogrepl.LSN WALStatus string } diff --git a/source-postgres/replication.go b/source-postgres/replication.go index 7dda76d688..f319939daa 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -89,7 +89,9 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s logrus.WithField("slot", slot).Warn("replication slot is already active (is another capture already running against this database?)") } else if slotInfo.WALStatus == "lost" { logrus.WithField("slot", slot).Warn("replication slot was invalidated by the server, it must be deleted and all bindings backfilled") - } else if startLSN < slotInfo.ConfirmedFlushLSN { + } else if slotInfo.ConfirmedFlushLSN == nil { + logrus.WithField("slot", slot).Warn("replication slot has no confirmed_flush_lsn and is likely still being created (but waiting on a long-running transaction)") + } else if startLSN < *slotInfo.ConfirmedFlushLSN { logrus.WithFields(logrus.Fields{ "slot": slot, "confirmedLSN": slotInfo.ConfirmedFlushLSN.String(),