Skip to content

Commit

Permalink
Fixes an issue related to de-duplication of source in PassThroughSegm…
Browse files Browse the repository at this point in the history
…enter.

Signed-off-by: Ralph Gasser <rg@pontius.ch>
  • Loading branch information
ppanopticon committed Dec 21, 2023
1 parent c5183e8 commit fd78174
Showing 1 changed file with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,38 @@ class PassThroughSegmenter : SegmenterFactory {
* @param upstream The upstream [Flow] of [Content] that is being segmented.
* @param downstream The [ProducerScope] to hand [Ingested] to the downstream pipeline.
*/
override suspend fun segment(upstream: Flow<ContentElement<*>>, downstream: ProducerScope<Retrievable>) = upstream.collect {
/* Retrievable for data source. */
if (it is SourcedContent) {
val sourceRetrievable = this.findOrCreateRetrievableForSource(it.source)

/* Prepare retrievable (with relationship). */
val retrievable = Ingested(UUID.randomUUID(), "segment", false, content = listOf(it))
retrievable.relationships.add(Relationship(retrievable, "partOf", sourceRetrievable))

/* Persist retrievable and connection. */
this.writer.add(retrievable)
this.writer.connect(retrievable.id, "partOf", sourceRetrievable.id)

/* Send retrievables downstream. */
downstream.send(sourceRetrievable)
downstream.send(retrievable)
} else {
val retrievable = Ingested(UUID.randomUUID(), "segment", false, content = listOf(it))

/* Persist retrievable. */
this.writer.add(retrievable)

/* Send retrievable downstream. */
downstream.send(retrievable)
override suspend fun segment(upstream: Flow<ContentElement<*>>, downstream: ProducerScope<Retrievable>) {
var sourceRetrievable: RetrievableWithSource? = null
upstream.collect {
/* Retrievable for data source. */
if (it is SourcedContent) {
if (it.source != sourceRetrievable?.source) {
val source = this.createRetrievableForSource(it.source)
downstream.send(source)
sourceRetrievable = source
}

/* Prepare retrievable (with relationship). */
val retrievable = Ingested(UUID.randomUUID(), "segment", false, content = listOf(it))
this.writer.add(retrievable)

/* Connect retrievable to source. */
sourceRetrievable?.let { source ->
retrievable.relationships.add(Relationship(retrievable, "partOf", source))
this.writer.connect(retrievable.id, "partOf", source.id)
}

/* Send retrievable downstream. */
downstream.send(retrievable)
} else {
val retrievable = Ingested(UUID.randomUUID(), "segment", false, content = listOf(it))

/* Persist retrievable. */
this.writer.add(retrievable)

/* Send retrievable downstream. */
downstream.send(retrievable)
}
}
}

Expand All @@ -90,7 +98,7 @@ class PassThroughSegmenter : SegmenterFactory {
*
* @param source The [Source] to create [Retrievable] for.
*/
private fun findOrCreateRetrievableForSource(source: Source): RetrievableWithSource {
private fun createRetrievableForSource(source: Source): RetrievableWithSource {
val result = object : RetrievableWithSource {
override val id: RetrievableId = source.sourceId
override val type: String = "source"
Expand All @@ -101,7 +109,6 @@ class PassThroughSegmenter : SegmenterFactory {
/* Persist retrievable. */
this.writer.add(result)


/* Return result. */
return result
}
Expand Down

0 comments on commit fd78174

Please sign in to comment.