Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDP-1323 Handle initialization errors for GCP #91

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import java.net.UnknownHostException

import scala.reflect._

import cats.implicits._

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException

import com.snowplowanalytics.snowplow.sources.kafka.{KafkaSource, KafkaSourceConfig}
Expand Down Expand Up @@ -44,13 +46,13 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)
case AuthenticationError(e) =>
e
// Wrong container name
case e: AbfsRestOperationException if e.getStatusCode == 404 =>
case e: AbfsRestOperationException if e.getStatusCode === 404 =>
s"The specified filesystem does not exist (e.g. wrong container name)"
// Service principal missing permissions for container (role assignement missing or wrong role)
case e: AbfsRestOperationException if e.getStatusCode == 403 =>
case e: AbfsRestOperationException if e.getStatusCode === 403 =>
s"Missing permissions for the destination (needs \"Storage Blob Data Contributor\" assigned to the service principal for the container)"
// Soft delete not disabled
case e: AbfsRestOperationException if e.getStatusCode == 409 =>
case e: AbfsRestOperationException if e.getStatusCode === 409 =>
"Blob soft delete must be disabled on the storage account"
case _: UnknownHostException =>
"Wrong storage name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

package com.snowplowanalytics.snowplow.lakes

import cats.implicits._

import com.google.api.client.googleapis.json.GoogleJsonResponseException

import com.snowplowanalytics.snowplow.sources.pubsub.{PubsubSource, PubsubSourceConfig}
import com.snowplowanalytics.snowplow.sinks.pubsub.{PubsubSink, PubsubSinkConfig}

Expand All @@ -19,5 +23,15 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo)

override def badSink: SinkProvider = PubsubSink.resource(_)

override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
override def isDestinationSetupError: DestinationSetupErrorCheck = {
// Destination bucket doesn't exist
case e: GoogleJsonResponseException if e.getDetails.getCode === 404 =>
"The specified bucket does not exist"
// Permissions missing for Cloud Storage
case e: GoogleJsonResponseException if e.getDetails.getCode === 403 =>
e.getDetails.getMessage
// Exceptions common to the table format - Delta/Iceberg/Hudi
case TableFormatSetupError.check(t) =>
t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package com.snowplowanalytics.snowplow.lakes

import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException}

import org.apache.spark.sql.delta.DeltaAnalysisException

object TableFormatSetupError {

// Check if given exception is specific to iceberg format
Expand All @@ -22,5 +24,7 @@ object TableFormatSetupError {
case e: IcebergForbiddenException =>
// No permission to create a table in Glue catalog
e.getMessage
case e: DeltaAnalysisException if e.errorClass == Some("DELTA_CREATE_TABLE_WITH_NON_EMPTY_LOCATION") =>
"Destination not empty and not a Delta table"
}
}
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ object Dependencies {

val icebergDeltaRuntimeDependencies = Seq(
iceberg,
delta % Runtime,
Spark.coreForIcebergDelta % Runtime,
Spark.sqlForIcebergDelta % Runtime
delta,
Spark.coreForIcebergDelta,
Spark.sqlForIcebergDelta
)

val coreDependencies = Seq(
Expand Down
Loading