Skip to content

Commit 12e7e2b

Browse files
authored
Fixes skupperproject#1446: check for raw read close only after draining read buffers (skupperproject#1447)
When polling the raw connection it is possible for pn_raw_connection_is_read_closed() to return true when there are still read buffers available. This can happen when the read buffers are not completely drained due to flow control. The patch moves the check for raw read closed to after the point where all read buffers have been drained. Closes skupperproject#1446
1 parent 8d767ef commit 12e7e2b

File tree

2 files changed

+36
-18
lines changed

2 files changed

+36
-18
lines changed

src/adaptors/adaptor_tls.c

+24-9
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ struct qd_tls_t {
5858
qd_tls_on_secure_cb_t *on_secure_cb;
5959
bool tls_has_output;
6060
bool tls_error;
61-
bool output_eos; // pn_tls_close_output() called
62-
bool input_drained; // no more decrypted output, raw conn read closed
63-
bool output_flushed; // encrypt done, raw conn write closed
61+
bool output_eos; // pn_tls_close_output() called
62+
bool raw_read_drained; // raw conn read closed and all buffer read
63+
bool input_drained; // no more decrypted output, raw conn read closed
64+
bool output_flushed; // encrypt done, raw conn write closed
6465

6566
uint64_t encrypted_output_bytes;
6667
uint64_t encrypted_input_bytes;
@@ -1274,8 +1275,13 @@ int qd_tls_do_io2(qd_tls_t *tls,
12741275
if (capacity > 0 && (!pn_tls_is_secure(tls->tls_session) || input_data != 0)) {
12751276
size_t pushed = 0;
12761277
total_octets = 0;
1277-
while (pushed < capacity && pn_raw_connection_take_read_buffers(raw_conn, &pn_buf_desc, 1) == 1) {
1278-
if (pn_buf_desc.size) {
1278+
while (pushed < capacity) {
1279+
size_t took = pn_raw_connection_take_read_buffers(raw_conn, &pn_buf_desc, 1);
1280+
if (took != 1) {
1281+
// No more read buffers available. Now it is safe to check if the raw connection has closed
1282+
tls->raw_read_drained = pn_raw_connection_is_read_closed(raw_conn);
1283+
break;
1284+
} else if (pn_buf_desc.size) {
12791285
total_octets += pn_buf_desc.size;
12801286
given = pn_tls_give_decrypt_input_buffers(tls->tls_session, &pn_buf_desc, 1);
12811287
assert(given == 1);
@@ -1427,6 +1433,8 @@ int qd_tls_do_io2(qd_tls_t *tls,
14271433
return -1;
14281434
}
14291435

1436+
// check if the output (encrypt) side is done
1437+
//
14301438
if (tls->output_eos && !pn_tls_is_encrypt_output_pending(tls->tls_session)) {
14311439
// We closed the encrypt side of the TLS connection and we've sent all output
14321440
tls->output_flushed = true;
@@ -1437,9 +1445,12 @@ int qd_tls_do_io2(qd_tls_t *tls,
14371445
}
14381446
}
14391447

1440-
if (pn_tls_is_input_closed(tls->tls_session)
1441-
|| (pn_raw_connection_is_read_closed(raw_conn) && !pn_tls_is_decrypt_output_pending(tls->tls_session))) {
1442-
// TLS will not take any more encrypted input - drain the raw conn input
1448+
// check for end of input (decrypt done)
1449+
//
1450+
if (pn_tls_is_input_closed(tls->tls_session)) {
1451+
// TLS clean close signalled by remote. Do not read any more data (prevent truncation attack).
1452+
//
1453+
tls->input_drained = true;
14431454
if (!pn_raw_connection_is_read_closed(raw_conn)) {
14441455
qd_log(tls->log_module, QD_LOG_DEBUG,
14451456
"[C%" PRIu64 "] TLS input closed - closing read side of raw connection", tls->conn_id);
@@ -1448,8 +1459,12 @@ int qd_tls_do_io2(qd_tls_t *tls,
14481459
while (pn_raw_connection_take_read_buffers(raw_conn, &pn_buf_desc, 1) == 1) {
14491460
qd_buffer_free(_pn_buf_desc_take_buffer(&pn_buf_desc));
14501461
}
1451-
1462+
} else if (tls->raw_read_drained && !pn_tls_is_decrypt_output_pending(tls->tls_session)) {
1463+
// Unclean close: remote simply dropped the connection. Done reading remaining decrypted output.
1464+
//
14521465
tls->input_drained = true;
1466+
qd_log(tls->log_module, QD_LOG_DEBUG,
1467+
"[C%" PRIu64 "] TLS input closed", tls->conn_id);
14531468
}
14541469

14551470
return tls->input_drained && tls->output_flushed ? QD_TLS_DONE : 0;

src/adaptors/tcp_lite/tcp_lite.c

+12-9
Original file line numberDiff line numberDiff line change
@@ -713,13 +713,13 @@ static void grant_read_buffers_XSIDE_IO(tcplite_connection_t *conn, const size_t
713713
}
714714

715715

716-
static uint64_t produce_read_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_message_t *stream, bool *blocked)
716+
static uint64_t produce_read_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_message_t *stream, bool *read_closed)
717717
{
718718
ASSERT_RAW_IO;
719719
uint64_t octet_count = 0;
720+
*read_closed = false;
720721

721722
if (qd_message_can_produce_buffers(stream)) {
722-
*blocked = false;
723723
qd_buffer_list_t qd_buffers = DEQ_EMPTY;
724724
pn_raw_buffer_t raw_buffers[RAW_BUFFER_BATCH_SIZE];
725725
size_t count;
@@ -742,12 +742,15 @@ static uint64_t produce_read_buffers_XSIDE_IO(tcplite_connection_t *conn, qd_mes
742742
count = pn_raw_connection_take_read_buffers(conn->raw_conn, raw_buffers, RAW_BUFFER_BATCH_SIZE);
743743
}
744744

745+
// ISSUE-1446: it is only safe to check pn_raw_connection_is_read_closed() after all read buffers are drained since
746+
// the connection can be marked closed while read buffers are still pending in the raw connection.
747+
//
748+
*read_closed = pn_raw_connection_is_read_closed(conn->raw_conn);
749+
745750
if (!DEQ_IS_EMPTY(qd_buffers)) {
746751
//qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] produce_read_buffers_XSIDE_IO - Producing %ld buffers", conn->conn_id, DEQ_SIZE(qd_buffers));
747752
qd_message_produce_buffers(stream, &qd_buffers);
748753
}
749-
} else {
750-
*blocked = true;
751754
}
752755

753756
return octet_count;
@@ -1240,14 +1243,14 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
12401243
//
12411244
// Produce available read buffers into the inbound stream
12421245
//
1246+
bool read_closed = false;
12431247
bool was_blocked = window_full(conn);
1244-
bool blocked;
1245-
uint64_t octet_count = produce_read_buffers_XSIDE_IO(conn, conn->inbound_stream, &blocked);
1248+
uint64_t octet_count = produce_read_buffers_XSIDE_IO(conn, conn->inbound_stream, &read_closed);
12461249
conn->inbound_octets += octet_count;
12471250

12481251
if (octet_count > 0) {
12491252
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] %cSIDE Raw read: Produced %"PRIu64" octets into stream", conn->conn_id, conn->listener_side ? 'L' : 'C', octet_count);
1250-
if (!was_blocked && window_full(conn)) {
1253+
if (!was_blocked && window_full(conn) && !read_closed) {
12511254
conn->window.closed_count += 1;
12521255
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP RX window CLOSED: inbound_bytes=%" PRIu64 " unacked=%" PRIu64,
12531256
DLV_ARGS(conn->inbound_delivery), conn->inbound_octets,
@@ -1271,8 +1274,8 @@ static bool manage_flow_XSIDE_IO(tcplite_connection_t *conn)
12711274
// If the raw connection is read-closed and the last produce did not block, settle and complete
12721275
// the inbound stream/delivery and close out the inbound half of the connection.
12731276
//
1274-
if (pn_raw_connection_is_read_closed(conn->raw_conn) && !blocked) {
1275-
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Read-closed - close inbound delivery", conn->conn_id);
1277+
if (read_closed) {
1278+
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " Raw conn read-closed - close inbound delivery", DLV_ARGS(conn->inbound_delivery));
12761279
qd_message_set_receive_complete(conn->inbound_stream);
12771280
qdr_delivery_continue(tcplite_context->core, conn->inbound_delivery, false);
12781281
qdr_delivery_set_context(conn->inbound_delivery, 0);

0 commit comments

Comments
 (0)