Skip to content

Commit

Permalink
Handle initial-response for event stream operations with RPC-bound pr…
Browse files Browse the repository at this point in the history
…otocols (#4004)

## Motivation and Context
Handles
[initial-response](https://smithy.io/2.0/spec/streaming.html#initial-response)
for event stream operations with an RPC-bound protocol in client SDKs.
This makes event stream operations like
[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)
in Kinesis and
[StartLiveTrail](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_StartLiveTail.html)
in CloudWatchLogs available in the Rust SDK, both of which are currently
unavailable
([RemoveEventStreamOperations.kt](https://github.com/smithy-lang/smithy-rs/pull/4004/files#diff-b94081365137754f8674df177705a472ede6d3c21da608fff68a450fec5abd26)
removes them via model transformation as unsupported).

Note that this PR is the first in a series and merged into a feature
branch, not into the main branch. It intentionally does not address
`TODO(EventStream)` or does not disable stalled stream protection for
newly enabled event stream operations. It just focuses on implementing
handling `initial-response` messages. The next PR will most likely focus
on handling `initial-request` message, and the last PR will focus on
code cleanup (hence if code review feedback is about code cleanup, I
happily accept it but might defer it to the last PR).

## Description
At its core, handling initial-response for event stream operations is
boiled down to using
[try_recv_initial](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs#L208-L232)
during deserialization phase within the orchestrator. Here is the issue,
though. `try_recv_initial` is an async method but functions/methods in
the deserializer in client SDKs are all synchronous, causing an
impedance mismatch between async and sync. This means we need to find an
async context somewhere in which to be able to call the
`try_recv_initial` method.

To sidestep this design limitation, we customize the `send` method for
event stream operations with RPC-bound protocols whose operation output
contains an event stream member. The customization makes the epilogue of
the `send` method look like the following (using `unwrap` for
simplicity):
```
    pub async fn send(
        self,
    ) -> Result<crate::operation::some_op::SomeOp, SdkError<...>> {
        let input = ...;
        let runtime_plugins = ...;

        // new epilogue of the send method
        let output = crate::operation::some_op::SomeOp::orchestrate(&runtime_plugins, input).await?;

        let message = output.response_stream.try_recv_initial().await.map_err(response_error).unwrap();

        match message {
            Some(message) => {
                // If operation output contains non-event stream members, we will populate them here
                let mut builder = output.into_builder(); // codegen needs to render this new `pub(crate)` conversion method on the output struct
                builder = crate::protocol_serde::shape_some_op::de_some_op(message.payload(), output).unwrap();
                Ok(builder.build().unwrap())
            },
            // In this `None` branch, `try_recv_initial` will set aside the initial frame it has read from event stream if it's not initial-response.
            // The next time the user calls `recv` on the `EventReceiver`, the user correctly receives the frame that was read off.
            None => Ok(output),
        }
    }
```
This provides us with an async context where we can call
`try_recv_initial` on an event stream member of an operation output
struct and populate the non-event stream members of that struct. The
downside though is that the operation output struct will be half-baked
until the end of `send`, with non-event stream members uninitialized, so
interceptors after deserialization won't have access to complete the
output struct. (it's todo to document somewhere for operations in
question).

The rest of the changes are for testing. Specifically
- added `cloudwatchlogs` as a new service integration test (to test
`start_live_trail` event stream operation)
- as part of this, the local helpers in
`aws/sdk/integration-tests/transcribestreaming/tests/test.rs` have been
moved to `aws_smithy_eventstream` and gated behind the `test-util` cargo
feature.
- added client SDK codegen test for event stream to verify the
functionality in the cases a) an `initial-request` message is present
and b) it is not
- [The event stream test
model](https://github.com/smithy-lang/smithy-rs/blob/ec4292005728cec42ca2f9f8ba40f6b5bb563b05/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/testutil/EventStreamTestModels.kt#L20)
has been modified for the client SDK to include non-event stream member
in the operation output struct

## Testing
- Existing CI
- Added a service integration test for `cloudwatchlogs`
- Added client SDK codegen tests for verifying initial-response message
handling

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
ysaito1001 authored Feb 11, 2025
1 parent 8039247 commit 68ad62a
Show file tree
Hide file tree
Showing 29 changed files with 13,734 additions and 345 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsCustomization
import software.amazon.smithy.rust.codegen.core.smithy.generators.LibRsSection
import software.amazon.smithy.rust.codegen.core.testutil.testDependenciesOnly
import software.amazon.smithy.rust.codegen.core.util.hasEventStreamOperations
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsConfig
import software.amazon.smithy.rustsdk.AwsCargoDependency.awsRuntime
import java.nio.file.Files
Expand Down Expand Up @@ -76,29 +78,31 @@ class IntegrationTestDecorator : ClientCodegenDecorator {
}

class IntegrationTestDependencies(
private val codegenContext: ClientCodegenContext,
codegenContext: ClientCodegenContext,
private val moduleName: String,
private val hasTests: Boolean,
private val hasBenches: Boolean,
) : LibRsCustomization() {
private val runtimeConfig = codegenContext.runtimeConfig
private val serviceShape = codegenContext.serviceShape
private val model = codegenContext.model

override fun section(section: LibRsSection) =
when (section) {
is LibRsSection.Body ->
testDependenciesOnly {
if (hasTests) {
val smithyAsync =
CargoDependency.smithyAsync(codegenContext.runtimeConfig)
CargoDependency.smithyAsync(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
val smithyTypes =
CargoDependency.smithyTypes(codegenContext.runtimeConfig)
CargoDependency.smithyTypes(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev)
addDependency(awsRuntime(runtimeConfig).toDevDependency().withFeature("test-util"))
addDependency(FuturesUtil)
addDependency(SerdeJson)
addDependency(smithyAsync)
addDependency(smithyProtocolTestHelpers(codegenContext.runtimeConfig))
addDependency(smithyProtocolTestHelpers(runtimeConfig))
addDependency(smithyRuntime(runtimeConfig).copy(features = setOf("test-util", "wire-mock"), scope = DependencyScope.Dev))
addDependency(smithyRuntimeApiTestUtil(runtimeConfig))
addDependency(smithyTypes)
Expand All @@ -109,6 +113,12 @@ class IntegrationTestDependencies(
if (hasBenches) {
addDependency(Criterion)
}
if (serviceShape.hasEventStreamOperations(model)) {
addDependency(
CargoDependency.smithyEventStream(runtimeConfig)
.copy(features = setOf("test-util"), scope = DependencyScope.Dev),
)
}
for (serviceSpecific in serviceSpecificCustomizations()) {
serviceSpecific.section(section)(this)
}
Expand All @@ -120,7 +130,7 @@ class IntegrationTestDependencies(
private fun serviceSpecificCustomizations(): List<LibRsCustomization> =
when (moduleName) {
"transcribestreaming" -> listOf(TranscribeTestDependencies())
"s3" -> listOf(S3TestDependencies(codegenContext))
"s3" -> listOf(S3TestDependencies(runtimeConfig))
"dynamodb" -> listOf(DynamoDbTestDependencies())
else -> emptyList()
}
Expand All @@ -142,11 +152,11 @@ class DynamoDbTestDependencies : LibRsCustomization() {
}
}

class S3TestDependencies(private val codegenContext: ClientCodegenContext) : LibRsCustomization() {
class S3TestDependencies(private val runtimeConfig: RuntimeConfig) : LibRsCustomization() {
override fun section(section: LibRsSection): Writable =
writable {
addDependency(awsConfig(codegenContext.runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
addDependency(smithyExperimental(codegenContext.runtimeConfig).toDevDependency())
addDependency(awsConfig(runtimeConfig).toDevDependency().withFeature("behavior-version-latest"))
addDependency(smithyExperimental(runtimeConfig).toDevDependency())
addDependency(AsyncStd)
addDependency(BytesUtils.toDevDependency())
addDependency(FastRand.toDevDependency())
Expand Down
Loading

0 comments on commit 68ad62a

Please sign in to comment.