diff --git a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala index 56f83089..24141502 100644 --- a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala +++ b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala @@ -10,8 +10,12 @@ package com.snowplowanalytics.snowplow.lakes +import java.net.UnknownHostException + import scala.reflect._ +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException + import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig} import com.snowplowanalytics.snowplow.sinks.kafka.{KafkaSink, KafkaSinkConfig} import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler @@ -35,5 +39,23 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo) override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler]) - override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check + override def isDestinationSetupError: DestinationSetupErrorCheck = { + // Authentication issue (wrong OAuth endpoint, wrong client id, wrong secret..) + case e: AbfsRestOperationException if e.getStatusCode == -1 => + e.getErrorMessage() + // Wrong container name + case e: AbfsRestOperationException if e.getStatusCode() == 404 => + s"${e.getErrorMessage()} (e.g. wrong container name)" + // Service principal missing permissions for container (role assignement missing or wrong role) + case e: AbfsRestOperationException if e.getStatusCode() == 403 => + s"Missing permissions for the destination (needs \"Storage Blob Data Contributor\" assigned to the service principal for the container)" + // Soft delete not disabled + case e: AbfsRestOperationException if e.getStatusCode() == 409 => + "Blob soft delete must be disabled on the storage account" + case _: UnknownHostException => + "Wrong storage name" + // Exceptions common to the table format - Delta/Iceberg/Hudi + case TableFormatSetupError.check(t) => + t + } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala index d0bfdbdb..497d6796 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala @@ -10,6 +10,8 @@ package com.snowplowanalytics.snowplow.lakes.tables +import java.net.InetAddress + import cats.implicits._ import cats.effect.Sync import org.typelevel.log4cats.Logger @@ -55,7 +57,16 @@ class DeltaWriter(config: Config.Delta) extends Writer { .build() }: Unit + // For Azure a wrong storage name means an invalid hostname and infinite retries when creating the Delta table + // If the hostname is invalid, UnknownHostException gets thrown + val checkHostname = + if (List("abfs", "abfss").contains(config.location.getScheme)) + Sync[F].blocking(InetAddress.getByName(config.location.getHost())) + else + Sync[F].unit + Logger[F].info(s"Creating Delta table ${config.location} if it does not already exist...") >> + checkHostname >> Sync[F] .blocking(builder.execute()) .void