diff --git a/packages/convex-helpers/README.md b/packages/convex-helpers/README.md index d1640689..fdb226ee 100644 --- a/packages/convex-helpers/README.md +++ b/packages/convex-helpers/README.md @@ -853,6 +853,151 @@ export const list = query({ }); ``` +## Composable streams of query results + +These are helper functions for constructing and composing streams of query results. + +- A "query" implements the `OrderedQuery` interface from the "convex/server" package, +so it has methods `.first()`, `.collect()`, `.paginate()`, etc. +- A "stream" is an async iterable of documents, ordered by an index on a table. + +The cool thing about a stream is you can merge two streams together +to create a new stream, and you can filter documents out of a stream with a +TypeScript predicate. + +For example, if you have a stream of "messages created by user1" and a stream +of "messages created by user2", you can merge them together to get a stream of +"messages created by user1 or user2". And you can filter the merged stream to +get a stream of "messages created by user1 or user2 that are unread". Then you +can convert the stream into a query and paginate it. + +Concrete functions you can use: + +- `stream` constructs a stream using the same syntax as `DatabaseReader`. + - e.g. `stream(ctx.db, schema).query("messages").withIndex("by_author", (q) => q.eq("author", "user1"))` +- `mergeStreams` combines multiple streams into a new stream, ordered by the same index. +- `filterStream` filters out documents from a stream based on a TypeScript predicate. +- `queryStream` converts a stream into a query, so you can call `.first()`, `.collect()`, `.paginate()`, etc. + +Beware if using `.paginate()` with streams in reactive queries, as it has the +same problems as [`paginator` and `getPage`](#manual-pagination): you need to +pass in `endCursor` to prevent holes or overlaps between the pages. + +**Motivating examples:** + +1. For a fixed set of authors, paginate all messages by those authors. + +```ts +import { stream, mergeStreams } from "convex-helpers/server/stream"; +import schema from "./schema"; + +export const listForAuthors = query({ + args: { + authors: v.array(v.id("users")), + paginationOpts: paginationOptsValidator, + }, + handler: async (ctx, { authors, paginationOpts }) => { + const authorStreams = authors.map((author) => + stream(ctx.db, schema) + .query("messages") + .withIndex("by_author", (q) => q.eq("author", author)), + ); + // Create a new stream of all messages authored by users in `args.authors`, + // ordered by the "by_author" index. + const allAuthorsStream = mergeStreams(authorStreams); + return await queryStream(allAuthorsStream).paginate(paginationOpts); + }, +}); +``` + +2. Paginate all messages whose authors match a complex predicate. + +There are actually two ways to do this. One uses "post-filter" pagination, +where the filter is applied after fetching a fixed number of documents. To do that, you can +use the `filter` helper described [above](#filter). The advantage is that the +queries read bounded data, but the disadvantage is that the returned page might +be small or empty. + +The other does "pre-filter" pagination, where the filter is applied before +picking the page size. Doing this with a filter that excludes most documents may +result in slow queries or errors because it's reading too much data, but if the +predicate often returns true, it's perfectly fine. To avoid edge cases where you +accidentally read too much data, you can pass `maximumRowsRead` in pagination +options to limit the number of rows read. Let's see how to do +pre-filtering with streams. + +```ts +import { + stream, + mergeStreams, + filterStream, + queryStream, +} from "convex-helpers/server/stream"; +import schema from "./schema"; + +export const list = query({ + args: { paginationOpts: paginationOptsValidator }, + handler: async (ctx, { paginationOpts }) => { + const allMessagesStream = stream(ctx.db, schema) + .query("messages") + .order("desc"); + const messagesByVerifiedAuthors = filterStream( + allMessagesStream, + async (message) => { + const author = await ctx.db.get(message.author); + return author !== null && author.verified; + }, + ); + // The pagination happens after the filtering, so the page should have size + // `paginationOpts.numItems`. + return await queryStream(messagesByVerifiedAuthors).paginate( + { ...paginationOpts, maximumRowsRead: 100 }, + ); + }, +}); +``` + +Again, remember to use `endCursor` in reactive queries to keep pages contiguous +(see [`paginator`](#paginator-manual-pagination-with-familiar-syntax)). + +3. Ignore a boolean field in an index. + +Suppose you have an index on `["author", "unread"]` and you want to get the +most recent 10 messages for an author, ignoring whether a messages is unread. + +Normally this would require a separate index on `["author"]`, or doing two +requests and manually picking the 10 most recent. But with streams, it's cleaner: + +```ts +import { + stream, + mergeStreams, + filterStream, + queryStream, +} from "convex-helpers/server/stream"; +import schema from "./schema"; + +export const latestMessages = query({ + args: { author: v.id("users") }, + handler: async (ctx, { author }) => { + // These are two streams of messages, ordered by _creationTime descending. + // The first has read messages, the second has unread messages. + const messagesForUnreadStatus = [false, true].map((unread) => + stream(ctx.db, schema) + .query("messages") + .withIndex("by_author", (q) => + q.eq("author", author).eq("unread", unread), + ) + .order("desc"), + ); + // Merge the two streams into a single stream of all messages authored by + // `args.author`, ordered by _creationTime descending. + const allMessagesByCreationTime = mergeStreams(...messagesForUnreadStatus); + return await queryStream(allMessagesByCreationTime).take(10); + }, +}); +``` + ## Query Caching Utilize a query cache implementation which persists subscriptions to the diff --git a/packages/convex-helpers/package.json b/packages/convex-helpers/package.json index ea38fd2f..b7aaa967 100644 --- a/packages/convex-helpers/package.json +++ b/packages/convex-helpers/package.json @@ -91,6 +91,10 @@ "types": "./server/sessions.d.ts", "default": "./server/sessions.js" }, + "./server/stream": { + "types": "./server/stream.d.ts", + "default": "./server/stream.js" + }, "./server/triggers": { "types": "./server/triggers.d.ts", "default": "./server/triggers.js" diff --git a/packages/convex-helpers/server/compare.ts b/packages/convex-helpers/server/compare.ts new file mode 100644 index 00000000..d2719da5 --- /dev/null +++ b/packages/convex-helpers/server/compare.ts @@ -0,0 +1,86 @@ +import { Value } from "convex/values"; + +// Returns -1 if k1 < k2 +// Returns 0 if k1 === k2 +// Returns 1 if k1 > k2 +export function compareValues(k1: Value | undefined, k2: Value | undefined) { + return compareAsTuples(makeComparable(k1), makeComparable(k2)); +} + +function compareAsTuples(a: [number, T], b: [number, T]): number { + if (a[0] === b[0]) { + return compareSameTypeValues(a[1], b[1]); + } else if (a[0] < b[0]) { + return -1; + } + return 1; +} + +function compareSameTypeValues(v1: T, v2: T): number { + if (v1 === undefined || v1 === null) { + return 0; + } + if ( + typeof v1 === "bigint" || + typeof v1 === "number" || + typeof v1 === "boolean" || + typeof v1 === "string" + ) { + return v1 < v2 ? -1 : v1 === v2 ? 0 : 1; + } + if (!Array.isArray(v1) || !Array.isArray(v2)) { + throw new Error(`Unexpected type ${v1 as any}`); + } + for (let i = 0; i < v1.length && i < v2.length; i++) { + const cmp = compareAsTuples(v1[i], v2[i]); + if (cmp !== 0) { + return cmp; + } + } + if (v1.length < v2.length) { + return -1; + } + if (v1.length > v2.length) { + return 1; + } + return 0; +} + +// Returns an array which can be compared to other arrays as if they were tuples. +// For example, [1, null] < [2, 1n] means null sorts before all bigints +// And [3, 5] < [3, 6] means floats sort as expected +// And [7, [[5, "a"]]] < [7, [[5, "a"], [5, "b"]]] means arrays sort as expected +function makeComparable(v: Value | undefined): [number, any] { + if (v === undefined) { + return [0, undefined]; + } + if (v === null) { + return [1, null]; + } + if (typeof v === "bigint") { + return [2, v]; + } + if (typeof v === "number") { + if (isNaN(v)) { + // Consider all NaNs to be equal. + return [3.5, 0]; + } + return [3, v]; + } + if (typeof v === "boolean") { + return [4, v]; + } + if (typeof v === "string") { + return [5, v]; + } + if (v instanceof ArrayBuffer) { + return [6, Array.from(new Uint8Array(v)).map(makeComparable)]; + } + if (Array.isArray(v)) { + return [7, v.map(makeComparable)]; + } + // Otherwise, it's an POJO. + const keys = Object.keys(v).sort(); + const pojo: Value[] = keys.map((k) => [k, v[k]!]); + return [8, pojo.map(makeComparable)]; +} diff --git a/packages/convex-helpers/server/pagination.test.ts b/packages/convex-helpers/server/pagination.test.ts index 3129aaa9..61a65301 100644 --- a/packages/convex-helpers/server/pagination.test.ts +++ b/packages/convex-helpers/server/pagination.test.ts @@ -260,7 +260,7 @@ describe("paginator", () => { { a: 1, b: 2, c: 5 }, ]); expect(result1.isDone).toBe(true); - expect(result1.continueCursor).toBe("endcursor"); + expect(result1.continueCursor).toBe("[]"); }); }); diff --git a/packages/convex-helpers/server/pagination.ts b/packages/convex-helpers/server/pagination.ts index 782de543..d361356a 100644 --- a/packages/convex-helpers/server/pagination.ts +++ b/packages/convex-helpers/server/pagination.ts @@ -1,23 +1,20 @@ -import { Value, convexToJson, jsonToConvex } from "convex/values"; +import { Value } from "convex/values"; import { DataModelFromSchemaDefinition, - DocumentByInfo, DocumentByName, GenericDataModel, GenericDatabaseReader, IndexNames, - IndexRange, - IndexRangeBuilder, - NamedIndex, NamedTableInfo, - OrderedQuery, - PaginationOptions, - PaginationResult, - Query, - QueryInitializer, SchemaDefinition, TableNamesInDataModel, } from "convex/server"; +import { + getIndexFields, + ReflectDatabaseReader, + stream, + streamIndexRange, +} from "./stream.js"; export type IndexKey = Value[]; @@ -132,214 +129,41 @@ export async function* streamQuery< request: Omit, "targetMaxRows" | "absoluteMaxRows">, ): AsyncGenerator<[DocumentByName, IndexKey]> { const index = request.index ?? "by_creation_time"; - const indexFields = getIndexFields(request); + const indexFields = getIndexFields( + request.table, + request.index as any, + request.schema, + ); const startIndexKey = request.startIndexKey ?? []; const endIndexKey = request.endIndexKey ?? []; const startInclusive = request.startInclusive ?? false; const order = request.order === "desc" ? "desc" : "asc"; - const startBoundType = - order === "desc" ? ltOr(startInclusive) : gtOr(startInclusive); const endInclusive = request.endInclusive ?? true; - const endBoundType = - order === "desc" ? gtOr(endInclusive) : ltOr(endInclusive); if ( indexFields.length < startIndexKey.length || indexFields.length < endIndexKey.length ) { throw new Error("Index key length exceeds index fields length"); } - const split = splitRange( - indexFields, - startIndexKey, - endIndexKey, - startBoundType, - endBoundType, - ); - for (const range of split) { - const query = ctx.db - .query(request.table) - .withIndex(index, rangeToQuery(range)) - .order(order); - for await (const doc of query) { - yield [doc, getIndexKey(doc, indexFields)]; - } - } -} - -// -// Helper functions -// - -const DEFAULT_TARGET_MAX_ROWS = 100; - -function equalValues(a: Value, b: Value): boolean { - return JSON.stringify(convexToJson(a)) === JSON.stringify(convexToJson(b)); -} - -function exclType(boundType: "gt" | "lt" | "gte" | "lte") { - if (boundType === "gt" || boundType === "gte") { - return "gt"; - } - return "lt"; -} - -const ltOr = (equal: boolean) => (equal ? "lte" : "lt"); -const gtOr = (equal: boolean) => (equal ? "gte" : "gt"); - -type Bound = ["gt" | "lt" | "gte" | "lte" | "eq", string, Value]; - -/** Split a range query between two index keys into a series of range queries - * that should be executed in sequence. This is necessary because Convex only - * supports range queries of the form - * q.eq("f1", v).eq("f2", v).lt("f3", v).gt("f3", v). - * i.e. all fields must be equal except for the last field, which can have - * two inequalities. - * - * For example, the range from >[1, 2, 3] to <=[1, 3, 2] would be split into - * the following queries: - * 1. q.eq("f1", 1).eq("f2", 2).gt("f3", 3) - * 2. q.eq("f1", 1).gt("f2", 2).lt("f2", 3) - * 3. q.eq("f1", 1).eq("f2", 3).lte("f3", 2) - */ -function splitRange( - indexFields: string[], - startBound: IndexKey, - endBound: IndexKey, - startBoundType: "gt" | "lt" | "gte" | "lte", - endBoundType: "gt" | "lt" | "gte" | "lte", -): Bound[][] { - // Three parts to the split: - // 1. reduce down from startBound to common prefix - // 2. range with common prefix - // 3. build back up from common prefix to endBound - const commonPrefix: Bound[] = []; - while ( - startBound.length > 0 && - endBound.length > 0 && - equalValues(startBound[0]!, endBound[0]!) - ) { - const indexField = indexFields[0]!; - indexFields = indexFields.slice(1); - const eqBound = startBound[0]!; - startBound = startBound.slice(1); - endBound = endBound.slice(1); - commonPrefix.push(["eq", indexField, eqBound]); - } - const makeCompare = ( - boundType: "gt" | "lt" | "gte" | "lte", - key: IndexKey, - ) => { - const range = commonPrefix.slice(); - let i = 0; - for (; i < key.length - 1; i++) { - range.push(["eq", indexFields[i]!, key[i]!]); - } - if (i < key.length) { - range.push([boundType, indexFields[i]!, key[i]!]); - } - return range; - }; - // Stage 1. - const startRanges: Bound[][] = []; - while (startBound.length > 1) { - startRanges.push(makeCompare(startBoundType, startBound)); - startBoundType = exclType(startBoundType); - startBound = startBound.slice(0, -1); - } - // Stage 3. - const endRanges: Bound[][] = []; - while (endBound.length > 1) { - endRanges.push(makeCompare(endBoundType, endBound)); - endBoundType = exclType(endBoundType); - endBound = endBound.slice(0, -1); - } - endRanges.reverse(); - // Stage 2. - let middleRange; - if (endBound.length === 0) { - middleRange = makeCompare(startBoundType, startBound); - } else if (startBound.length === 0) { - middleRange = makeCompare(endBoundType, endBound); - } else { - const startValue = startBound[0]!; - const endValue = endBound[0]!; - middleRange = commonPrefix.slice(); - middleRange.push([startBoundType, indexFields[0]!, startValue]); - middleRange.push([endBoundType, indexFields[0]!, endValue]); - } - return [...startRanges, middleRange, ...endRanges]; -} - -function rangeToQuery(range: Bound[]) { - return (q: any) => { - for (const [boundType, field, value] of range) { - q = q[boundType](field, value); - } - return q; + const bounds = { + lowerBound: order === "asc" ? startIndexKey : endIndexKey, + lowerBoundInclusive: order === "asc" ? startInclusive : endInclusive, + upperBound: order === "asc" ? endIndexKey : startIndexKey, + upperBoundInclusive: order === "asc" ? endInclusive : startInclusive, }; -} - -function getIndexFields< - DataModel extends GenericDataModel, - T extends TableNamesInDataModel, ->( - request: Pick< - PageRequest, - "indexFields" | "schema" | "table" | "index" - >, -): string[] { - const indexDescriptor = String(request.index ?? "by_creation_time"); - if (indexDescriptor === "by_creation_time") { - return ["_creationTime", "_id"]; - } - if (indexDescriptor === "by_id") { - return ["_id"]; - } - if (request.indexFields) { - const fields = request.indexFields.slice(); - if (!request.indexFields.includes("_creationTime")) { - fields.push("_creationTime"); - } - if (!request.indexFields.includes("_id")) { - fields.push("_id"); - } - return fields; - } - if (!request.schema) { - throw new Error("schema is required to infer index fields"); - } - const table = request.schema.tables[request.table]; - const index = table.indexes.find( - (index: any) => index.indexDescriptor === indexDescriptor, - ); - if (!index) { - throw new Error( - `Index ${indexDescriptor} not found in table ${request.table}`, - ); - } - const fields = index.fields.slice(); - fields.push("_creationTime"); - fields.push("_id"); - return fields; -} - -function getIndexKey< - DataModel extends GenericDataModel, - T extends TableNamesInDataModel, ->(doc: DocumentByName, indexFields: string[]): IndexKey { - const key: IndexKey = []; - for (const field of indexFields) { - let obj: any = doc; - for (const subfield of field.split(".")) { - obj = obj[subfield]; - } - key.push(obj); + const stream = streamIndexRange( + ctx.db as any, + request.schema as any, + request.table, + index as any, + bounds, + order, + ).iterWithKeys(); + for await (const [doc, indexKey] of stream) { + yield [doc, indexKey]; } - return key; } -const END_CURSOR = "endcursor"; - /** * Simpified version of `getPage` that you can use for one-off queries that * don't need to be reactive. @@ -385,365 +209,12 @@ const END_CURSOR = "endcursor"; export function paginator>( db: GenericDatabaseReader>, schema: Schema, -): PaginatorDatabaseReader> { - return new PaginatorDatabaseReader(db, schema); -} - -export class PaginatorDatabaseReader - implements GenericDatabaseReader -{ - // TODO: support system tables - public system: any = null; - - constructor( - public db: GenericDatabaseReader, - public schema: SchemaDefinition, - ) {} - - query>( - tableName: TableName, - ): PaginatorQueryInitializer { - return new PaginatorQueryInitializer(this, tableName); - } - get(_id: any): any { - throw new Error("get() not supported for `paginator`"); - } - normalizeId(_tableName: any, _id: any): any { - throw new Error("normalizeId() not supported for `paginator`."); - } +): ReflectDatabaseReader { + return stream(db, schema); } -export class PaginatorQueryInitializer< - DataModel extends GenericDataModel, - T extends TableNamesInDataModel, -> implements QueryInitializer> -{ - constructor( - public parent: PaginatorDatabaseReader, - public table: T, - ) {} - fullTableScan(): PaginatorQuery { - return this.withIndex("by_creation_time"); - } - withIndex>>( - indexName: IndexName, - indexRange?: ( - q: IndexRangeBuilder< - DocumentByInfo>, - NamedIndex, IndexName> - >, - ) => IndexRange, - ): PaginatorQuery { - const indexFields = getIndexFields({ - table: this.table, - index: indexName, - schema: this.parent.schema, - }); - const q = new PaginatorIndexRange(indexFields); - if (indexRange) { - indexRange(q as any); - } - return new PaginatorQuery(this, indexName, q); - } - withSearchIndex(_indexName: any, _searchFilter: any): any { - throw new Error("Cannot paginate withSearchIndex"); - } - order(order: "asc" | "desc"): OrderedPaginatorQuery { - return this.fullTableScan().order(order); - } - paginate( - opts: PaginationOptions & { endCursor?: string | null }, - ): Promise>>> { - return this.fullTableScan().paginate(opts); - } - filter(_predicate: any): any { - throw new Error( - ".filter() not supported for `paginator`. Filter the returned `page` instead.", - ); - } - collect(): any { - throw new Error( - ".collect() not supported for `paginator`. Use .paginate() instead.", - ); - } - first(): any { - throw new Error( - ".first() not supported for `paginator`. Use .paginate() instead.", - ); - } - unique(): any { - throw new Error( - ".unique() not supported for `paginator`. Use .paginate() instead.", - ); - } - take(_n: number): any { - throw new Error( - ".take() not supported for `paginator`. Use .paginate() instead.", - ); - } - [Symbol.asyncIterator](): any { - throw new Error( - "[Symbol.asyncIterator]() not supported for `paginator`. Use .paginate() instead.", - ); - } -} - -export class PaginatorQuery< - DataModel extends GenericDataModel, - T extends TableNamesInDataModel, -> implements Query> -{ - constructor( - public parent: PaginatorQueryInitializer, - public index: IndexNames>, - public q: PaginatorIndexRange, - ) {} - order(order: "asc" | "desc") { - return new OrderedPaginatorQuery(this, order); - } - paginate( - opts: PaginationOptions & { endCursor?: string | null }, - ): Promise>>> { - return this.order("asc").paginate(opts); - } - filter(_predicate: any): this { - throw new Error( - ".filter() not supported for `paginator`. Filter the returned `page` instead.", - ); - } - collect(): any { - throw new Error( - ".collect() not supported for `paginator`. Use .paginate() instead.", - ); - } - first(): any { - throw new Error( - ".first() not supported for `paginator`. Use .paginate() instead.", - ); - } - unique(): any { - throw new Error( - ".unique() not supported for `paginator`. Use .paginate() instead.", - ); - } - take(_n: number): any { - throw new Error( - ".take() not supported for `paginator`. Use .paginate() instead.", - ); - } - [Symbol.asyncIterator](): any { - throw new Error( - "[Symbol.asyncIterator]() not supported for `paginator`. Use .paginate() instead.", - ); - } -} - -export class OrderedPaginatorQuery< - DataModel extends GenericDataModel, - T extends TableNamesInDataModel, -> implements OrderedQuery> -{ - public startIndexKey: IndexKey | undefined; - public startInclusive: boolean; - public endIndexKey: IndexKey | undefined; - public endInclusive: boolean; - constructor( - public parent: PaginatorQuery, - public order: "asc" | "desc", - ) { - this.startIndexKey = - order === "asc" - ? parent.q.lowerBoundIndexKey - : parent.q.upperBoundIndexKey; - this.endIndexKey = - order === "asc" - ? parent.q.upperBoundIndexKey - : parent.q.lowerBoundIndexKey; - this.startInclusive = - order === "asc" - ? parent.q.lowerBoundInclusive - : parent.q.upperBoundInclusive; - this.endInclusive = - order === "asc" - ? parent.q.upperBoundInclusive - : parent.q.lowerBoundInclusive; - } - async paginate( - opts: PaginationOptions & { endCursor?: string | null }, - ): Promise>> { - if (opts.cursor === END_CURSOR) { - return { - page: [], - isDone: true, - continueCursor: END_CURSOR, - }; - } - const schema = this.parent.parent.parent.schema; - let startIndexKey = this.startIndexKey; - let startInclusive = this.startInclusive; - if (opts.cursor !== null) { - startIndexKey = jsonToConvex(JSON.parse(opts.cursor)) as IndexKey; - startInclusive = false; - } - let endIndexKey = this.endIndexKey; - let endInclusive = this.endInclusive; - let absoluteMaxRows: number | undefined = opts.numItems; - if (opts.endCursor) { - absoluteMaxRows = undefined; - if (opts.endCursor !== END_CURSOR) { - endIndexKey = jsonToConvex(JSON.parse(opts.endCursor)) as IndexKey; - endInclusive = true; - } - } - const { page, hasMore, indexKeys } = await getPage( - { db: this.parent.parent.parent.db }, - { - table: this.parent.parent.table, - startIndexKey, - startInclusive, - endIndexKey, - endInclusive, - targetMaxRows: opts.numItems, - absoluteMaxRows, - order: this.order, - index: this.parent.index, - schema, - indexFields: this.parent.q.indexFields, - }, - ); - let continueCursor = END_CURSOR; - let isDone = !hasMore; - if (opts.endCursor && opts.endCursor !== END_CURSOR) { - continueCursor = opts.endCursor; - isDone = false; - } else if (indexKeys.length > 0 && hasMore) { - continueCursor = JSON.stringify( - convexToJson(indexKeys[indexKeys.length - 1] as Value), - ); - } - return { - page, - isDone, - continueCursor, - }; - } - filter(_predicate: any): any { - throw new Error( - ".filter() not supported for `paginator`. Filter the returned `page` instead.", - ); - } - collect(): any { - throw new Error( - ".collect() not supported for `paginator`. Use .paginate() instead.", - ); - } - first(): any { - throw new Error( - ".first() not supported for `paginator`. Use .paginate() instead.", - ); - } - unique(): any { - throw new Error( - ".unique() not supported for `paginator`. Use .paginate() instead.", - ); - } - take(_n: number): any { - throw new Error( - ".take() not supported for `paginator`. Use .paginate() instead.", - ); - } - [Symbol.asyncIterator](): any { - throw new Error( - "[Symbol.asyncIterator]() not supported for `paginator`. Use .paginate() instead.", - ); - } -} +// +// Helper functions +// -class PaginatorIndexRange { - private hasSuffix = false; - public lowerBoundIndexKey: IndexKey | undefined = undefined; - public lowerBoundInclusive: boolean = true; - public upperBoundIndexKey: IndexKey | undefined = undefined; - public upperBoundInclusive: boolean = true; - constructor(public indexFields: string[]) {} - eq(field: string, value: Value) { - if (!this.canLowerBound(field) || !this.canUpperBound(field)) { - throw new Error(`Cannot use eq on field '${field}'`); - } - this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; - this.lowerBoundIndexKey.push(value); - this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; - this.upperBoundIndexKey.push(value); - return this; - } - lt(field: string, value: Value) { - if (!this.canUpperBound(field)) { - throw new Error(`Cannot use lt on field '${field}'`); - } - this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; - this.upperBoundIndexKey.push(value); - this.upperBoundInclusive = false; - this.hasSuffix = true; - return this; - } - lte(field: string, value: Value) { - if (!this.canUpperBound(field)) { - throw new Error(`Cannot use lte on field '${field}'`); - } - this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; - this.upperBoundIndexKey.push(value); - this.hasSuffix = true; - return this; - } - gt(field: string, value: Value) { - if (!this.canLowerBound(field)) { - throw new Error(`Cannot use gt on field '${field}'`); - } - this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; - this.lowerBoundIndexKey.push(value); - this.lowerBoundInclusive = false; - this.hasSuffix = true; - return this; - } - gte(field: string, value: Value) { - if (!this.canLowerBound(field)) { - throw new Error(`Cannot use gte on field '${field}'`); - } - this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; - this.lowerBoundIndexKey.push(value); - this.hasSuffix = true; - return this; - } - private canLowerBound(field: string) { - const currentLowerBoundLength = this.lowerBoundIndexKey?.length ?? 0; - const currentUpperBoundLength = this.upperBoundIndexKey?.length ?? 0; - if (currentLowerBoundLength > currentUpperBoundLength) { - // Already have a lower bound. - return false; - } - if (currentLowerBoundLength === currentUpperBoundLength && this.hasSuffix) { - // Already have a lower bound and an upper bound. - return false; - } - return ( - currentLowerBoundLength < this.indexFields.length && - this.indexFields[currentLowerBoundLength] === field - ); - } - private canUpperBound(field: string) { - const currentLowerBoundLength = this.lowerBoundIndexKey?.length ?? 0; - const currentUpperBoundLength = this.upperBoundIndexKey?.length ?? 0; - if (currentUpperBoundLength > currentLowerBoundLength) { - // Already have an upper bound. - return false; - } - if (currentLowerBoundLength === currentUpperBoundLength && this.hasSuffix) { - // Already have a lower bound and an upper bound. - return false; - } - return ( - currentUpperBoundLength < this.indexFields.length && - this.indexFields[currentUpperBoundLength] === field - ); - } -} +const DEFAULT_TARGET_MAX_ROWS = 100; diff --git a/packages/convex-helpers/server/stream.test.ts b/packages/convex-helpers/server/stream.test.ts new file mode 100644 index 00000000..d92afd56 --- /dev/null +++ b/packages/convex-helpers/server/stream.test.ts @@ -0,0 +1,259 @@ +import { defineTable, defineSchema, GenericDocument } from "convex/server"; +import { convexTest } from "convex-test"; +import { expect, test } from "vitest"; +import { + filterStream, + IndexKey, + mergeStreams, + queryStream, + reflect, + stream, +} from "./stream.js"; +import { modules } from "./setup.test.js"; +import { v } from "convex/values"; + +const schema = defineSchema({ + foo: defineTable({ + a: v.number(), + b: v.number(), + c: v.number(), + }).index("abc", ["a", "b", "c"]), +}); + +function stripSystemFields(doc: GenericDocument) { + const { _id, _creationTime, ...rest } = doc; + return rest; +} +function dropSystemFields(indexKey: IndexKey) { + return indexKey.slice(0, -2); +} +function dropAndStripSystemFields( + item: IteratorResult<[GenericDocument, IndexKey]>, +) { + return { + done: item.done, + value: item.value + ? [stripSystemFields(item.value[0]), dropSystemFields(item.value[1])] + : undefined, + }; +} + +const MANY_DOCS: { a: number; b: number; c: number }[] = []; +for (let a = 0; a < 3; a++) { + for (let b = 0; b < 3; b++) { + for (let c = 0; c < 3; c++) { + MANY_DOCS.push({ a, b, c }); + } + } +} + +describe("reflect", () => { + test("reflection", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + const query = reflect(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)) + .order("desc"); + const { table, index, bounds, indexFields, order } = query.reflect(); + expect(table).toBe("foo"); + expect(index).toBe("abc"); + expect(bounds.lowerBound).toEqual([1, 2]); + expect(bounds.lowerBoundInclusive).toBe(false); + expect(bounds.upperBound).toEqual([1]); + expect(bounds.upperBoundInclusive).toBe(true); + expect(indexFields).toEqual(["a", "b", "c", "_creationTime", "_id"]); + expect(order).toBe("desc"); + }); + }); + + test("reflection as query", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 3 }); + const query = reflect(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)) + .order("desc"); + const result = await query.collect(); + expect(result.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 3 }, + { a: 1, b: 3, c: 3 }, + ]); + }); + }); +}); + +describe("stream", () => { + test("reflection as stream", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 3 }); + const query = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)) + .order("desc"); + expect(query.reflectOrder()).toBe("desc"); + const iter = query.iterWithKeys()[Symbol.asyncIterator](); + expect(dropAndStripSystemFields(await iter.next())).toEqual({ + done: false, + value: [{ a: 1, b: 4, c: 3 }, [1, 4, 3]], + }); + expect(dropAndStripSystemFields(await iter.next())).toEqual({ + done: false, + value: [{ a: 1, b: 3, c: 3 }, [1, 3, 3]], + }); + expect(dropAndStripSystemFields(await iter.next())).toEqual({ + done: true, + }); + }); + }); + + test("query round trip", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 3 }); + const initialQuery = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)) + .order("desc"); + const query = queryStream(initialQuery); + const result = await query.collect(); + expect(result.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 3 }, + { a: 1, b: 3, c: 3 }, + ]); + }); + }); + + test("query round trip with pagination", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 4 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 5 }); + const initialQuery = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)); + const query = queryStream(initialQuery); + const resultPage1 = await query.paginate({ numItems: 2, cursor: null }); + expect(resultPage1.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 3, c: 3 }, + { a: 1, b: 4, c: 3 }, + ]); + expect(resultPage1.isDone).toBe(false); + const resultPage2 = await query.paginate({ + numItems: 2, + cursor: resultPage1.continueCursor, + }); + expect(resultPage2.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 4 }, + { a: 1, b: 4, c: 5 }, + ]); + expect(resultPage2.isDone).toBe(false); + const resultPage3 = await query.paginate({ + numItems: 2, + cursor: resultPage2.continueCursor, + }); + expect(resultPage3.page.map(stripSystemFields)).toEqual([]); + expect(resultPage3.isDone).toBe(true); + }); + }); + + test("merge streams", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); // excluded + await ctx.db.insert("foo", { a: 1, b: 4, c: 3 }); + await ctx.db.insert("foo", { a: 1, b: 5, c: 4 }); + await ctx.db.insert("foo", { a: 1, b: 6, c: 5 }); + const query1 = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 4)); + const query2 = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).lt("b", 3)); + const query3 = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).eq("b", 4).eq("c", 3)); + const fullQuery = mergeStreams(query1, query2, query3); + const result = await queryStream(fullQuery).collect(); + expect(result.map(stripSystemFields)).toEqual([ + { a: 1, b: 2, c: 3 }, + { a: 1, b: 4, c: 3 }, + { a: 1, b: 5, c: 4 }, + { a: 1, b: 6, c: 5 }, + ]); + const page1 = await queryStream(fullQuery).paginate({ + numItems: 2, + cursor: null, + }); + expect(page1.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 2, c: 3 }, + { a: 1, b: 4, c: 3 }, + ]); + expect(page1.isDone).toBe(false); + const page2 = await queryStream(fullQuery).paginate({ + numItems: 3, + cursor: page1.continueCursor, + }); + expect(page2.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 5, c: 4 }, + { a: 1, b: 6, c: 5 }, + ]); + expect(page2.isDone).toBe(true); + }); + }); + + test("filter stream", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + await ctx.db.insert("foo", { a: 1, b: 2, c: 3 }); // excluded by index + await ctx.db.insert("foo", { a: 1, b: 3, c: 3 }); // excluded by filter + await ctx.db.insert("foo", { a: 1, b: 4, c: 4 }); + await ctx.db.insert("foo", { a: 1, b: 4, c: 5 }); // excluded by filter + await ctx.db.insert("foo", { a: 1, b: 5, c: 4 }); + await ctx.db.insert("foo", { a: 1, b: 6, c: 4 }); + const query = stream(ctx.db, schema) + .query("foo") + .withIndex("abc", (q) => q.eq("a", 1).gt("b", 2)); + const filteredQuery = filterStream(query, async (doc) => doc.c === 4); + const result = await queryStream(filteredQuery).collect(); + expect(result.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 4 }, + { a: 1, b: 5, c: 4 }, + { a: 1, b: 6, c: 4 }, + ]); + const page1 = await queryStream(filteredQuery).paginate({ + numItems: 2, + cursor: null, + }); + expect(page1.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 4 }, + { a: 1, b: 5, c: 4 }, + ]); + expect(page1.isDone).toBe(false); + + const limitedPage1 = await queryStream(filteredQuery).paginate({ + numItems: 2, + cursor: null, + maximumRowsRead: 2, + }); + expect(limitedPage1.page.map(stripSystemFields)).toEqual([ + { a: 1, b: 4, c: 4 }, + ]); + expect(limitedPage1.pageStatus).toBe("SplitRequired"); + expect(dropSystemFields(JSON.parse(limitedPage1.splitCursor!))).toEqual([1, 3, 3]); + expect(dropSystemFields(JSON.parse(limitedPage1.continueCursor))).toEqual([1, 4, 4]); + }); + }); +}); diff --git a/packages/convex-helpers/server/stream.ts b/packages/convex-helpers/server/stream.ts new file mode 100644 index 00000000..b02f2e2a --- /dev/null +++ b/packages/convex-helpers/server/stream.ts @@ -0,0 +1,1117 @@ +import { Value, convexToJson, jsonToConvex } from "convex/values"; +import { + DataModelFromSchemaDefinition, + DocumentByInfo, + DocumentByName, + GenericDataModel, + GenericDatabaseReader, + IndexNames, + IndexRange, + IndexRangeBuilder, + NamedIndex, + NamedTableInfo, + OrderedQuery, + PaginationOptions, + PaginationResult, + Query, + QueryInitializer, + SchemaDefinition, + TableNamesInDataModel, +} from "convex/server"; +import { compareValues } from "./compare.js"; + +export type IndexKey = Value[]; + +// +// Helper functions +// + +function exclType(boundType: "gt" | "lt" | "gte" | "lte") { + if (boundType === "gt" || boundType === "gte") { + return "gt"; + } + return "lt"; +} + +type Bound = ["gt" | "lt" | "gte" | "lte" | "eq", string, Value]; + +/** Split a range query between two index keys into a series of range queries + * that should be executed in sequence. This is necessary because Convex only + * supports range queries of the form + * q.eq("f1", v).eq("f2", v).lt("f3", v).gt("f3", v). + * i.e. all fields must be equal except for the last field, which can have + * two inequalities. + * + * For example, the range from >[1, 2, 3] to <=[1, 3, 2] would be split into + * the following queries: + * 1. q.eq("f1", 1).eq("f2", 2).gt("f3", 3) + * 2. q.eq("f1", 1).gt("f2", 2).lt("f2", 3) + * 3. q.eq("f1", 1).eq("f2", 3).lte("f3", 2) + */ +function splitRange( + indexFields: string[], + startBound: IndexKey, + endBound: IndexKey, + startBoundType: "gt" | "lt" | "gte" | "lte", + endBoundType: "gt" | "lt" | "gte" | "lte", +): Bound[][] { + // Three parts to the split: + // 1. reduce down from startBound to common prefix + // 2. range with common prefix + // 3. build back up from common prefix to endBound + const commonPrefix: Bound[] = []; + while ( + startBound.length > 0 && + endBound.length > 0 && + compareValues(startBound[0]!, endBound[0]!) === 0 + ) { + const indexField = indexFields[0]!; + indexFields = indexFields.slice(1); + const eqBound = startBound[0]!; + startBound = startBound.slice(1); + endBound = endBound.slice(1); + commonPrefix.push(["eq", indexField, eqBound]); + } + const makeCompare = ( + boundType: "gt" | "lt" | "gte" | "lte", + key: IndexKey, + ) => { + const range = commonPrefix.slice(); + let i = 0; + for (; i < key.length - 1; i++) { + range.push(["eq", indexFields[i]!, key[i]!]); + } + if (i < key.length) { + range.push([boundType, indexFields[i]!, key[i]!]); + } + return range; + }; + // Stage 1. + const startRanges: Bound[][] = []; + while (startBound.length > 1) { + startRanges.push(makeCompare(startBoundType, startBound)); + startBoundType = exclType(startBoundType); + startBound = startBound.slice(0, -1); + } + // Stage 3. + const endRanges: Bound[][] = []; + while (endBound.length > 1) { + endRanges.push(makeCompare(endBoundType, endBound)); + endBoundType = exclType(endBoundType); + endBound = endBound.slice(0, -1); + } + endRanges.reverse(); + // Stage 2. + let middleRange; + if (endBound.length === 0) { + middleRange = makeCompare(startBoundType, startBound); + } else if (startBound.length === 0) { + middleRange = makeCompare(endBoundType, endBound); + } else { + const startValue = startBound[0]!; + const endValue = endBound[0]!; + middleRange = commonPrefix.slice(); + middleRange.push([startBoundType, indexFields[0]!, startValue]); + middleRange.push([endBoundType, indexFields[0]!, endValue]); + } + return [...startRanges, middleRange, ...endRanges]; +} + +function rangeToQuery(range: Bound[]) { + return (q: any) => { + for (const [boundType, field, value] of range) { + q = q[boundType](field, value); + } + return q; + }; +} + +export function getIndexFields< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, +>( + table: T, + index?: IndexNames, T>>, + schema?: Schema, +): string[] { + const indexDescriptor = String(index ?? "by_creation_time"); + if (indexDescriptor === "by_creation_time") { + return ["_creationTime", "_id"]; + } + if (indexDescriptor === "by_id") { + return ["_id"]; + } + if (!schema) { + throw new Error("schema is required to infer index fields"); + } + const tableInfo = schema.tables[table]; + const indexInfo = tableInfo.indexes.find( + (index: any) => index.indexDescriptor === indexDescriptor, + ); + if (!indexInfo) { + throw new Error(`Index ${indexDescriptor} not found in table ${table}`); + } + const fields = indexInfo.fields.slice(); + fields.push("_creationTime"); + fields.push("_id"); + return fields; +} + +function getIndexKey< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +>(doc: DocumentByName, indexFields: string[]): IndexKey { + const key: IndexKey = []; + for (const field of indexFields) { + let obj: any = doc; + for (const subfield of field.split(".")) { + obj = obj[subfield]; + } + key.push(obj); + } + return key; +} + +export function reflect>( + db: GenericDatabaseReader>, + schema: Schema, +): ReflectDatabaseReader { + return new ReflectDatabaseReader(db, schema); +} + +/** + * A "stream" is an async iterable of query results, ordered by an index on a table. + * + * Use it as you would use `ctx.db`. + * If using pagination in a reactive query, see the warnings on the `paginator` + * function. TL;DR: you need to pass in `endCursor` to prevent holes or overlaps + * between pages. + * + * Once you have a stream, you can use `mergeStreams` or `filterStream` to make + * more streams. Then use `queryStream` to convert it into an OrderedQuery, + * so you can call `.paginate()`, `.collect()`, etc. + */ +export function stream>( + db: GenericDatabaseReader>, + schema: Schema, +): ReflectDatabaseReader { + return reflect(db, schema); +} + +/** + * A "stream" is an async iterable of query results, ordered by an index on a table. + */ +export interface IndexStream< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +> { + iterWithKeys(): AsyncIterable<[DocumentByName | null, IndexKey]>; + reflectOrder(): "asc" | "desc"; + narrow(indexBounds: IndexBounds): IndexStream; +} + +export class ReflectDatabaseReader< + Schema extends SchemaDefinition, +> implements GenericDatabaseReader> +{ + // TODO: support system tables + public system: any = null; + + constructor( + public db: GenericDatabaseReader>, + public schema: Schema, + ) {} + + query>>( + tableName: TableName, + ): ReflectQueryInitializer { + return new ReflectQueryInitializer(this, tableName); + } + get(_id: any): any { + throw new Error("get() not supported for `paginator`"); + } + normalizeId(_tableName: any, _id: any): any { + throw new Error("normalizeId() not supported for `paginator`."); + } +} + +type DM> = + DataModelFromSchemaDefinition; + +export type IndexBounds = { + lowerBound: IndexKey; + lowerBoundInclusive: boolean; + upperBound: IndexKey; + upperBoundInclusive: boolean; +}; + +export type QueryReflection< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + IndexName extends IndexNames, T>>, +> = { + db: GenericDatabaseReader>; + schema: Schema; + table: T; + index: IndexName; + indexFields: string[]; + order: "asc" | "desc"; + bounds: IndexBounds; + indexRange?: ( + q: IndexRangeBuilder< + DocumentByInfo, T>>, + NamedIndex, T>, IndexName> + >, + ) => IndexRange; +}; + +export interface ReflectableQuery< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + IndexName extends IndexNames, T>>, +> extends IndexStream, T> { + reflect(): QueryReflection; +} + +export class ReflectQueryInitializer< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + > + implements + QueryInitializer, T>>, + ReflectableQuery +{ + constructor( + public parent: ReflectDatabaseReader, + public table: T, + ) {} + fullTableScan(): ReflectQuery { + return this.withIndex("by_creation_time"); + } + withIndex, T>>>( + indexName: IndexName, + indexRange?: ( + q: IndexRangeBuilder< + DocumentByInfo, T>>, + NamedIndex, T>, IndexName> + >, + ) => IndexRange, + ): ReflectQuery { + const indexFields = getIndexFields( + this.table, + indexName, + this.parent.schema, + ); + const q = new ReflectIndexRange(indexFields); + if (indexRange) { + indexRange(q as any); + } + return new ReflectQuery(this, indexName, q, indexRange); + } + withSearchIndex(_indexName: any, _searchFilter: any): any { + throw new Error("Cannot paginate withSearchIndex"); + } + inner() { + return this.fullTableScan(); + } + order( + order: "asc" | "desc", + ): OrderedReflectQuery { + return this.inner().order(order); + } + paginate(opts: PaginationOptions & { endCursor?: string | null }) { + return this.inner().paginate(opts); + } + filter(_predicate: any): any { + throw new Error( + ".filter() not supported for `paginator`. Filter the returned `page` instead.", + ); + } + collect() { + return this.inner().collect(); + } + first() { + return this.inner().first(); + } + unique() { + return this.inner().unique(); + } + take(n: number) { + return this.inner().take(n); + } + [Symbol.asyncIterator]() { + return this.inner()[Symbol.asyncIterator](); + } + reflect() { + return this.inner().reflect(); + } + iterWithKeys() { + return this.inner().iterWithKeys(); + } + reflectOrder(): "asc" | "desc" { + return this.inner().reflectOrder(); + } + narrow(indexBounds: IndexBounds) { + return this.inner().narrow(indexBounds); + } +} + +export class ReflectQuery< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + IndexName extends IndexNames, T>>, + > + implements + Query, T>>, + ReflectableQuery +{ + constructor( + public parent: ReflectQueryInitializer, + public index: IndexName, + public q: ReflectIndexRange, + public indexRange: + | (( + q: IndexRangeBuilder< + DocumentByInfo, T>>, + NamedIndex, T>, IndexName> + >, + ) => IndexRange) + | undefined, + ) {} + order(order: "asc" | "desc") { + return new OrderedReflectQuery(this, order); + } + inner() { + return this.order("asc"); + } + paginate(opts: PaginationOptions & { endCursor?: string | null }) { + return this.inner().paginate(opts); + } + filter(_predicate: any): this { + throw new Error( + ".filter() not supported for `paginator`. Filter the returned `page` instead.", + ); + } + collect() { + return this.inner().collect(); + } + first() { + return this.inner().first(); + } + unique() { + return this.inner().unique(); + } + take(n: number) { + return this.inner().take(n); + } + [Symbol.asyncIterator]() { + return this.inner()[Symbol.asyncIterator](); + } + reflect() { + return this.inner().reflect(); + } + iterWithKeys() { + return this.inner().iterWithKeys(); + } + reflectOrder() { + return this.inner().reflectOrder(); + } + narrow(indexBounds: IndexBounds) { + return this.inner().narrow(indexBounds); + } +} + +export class OrderedReflectQuery< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + IndexName extends IndexNames, T>>, + > + implements + OrderedQuery, T>>, + ReflectableQuery +{ + constructor( + public parent: ReflectQuery, + public order: "asc" | "desc", + ) {} + reflect() { + return { + db: this.parent.parent.parent.db, + schema: this.parent.parent.parent.schema, + table: this.parent.parent.table, + index: this.parent.index, + indexFields: this.parent.q.indexFields, + order: this.order, + bounds: { + lowerBound: this.parent.q.lowerBoundIndexKey ?? [], + lowerBoundInclusive: this.parent.q.lowerBoundInclusive, + upperBound: this.parent.q.upperBoundIndexKey ?? [], + upperBoundInclusive: this.parent.q.upperBoundInclusive, + }, + indexRange: this.parent.indexRange, + }; + } + /** + * inner() is as if you had used ctx.db to construct the query. + */ + inner(): OrderedQuery, T>> { + const { db, table, index, order, indexRange } = this.reflect(); + return db.query(table).withIndex(index, indexRange).order(order); + } + async paginate(opts: PaginationOptions & { endCursor?: string | null }) { + // Note `db.query().paginate()` has additional semantics: it reads from the + // query journal and can only be called once per query. + // Meanwhile `queryStream(stream).paginate()` doesn't have those semantics. + // It would be weird to change semantics so subtly, so we wrap the query + // in a queryStream before paginating. + return queryStream(this).paginate(opts); + } + filter(_predicate: any): any { + throw new Error( + ".filter() not supported for ReflectQuery. Use `filter` or `filterStream` instead.", + ); + } + collect() { + return this.inner().collect(); + } + first() { + return this.inner().first(); + } + unique() { + return this.inner().unique(); + } + take(n: number) { + return this.inner().take(n); + } + [Symbol.asyncIterator]() { + return this.inner()[Symbol.asyncIterator](); + } + iterWithKeys(): AsyncIterable<[DocumentByName, T>, IndexKey]> { + const { indexFields } = this.reflect(); + const iterable = this.inner(); + return { + [Symbol.asyncIterator]() { + const iterator = iterable[Symbol.asyncIterator](); + return { + async next() { + const result = await iterator.next(); + if (result.done) { + return { done: true, value: undefined }; + } + return { + done: false, + value: [result.value, getIndexKey(result.value, indexFields)], + }; + }, + }; + }, + }; + } + reflectOrder() { + return this.order; + } + narrow(indexBounds: IndexBounds): IndexStream, T> { + const { db, table, index, order, bounds, schema } = this.reflect(); + let maxLowerBound = bounds.lowerBound; + let maxLowerBoundInclusive = bounds.lowerBoundInclusive; + if ( + compareKeys( + { + value: indexBounds.lowerBound, + kind: indexBounds.lowerBoundInclusive ? "predecessor" : "successor", + }, + { + value: bounds.lowerBound, + kind: bounds.lowerBoundInclusive ? "predecessor" : "successor", + }, + ) > 0 + ) { + maxLowerBound = indexBounds.lowerBound; + maxLowerBoundInclusive = indexBounds.lowerBoundInclusive; + } + let minUpperBound = bounds.upperBound; + let minUpperBoundInclusive = bounds.upperBoundInclusive; + if ( + compareKeys( + { + value: indexBounds.upperBound, + kind: indexBounds.upperBoundInclusive ? "successor" : "predecessor", + }, + { + value: bounds.upperBound, + kind: bounds.upperBoundInclusive ? "successor" : "predecessor", + }, + ) < 0 + ) { + minUpperBound = indexBounds.upperBound; + minUpperBoundInclusive = indexBounds.upperBoundInclusive; + } + return streamIndexRange( + db, + schema, + table, + index, + { + lowerBound: maxLowerBound, + lowerBoundInclusive: maxLowerBoundInclusive, + upperBound: minUpperBound, + upperBoundInclusive: minUpperBoundInclusive, + }, + order, + ); + } +} + +export function streamIndexRange< + Schema extends SchemaDefinition, + T extends TableNamesInDataModel>, + IndexName extends IndexNames, T>>, +>( + db: GenericDatabaseReader>, + schema: Schema, + table: T, + index: IndexName, + bounds: IndexBounds, + order: "asc" | "desc", +): IndexStream, T> { + const indexFields = getIndexFields(table, index, schema); + const splitBounds = splitRange( + indexFields, + bounds.lowerBound, + bounds.upperBound, + bounds.lowerBoundInclusive ? "gte" : "gt", + bounds.upperBoundInclusive ? "lte" : "lt", + ); + const subQueries: OrderedReflectQuery[] = []; + for (const splitBound of splitBounds) { + subQueries.push( + reflect(db, schema) + .query(table) + .withIndex(index, rangeToQuery(splitBound)) + .order(order), + ); + } + return concatStreams(...subQueries); +} + +class ReflectIndexRange { + private hasSuffix = false; + public lowerBoundIndexKey: IndexKey | undefined = undefined; + public lowerBoundInclusive: boolean = true; + public upperBoundIndexKey: IndexKey | undefined = undefined; + public upperBoundInclusive: boolean = true; + constructor(public indexFields: string[]) {} + eq(field: string, value: Value) { + if (!this.canLowerBound(field) || !this.canUpperBound(field)) { + throw new Error(`Cannot use eq on field '${field}'`); + } + this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; + this.lowerBoundIndexKey.push(value); + this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; + this.upperBoundIndexKey.push(value); + return this; + } + lt(field: string, value: Value) { + if (!this.canUpperBound(field)) { + throw new Error(`Cannot use lt on field '${field}'`); + } + this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; + this.upperBoundIndexKey.push(value); + this.upperBoundInclusive = false; + this.hasSuffix = true; + return this; + } + lte(field: string, value: Value) { + if (!this.canUpperBound(field)) { + throw new Error(`Cannot use lte on field '${field}'`); + } + this.upperBoundIndexKey = this.upperBoundIndexKey ?? []; + this.upperBoundIndexKey.push(value); + this.hasSuffix = true; + return this; + } + gt(field: string, value: Value) { + if (!this.canLowerBound(field)) { + throw new Error(`Cannot use gt on field '${field}'`); + } + this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; + this.lowerBoundIndexKey.push(value); + this.lowerBoundInclusive = false; + this.hasSuffix = true; + return this; + } + gte(field: string, value: Value) { + if (!this.canLowerBound(field)) { + throw new Error(`Cannot use gte on field '${field}'`); + } + this.lowerBoundIndexKey = this.lowerBoundIndexKey ?? []; + this.lowerBoundIndexKey.push(value); + this.hasSuffix = true; + return this; + } + private canLowerBound(field: string) { + const currentLowerBoundLength = this.lowerBoundIndexKey?.length ?? 0; + const currentUpperBoundLength = this.upperBoundIndexKey?.length ?? 0; + if (currentLowerBoundLength > currentUpperBoundLength) { + // Already have a lower bound. + return false; + } + if (currentLowerBoundLength === currentUpperBoundLength && this.hasSuffix) { + // Already have a lower bound and an upper bound. + return false; + } + return ( + currentLowerBoundLength < this.indexFields.length && + this.indexFields[currentLowerBoundLength] === field + ); + } + private canUpperBound(field: string) { + const currentLowerBoundLength = this.lowerBoundIndexKey?.length ?? 0; + const currentUpperBoundLength = this.upperBoundIndexKey?.length ?? 0; + if (currentUpperBoundLength > currentLowerBoundLength) { + // Already have an upper bound. + return false; + } + if (currentLowerBoundLength === currentUpperBoundLength && this.hasSuffix) { + // Already have a lower bound and an upper bound. + return false; + } + return ( + currentUpperBoundLength < this.indexFields.length && + this.indexFields[currentUpperBoundLength] === field + ); + } +} + +/** + * Merge multiple streams, provided in any order, into a single stream. + * + * The streams will be merged into a stream of documents ordered by the index keys. + * + * e.g. ```ts + * mergeStreams( + * stream(db, schema).query("messages").withIndex("by_author", q => q.eq("author", "user3")), + * stream(db, schema).query("messages").withIndex("by_author", q => q.eq("author", "user1")), + * stream(db, schema).query("messages").withIndex("by_author", q => q.eq("author", "user2")), + * ) + * ``` + * + * returns a stream of messages for user1, then user2, then user3. + */ +export function mergeStreams< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +>(...streams: IndexStream[]): IndexStream { + if (streams.length === 0) { + throw new Error("Cannot union empty array of streams"); + } + let order = streams[0]!.reflectOrder(); + for (const stream of streams) { + if (stream.reflectOrder() !== order) { + throw new Error("Cannot union streams with different orders"); + } + } + return { + iterWithKeys: () => { + const iterables = streams.map((stream) => stream.iterWithKeys()); + return { + [Symbol.asyncIterator]() { + const iterators = iterables.map((iterable) => + iterable[Symbol.asyncIterator](), + ); + const results = Array.from( + { length: iterators.length }, + (): IteratorResult< + [DocumentByName | null, IndexKey] | undefined + > => ({ done: false, value: undefined }), + ); + return { + async next() { + // Fill results from iterators with no value yet. + await Promise.all( + iterators.map(async (iterator, i) => { + if (!results[i]!.done && !results[i]!.value) { + const result = await iterator.next(); + results[i] = result; + } + }), + ); + // Find index for the value with the lowest index key. + let minIndexKeyAndIndex: [IndexKey, number] | undefined = + undefined; + for (let i = 0; i < results.length; i++) { + const result = results[i]!; + if (result.done || !result.value) { + continue; + } + const [_, resultIndexKey] = result.value; + if (minIndexKeyAndIndex === undefined) { + minIndexKeyAndIndex = [resultIndexKey, i]; + continue; + } + const [prevMin, _prevMinIndex] = minIndexKeyAndIndex; + if ( + compareKeys( + { value: resultIndexKey, kind: "exact" }, + { value: prevMin, kind: "exact" }, + ) < 0 + ) { + minIndexKeyAndIndex = [resultIndexKey, i]; + } + } + if (minIndexKeyAndIndex === undefined) { + return { done: true, value: undefined }; + } + const [_, minIndex] = minIndexKeyAndIndex; + const result = results[minIndex]!.value; + // indicate that we've used this result + results[minIndex]!.value = undefined; + return { done: false, value: result }; + }, + }; + }, + }; + }, + reflectOrder: () => order, + narrow: (indexBounds: IndexBounds) => { + return mergeStreams( + ...streams.map((stream) => stream.narrow(indexBounds)), + ); + }, + }; +} + +/** + * Concatenate multiple streams into a single stream. + * This assumes that the streams correspond to disjoint index ranges, + * and are provided in the same order as the index ranges. + * + * e.g. ```ts + * concatStreams( + * stream(db, schema).query("messages").withIndex("by_author", q => q.eq("author", "user1")), + * stream(db, schema).query("messages").withIndex("by_author", q => q.eq("author", "user2")), + * ) + * ``` + * + * is valid, but if the stream arguments were reversed, or the queries were + * `.order("desc")`, it would be invalid. + * + * It's not recommended to use `concatStreams` directly, since it has the same + * behavior as `mergeStreams`, but with fewer runtime checks. + */ +export function concatStreams< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +>(...streams: IndexStream[]): IndexStream { + if (streams.length === 0) { + throw new Error("Cannot concat empty array of streams"); + } + let order = streams[0]!.reflectOrder(); + for (const stream of streams) { + if (stream.reflectOrder() !== order) { + throw new Error("Cannot concat streams with different orders"); + } + } + return { + iterWithKeys: () => { + const iterables = streams.map((stream) => stream.iterWithKeys()); + return { + [Symbol.asyncIterator]() { + const iterators = iterables.map((iterable) => + iterable[Symbol.asyncIterator](), + ); + return { + async next() { + while (iterators.length > 0) { + const result = await iterators[0]!.next(); + if (result.done) { + iterators.shift(); + } else { + return result; + } + } + return { done: true, value: undefined }; + }, + }; + }, + }; + }, + reflectOrder: () => order, + narrow: (indexBounds: IndexBounds) => { + return concatStreams( + ...streams.map((stream) => stream.narrow(indexBounds)), + ); + }, + }; +} + +/** + * Apply a filter to a stream. + * + * Watch out for sparse filters, as they may read unbounded amounts of data. + */ +export function filterStream< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +>( + stream: IndexStream, + predicate: ( + doc: DocumentByInfo>, + ) => Promise, +): IndexStream { + return { + iterWithKeys: () => { + const iterable = stream.iterWithKeys(); + return { + [Symbol.asyncIterator]() { + const iterator = iterable[Symbol.asyncIterator](); + return { + async next() { + const result = await iterator.next(); + if (result.done) { + return result; + } + if (result.value[0] === null || (await predicate(result.value[0]))) { + return result; + } + return { done: false, value: [null, result.value[1]] }; + }, + }; + }, + }; + }, + reflectOrder: () => stream.reflectOrder(), + narrow: (indexBounds: IndexBounds) => + filterStream(stream.narrow(indexBounds), predicate), + }; +} + +/** + * A wrapper around an IndexStream that provides a query interface. + */ +export class QueryStream< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +> implements OrderedQuery> +{ + constructor(public stream: IndexStream) {} + filter(_predicate: any): never { + throw new Error("Cannot filter query stream. use filterStream instead."); + } + async paginate( + opts: PaginationOptions & { endCursor?: string | null, maximumRowsRead?: number }, + ): Promise>> { + const order = this.stream.reflectOrder(); + let newStartKey = { + key: [] as IndexKey, + inclusive: true, + }; + if (opts.cursor !== null) { + newStartKey = { + key: jsonToConvex(JSON.parse(opts.cursor)) as IndexKey, + inclusive: false, + }; + } + let newEndKey = { + key: [] as IndexKey, + inclusive: true, + }; + const maxRowsToRead = opts.maximumRowsRead; + const softMaxRowsToRead = maxRowsToRead ? (3 * maxRowsToRead / 4) : 1000; + let maxRows: number | undefined = opts.numItems; + if (opts.endCursor) { + newEndKey = { + key: jsonToConvex(JSON.parse(opts.endCursor)) as IndexKey, + inclusive: true, + }; + // If there's an endCursor, continue until we get there even if it's more + // than numItems. + maxRows = undefined; + } + const newLowerBound = order === "asc" ? newStartKey : newEndKey; + const newUpperBound = order === "asc" ? newEndKey : newStartKey; + const narrowStream = this.stream.narrow({ + lowerBound: newLowerBound.key, + lowerBoundInclusive: newLowerBound.inclusive, + upperBound: newUpperBound.key, + upperBoundInclusive: newUpperBound.inclusive, + }); + const page: DocumentByInfo>[] = []; + const indexKeys: IndexKey[] = []; + let hasMore = opts.endCursor && opts.endCursor !== "[]"; + let continueCursor = opts.endCursor ?? "[]"; + for await (const [doc, indexKey] of narrowStream.iterWithKeys()) { + if (doc !== null) { + page.push(doc); + } + indexKeys.push(indexKey); + if ((maxRows !== undefined && page.length >= maxRows) || (maxRowsToRead !== undefined && indexKeys.length >= maxRowsToRead)) { + hasMore = true; + continueCursor = JSON.stringify(convexToJson(indexKey as Value)); + break; + } + } + let pageStatus: "SplitRecommended" | "SplitRequired" | undefined = undefined; + let splitCursor: IndexKey | undefined = undefined; + if (indexKeys.length === maxRowsToRead) { + pageStatus = "SplitRequired"; + splitCursor = indexKeys[Math.floor((indexKeys.length - 1) / 2)]; + } else if (indexKeys.length >= softMaxRowsToRead) { + pageStatus = "SplitRecommended"; + splitCursor = indexKeys[Math.floor((indexKeys.length - 1) / 2)]; + } + return { + page, + isDone: !hasMore, + continueCursor, + pageStatus, + splitCursor: splitCursor ? JSON.stringify(convexToJson(splitCursor as Value)) : undefined, + }; + } + async collect() { + return await this.take(Infinity); + } + async take(n: number) { + const results: DocumentByInfo>[] = []; + for await (const [doc, _] of this.stream.iterWithKeys()) { + if (doc === null) { + continue; + } + results.push(doc); + if (results.length === n) { + break; + } + } + return results; + } + async unique() { + const docs = await this.take(2); + if (docs.length === 2) { + throw new Error("Query is not unique"); + } + return docs[0] ?? null; + } + async first() { + const docs = await this.take(1); + return docs[0] ?? null; + } + [Symbol.asyncIterator]() { + const iterator = this.stream.iterWithKeys()[Symbol.asyncIterator](); + return { + async next() { + const result = await iterator.next(); + if (result.done) { + return { done: true as const, value: undefined }; + } + return { done: false, value: result.value[0]! }; + }, + }; + } +} + +export function queryStream< + DataModel extends GenericDataModel, + T extends TableNamesInDataModel, +>(stream: IndexStream): QueryStream { + return new QueryStream(stream); +} + +type Key = { + value: IndexKey; + kind: "successor" | "predecessor" | "exact"; +}; + +function getValueAtIndex( + v: Value[], + index: number, +): { kind: "found"; value: Value } | undefined { + if (index >= v.length) { + return undefined; + } + return { kind: "found", value: v[index]! }; +} + +function compareDanglingSuffix( + shorterKeyKind: "exact" | "successor" | "predecessor", + longerKeyKind: "exact" | "successor" | "predecessor", + shorterKey: Key, + longerKey: Key, +): number { + if (shorterKeyKind === "exact" && longerKeyKind === "exact") { + throw new Error( + `Exact keys are not the same length: ${JSON.stringify( + shorterKey.value, + )}, ${JSON.stringify(longerKey.value)}`, + ); + } + if (shorterKeyKind === "exact") { + throw new Error( + `Exact key is shorter than prefix: ${JSON.stringify( + shorterKey.value, + )}, ${JSON.stringify(longerKey.value)}`, + ); + } + if (shorterKeyKind === "predecessor" && longerKeyKind === "successor") { + // successor is longer than predecessor, so it is bigger + return -1; + } + if (shorterKeyKind === "successor" && longerKeyKind === "predecessor") { + // successor is shorter than predecessor, so it is larger + return 1; + } + if (shorterKeyKind === "predecessor" && longerKeyKind === "predecessor") { + // predecessor of [2, 3] contains [2, 1] while predecessor of [2] doesn't, so longer predecessors are larger + return -1; + } + if (shorterKeyKind === "successor" && longerKeyKind === "successor") { + // successor of [2, 3] contains [2, 4] while successor of [2] doesn't, so longer successors are smaller + return 1; + } + if (shorterKeyKind === "predecessor" && longerKeyKind === "exact") { + return -1; + } + if (shorterKeyKind === "successor" && longerKeyKind === "exact") { + return 1; + } + throw new Error(`Unexpected key kinds: ${shorterKeyKind}, ${longerKeyKind}`); +} + +function compareKeys(key1: Key, key2: Key): number { + let i = 0; + while (i < Math.max(key1.value.length, key2.value.length)) { + const v1 = getValueAtIndex(key1.value as any, i); + const v2 = getValueAtIndex(key2.value as any, i); + if (v1 === undefined) { + return compareDanglingSuffix(key1.kind, key2.kind, key1, key2); + } + if (v2 === undefined) { + return -1 * compareDanglingSuffix(key2.kind, key1.kind, key2, key1); + } + const result = compareValues(v1.value, v2.value); + if (result !== 0) { + return result; + } + // if the prefixes are the same so far, keep going with the comparison + i++; + } + + if (key1.kind === key2.kind) { + return 0; + } + + // keys are the same length and values + if (key1.kind === "exact") { + if (key2.kind === "successor") { + return -1; + } else { + return 1; + } + } + if (key1.kind === "predecessor") { + return -1; + } + if (key1.kind === "successor") { + return 1; + } + throw new Error(`Unexpected key kind: ${key1.kind as any}`); +}