|
| 1 | +package ru.ifmo.se.dating.spring |
| 2 | + |
| 3 | +import io.r2dbc.spi.Connection |
| 4 | +import kotlinx.coroutines.flow.Flow |
| 5 | +import kotlinx.coroutines.reactive.asFlow |
| 6 | +import kotlinx.coroutines.reactive.awaitSingle |
| 7 | +import kotlinx.coroutines.reactor.awaitSingleOrNull |
| 8 | +import org.jooq.Publisher |
| 9 | +import org.jooq.SQLDialect |
| 10 | +import org.jooq.conf.Settings |
| 11 | +import org.jooq.exception.IntegrityConstraintViolationException |
| 12 | +import org.jooq.impl.DSL |
| 13 | +import org.springframework.r2dbc.core.DatabaseClient |
| 14 | +import org.springframework.stereotype.Component |
| 15 | +import reactor.core.publisher.Flux |
| 16 | +import reactor.core.publisher.Mono |
| 17 | +import reactor.kotlin.core.publisher.toFlux |
| 18 | +import ru.ifmo.se.dating.storage.jooq.DSLBlock |
| 19 | +import ru.ifmo.se.dating.storage.jooq.JooqDatabase |
| 20 | +import ru.ifmo.se.dating.storage.jooq.exception.toStorage |
| 21 | + |
| 22 | +@Component |
| 23 | +class SpringJooqDatabase(private val database: DatabaseClient) : JooqDatabase { |
| 24 | + private val settings = Settings() |
| 25 | + .withBindOffsetDateTimeType(true) |
| 26 | + .withBindOffsetTimeType(true) |
| 27 | + |
| 28 | + override fun <T : Any> flow(block: DSLBlock<T>): Flow<T> = |
| 29 | + flux(block).asFlow() |
| 30 | + |
| 31 | + override suspend fun <T : Any> only(block: DSLBlock<T>): T = |
| 32 | + mono(block).awaitSingle() |
| 33 | + |
| 34 | + override suspend fun <T : Any> maybe(block: DSLBlock<T>): T? = |
| 35 | + mono(block).awaitSingleOrNull() |
| 36 | + |
| 37 | + private fun Connection.dsl() = |
| 38 | + DSL.using(this, SQLDialect.POSTGRES, settings) |
| 39 | + |
| 40 | + private fun <T : Any> flux(block: DSLBlock<T>) = database |
| 41 | + .inConnectionMany { block(it.dsl()).toFlux() } |
| 42 | + .onErrorMap(IntegrityConstraintViolationException::class.java) { it.toStorage() } |
| 43 | + |
| 44 | + private fun <T : Any> mono(block: DSLBlock<T>) = database |
| 45 | + .inConnection { block(it.dsl()).toMono() } |
| 46 | + .onErrorMap(IntegrityConstraintViolationException::class.java) { it.toStorage() } |
| 47 | +} |
| 48 | + |
| 49 | +private fun <T> publisher(jooq: Publisher<T>) = |
| 50 | + jooq as org.reactivestreams.Publisher<T> |
| 51 | + |
| 52 | +private fun <T> Publisher<T>.toFlux(): Flux<T> = |
| 53 | + Flux.from(publisher(this)) |
| 54 | + |
| 55 | +private fun <T> Publisher<T>.toMono(): Mono<T> = |
| 56 | + Mono.from(publisher(this)) |
0 commit comments