Skip to content

Commit

Permalink
Close channel on errors (#16)
Browse files Browse the repository at this point in the history
Motivation:

NIO channel pipeline's should, in most cases, be closed when an error is
caught. This is not currently the case for the client and server
connection channels.

Modifications:

- Close on error unless it's safe to ignore the error (such as if it's a
stream-level error)

Result:

Connections are closed if something unrecoverable happens
  • Loading branch information
glbrntt authored Oct 15, 2024
1 parent a0ff9bb commit fec325b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,25 @@ package final class ClientConnectionHandler: ChannelInboundHandler, ChannelOutbo
}

package func errorCaught(context: ChannelHandlerContext, error: any Error) {
// Store the error and close, this will result in the final close event being fired down
// the pipeline with an appropriate close reason and appropriate error. (This avoids
// the async channel just throwing the error.)
self.state.receivedError(error)
context.close(mode: .all, promise: nil)
if self.closeConnectionOnError(error) {
// Store the error and close, this will result in the final close event being fired down
// the pipeline with an appropriate close reason and appropriate error. (This avoids
// the async channel just throwing the error.)
self.state.receivedError(error)
context.close(mode: .all, promise: nil)
}
}

private func closeConnectionOnError(_ error: any Error) -> Bool {
switch error {
case is NIOHTTP2Errors.StreamError:
// Stream errors occur in streams, they are only propagated down the connection channel
// pipeline for vestigial reasons.
return false
default:
// Everything else is considered terminal for the connection until we know better.
return true
}
}

package func channelRead(context: ChannelHandlerContext, data: NIOAny) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ extension ServerConnectionManagementHandler {
/// as part of graceful shutdown.
private let goAwayPingData: HTTP2PingData

/// Whether the connection is currently closing.
var isClosing: Bool {
self.state.isClosing
}

/// Create a new state machine.
///
/// - Parameters:
Expand Down Expand Up @@ -391,5 +396,14 @@ extension ServerConnectionManagementHandler.StateMachine {
case closing(Closing)
case closed
case _modifying

var isClosing: Bool {
switch self {
case .closing:
return true
case .active, .closed, ._modifying:
return false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,37 @@ package final class ServerConnectionManagementHandler: ChannelDuplexHandler {
context.fireUserInboundEventTriggered(event)
}

package func errorCaught(context: ChannelHandlerContext, error: any Error) {
if self.closeConnectionOnError(error) {
context.close(mode: .all, promise: nil)
}
}

private func closeConnectionOnError(_ error: any Error) -> Bool {
switch error {
case is NIOHTTP2Errors.NoSuchStream:
// In most cases this represents incorrect client behaviour. However, NIOHTTP2 currently
// emits this error if a server receives a HEADERS frame for a new stream after having sent
// a GOAWAY frame. This can happen when a client opening a stream races with a server
// shutting down.
//
// This should be resolved in NIOHTTP2: https://github.com/apple/swift-nio-http2/issues/466
//
// Only close the connection if it's not already closing (as this is the state in which the
// error can be safely ignored).
return !self.state.isClosing

case is NIOHTTP2Errors.StreamError:
// Stream errors occur in streams, they are only propagated down the connection channel
// pipeline for vestigial reasons.
return false

default:
// Everything else is considered terminal for the connection until we know better.
return true
}
}

package func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.inReadLoop = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,54 @@ struct ClientConnectionHandlerTests {
connection.channel.close(mode: .all, promise: nil)
#expect(try connection.readEvent() == .closing(.unexpected(nil, isIdle: false)))
}

@Test("Closes on error")
func closesOnError() throws {
let connection = try Connection()
try connection.activate()

let streamError = NIOHTTP2Errors.noSuchStream(streamID: 42)
connection.channel.pipeline.fireErrorCaught(streamError)

// Closing is completed on the next loop tick, so run the loop.
connection.channel.embeddedEventLoop.run()
try connection.channel.closeFuture.wait()
}

@Test("Doesn't close on stream error")
func doesNotCloseOnStreamError() throws {
let connection = try Connection(maxIdleTime: .minutes(1))
try connection.activate()

let streamError = NIOHTTP2Errors.streamError(
streamID: 42,
baseError: NIOHTTP2Errors.streamIDTooSmall()
)
connection.channel.pipeline.fireErrorCaught(streamError)

// Now do a normal shutdown to make sure the connection is still working as normal.
//
// Write the initial settings to ready the connection.
try connection.settings([])
#expect(try connection.readEvent() == .ready)

// Idle with no streams open we should:
// - read out a closing event,
// - write a GOAWAY frame,
// - close.
connection.loop.advanceTime(by: .minutes(5))

#expect(try connection.readEvent() == .closing(.idle))

let frame = try #require(try connection.readFrame())
#expect(frame.streamID == .rootStream)
let (lastStreamID, error, data) = try #require(frame.payload.goAway)
#expect(lastStreamID == .rootStream)
#expect(error == .noError)
#expect(data == ByteBuffer(string: "idle"))

try connection.waitUntilClosed()
}
}

extension ClientConnectionHandlerTests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,6 @@

import NIOCore
import NIOHTTP2
import XCTest

func XCTAssertGoAway(
_ payload: HTTP2Frame.FramePayload,
verify: (HTTP2StreamID, HTTP2ErrorCode, ByteBuffer?) throws -> Void = { _, _, _ in }
) rethrows {
switch payload {
case .goAway(let lastStreamID, let errorCode, let opaqueData):
try verify(lastStreamID, errorCode, opaqueData)
default:
XCTFail("Expected '.goAway' got '\(payload)'")
}
}

func XCTAssertPing(
_ payload: HTTP2Frame.FramePayload,
verify: (HTTP2PingData, Bool) throws -> Void = { _, _ in }
) rethrows {
switch payload {
case .ping(let data, ack: let ack):
try verify(data, ack)
default:
XCTFail("Expected '.ping' got '\(payload)'")
}
}

extension HTTP2Frame.FramePayload {
var goAway: (lastStreamID: HTTP2StreamID, errorCode: HTTP2ErrorCode, opaqueData: ByteBuffer?)? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,40 @@ struct ServerConnectionManagementHandlerTests {
// The server should close the connection.
try connection.waitUntilClosed()
}

@Test("Closes on error")
func closesOnError() throws {
let connection = try Connection()
try connection.activate()

let streamError = NIOHTTP2Errors.noSuchStream(streamID: 42)
connection.channel.pipeline.fireErrorCaught(streamError)

// Closing is completed on the next loop tick, so run the loop.
connection.channel.embeddedEventLoop.run()
try connection.channel.closeFuture.wait()
}

@Test("Doesn't close on stream error")
func doesNotCloseOnStreamError() throws {
let connection = try Connection(maxIdleTime: .minutes(1))
try connection.activate()

let streamError = NIOHTTP2Errors.streamError(
streamID: 42,
baseError: NIOHTTP2Errors.streamIDTooSmall()
)
connection.channel.pipeline.fireErrorCaught(streamError)

// Follow a normal flow to check the connection wasn't closed.
//
// Hit the max idle time.
connection.advanceTime(by: .minutes(1))
// Follow the graceful shutdown flow.
try self.testGracefulShutdown(connection: connection, lastStreamID: 0)
// Closed because no streams were open.
try connection.waitUntilClosed()
}
}

extension ServerConnectionManagementHandlerTests {
Expand Down

0 comments on commit fec325b

Please sign in to comment.