Skip to content

Commit

Permalink
sync profile feed
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-lox committed Nov 1, 2024
1 parent 5f307e7 commit 231e05a
Show file tree
Hide file tree
Showing 5 changed files with 429 additions and 9 deletions.
118 changes: 118 additions & 0 deletions lib/domain_layer/usecases/profile_feed.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import 'dart:async';

import '../entities/nostr_note.dart';
import '../repositories/note_repository.dart';

/// used to get the feeds for a user profile
class ProfileFeed {
final NoteRepository _noteRepository;

final String userFeedFreshId = "profile-fresh";
final String userFeedTimelineFetchId = "profile-timeline";

// root streams
final StreamController<NostrNote> _rootNotesController =
StreamController<NostrNote>();
Stream<NostrNote> get rootNotesStream => _rootNotesController.stream;

final StreamController<NostrNote> _newRootNotesController =
StreamController<NostrNote>();
Stream<NostrNote> get newRootNotesStream => _newRootNotesController.stream;

// root and reply streams
final StreamController<NostrNote> _rootAndReplyNotesController =
StreamController<NostrNote>();
Stream<NostrNote> get rootAndReplyNotesStream =>
_rootAndReplyNotesController.stream;

final StreamController<NostrNote> _newRootAndReplyNotesController =
StreamController<NostrNote>();
Stream<NostrNote> get newRootAndReplyNotesStream =>
_newRootAndReplyNotesController.stream;

ProfileFeed(
this._noteRepository,
);

Future<void> subscribeToFreshNotes({
required String npub,
required int since,
}) async {
final newNotesStream = _noteRepository.subscribeTextNotesByAuthors(
authors: [npub],
requestId: userFeedFreshId,
since: since,
);

newNotesStream.listen((event) {
_newRootAndReplyNotesController.add(event);
if (event.isRoot) {
_newRootNotesController.add(event);
}
});
}

/// load later timelineevents then
void loadMore({
required int oltherThen,
required String pubkey,
}) {
fetchFeedEvents(
npub: pubkey,
requestId: "loadMore-profile-",
limit: 20,
until: oltherThen - 1, // -1 to not get dublicates
);
}

void fetchFeedEvents({
required String npub,
required String requestId,
int? since,
int? until,
int? limit,
List<String>? eTags,
}) async {
// get contacts of user
final mynotesStream = _noteRepository.getTextNotesByAuthors(
authors: [npub],
requestId: requestId,
since: since,
until: until,
limit: limit,
eTags: eTags,
);

mynotesStream.listen((event) {
_rootAndReplyNotesController.add(event);
if (event.isRoot) {
_rootNotesController.add(event);
}
});
}

/// integrate new root notes into main feed
void integrateRootNotes(List<NostrNote> events) {
for (final event in events) {
_rootNotesController.add(event);
}
}

void integrateRootAndReplyNotes(List<NostrNote> events) {
for (final event in events) {
_rootAndReplyNotesController.add(event);
}
}

/// clean up everything including closing subscriptions
Future<void> dispose() async {
final List<Future> futures = [];
futures.add(_noteRepository.closeSubscription(userFeedTimelineFetchId));
futures.add(_rootNotesController.close());
futures.add(_newRootNotesController.close());
futures.add(_rootAndReplyNotesController.close());
futures.add(_newRootAndReplyNotesController.close());

await Future.wait(futures);
}
}
12 changes: 6 additions & 6 deletions lib/presentation_layer/providers/main_feed_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ final mainFeedStateProvider =
);

class MainFeedState extends FamilyNotifier<FeedViewModel, String> {
StreamSubscription? _mainFeedSub;
StreamSubscription? _newNotesSub;
StreamSubscription? _rootNotesSub;
StreamSubscription? _newRootNotesSub;
StreamSubscription? _rootAndReplySub;
StreamSubscription? _newRootAndReplySub;

Expand All @@ -55,8 +55,8 @@ class MainFeedState extends FamilyNotifier<FeedViewModel, String> {
newRootAndReplyNotes: [],
);

_mainFeedSub?.cancel();
_newNotesSub?.cancel();
_rootNotesSub?.cancel();
_newRootNotesSub?.cancel();
_rootAndReplySub?.cancel();
_newRootAndReplySub?.cancel();
await mainFeed.dispose();
Expand Down Expand Up @@ -91,13 +91,13 @@ class MainFeedState extends FamilyNotifier<FeedViewModel, String> {
appDbP.save(key: 'main_feed_cache_cutoff', value: now.toString());

// Timeline subscription
_mainFeedSub = mainFeed.rootNotesStream
_rootNotesSub = mainFeed.rootNotesStream
.bufferTime(const Duration(milliseconds: 500))
.where((events) => events.isNotEmpty)
.listen(_addRootTimelineEvents);

// New notes subscription
_newNotesSub = mainFeed.newRootNotesStream
_newRootNotesSub = mainFeed.newRootNotesStream
.bufferTime(const Duration(seconds: 1))
.where((events) => events.isNotEmpty)
.listen(_addNewRootEvents);
Expand Down
148 changes: 148 additions & 0 deletions lib/presentation_layer/providers/profile_feed_provider.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import 'dart:async';

import 'package:riverpod/riverpod.dart';
import 'package:rxdart/rxdart.dart';

import '../../data_layer/data_sources/dart_ndk_source.dart';
import '../../data_layer/repositories/note_repository_impl.dart';
import '../../domain_layer/entities/feed_view_model.dart';
import '../../domain_layer/entities/nostr_note.dart';
import '../../domain_layer/repositories/note_repository.dart';
import '../../domain_layer/usecases/profile_feed.dart';
import 'db_app_provider.dart';
import 'event_verifier.dart';
import 'ndk_provider.dart';

final profileFeedProvider = Provider<ProfileFeed>((ref) {
final ndk = ref.watch(ndkProvider);

final eventVerifier = ref.watch(eventVerifierProvider);

final DartNdkSource dartNdkSource = DartNdkSource(ndk);

final NoteRepository noteRepository = NoteRepositoryImpl(
dartNdkSource: dartNdkSource,
eventVerifier: eventVerifier,
);

final ProfileFeed profileFeed = ProfileFeed(noteRepository);

return profileFeed;
});

final profileFeedStateProvider =
NotifierProvider.family<ProfileFeedState, FeedViewModel, String>(
ProfileFeedState.new,
);

class ProfileFeedState extends FamilyNotifier<FeedViewModel, String> {
StreamSubscription? _rootNotesSub;
StreamSubscription? _newRootNotesSub;
StreamSubscription? _rootAndReplySub;
StreamSubscription? _newRootAndReplySub;

/// closes everthing and resets the state
Future<void> resetStateDispose() async {
final profileFeed = ref.read(profileFeedProvider);
state = FeedViewModel(
timelineRootNotes: [],
newRootNotes: [],
timelineRootAndReplyNotes: [],
newRootAndReplyNotes: [],
);

_rootNotesSub?.cancel();
_newRootNotesSub?.cancel();
_rootAndReplySub?.cancel();
_newRootAndReplySub?.cancel();
await profileFeed.dispose();
}

@override
FeedViewModel build(String arg) {
_initSubscriptions(arg);
return FeedViewModel(
timelineRootNotes: [],
newRootNotes: [],
timelineRootAndReplyNotes: [],
newRootAndReplyNotes: [],
);
}

void _initSubscriptions(String pubkey) async {
final profileFeed = ref.read(profileFeedProvider);
final appDbP = ref.read(dbAppProvider);

final dbCutOffKey = 'profile_feed_cache_cutoff_$pubkey';

// [cutoff] is seperates the feed into old and new notes
// basically marking the cache point
final lastFetch = await appDbP.read(dbCutOffKey);
int cutoff = 0;
final int now = DateTime.now().millisecondsSinceEpoch ~/ 1000;
if (lastFetch != null) {
cutoff = int.parse(lastFetch);
} else {
cutoff = now;
}
// Save the current time as the new cutoff
appDbP.save(key: dbCutOffKey, value: now.toString());

// Timeline subscription
_rootNotesSub = profileFeed.rootNotesStream
.bufferTime(const Duration(milliseconds: 500))
.where((events) => events.isNotEmpty)
.listen(_addRootTimelineEvents);

// New notes subscription
_newRootNotesSub = profileFeed.newRootNotesStream
.bufferTime(const Duration(seconds: 1))
.where((events) => events.isNotEmpty)
.listen(_addNewRootEvents);

_rootAndReplySub = profileFeed.rootAndReplyNotesStream
.bufferTime(const Duration(milliseconds: 500))
.where((events) => events.isNotEmpty)
.listen(_addRootAndReplyTimelineEvents);

_newRootAndReplySub = profileFeed.newRootAndReplyNotesStream
.bufferTime(const Duration(seconds: 1))
.where((events) => events.isNotEmpty)
.listen(_addNewRootAndReplyEvents);

// Initial fetch
profileFeed.fetchFeedEvents(
npub: pubkey,
requestId: "startup-profile",
limit: 20,
until: cutoff,
);
profileFeed.subscribeToFreshNotes(npub: pubkey, since: cutoff);
}

void _addRootTimelineEvents(List<NostrNote> events) {
state = state.copyWith(
timelineRootNotes: [...state.timelineRootNotes, ...events]
..sort((a, b) => b.created_at.compareTo(a.created_at)));
}

void _addNewRootEvents(List<NostrNote> events) {
state = state.copyWith(
newRootNotes: [...state.newRootNotes, ...events]
..sort((a, b) => b.created_at.compareTo(a.created_at)));
}

void _addRootAndReplyTimelineEvents(List<NostrNote> events) {
state = state.copyWith(
timelineRootAndReplyNotes: [
...state.timelineRootAndReplyNotes,
...events
]..sort((a, b) => b.created_at.compareTo(a.created_at)));
}

void _addNewRootAndReplyEvents(List<NostrNote> events) {
state = state.copyWith(
newRootAndReplyNotes: [...state.newRootAndReplyNotes, ...events]
..sort((a, b) => b.created_at.compareTo(a.created_at)));
}
}
Loading

0 comments on commit 231e05a

Please sign in to comment.