Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Return content range data immediately #60

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@ipld/dag-pb": "^2.1.18",
"@multiformats/blake2": "^1.0.11",
"browser-readablestream-to-it": "^2.0.4",
"content-range": "^2.0.2",
"hashring": "^3.2.0",
"idb": "^7.1.1",
"ipfs-unixfs-exporter": "https://gitpkg.now.sh/filecoin-saturn/js-ipfs-unixfs/packages/ipfs-unixfs-exporter?build",
Expand Down
65 changes: 52 additions & 13 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { CID } from 'multiformats'
import pLimit from 'p-limit'

import { extractVerifiedContent } from './utils/car.js'
import { extractVerifiedContent, extractVerifiedEntity, normalizeContentRange } from './utils/car.js'
import { asAsyncIterable, asyncIteratorToBuffer } from './utils/itr.js'
import { randomUUID } from './utils/uuid.js'
import { memoryStorage } from './storage/index.js'
Expand All @@ -12,11 +12,13 @@ import { parseUrl, addHttpPrefix } from './utils/url.js'
import { isBrowserContext } from './utils/runtime.js'
import HashRing from 'hashring'
import { isErrorUnavoidable } from './utils/errors.js'
import { parse as parseRange } from "content-range"

const MAX_NODE_WEIGHT = 100
/**
* @typedef {import('./types.js').Node} Node
* @typedef {import('./types.js').FetchOptions} FetchOptions
* @typedef {import('./types.js').Response} Response
*/

export class Saturn {
Expand Down Expand Up @@ -250,7 +252,7 @@ export class Saturn {
*
* @param {string} cidPath
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
* @returns {Promise<AsyncIterable<Uint8Array>}
*/
async * fetchContentWithFallback (cidPath, opts = {}) {
const upstreamController = opts.controller
Expand All @@ -276,8 +278,8 @@ export class Saturn {
}
let byteCount = 0
const fetchOptions = Object.assign(opts, options)
const byteChunks = await this.fetchContent(cidPath, fetchOptions)
for await (const chunk of byteChunks) {
const response = await this.fetchContent(cidPath, fetchOptions)
for await (const chunk of response.body) {
// avoid sending duplicate chunks
if (byteCount < byteCountCheckpoint) {
// checks for overlapping chunks
Expand Down Expand Up @@ -366,9 +368,9 @@ export class Saturn {
*
* @param {string} cidPath
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
* @returns {Promise<Response>}
*/
async * fetchContent (cidPath, opts = {}) {
async fetchContent (cidPath, opts = {}) {
let res, controller, log
opts = Object.assign({}, this.config, opts)

Expand All @@ -386,21 +388,57 @@ export class Saturn {
yield chunk
}
}

const self = this
async function * withErrorHandling(itr) {
return async function * () {
try {
yield * itr
} catch (err) {
log.error = err.message
controller.abort()
throw err
} finally {
self._finalizeLog(log)
}
}
}

try {
const itr = metricsIterable(asAsyncIterable(res.body))
const self = this
if (!opts.format) {
yield * itr
const body = withErrorHandling(itr)
let totalSize = parseInt(res.headers.get('Content-Length'))
let range
if (res.headers.has('Content-Range')) {
const parsed = parseRange(res.headers.get('Content-Range'))
totalSize = parsed?.size || totalSize
range = {
rangeStart: parsed?.start,
rangeEnd: parsed?.end
}
}
return {
totalSize, range, body
}
} else {
yield * extractVerifiedContent(cidPath, itr, opts.range || {})
const node = await extractVerifiedEntity(cidPath, itr)
let range
if (opts.range) {
range = normalizeContentRange(node, opts.range || {})
}
return {
totalSize: Number(node.size),
range,
body: withErrorHandling(extractVerifiedContent(node, opts.range || {}))
}
}
} catch (err) {
log.error = err.message
controller.abort()

throw err
} finally {
this._finalizeLog(log)
throw err
}
}

Expand All @@ -411,7 +449,8 @@ export class Saturn {
* @returns {Promise<Uint8Array>}
*/
async fetchContentBuffer (cidPath, opts = {}) {
return await asyncIteratorToBuffer(this.fetchContent(cidPath, opts))
const response = await this.fetchContent(cidPath, opts)
return await asyncIteratorToBuffer(response.body)
}

/**
Expand Down Expand Up @@ -646,4 +685,4 @@ export class Saturn {
this.nodes = nodes
this.storage.set(Saturn.nodesListKey, nodes)
}
}
}
12 changes: 10 additions & 2 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@
* Options for a range request
*
* @typedef {object} ContentRange
* @property {number} [rangeStart]
* @property {number} [rangeEnd]
* @property {number | null } [rangeStart]
* @property {number | null } [rangeEnd]
*/

/**
* Response to fetchContent
* @typedef {object} Response
* @property {number} totalSize
* @property {ContentRange | undefined} range
* @property {AsyncIterable<Uint8Array>} body
*/

export {}
59 changes: 40 additions & 19 deletions src/utils/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,49 +85,70 @@ export async function verifyBlock (cid, bytes) {
}

/**
* Verifies and extracts the raw content from a CAR stream.
* Verifies a car stream represents a valid IPLD entity and returns it
*
* @param {string} cidPath
* @param {ReadableStream|AsyncIterable} carStream
* @param {import('../types.js').ContentRange} options
*/
export async function * extractVerifiedContent (cidPath, carStream, options = {}) {
export async function extractVerifiedEntity(cidPath, carStream) {
const getter = await CarBlockGetter.fromStream(carStream)
const node = await unixfs.exporter(cidPath, getter)
return await unixfs.exporter(cidPath, getter)
}

for await (const chunk of contentGenerator(node, options)) {
/**
* Extracts raw content of a verified IPLD entity
*
* @param {unixfs.UnixFSEntry} node
* @param {import('../types.js').ContentRange} options
*/
export async function * extractVerifiedContent(node, options = {}) {
const normalizedRange = normalizeContentRange(node, options)
const length = normalizedRange.rangeEnd + 1 - normalizedRange.rangeStart
for await (const chunk of node.content({ })) {
yield chunk
}
}

/**
*
* Returns the request content range normalized to the file size
* @param {unixfs.UnixFSEntry} node
* @param {import('../types.js').ContentRange} options
* @returns {import('../types.js').ContentRange}
*/
function contentGenerator(node, options = {}) {

export function normalizeContentRange(node, options = {}) {
let rangeStart = options.rangeStart ?? 0
if (rangeStart < 0) {
rangeStart = rangeStart + Number(node.size)
if (rangeStart < 0) {
rangeStart = 0
}

} else if (rangeStart > 0) {
if (rangeStart >= Number(node.size)) {
throw new Error("range start greater than content length")
}
}

if (options.rangeEnd === null || options.rangeEnd === undefined) {
return node.content({offset: rangeStart})
return {
rangeStart,
rangeEnd: Number(node.size) - 1
}
}

let rangeEnd = options.rangeEnd
let rangeEnd = options.rangeEnd
if (rangeEnd < 0) {
rangeEnd = rangeEnd + Number(node.size) - 1
if (rangeEnd < 0) {
rangeEnd = rangeEnd + Number(node.size)
} else {
rangeEnd = rangeEnd+1
throw new Error("range end is too small")
}
const toRead = rangeEnd - rangeStart
if (toRead < 0) {
throw new Error("range end must be greater than range start")
} else {
if (rangeEnd >= Number(node.size)) {
rangeEnd = Number(node.size) - 1
}
return node.content({offset: rangeStart, length: toRead})
}
}

if (rangeEnd - rangeStart < 0) {
throw new Error("range end must be greater than or equal to range start")
}
return { rangeStart, rangeEnd }
}
Loading
Loading