Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

composable streams of query results #446

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

Conversation

ldanilek
Copy link
Contributor

@ldanilek ldanilek commented Feb 8, 2025

as described in the README, add some helpers for merging and filtering streams of query results.

This extracts the implementations of paginator and streamQuery into new helpers, that allow better composition and more patterns.

First there's reflect which allows you to construct queries with the normal syntax reflect(ctx.db, schema).query(table).withIndex(index, indexRange).order("desc") and then get their internal details, e.g. which index it's looking at. -- the naming is based on https://golangbot.com/reflection/ but it's mostly an internal library.

Then once you have a reflectable query, it can be used as a "stream", which is an async iterable of documents ordered by an index. For this reason, stream(ctx.db, schema) is another way of writing reflect(ctx.db, schema).

Once you have a stream, you can merge them with mergeStreams or filter them with filterStream, generating more streams.

See the README for more details and motivating examples.


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@ldanilek ldanilek requested a review from ianmacartney February 8, 2025 00:51
Copy link
Collaborator

@ianmacartney ianmacartney left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it, but the language around queries, query streams, index streams, and merging is a bit murky even for me after skimming the code. Can you give some bullets of how each term relates to one another in the readme?

I'm starting to wonder if all the query helpers could get bundled into a @convex-dev/query-utils library that you own. It might be a good organization, and more "official-feeling" than helpers. I've already pulled migrations and rate limiting into components, and maybe I should pull validators into validator-utils or something, then have helpers be for developing APIs before crystalizing them into separate packages... one downside of course is discoverability.


With the `stream` helper, you can construct a stream with the same syntax as
you would use `DatabaseReader`. Once you have a stream, you can compose them
to get more streams (still ordered by the same index) with `mergeStreams`, and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "get more streams" mean here? reading this first, I'm not sure as a developer (yet)

const authorStreams = authors.map(author =>
stream(ctx.db, schema).query("messages").withIndex("by_author", q => q.eq("author", author))
);
const allAuthorsStream = mergeStreams(authorStreams);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "mergeStreams" means here. each will be concatenated? I'll try to think of other names as I go, but maybe we drop some comments in here that walk through what each step does.

}
// Otherwise, it's an POJO.
const keys = Object.keys(v).sort();
const pojo: Value[] = keys.map((k) => [k, v[k]!]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const pojo: Value[] = keys.map((k) => [k, v[k]!]);
const pojo: (Value | undefined)[] = keys.map((k) => [k, v[k]]);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think they're the same thing

@@ -0,0 +1,86 @@
import { Value } from "convex/values";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this will make an entrypoint at convex-helpers/server/compare

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought to do that i had to make an export in package.json
(i don't mind making this an entrypoint, but it shouldn't be necessary for now)

}
lt(field: string, value: Value) {
if (!this.canUpperBound(field)) {
throw new Error(`Cannot use lt on field '${field}'`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's an example of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is like if you have an index on ["a", "b"] and you do q=>q.lt("b", 1), or q=>q.lt("a", 1).lt("a", 1) or q=>q.eq("a", 1).lt("a", 1). there are tests :)

}

/**
* Merge multiple streams, provided in any order, into a single stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what order the data will be returned in, and add an example like you do for concatStream

/**
* Apply a filter to a stream.
*
* Watch out for sparse filters, as they may read unbounded amounts of data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pass in a rowReadLimit or something so folks can bound - like "give me 10 non-deleted users' messages, but cap the search at 1k users. Would that end up with surprising results later? where pagination might think it's done? Maybe throw a catch-able error with the results so far? Or do you have other ideas for how to gracefully fail there? Maybe it's "usually" totally fine, but there was a burst of user deletions, and now a query is stuck in a failing state?

@ianmacartney
Copy link
Collaborator

tbh I hit some review fatigue when I got to the huge stream.ts and compare.ts logic. I can look more closely at whatever feels most important to you, or go deep into it once I have self-hosting things under control

DataModel extends GenericDataModel,
T extends TableNamesInDataModel<DataModel>,
> {
iterWithKeys(): AsyncIterable<[DocumentByName<DataModel, T> | null, IndexKey]>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: IndexKey can contain undefined, make sure we're not serializing it as a convex value

- e.g. `stream(ctx.db, schema).query("messages").withIndex("by_author", (q) => q.eq("author", "user1"))`
- `mergeStreams` combines multiple streams into a new stream, ordered by the same index.
- `filterStream` filters out documents from a stream based on a TypeScript predicate.
- `queryStream` converts a stream into a query, so you can call `.first()`, `.collect()`, `.paginate()`, etc.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

necessary?

## Composable streams of query results

These are helper functions for constructing and composing streams of query results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a motivating example here

stream(ctx.db, schema)
.query("messages")
.withIndex("by_author", (q) =>
q.eq("author", author).eq("unread", unread),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this order by author,unread still? or by _creationTime?

);
// Merge the two streams into a single stream of all messages authored by
// `args.author`, ordered by _creationTime descending.
const allMessagesByCreationTime = mergeStreams(...messagesForUnreadStatus);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate cursor for each mergeStream?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants