diff --git a/protocol/activity_center.go b/protocol/activity_center.go index 6c7d32600c..0ed9fe1e07 100644 --- a/protocol/activity_center.go +++ b/protocol/activity_center.go @@ -42,6 +42,10 @@ const ( ActivityCenterNotificationTypeCommunityUnbanned ActivityCenterNotificationTypeNewInstallationReceived ActivityCenterNotificationTypeNewInstallationCreated + ActivityCenterNotificationTypeBackupSyncingFetching + ActivityCenterNotificationTypeBackupSyncingSuccess + ActivityCenterNotificationTypeBackupSyncingPartialFailure + ActivityCenterNotificationTypeBackupSyncingFailure ) type ActivityCenterMembershipStatus int diff --git a/protocol/messenger.go b/protocol/messenger.go index 2402b0c6ea..fb7616123c 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -195,6 +195,8 @@ type Messenger struct { peersyncingRequests map[string]uint64 mvdsStatusChangeEvent chan datasyncnode.PeerStatusChangeEvent + + backedUpFetchingStatus *BackupFetchingStatus } type EnvelopeEventsInterceptor struct { @@ -917,6 +919,18 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } } + if m.processBackedupMessages { + m.backedUpFetchingStatus = &BackupFetchingStatus{ + dataProgress: make(map[string]FetchingBackedUpDataTracking), + lastKnownMsgClock: 0, + fetchingCompleted: false, + } + err = m.startBackupFetchingTracking(response) + if err != nil { + return nil, err + } + } + return response, nil } diff --git a/protocol/messenger_backup_handler.go b/protocol/messenger_backup_handler.go index 009e6d83d0..b11a9ddc52 100644 --- a/protocol/messenger_backup_handler.go +++ b/protocol/messenger_backup_handler.go @@ -2,12 +2,15 @@ package protocol import ( "database/sql" + "sync" + "time" "go.uber.org/zap" "github.com/golang/protobuf/proto" utils "github.com/status-im/status-go/common" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" "github.com/status-im/status-go/multiaccounts/errors" "github.com/status-im/status-go/multiaccounts/settings" @@ -28,6 +31,20 @@ const ( SyncWakuSectionKeyWatchOnlyAccounts = "watchOnlyAccounts" ) +const backupSyncingNotificationID = "BACKUP_SYNCING" + +type FetchingBackedUpDataTracking struct { + LoadedItems map[uint32]bool + TotalNumber uint32 +} + +type BackupFetchingStatus struct { + dataProgress map[string]FetchingBackedUpDataTracking + lastKnownMsgClock uint64 + fetchingCompleted bool + fetchingCompletedMutex sync.Mutex +} + func (m *Messenger) HandleBackup(state *ReceivedMessageState, message *protobuf.Backup, statusMessage *v1protocol.StatusMessage) error { if !m.processBackedupMessages { return nil @@ -96,6 +113,11 @@ func (m *Messenger) handleBackup(state *ReceivedMessageState, message *protobuf. response.AddFetchingBackedUpDataDetails(SyncWakuSectionKeyKeypairs, message.KeypairDetails) response.AddFetchingBackedUpDataDetails(SyncWakuSectionKeyWatchOnlyAccounts, message.WatchOnlyAccountDetails) + err = m.updateBackupFetchProgress(message, &response, state) + if err != nil { + errors = append(errors, err) + } + m.config.messengerSignalsHandler.SendWakuFetchingBackupProgress(&response) } @@ -104,6 +126,123 @@ func (m *Messenger) handleBackup(state *ReceivedMessageState, message *protobuf. return errors } +func (m *Messenger) updateBackupFetchProgress(message *protobuf.Backup, response *wakusync.WakuBackedUpDataResponse, state *ReceivedMessageState) error { + m.backedUpFetchingStatus.fetchingCompletedMutex.Lock() + defer m.backedUpFetchingStatus.fetchingCompletedMutex.Unlock() + + if m.backedUpFetchingStatus.fetchingCompleted { + return nil + } + + if m.backedUpFetchingStatus.lastKnownMsgClock > message.Clock { + return nil + } + + if m.backedUpFetchingStatus.lastKnownMsgClock < message.Clock { + // Reset the progress tracker because we have access to a more recent copy of the backup + m.backedUpFetchingStatus.lastKnownMsgClock = message.Clock + m.backedUpFetchingStatus.dataProgress = make(map[string]FetchingBackedUpDataTracking) + for backupName, details := range response.FetchingBackedUpDataDetails() { + m.backedUpFetchingStatus.dataProgress[backupName] = FetchingBackedUpDataTracking{ + LoadedItems: make(map[uint32]bool), + TotalNumber: details.TotalNumber, + } + } + + if len(m.backedUpFetchingStatus.dataProgress) == 0 { + return nil + } + } + + // Evaluate the progress of the backup + + // Set the new items before evaluating + for backupName, details := range response.FetchingBackedUpDataDetails() { + m.backedUpFetchingStatus.dataProgress[backupName].LoadedItems[details.DataNumber] = true + } + + for _, tracker := range m.backedUpFetchingStatus.dataProgress { + if len(tracker.LoadedItems)-1 < int(tracker.TotalNumber) { + // have not received everything yet + return nil + } + } + + m.backedUpFetchingStatus.fetchingCompleted = true + + // Update the AC notification and add it to the response + notification, err := m.persistence.GetActivityCenterNotificationByID(types.FromHex(backupSyncingNotificationID)) + if err != nil { + return err + } + + if notification == nil { + return nil + } + + notification.UpdatedAt = m.GetCurrentTimeInMillis() + notification.Type = ActivityCenterNotificationTypeBackupSyncingSuccess + _, err = m.persistence.SaveActivityCenterNotification(notification, true) + if err != nil { + return err + } + + state.Response.AddActivityCenterNotification(notification) + return nil +} + +func (m *Messenger) startBackupFetchingTracking(response *MessengerResponse) error { + // Add an acivity center notification to show that we are fetching back up messages + notification := &ActivityCenterNotification{ + ID: types.FromHex(backupSyncingNotificationID), + Type: ActivityCenterNotificationTypeBackupSyncingFetching, + Timestamp: m.getTimesource().GetCurrentTime(), + Read: false, + Deleted: false, + UpdatedAt: m.GetCurrentTimeInMillis(), + } + err := m.addActivityCenterNotification(response, notification, nil) + + if err != nil { + return err + } + + // Add a timeout to mark the backup syncing as failed after 1 minute and 30 seconds + time.AfterFunc(1*time.Minute+30*time.Second, func() { + m.backedUpFetchingStatus.fetchingCompletedMutex.Lock() + defer m.backedUpFetchingStatus.fetchingCompletedMutex.Unlock() + + if m.backedUpFetchingStatus.fetchingCompleted { + // Nothing to do, the fetching has completed successfully + return + } + // Update the AC notification to the failure state + notification, err := m.persistence.GetActivityCenterNotificationByID(types.FromHex(backupSyncingNotificationID)) + if err != nil { + m.logger.Error("failed to get activity center notification", zap.Error(err)) + } else if notification != nil { + notification.UpdatedAt = m.GetCurrentTimeInMillis() + if m.backedUpFetchingStatus.dataProgress == nil || len(m.backedUpFetchingStatus.dataProgress) == 0 { + notification.Type = ActivityCenterNotificationTypeBackupSyncingFailure + } else { + notification.Type = ActivityCenterNotificationTypeBackupSyncingPartialFailure + } + _, err = m.persistence.SaveActivityCenterNotification(notification, true) + if err != nil { + m.logger.Error("failed to save activity center notification", zap.Error(err)) + } else { + if m.config.messengerSignalsHandler != nil { + resp := &MessengerResponse{} + resp.AddActivityCenterNotification(notification) + m.config.messengerSignalsHandler.MessengerResponse(resp) + } + } + } + }) + + return nil +} + func (m *Messenger) handleBackedUpProfile(message *protobuf.BackedUpProfile, backupTime uint64) error { if message == nil { return nil diff --git a/protocol/messenger_backup_test.go b/protocol/messenger_backup_test.go index 5139a2c525..c6879baf3a 100644 --- a/protocol/messenger_backup_test.go +++ b/protocol/messenger_backup_test.go @@ -260,6 +260,160 @@ func (s *MessengerBackupSuite) TestBackupProfileWithInvalidDisplayName() { s.Require().Equal("", storedBob1DisplayName) } +func (s *MessengerBackupSuite) TestFetchingDuringBackup() { + bob1 := s.m + bob1.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} + + state := ReceivedMessageState{ + Response: &MessengerResponse{}, + } + + backup := &protobuf.Backup{ + Clock: 1, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + + err := bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + s.Require().Len(bob1.backedUpFetchingStatus.dataProgress, 3) + s.Require().Equal(uint32(1), bob1.backedUpFetchingStatus.dataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Parse a backup with a higher clock so reset the fetching + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(2), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + SettingsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + KeypairDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + WatchOnlyAccountDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + s.Require().Len(bob1.backedUpFetchingStatus.dataProgress, 6) + s.Require().Equal(uint32(2), bob1.backedUpFetchingStatus.dataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Backup with a smaller clock is ignored + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(5), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(0), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + // The values are gonna be the same as before as the backup was ignored + s.Require().Len(bob1.backedUpFetchingStatus.dataProgress, 6) + s.Require().Equal(uint32(2), bob1.backedUpFetchingStatus.dataProgress[SyncWakuSectionKeyContacts].TotalNumber) + + // Parse the backup with almost all the correct data numbers + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(2), + }, + CommunitiesDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + ProfileDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + SettingsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + KeypairDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + WatchOnlyAccountDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(1), + TotalNumber: uint32(1), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is not done, so no signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 0) + + // Parse the remaining backup so the notification should be sent now + backup = &protobuf.Backup{ + Clock: 2, + ContactsDetails: &protobuf.FetchingBackedUpDataDetails{ + DataNumber: uint32(2), + TotalNumber: uint32(2), + }, + } + err = bob1.HandleBackup( + &state, + backup, + &v1protocol.StatusMessage{}, + ) + s.Require().NoError(err) + // The backup is done, so the signal should be sent + s.Require().Len(state.Response.ActivityCenterNotifications(), 1) + s.Require().Equal(ActivityCenterNotificationTypeBackupSyncingSuccess, state.Response.ActivityCenterNotifications()[0].Type) +} + func (s *MessengerBackupSuite) TestBackupSettings() { s.T().Skip("flaky test") const ( diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 1679baaabd..42d8854d3f 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -322,7 +322,13 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries return nil, nil } + allResponses := &MessengerResponse{} if forceFetchingBackup || !backupFetched { + err = m.startBackupFetchingTracking(allResponses) + if err != nil { + return nil, err + } + m.logger.Info("fetching backup") err := m.syncBackup() if err != nil { @@ -336,7 +342,6 @@ func (m *Messenger) RequestAllHistoricMessages(forceFetchingBackup, withRetries defer m.resetFiltersPriority(filters) filtersByMs := m.SplitFiltersByStoreNode(filters) - allResponses := &MessengerResponse{} for communityID, filtersForMs := range filtersByMs { peerID := m.getCommunityStorenode(communityID) if withRetries { diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 96d2f0d3a1..246725955c 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -63,7 +63,10 @@ type MessengerSignalsHandlerMock struct { } func (m *MessengerSignalsHandlerMock) SendWakuFetchingBackupProgress(response *wakusync.WakuBackedUpDataResponse) { - m.wakuBackedUpDataResponseChan <- response + select { + case m.wakuBackedUpDataResponseChan <- response: + default: + } } func (m *MessengerSignalsHandlerMock) SendWakuBackedUpProfile(*wakusync.WakuBackedUpDataResponse) {} func (m *MessengerSignalsHandlerMock) SendWakuBackedUpSettings(*wakusync.WakuBackedUpDataResponse) {}