diff --git a/src/main/scala/algoliasearch/extension/internal/Iterable.scala b/src/main/scala/algoliasearch/extension/internal/Iterable.scala new file mode 100644 index 00000000..b4c35460 --- /dev/null +++ b/src/main/scala/algoliasearch/extension/internal/Iterable.scala @@ -0,0 +1,46 @@ +package algoliasearch.extension.internal + +import scala.concurrent.{ExecutionContext, Future, blocking} +import scala.concurrent.duration.Duration + +private[algoliasearch] object Iterable { + case class Error[T]( + validate: T => Boolean, + message: Option[T => String] = None + ) + + def createIterable[T]( + execute: Option[T] => Future[T], + validate: T => Boolean, + aggregator: Option[T => Unit] = None, + timeout: () => Duration = () => Duration.Zero, + error: Option[Iterable.Error[T]] = None + )(implicit ec: ExecutionContext): Future[T] = { + def executor(previousResponse: Option[T] = None): Future[T] = { + execute(previousResponse).flatMap { response => + // Call aggregator if defined + aggregator.foreach(agg => agg(response)) + + // Validate the response + if (validate(response)) { + Future.successful(response) + } else { + // Check for error validation + error match { + case Some(err) if err.validate(response) => + err.message match { + case Some(errMsg) => Future.failed(new Exception(errMsg(response))) + case None => Future.failed(new Exception("An error occurred")) + } + case _ => + // Sleep for timeout duration, then retry + blocking(Thread.sleep(timeout().toMillis)) + executor(Some(response)) + } + } + } + } + + executor() + } +} diff --git a/src/main/scala/algoliasearch/extension/package.scala b/src/main/scala/algoliasearch/extension/package.scala index b9166062..f8c6e04a 100644 --- a/src/main/scala/algoliasearch/extension/package.scala +++ b/src/main/scala/algoliasearch/extension/package.scala @@ -3,6 +3,7 @@ package algoliasearch import algoliasearch.api.SearchClient import algoliasearch.config.RequestOptions import algoliasearch.exception.AlgoliaApiException +import algoliasearch.extension.internal.Iterable.createIterable import algoliasearch.extension.internal.RetryUntil.{DEFAULT_DELAY, retryUntil} import algoliasearch.search._ @@ -354,9 +355,6 @@ package object extension { batchSize: Int = 1000, requestOptions: Option[RequestOptions] = None )(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = { - val requests = objects.map { record => - BatchRequest(action = Action.AddObject, body = record) - } val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}" for { @@ -405,6 +403,12 @@ package object extension { ) } + /** Check if an index exists. + * @param indexName + * The index name to check. + * @return + * A future containing a boolean indicating if the index exists. + */ def indexExists(indexName: String)(implicit ec: ExecutionContext): Future[Boolean] = { try { client.getSettings(indexName) @@ -415,5 +419,128 @@ package object extension { Future.successful(true) } + + /** Browse objects in an index. + * @param indexName + * The index name to browse. + * @param browseParams + * The browse parameters. + * @param validate + * The validation function. Default is to check if the cursor is defined. + * @param aggregator + * The aggregation function. This is where you can aggregate the results. + * @param requestOptions + * Additional request configuration. + * @return + * A future containing the last browse response. + */ + def browseObjects( + indexName: String, + browseParams: BrowseParamsObject, + validate: BrowseResponse => Boolean = response => response.cursor.isEmpty, + aggregator: BrowseResponse => Unit, + requestOptions: Option[RequestOptions] = None + )(implicit ec: ExecutionContext): Future[BrowseResponse] = { + createIterable( + execute = (previousResponse: Option[BrowseResponse]) => + client.browse( + indexName, + Some( + browseParams.copy( + hitsPerPage = previousResponse.flatMap(_.hitsPerPage.orElse(Some(1000))), + cursor = previousResponse.flatMap(_.cursor) + ) + ), + requestOptions + ), + validate = validate, + aggregator = Some(aggregator) + ) + } + + /** Browse rules in an index. + * @param indexName + * The index name to browse. + * @param searchRulesParams + * The search rules parameters. + * @param validate + * The validation function. Default is to check if the number of hits is less than the hits per page. + * @param aggregator + * The aggregation function. This is where you can aggregate the results. + * @param requestOptions + * Additional request configuration. + * @return + * A future containing the last search rules response. + */ + def browseRules( + indexName: String, + searchRulesParams: SearchRulesParams, + validate: Option[SearchRulesResponse => Boolean] = None, + aggregator: SearchRulesResponse => Unit, + requestOptions: Option[RequestOptions] = None + )(implicit ec: ExecutionContext): Future[SearchRulesResponse] = { + val hitsPerPage = 1000 + + createIterable( + execute = (previousResponse: Option[SearchRulesResponse]) => + client.searchRules( + indexName, + Some( + searchRulesParams.copy( + page = previousResponse.map(_.page + 1).orElse(Some(0)), + hitsPerPage = Some(hitsPerPage) + ) + ), + requestOptions + ), + validate = validate.getOrElse((response: SearchRulesResponse) => response.hits.length < hitsPerPage), + aggregator = Some(aggregator) + ) + } + + /** Browse synonyms in an index. + * @param indexName + * The index name to browse. + * @param searchSynonymsParams + * The search synonyms parameters. + * @param validate + * The validation function. Default is to check if the number of hits is less than the hits per page. + * @param aggregator + * The aggregation function. This is where you can aggregate the results. + * @param requestOptions + * Additional request configuration. + * @return + * A future containing the last search synonyms response. + */ + def browseSynonyms( + indexName: String, + searchSynonymsParams: SearchSynonymsParams, + validate: Option[SearchSynonymsResponse => Boolean] = None, + aggregator: SearchSynonymsResponse => Unit, + requestOptions: Option[RequestOptions] = None + )(implicit ec: ExecutionContext): Future[SearchSynonymsResponse] = { + val hitsPerPage = 1000 + var page = searchSynonymsParams.page.getOrElse(0) + + createIterable( + execute = (_: Option[SearchSynonymsResponse]) => + try { + client.searchSynonyms( + indexName, + Some( + searchSynonymsParams.copy( + page = Some(page), + hitsPerPage = Some(hitsPerPage) + ) + ), + requestOptions + ) + } finally { + page += 1 + }, + validate = validate.getOrElse((response: SearchSynonymsResponse) => response.hits.length < hitsPerPage), + aggregator = Some(aggregator) + ) + } } }