Skip to content

Commit

Permalink
PDP-1324 - Handle initialization errors for lake loader Delta Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Oct 18, 2024
1 parent dc7e9a7 commit 4a99726
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@

package com.snowplowanalytics.snowplow.lakes

import java.net.UnknownHostException

import scala.reflect._

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException

import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kafka.{KafkaSink, KafkaSinkConfig}
import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler
Expand All @@ -35,5 +39,23 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)

override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])

override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
override def isDestinationSetupError: DestinationSetupErrorCheck = {
// Authentication issue (wrong OAuth endpoint, wrong client id, wrong secret..)
case e: AbfsRestOperationException if e.getStatusCode == -1 =>
e.getErrorMessage()
// Wrong container name
case e: AbfsRestOperationException if e.getStatusCode() == 404 =>
s"${e.getErrorMessage()} (e.g. wrong container name)"
// Service principal missing permissions for container (role assignement missing or wrong role)
case e: AbfsRestOperationException if e.getStatusCode() == 403 =>
s"Missing permissions for the destination (needs \"Storage Blob Data Contributor\" assigned to the service principal for the container)"
// Soft delete not disabled
case e: AbfsRestOperationException if e.getStatusCode() == 409 =>
"Blob soft delete must be disabled on the storage account"
case _: UnknownHostException =>
"Wrong storage name"
// Exceptions common to the table format - Delta/Iceberg/Hudi
case TableFormatSetupError.check(t) =>
t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

package com.snowplowanalytics.snowplow.lakes.tables

import java.net.InetAddress

import cats.implicits._
import cats.effect.Sync
import org.typelevel.log4cats.Logger
Expand Down Expand Up @@ -55,7 +57,16 @@ class DeltaWriter(config: Config.Delta) extends Writer {
.build()
}: Unit

// For Azure a wrong storage name means an invalid hostname and infinite retries when creating the Delta table
// If the hostname is invalid, UnknownHostException gets thrown
val checkHostname =
if (List("abfs", "abfss").contains(config.location.getScheme))
Sync[F].blocking(InetAddress.getByName(config.location.getHost()))
else
Sync[F].unit

Logger[F].info(s"Creating Delta table ${config.location} if it does not already exist...") >>
checkHostname >>
Sync[F]
.blocking(builder.execute())
.void
Expand Down

0 comments on commit 4a99726

Please sign in to comment.