Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upsertStreaming and upsertBatch (fix #115) #117

Merged
merged 3 commits into from
Jul 13, 2024
Merged

Conversation

oyvindberg
Copy link
Owner

@oyvindberg oyvindberg commented Jul 10, 2024

Let's give it a shot, looks very useful. example of generated code:

anorm

  override def upsertBatch(unsaved: Iterable[UnitmeasureRow])(implicit c: Connection): List[UnitmeasureRow] = {
    def toNamedParameter(row: UnitmeasureRow): List[NamedParameter] = List(
      NamedParameter("unitmeasurecode", ParameterValue(row.unitmeasurecode, null, UnitmeasureId.toStatement)),
      NamedParameter("name", ParameterValue(row.name, null, Name.toStatement)),
      NamedParameter("modifieddate", ParameterValue(row.modifieddate, null, TypoLocalDateTime.toStatement))
    )
    unsaved.toList match {
      case Nil => Nil
      case head :: rest =>
        new anorm.adventureworks.ExecuteReturningSyntax.Ops(
          BatchSql(
            s"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                values ({unitmeasurecode}::bpchar, {name}::varchar, {modifieddate}::timestamp)
                on conflict ("unitmeasurecode")
                do update set
                  "name" = EXCLUDED."name",
                  "modifieddate" = EXCLUDED."modifieddate"
                returning "unitmeasurecode", "name", "modifieddate"::text
             """,
            toNamedParameter(head),
            rest.map(toNamedParameter)*
          )
        ).executeReturning(UnitmeasureRow.rowParser(1).*)
    }
  }
  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: Iterator[UnitmeasureRow], batchSize: Int = 10000)(implicit c: Connection): Int = {
    SQL"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".execute(): @nowarn
    streamingInsert(s"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""", batchSize, unsaved)(UnitmeasureRow.text, c): @nowarn
    SQL"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
          select * from unitmeasure_TEMP
          on conflict ("unitmeasurecode")
          do update set
            "name" = EXCLUDED."name",
            "modifieddate" = EXCLUDED."modifieddate"
          ;
          drop table unitmeasure_TEMP;""".executeUpdate()
  }

doobie

  override def upsertBatch(unsaved: List[UnitmeasureRow]): Stream[ConnectionIO, UnitmeasureRow] = {
    Update[UnitmeasureRow](
      s"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
          values (?::bpchar,?::varchar,?::timestamp)
          on conflict ("unitmeasurecode")
          do update set
            "name" = EXCLUDED."name",
            "modifieddate" = EXCLUDED."modifieddate"
          returning "unitmeasurecode", "name", "modifieddate"::text"""
    )(using UnitmeasureRow.write)
    .updateManyWithGeneratedKeys[UnitmeasureRow]("unitmeasurecode", "name", "modifieddate")(unsaved)(using catsStdInstancesForList, UnitmeasureRow.read)
  }
  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: Stream[ConnectionIO, UnitmeasureRow], batchSize: Int = 10000): ConnectionIO[Int] = {
    for {
      _ <- sql"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".update.run
      _ <- new FragmentOps(sql"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""").copyIn(unsaved, batchSize)(using UnitmeasureRow.text)
      res <- sql"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                   select * from unitmeasure_TEMP
                   on conflict ("unitmeasurecode")
                   do update set
                     "name" = EXCLUDED."name",
                     "modifieddate" = EXCLUDED."modifieddate"
                   ;
                   drop table unitmeasure_TEMP;""".update.run
    } yield res
  }

zio-jdbc

  // Not implementable for zio-jdbc: upsertBatch

  /* NOTE: this functionality is not safe if you use auto-commit mode! it runs 3 SQL statements */
  override def upsertStreaming(unsaved: ZStream[ZConnection, Throwable, UnitmeasureRow], batchSize: Int = 10000): ZIO[ZConnection, Throwable, Long] = {
    val created = sql"create temporary table unitmeasure_TEMP (like production.unitmeasure) on commit drop".execute
    val copied = streamingInsert(s"""copy unitmeasure_TEMP("unitmeasurecode", "name", "modifieddate") from stdin""", batchSize, unsaved)(UnitmeasureRow.text)
    val merged = sql"""insert into production.unitmeasure("unitmeasurecode", "name", "modifieddate")
                       select * from unitmeasure_TEMP
                       on conflict ("unitmeasurecode")
                       do update set
                         "name" = EXCLUDED."name",
                         "modifieddate" = EXCLUDED."modifieddate"
                       ;
                       drop table unitmeasure_TEMP;""".update
    created *> copied *> merged
  }

@oyvindberg oyvindberg force-pushed the add-streaming-upsert branch from 3ba2290 to c6a023a Compare July 10, 2024 11:42
@oyvindberg oyvindberg force-pushed the add-streaming-upsert branch 4 times, most recently from 22c396a to c377b16 Compare July 13, 2024 14:48
@oyvindberg oyvindberg changed the title Add upsertStreaming (fix #115) Add upsertStreaming and upsertBatch (fix #115) Jul 13, 2024
caveats:
- anorm didn't support returning rows for batch queries, so it's monkey-patched in into the `anorm` package
- zio-jdbc cannot express batch updates at all. For this reason it was necessary to add support for not implementing a repo method for a dblib.
@oyvindberg oyvindberg force-pushed the add-streaming-upsert branch from c377b16 to d81be74 Compare July 13, 2024 15:07
@oyvindberg oyvindberg merged commit 098a1bc into main Jul 13, 2024
7 checks passed
@oyvindberg oyvindberg deleted the add-streaming-upsert branch July 13, 2024 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants