Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dekaf: Fixes for release #1965

Merged
merged 4 commits into from
Feb 25, 2025
Merged

Dekaf: Fixes for release #1965

merged 4 commits into from
Feb 25, 2025

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Feb 24, 2025

I decided to roll all of my remaining changes into one PR for easier reviewing. We have:

  • Improvements to log filtering

    Since we're now piping tracing logs into the task logs which is user facing, we'd like to make the logs we send as meaningful as possible. In order to do that, we need to be able to more narrowly specify which logs should get sent. In order to do that, I've swapped the simple log level filter for a slightly more advanced tracing_subscriber::filter::Targets which allows us to specify log filters such as debug,simple_crypt=warn,aws_config=warn,h2=warn.

  • Add back in the original logic around /_meta/is_deleted for refresh-token-based connections in order to avoid breaking them.

    I also realized that since people are going to have to create new dataflows when they upgrade to Dekaf materializations anyway, we don't have to keep doing the janky logic to actually mutate the /meta location to include is_deleted, we can just add a new extractor for a location like _is_deleted.

    Ideally I'd like for the connector to expose this field during field selection, but we currently have a (perfectly reasonable) assertion that the connector can't return constraints for fields that aren't projections.

  • doc: Fix intersection of shape defaults #1950 which was already approved, but wasn't merged into master because it was ontop of the validation-test refactor

This change is Reviewable

@@ -494,17 +494,17 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) {
if stats
.left
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0)
Copy link
Contributor Author

@jshearer jshearer Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that this was too restrictive and was complaining too much -- it's fine to send_stats() stats where both bytes_total and docs_total are 0 because of the filter above in StatsAggregator::take

Copy link
Member

@jgraettinger jgraettinger Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this filtering required in the first place? How does it come to be that bytes_total != 0 but docs_total == 0, or vice versa? I don't see a satisfying explanation here, of how we observed such documents in production in the first place and how this change ensures they can never happen again. okay, i now see that this pr addressed the root cause

Can you instead make this an assert()-ion rather than a tracing::error?

@jshearer jshearer force-pushed the jshearer/dekaf_updates_and_fixes branch from e18dd24 to e9a7266 Compare February 24, 2025 17:06
None => 0,
};

let mut full_doc = doc.to_debug_json_value();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to figure out how to plumb through an allocator here and turn doc into a HeapNode... but unless I'm missing something, this seems fine as a stop-gap -- this codepath goes away once everyone migrates over to materializations and we turn off support for that connection method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion into / out of serde_json::Value is ... fine, especially given it's short expected life-cycle, but did you look at what to_debug_json_value does / why it's a thing? It's behavior is not appropriate here.

Copy link
Contributor Author

@jshearer jshearer Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see -- I looked at the function and saw it was serializing to a serde_json::Value, but didn't look into what SerPolicy::debug() does. I think I want SerPolicy::noop() instead to disable any truncation of the output

@jshearer jshearer force-pushed the jshearer/dekaf_updates_and_fixes branch from e9a7266 to 1799a0d Compare February 24, 2025 17:15
@jshearer jshearer marked this pull request as ready for review February 24, 2025 19:01
@jgraettinger
Copy link
Member

Move validation test infrastructure into shared validation-test crate in order to use it in dekaf. Specifically, I needed to be able to convert an input fixture spec into its corresponding built output, which is exactly what validation_test::run() does.

I'm going to push back on moving all of this, and introducing validation-tests at all.
Instead, look at the activate crate, it's managed_build routine, and how that's used in its tests.

@jgraettinger
Copy link
Member

I think this needs to be split up. What is 100% necessary to land for tomorrow? I'd like to see a focused PR just on those essential changes.

@jshearer jshearer force-pushed the jshearer/dekaf_updates_and_fixes branch from 1799a0d to 791ae6e Compare February 24, 2025 19:43
@jshearer
Copy link
Contributor Author

look at the activate crate, it's managed_build routine, and how that's used in its tests.

Okay cool, will do. In the mean-time, I rewrote the commits here to omit all of the testing stuff and will land that in a separate PR that doesn't mess with validation

Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments

@@ -494,17 +494,17 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) {
if stats
.left
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0)
Copy link
Member

@jgraettinger jgraettinger Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this filtering required in the first place? How does it come to be that bytes_total != 0 but docs_total == 0, or vice versa? I don't see a satisfying explanation here, of how we observed such documents in production in the first place and how this change ensures they can never happen again. okay, i now see that this pr addressed the root cause

Can you instead make this an assert()-ion rather than a tracing::error?

None => 0,
};

let mut full_doc = doc.to_debug_json_value();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion into / out of serde_json::Value is ... fine, especially given it's short expected life-cycle, but did you look at what to_debug_json_value does / why it's a thing? It's behavior is not appropriate here.

@jgraettinger
Copy link
Member

jgraettinger commented Feb 25, 2025

What was the resolution for the date-time handling observed in the bump from avro 0.16 => 0.17? Did a PR for that land separately? landed here

@jgraettinger
Copy link
Member

I'm not seeing the sops timeout change we discussed yesterday. Where's that?

@jshearer
Copy link
Contributor Author

That's in the k8s pod spec, it'll be in the ops PR to deploy this

Since we're now piping `tracing` logs into the task logs which is user facing, we'd like to make the logs we send as meaningful as possible. In order to do that, we need to be able to more narrowly specify which logs should get sent. In order to do that, I've swapped the simple log level filter for a slightly more advanced `tracing_subscriber::filter::Targets` which allows us to specify log filters such as `debug,simple_crypt=warn,aws_config=warn,h2=warn`.
We weren't correctly narrowing the default of intersected shapes. For example, if we intersected one shape like
```json
{
    "type": ["string", "null"],
    "default": "null"
}
```

with another shape like
```json
{
    "type": "string"
}
```

We'd end up with a shape of
```json
{
    "type": "string",
    "default": null
}
```

Which isn't correct, as when we try to actually emit a document for this shape that's missing the field
in question and provide its default value of null, that will subsequently fail to validate against the schema.

Fixes #1944
…ctions

This got lost in the shuffle of introducing field selection, so this temporarily adds it back
until we can get everyone using new style tasks and we can get rid of this logic for good.

---

Also fix CDC deletions mode for new style connections

The existing behavior would override `_meta` with an object just containing `{"is_deleted": 0}`.
Instead let's not try to retain backwards compatibility here and just add a new `_is_deleted` field
@jshearer jshearer force-pushed the jshearer/dekaf_updates_and_fixes branch from 791ae6e to 14bc520 Compare February 25, 2025 14:13
@jshearer
Copy link
Contributor Author

Ok, turned the stats error into an assertion, and swapped to_debug_json_value() for serde_json::to_value(&doc::SerPolicy::noop().on(doc)).unwrap();

@jshearer jshearer merged commit 0f3097d into master Feb 25, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants