Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle initial-response for event stream operations with RPC-bound protocols #4004

Conversation

ysaito1001
Copy link
Contributor

@ysaito1001 ysaito1001 commented Feb 8, 2025

Motivation and Context

Handles initial-response for event stream operations with an RPC-bound protocol in client SDKs. This makes event stream operations like SubscribeToShard in Kinesis and StartLiveTrail in CloudWatchLogs available in the Rust SDK, both of which are currently unavailable (RemoveEventStreamOperations.kt removes them via model transformation as unsupported).

Note that this PR is the first in a series and merged into a feature branch, not into the main branch. It intentionally does not address TODO(EventStream) or does not disable stalled stream protection for newly enabled event stream operations. It just focuses on implementing handling initial-response messages. The next PR will most likely focus on handling initial-request message, and the last PR will focus on code cleanup (hence if code review feedback is about code cleanup, I happily accept it but might defer it to the last PR).

Description

At its core, handling initial-response for event stream operations is boiled down to using try_recv_initial during deserialization phase within the orchestrator. Here is the issue, though. try_recv_initial is an async method but functions/methods in the deserializer in client SDKs are all synchronous, causing an impedance mismatch between async and sync. This means we need to find an async context somewhere in which to be able to call the try_recv_initial method.

To sidestep this design limitation, we customize the send method for event stream operations with RPC-bound protocols whose operation output contains an event stream member. The customization makes the epilogue of the send method look like the following (using unwrap for simplicity):

    pub async fn send(
        self,
    ) -> Result<crate::operation::some_op::SomeOp, SdkError<...>> {
        let input = ...;
        let runtime_plugins = ...;

        // new epilogue of the send method
        let output = crate::operation::some_op::SomeOp::orchestrate(&runtime_plugins, input).await?;

        let message = output.response_stream.try_recv_initial().await.map_err(response_error).unwrap();

        match message {
            Some(message) => {
                // If operation output contains non-event stream members, we will populate them here
                let mut builder = output.into_builder(); // codegen needs to render this new `pub(crate)` conversion method on the output struct
                builder = crate::protocol_serde::shape_some_op::de_some_op(message.payload(), output).unwrap();
                Ok(builder.build().unwrap())
            },
            // In this `None` branch, `try_recv_initial` will set aside the initial frame it has read from event stream if it's not initial-response.
            // The next time the user calls `recv` on the `EventReceiver`, the user correctly receives the frame that was read off.
            None => Ok(output),
        }
    }

This provides us with an async context where we can call try_recv_initial on an event stream member of an operation output struct and populate the non-event stream members of that struct. The downside though is that the operation output struct will be half-baked until the end of send, with non-event stream members uninitialized, so interceptors after deserialization won't have access to complete the output struct. (it's todo to document somewhere for operations in question).

The rest of the changes are for testing. Specifically

  • added cloudwatchlogs as a new service integration test (to test start_live_trail event stream operation)
    • as part of this, the local helpers in aws/sdk/integration-tests/transcribestreaming/tests/test.rs have been moved to aws_smithy_eventstream and gated behind the test-util cargo feature.
  • added client SDK codegen test for event stream to verify the functionality in the cases a) an initial-request message is present and b) it is not

Testing

  • Existing CI
  • Added a service integration test for cloudwatchlogs
  • Added client SDK codegen tests for verifying initial-response message handling

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Copy link

github-actions bot commented Feb 8, 2025

A new generated diff is ready to view.

A new doc preview is ready to view.

@smithy-lang smithy-lang deleted a comment from github-actions bot Feb 9, 2025
@ysaito1001 ysaito1001 marked this pull request as ready for review February 10, 2025 19:14
@ysaito1001 ysaito1001 requested review from a team as code owners February 10, 2025 19:14
Copy link
Contributor

@landonxjames landonxjames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Couple of questions, but no blockers

@@ -150,6 +162,74 @@ class FluentBuilderGenerator(
}
""",
*scope,
"epilogue" to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be saving this for the clean up PR at the end, but would it be worth adding a customization to the documentation of these operations detailing how they will work differently (or not at all?) with some of the interceptor hooks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be saving this for the clean up PR at the end

Good point, and I am. We need to first decide where to put that documentation so our customers can easily find it.

Copy link
Contributor

@aajtodd aajtodd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple small questions/suggestions but overall looks good.

Copy link

A new generated diff is ready to view.

A new doc preview is ready to view.

@ysaito1001 ysaito1001 merged commit 68ad62a into ysaito/support-event-stream-for-rpc-bound-protocols Feb 11, 2025
43 of 44 checks passed
@ysaito1001 ysaito1001 deleted the ysaito/handle-initial-response branch February 11, 2025 22:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants