-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dekaf: Fixes for release #1965
Dekaf: Fixes for release #1965
Conversation
@@ -494,17 +494,17 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> { | |||
pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) { | |||
if stats | |||
.left | |||
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that this was too restrictive and was complaining too much -- it's fine to send_stats()
stats where both bytes_total
and docs_total
are 0 because of the filter above in StatsAggregator::take
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this filtering required in the first place? How does it come to be that bytes_total != 0 but docs_total == 0, or vice versa? I don't see a satisfying explanation here, of how we observed such documents in production in the first place and how this change ensures they can never happen again. okay, i now see that this pr addressed the root cause
Can you instead make this an assert()-ion rather than a tracing::error?
e18dd24
to
e9a7266
Compare
crates/dekaf/src/utils.rs
Outdated
None => 0, | ||
}; | ||
|
||
let mut full_doc = doc.to_debug_json_value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternative is to figure out how to plumb through an allocator here and turn doc into a HeapNode... but unless I'm missing something, this seems fine as a stop-gap -- this codepath goes away once everyone migrates over to materializations and we turn off support for that connection method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conversion into / out of serde_json::Value is ... fine, especially given it's short expected life-cycle, but did you look at what to_debug_json_value
does / why it's a thing? It's behavior is not appropriate here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see -- I looked at the function and saw it was serializing to a serde_json::Value
, but didn't look into what SerPolicy::debug()
does. I think I want SerPolicy::noop()
instead to disable any truncation of the output
e9a7266
to
1799a0d
Compare
I'm going to push back on moving all of this, and introducing |
I think this needs to be split up. What is 100% necessary to land for tomorrow? I'd like to see a focused PR just on those essential changes. |
1799a0d
to
791ae6e
Compare
Okay cool, will do. In the mean-time, I rewrote the commits here to omit all of the testing stuff and will land that in a separate PR that doesn't mess with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % comments
@@ -494,17 +494,17 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> { | |||
pub fn send_stats(&self, collection_name: String, stats: ops::stats::Binding) { | |||
if stats | |||
.left | |||
.is_some_and(|s| s.bytes_total == 0 || s.docs_total == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this filtering required in the first place? How does it come to be that bytes_total != 0 but docs_total == 0, or vice versa? I don't see a satisfying explanation here, of how we observed such documents in production in the first place and how this change ensures they can never happen again. okay, i now see that this pr addressed the root cause
Can you instead make this an assert()-ion rather than a tracing::error?
crates/dekaf/src/utils.rs
Outdated
None => 0, | ||
}; | ||
|
||
let mut full_doc = doc.to_debug_json_value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conversion into / out of serde_json::Value is ... fine, especially given it's short expected life-cycle, but did you look at what to_debug_json_value
does / why it's a thing? It's behavior is not appropriate here.
|
I'm not seeing the sops timeout change we discussed yesterday. Where's that? |
That's in the k8s pod spec, it'll be in the ops PR to deploy this |
Since we're now piping `tracing` logs into the task logs which is user facing, we'd like to make the logs we send as meaningful as possible. In order to do that, we need to be able to more narrowly specify which logs should get sent. In order to do that, I've swapped the simple log level filter for a slightly more advanced `tracing_subscriber::filter::Targets` which allows us to specify log filters such as `debug,simple_crypt=warn,aws_config=warn,h2=warn`.
We weren't correctly narrowing the default of intersected shapes. For example, if we intersected one shape like ```json { "type": ["string", "null"], "default": "null" } ``` with another shape like ```json { "type": "string" } ``` We'd end up with a shape of ```json { "type": "string", "default": null } ``` Which isn't correct, as when we try to actually emit a document for this shape that's missing the field in question and provide its default value of null, that will subsequently fail to validate against the schema. Fixes #1944
…ctions This got lost in the shuffle of introducing field selection, so this temporarily adds it back until we can get everyone using new style tasks and we can get rid of this logic for good. --- Also fix CDC deletions mode for new style connections The existing behavior would override `_meta` with an object just containing `{"is_deleted": 0}`. Instead let's not try to retain backwards compatibility here and just add a new `_is_deleted` field
791ae6e
to
14bc520
Compare
Ok, turned the stats error into an assertion, and swapped |
I decided to roll all of my remaining changes into one PR for easier reviewing. We have:
/_meta/is_deleted
for refresh-token-based connections in order to avoid breaking them.master
because it was ontop of thevalidation-test
refactorThis change isdata:image/s3,"s3://crabby-images/d0bb7/d0bb7f7625ca5bf5c3cf7a2b7a514cf841ab8395" alt="Reviewable"