Skip to content

Commit

Permalink
event feed streams
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-lox committed Nov 4, 2024
1 parent 249bb35 commit c655857
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions lib/domain_layer/usecases/event_feed.dart
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
import 'dart:async';
import 'dart:developer';

import 'package:rxdart/rxdart.dart';

import '../entities/nostr_note.dart';
import '../entities/tree_node.dart';
import '../repositories/note_repository.dart';
import 'follow.dart';

///
/// idea is to combine multiple streams here into the feed stream
/// the feed stream gets then sorted on the ui in an intervall to prevent huge layout shifts
///
/// there could be one update stream and one for scrolling
///
///
class EventFeed {
final NoteRepository _noteRepository;
final Follow _follow;

final String rootNoteFetchId = "event-root";
final String repliesFetchId = "event-replies";
final String repliesFetchId = "replies";

// root streams
final StreamController<NostrNote> _rootNoteController =
StreamController<NostrNote>();
Stream<NostrNote> get rootNoteStream => _rootNoteController.stream;
/// the complete build tree of replies => new updates = new state
final StreamController<List<TreeNode<NostrNote>>> _repliesTreeController =
StreamController<List<TreeNode<NostrNote>>>();
Stream<List<TreeNode<NostrNote>>> get repliesTreeStream =>
_repliesTreeController.stream;

/// raw replies
final StreamController<NostrNote> _replyNotesController =
StreamController<NostrNote>();
Stream<NostrNote> get replyNotesStream => _replyNotesController.stream;

/// root note
final StreamController<NostrNote> _rootNoteController =
StreamController<NostrNote>();
Stream<NostrNote> get rootNoteStream => _rootNoteController.stream;

EventFeed(this._noteRepository, this._follow);

Future<void> subscribeToReplyNotes({
Expand All @@ -41,8 +41,20 @@ class EventFeed {
rootNoteId: rootNoteId,
);

replyNotes.listen((event) {
_replyNotesController.add(event);
replyNotes
.bufferTime(const Duration(milliseconds: 500))
.where((events) => events.isNotEmpty)
.listen((events) {
for (final event in events) {
_replyNotesController.add(event);
}

final tree = buildRepliesTree(
rootNoteId: rootNoteId,
replies: events,
);

_repliesTreeController.add(tree);
});
}

Expand All @@ -61,7 +73,6 @@ class EventFeed {
/// build a tree from the replies \
/// [returns] a list of first level replies \
/// the cildren are replies of replies
static List<TreeNode<NostrNote>> buildRepliesTree({
required String rootNoteId,
required List<NostrNote> replies,
Expand Down Expand Up @@ -114,6 +125,7 @@ class EventFeed {
futures.add(_noteRepository.closeSubscription(repliesFetchId));
futures.add(_rootNoteController.close());
futures.add(_replyNotesController.close());
futures.add(_repliesTreeController.close());

await Future.wait(futures);
}
Expand Down

0 comments on commit c655857

Please sign in to comment.