Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.5.0 #93

Merged
merged 23 commits into from
Nov 1, 2024
Merged

Release/0.5.0 #93

merged 23 commits into from
Nov 1, 2024

Conversation

oguzhanunlu
Copy link
Contributor

@oguzhanunlu oguzhanunlu commented Oct 31, 2024

Jira ref: PDP-1492

istreeter and others added 21 commits July 12, 2024 09:03
In #62 we added a feature so that bad rows are re-sized if their
serialized length exceeds the maximum size allowed by the sink.

This fixes a bug which meant the resizing was not working properly.
The health probe now reports unhealthy under these extra scenarios:

- Bad row sink is unhealthy – cannot write to the sink
- Fatal error happens when trying to write events to the lake
…mns (#66)

Some query engines dislike Iceberg tables in which a STRUCT field is
nullable and a nested field of the STRUCT is non-nullable. For example,
in Snowflake we have seen errors like "SQL execution internal error"
when the nested field contains a null.

This PR adds a config option `respectIgluNullability`.  When set to `false`,
the Lake Loader will make all nested fields nullable. This is slightly
less ideal for data storage, but it brings back compatibility with query
engines like Snowflake.

The default value of the new config option is `true` which maintains the
behaviour of previous versions of the loader.
As part of this change we also bump spark to 3.5.x when using Hudi. And
bump scala to 2.13.x. Previously we were pinned to earlier versions
because of compatibility with Hudi 0.14.0.

This PR is implemented in a way that we retain the flexibility of
easily supporting a different version of Spark for the Hudi docker
image. I anticipate we might need this flexibility if Iceberg/Delta are
faster to add support for Spark 4.x.
Although we remain on Hudi 0.15.0, this bumps the AWS module of Hudi to
version 1.x. This is safe because the AWS module is backwards compatible
to 0.15.0. This change lets us use features only in 1.x like assumed
role credentials provider.
Two features has been added in this commit: alerting and retrying

For alerting, webhook method is used similar to other Snowplow apps. Alert message is sent to URL given in the config. Alerts are sent for some error cases, not for all of them. It is implemented such that it is sent only for setup errors. The error cases where alert sent can be extended in the future, of course.

For retrying, two retry policies can be defined similar to Snowflake Loader. One of them is for setup errors and other one is for transient errors. Alert would be sent only for setup errors, not for transient errors.

Also, possible setup error cases for Iceberg/Glue/S3 are added in this commit as well. Error cases for other destinations/table formats will be added later.
…72)

It is possible to run the Hudi Lake Loader enabling the hudi option
`"hoodie.datasource.hive_sync.enable": "true"` to register/sync the
table to a Hive Metastore or Glue.

However, with that setting enabled, the Hudi delays syncing until the
first time events are committed.  For use case, it is more helpful if
the loader connects to Glue/Hive during startup, so we more quickly get
an alert if the loader is missing permissions.

This PR works my making the loader add an empty commit during startup.
It does not add any parquet file, but it triggers the loader to sync the
table to Glue/Hive.
The webhook alert should contain a short helpful message explaining why
an error is caused by the destination setup.  In other snowplow loaders
we get the message simply by serializing the Exception.  But in Lake
Loader I found the exception messages to be very messy.

In a related problem, for Hudi setup errors I needed to traverse the
Exception's `getCause` in order to check if it was a setup error.

This PR takes more explicit control of setting short friendly error
messages, and traversing the `getCause` to get all relevant messages.

E.g. an alert message before this change:

> Failed to create events table: s3a://<REDACTED/events/_delta_log: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by V1ToV2AwsCredentialProviderAdapter : software.amazon.awssdk.services.sts.model.StsException: User: arn:aws:iam::<REDACTED>:user/<REDACTED> is not authorized to perform: sts:AssumeRole on resource: arn:aws:iam::<REDACTED>:role/<REDACTED> (Service: Sts, Status Code: 403, Request ID: 00000000-0000-0000-0000-000000000000)

The corresponding alert after this change:

> Failed to create events table: s3a://<REDACTED/events/_delta_log: Failed to initialize AWS access credentials: Missing permissions to assume the AWS IAM role

**Other small changes I snuck into this commit:**

- Added specific webhook alerts for Hudi.
- Removed the AssumedRoleCredentialsProvider for aws sdk v1.  This is no
  longer needed now that Hadoop is fully using aws sdk v2.
- Fixed minor bug with retrying creating a database in Hudi Writer
The loader has a feature where it sends alerts to a webhook if the
destination is mis-configured.

However, with Iceberg it was possible for the initialization to be
successful, and yet the loader might still fail later while committing
events, e.g. due to a permissions error updating the Glue catalog.

Here we add an extra step so the loader is forced to make an empty
commit early during initialization.  If there is an error committing,
then the loader sends a webhook alert and never becomes healthy.
For KCL apps, we want the KCL to get initialized as early as possible,
so the worker can claim shard leases before they get stolen by other
workers.

Before this PR, the loader initialized the destination table first, and
then subscribed to the stream afterwards.  Initializing the destination
table can be fairly slow, especially because we do things like syncing
to the external catalog, and possibly cleaning up aborted commits.

After this PR, the loader subscribes to the stream concurrently with
initializing the destination table. This lets the KCL claim leases
before they get stolen.
In common-streams 0.8.x we shifted alerting / retrying / webhook out of
the applications and into the common library.  It also adds new features
like heartbeat webhooks starting when the loader first becomes healthy.
Before this PR, the loader would generate a failed event if it failed to
fetch a required schema from Iglu.  However, all events have already
passed validation in Enrich, so it is completely unexpected to have an
Iglu failure.  An Iglu error _probably_ means some type of configuration
error or service outage.

After this PR, the loader will crash and exit on an Iglu error, instead
of creating a failed event.  This is probably the preferred behaviour,
while the pipeline operator addresses the underlying infrastructure
problem.

If an Iglu schema is genuinely now unavailable, then the pipeline
operator can override the default behaviour by setting
`exitOnMissingIgluSchema: false` in the configuration file or by listing
the missing schema in `skipschemas`.
On rare occasions I have seen errors where Spark complains about
creating two temporary tables with the same name. In the loader we
create table names based on the window's start time. The error was
unexpected because each window should have a different start time.

I believe this is the fix. It ensures view name is computed right at the
start of the window, and not after waiting for the table to be
initialized. It prevents consecutive windows from picking the same
timestamp in the case when the table is very slow to initialize.
The following improvements are introduced via common-streams 0.8.0-M4:

- Fields starting with a digit are now prefixed with an underscore `_`.
  This is needed for Hudi, which does not allow fields starting with a
  digit (snowplow/schema-ddl#209)
- New kinesis source implementation without fs2-kinesis
  (snowplow-incubator/common-streams#84)
- Iglu schemas are resolved in parallel, for short pause times during
  event processing (snowplow-incubator/common-streams#85)
- Common http client configured with restricted max connections per
  server (snowplow-incubator/common-streams#87)
- Iglu scala client 3.2.0 no longer relies on the "list" schemas
  endpoint (snowplow/iglu-scala-client#255)
The Open Table Formats occasionally need to delete files as part of
routine maintenance. For example, Delta deletes old log files,
configured via table property `delta.logRetentionDuration`.

For the Snowplow Lake Loader, this can mean deleting a very large number
of files; bearing in mind this is a streaming loader that commits
frequently. And deleting files from cloud storage is relatively slow. I
observed that the loader could pause for several minutes on a single
commit waiting for files to be deleted.

Here I implement a customized Hadoop FileSystem where `delete` returns
immediately and then operates asynchronously. This means deleting never
blocks the loader's main fiber of execution.  It is safe to run `delete`
tasks asynchronously because the Open Table Formats do not have a hard
requirement that files are deleted immediately.
common-streams 0.8.0-M5 includes the configuration option
`maxPullsPerTransportChannel` for the pubsub source.
Delta has a "S3 multi-cluster" mode, which allows safe cocurrent writes
by multiple loaders.

To enable this mode, users can set the spark options documented by
Delta:

- `spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore`
- `spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName=???`
- `spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=???`
- (and some others)

This commit adds the necessary runtime dependencies for this mode.
@oguzhanunlu oguzhanunlu self-assigned this Oct 31, 2024
In #82 we added a feature to delete files asynchronously. This has
worked great for Delta, which is tolerant to deletes that happen
eventually not immediately. But it is not working great for Hudi, which
does delete-and-replace of the `hoodie.properties` file.

This commit disables the feature again for Hudi only.
CHANGELOG Outdated
Comment on lines 3 to 24
Disable asynchronous deletes for Hudi
Upgrade dependencies
DynamoDB runtime dependencies for Delta S3 multi-writer
PDP-1323 Handle initialization errors for GCP
PDP-1324 - Handle initialization errors for lake loader Delta Azure
Upgrade common-streams to 0.8.0-M5
Delete files asynchronously
Upgrade common-streams 0.8.0-M4
Avoid error on duplicate view name
Add option to exit on missing Iglu schemas
common-streams 0.8.x with refactored health monitoring
Create table concurrently with subscribing to stream of events
Iceberg fail fast if missing permissions on the catalog
Make alert messages more human-readable
Hudi loader should fail early if missing permissions on Glue catalog
Add alert & retry for delta/s3 initialization
Implement alerting and retrying mechanisms
Bump aws-hudi to 1.0.0-beta2
Bump hudi to 0.15.0
Allow disregarding Iglu field's nullability when creating output columns
Extend health probe to report unhealthy on more error scenarios
Fix bad rows resizing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all earlier changelogs we have references to the relevant issue/PR, e.g.

Disable asynchronous deletes for Hudi (#94)

I think we should keep the references.

@oguzhanunlu oguzhanunlu merged commit c1e275e into main Nov 1, 2024
2 checks passed
@oguzhanunlu oguzhanunlu deleted the release/0.5.0 branch November 1, 2024 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants