forked from skupperproject/skupper-router
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtcp_adaptor.c
2565 lines (2184 loc) · 103 KB
/
tcp_adaptor.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/enum.h>
#include <qpid/dispatch/alloc_pool.h>
#include <qpid/dispatch/io_module.h>
#include <qpid/dispatch/protocol_adaptor.h>
#include <qpid/dispatch/server.h>
#include <qpid/dispatch/log.h>
#include <qpid/dispatch/platform.h>
#include <qpid/dispatch/connection_counters.h>
#include <proton/proactor.h>
#include <proton/raw_connection.h>
#include <proton/listener.h>
#include "tcp_adaptor.h"
#include <stdatomic.h>
//
// Function suffixes in this module:
//
// _CSIDE_IO - Raw-connection IO thread, connector-side
// _LSIDE_IO - Raw-connection IO thread, listener-side
// _XSIDE_IO - Raw-connection IO thread, either-side
// _TIMER_IO - Connectionless IO thread
// _CSIDE - Timer IO thread, connector-side
// _IO - Any IO thread
//
ALLOC_DEFINE(qd_tcp_listener_t);
ALLOC_DEFINE(qd_tcp_connector_t);
ALLOC_DEFINE_SAFE(qd_tcp_connection_t);
static const char *const state_names[] =
{
[LSIDE_INITIAL] = "LSIDE_INITIAL",
[LSIDE_TLS_HANDSHAKE] = "LSIDE_TLS_HANDSHAKE",
[LSIDE_LINK_SETUP] = "LSIDE_LINK_SETUP",
[LSIDE_STREAM_START] = "LSIDE_STREAM_START",
[LSIDE_FLOW] = "LSIDE_FLOW",
[LSIDE_TLS_FLOW] = "LSIDE_TLS_FLOW",
[CSIDE_INITIAL] = "CSIDE_INITIAL",
[CSIDE_LINK_SETUP] = "CSIDE_LINK_SETUP",
[CSIDE_FLOW] = "CSIDE_FLOW",
[CSIDE_TLS_FLOW] = "CSIDE_TLS_FLOW",
[XSIDE_CLOSING] = "XSIDE_CLOSING"
};
ENUM_DEFINE(qd_tcp_connection_state, state_names);
#define CONNECTION_CLOSE_TIME 10000
#define RAW_BUFFER_BATCH_SIZE 16
//
// Global Adaptor State
//
typedef struct {
qdr_core_t *core;
qd_dispatch_t *qd;
qd_server_t *server;
qdr_protocol_adaptor_t *pa;
qd_tcp_listener_list_t listeners;
qd_tcp_connector_list_t connectors;
qd_tcp_connection_list_t connections;
sys_mutex_t lock;
pn_proactor_t *proactor;
bool adaptor_finalizing;
} qd_tcp_context_t;
static qd_tcp_context_t *tcp_context;
static uint64_t buffer_ceiling = 0;
static uint64_t buffer_threshold_50;
static uint64_t buffer_threshold_75;
static uint64_t buffer_threshold_85;
// Window Flow Control
//
// This adaptor uses a simple window with acknowledge algorithm to enforce backpressure on the TCP sender. The ingress
// adaptor will only permit up to TCP_MAX_CAPACITY_BYTES bytes received before backpressuring the sender by no longer
// granting empty receive buffers to the raw connection. The egress adapter will send its count of total bytes written
// out the raw connection every TCP_ACK_THRESHOLD_BYTES written bytes. The egress sends this update via a PN_RECEIVED
// frame, setting the section_offset field to the total number of bytes written out the raw connection. When the
// PN_RECEIVED update is received at the ingress adaptor it will update the window size by subtracting the
// section_offset from its total received bytes counter. If the result of the subtraction is less than TCP_MAX_CAPACITY
// then backpressure is relieved and empty receive buffers are given to the raw connection.
//
// TCP_MAX_CAPACITY_BYTES: this is set to 2x the maximum number of bytes a cut through message can buffer. This makes
// the window large enough to max out a message at each end of the TCP flow.
//
#define TCP_FULL_MSG_BYTES (QD_BUFFER_DEFAULT_SIZE * UCT_SLOT_COUNT * UCT_SLOT_BUF_LIMIT)
#define TCP_MAX_CAPACITY_BYTES (TCP_FULL_MSG_BYTES * UINT64_C(2))
#define TCP_ACK_THRESHOLD_BYTES TCP_FULL_MSG_BYTES
// is the incoming byte window full?
//
inline static bool window_full(const qd_tcp_connection_t *conn)
{
return !conn->window.disabled && (conn->inbound_octets - conn->window.last_update) >= TCP_MAX_CAPACITY_BYTES;
}
//
// Forward References
//
static void on_connection_event_CSIDE_IO(pn_event_t *e, qd_server_t *qd_server, void *context);
static void connection_run_LSIDE_IO(qd_tcp_connection_t *conn);
static void connection_run_CSIDE_IO(qd_tcp_connection_t *conn);
static void connection_run_XSIDE_IO(qd_tcp_connection_t *conn);
static uint64_t validate_outbound_message(const qdr_delivery_t *out_dlv);
static void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void *context);
static void on_tls_connection_secured(qd_tls_t *tls, void *user_context);
static char *get_tls_negotiated_alpn(qd_message_t *msg); // caller must free() returned string!
static int setup_tls_session(qd_tcp_connection_t *conn, const qd_tls_domain_t *parent_domain, const char *alpn_protocol);
//=================================================================================
// Thread assertions
//=================================================================================
typedef enum {
THREAD_UNKNOWN,
THREAD_ROUTER_CORE,
THREAD_TIMER_IO,
THREAD_RAW_IO
} qd_tcp_thread_state_t;
__thread qd_tcp_thread_state_t tcp_thread_state;
#define SET_THREAD_UNKNOWN tcp_thread_state = THREAD_UNKNOWN
#define SET_THREAD_ROUTER_CORE tcp_thread_state = THREAD_ROUTER_CORE
#define SET_THREAD_TIMER_IO tcp_thread_state = THREAD_TIMER_IO
#define SET_THREAD_RAW_IO tcp_thread_state = THREAD_RAW_IO
#define ASSERT_ROUTER_CORE assert(tcp_thread_state == THREAD_ROUTER_CORE || tcp_context->adaptor_finalizing)
#define ASSERT_TIMER_IO assert(tcp_thread_state == THREAD_TIMER_IO || tcp_context->adaptor_finalizing)
#define ASSERT_RAW_IO assert(tcp_thread_state == THREAD_RAW_IO || tcp_context->adaptor_finalizing)
//=================================================================================
// Core Activation Handler
//=================================================================================
/**
* This function in invoked in a timer thread, not associated with any IO context, in order to process core connections
* terminated in the adaptor. The core connections processed here are for connectors only. Connection activation
* happens elsewhere, in the context of a Proton raw IO connection.
*/
static void on_core_activate_TIMER_IO(void *context)
{
SET_THREAD_TIMER_IO;
assert(((qd_tcp_common_t*) context)->context_type == TL_CONNECTOR);
qdr_connection_t *core_conn = ((qd_tcp_connector_t*) context)->core_conn;
qdr_connection_process(core_conn);
}
//=================================================================================
// Helper Functions
//=================================================================================
static pn_data_t *TL_conn_properties(void)
{
// Return a new tcp connection properties map.
pn_data_t *props = pn_data(0);
pn_data_put_map(props);
pn_data_enter(props);
pn_data_put_symbol(props,
pn_bytes(strlen(QD_CONNECTION_PROPERTY_ADAPTOR_KEY),
QD_CONNECTION_PROPERTY_ADAPTOR_KEY));
pn_data_put_string(props,
pn_bytes(strlen(QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE),
QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE));
pn_data_exit(props);
return props;
}
static qdr_connection_t *TL_open_core_connection(uint64_t conn_id, bool incoming, const char *host)
{
qdr_connection_t *conn;
//
// The qdr_connection_info() function makes its own copy of the passed in tcp_conn_properties.
// So, we need to call pn_data_free(properties)
//
pn_data_t *properties = TL_conn_properties();
qdr_connection_info_t *info = qdr_connection_info(false, // is_encrypted,
false, // is_authenticated,
true, // opened,
"", // sasl_mechanisms,
incoming ? QD_INCOMING : QD_OUTGOING, // dir,
host,
"", // ssl_proto,
"", // ssl_cipher,
"", // user,
"TcpAdaptor", // container,
properties, // connection_properties,
0, // ssl_ssf,
false, // ssl,
"", // peer router version,
true, // streaming links
false); // connection trunking
pn_data_free(properties);
conn = qdr_connection_opened(tcp_context->core,
tcp_context->pa,
incoming, // incoming
QDR_ROLE_NORMAL, // role
1, // cost
conn_id, // management_id
0, // label
0, // remote_container_id
false, // strip_annotations_in
false, // strip_annotations_out
5, // link_capacity
0, // policy_spec
info, // connection_info
0, // context_binder
0); // bind_token
qd_connection_counter_inc(QD_PROTOCOL_TCP);
return conn;
}
static void TL_setup_listener(qd_tcp_listener_t *li)
{
//
// Create a vflow record for this listener
//
li->common.vflow = vflow_start_record(VFLOW_RECORD_LISTENER, 0);
vflow_set_string(li->common.vflow, VFLOW_ATTRIBUTE_PROTOCOL, "tcp");
vflow_set_string(li->common.vflow, VFLOW_ATTRIBUTE_NAME, li->adaptor_config->name);
vflow_set_string(li->common.vflow, VFLOW_ATTRIBUTE_DESTINATION_HOST, li->adaptor_config->host);
vflow_set_string(li->common.vflow, VFLOW_ATTRIBUTE_DESTINATION_PORT, li->adaptor_config->port);
vflow_set_string(li->common.vflow, VFLOW_ATTRIBUTE_VAN_ADDRESS, li->adaptor_config->address);
vflow_set_uint64(li->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, 0);
vflow_add_rate(li->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, VFLOW_ATTRIBUTE_FLOW_RATE_L4);
//
// Set up the protocol observer
//
// TODO - add configuration to the listener to influence whether and how the observer is set up.
//
li->protocol_observer_config = qdpo_config(0, true);
li->protocol_observer = protocol_observer("tcp", li->protocol_observer_config);
//
// Create an adaptor listener. This listener will automatically create a listening socket when there is at least one
// consumer for the service address. Once the last consumer for the service address goes away the adaptor listener
// will automatically close the listening socket. When a client connects to the listening socket the "on_accept"
// callback will be invoked on the proactor listener thread.
//
li->adaptor_listener = qd_adaptor_listener(tcp_context->qd, li->adaptor_config, LOG_TCP_ADAPTOR);
qd_adaptor_listener_listen(li->adaptor_listener, on_accept, li);
}
static void TL_setup_connector(qd_tcp_connector_t *connector)
{
//
// Set up a core connection to handle all of the links and deliveries for this connector
//
connector->conn_id = qd_server_allocate_connection_id(tcp_context->server);
connector->core_conn = TL_open_core_connection(connector->conn_id, false, "egress-dispatch");
qdr_connection_set_context(connector->core_conn, connector);
connector->connections_opened = 1; // for legacy compatibility: it counted the egress-dispatch conn
//
// Attach an out-link to represent our desire to receive connection streams for the address
//
qdr_terminus_t *source = qdr_terminus(0);
qdr_terminus_set_address(source, connector->adaptor_config->address);
//
// Create a vflow record for this connector
//
connector->common.vflow = vflow_start_record(VFLOW_RECORD_CONNECTOR, 0);
vflow_set_string(connector->common.vflow, VFLOW_ATTRIBUTE_PROTOCOL, "tcp");
vflow_set_string(connector->common.vflow, VFLOW_ATTRIBUTE_NAME, connector->adaptor_config->name);
vflow_set_string(connector->common.vflow, VFLOW_ATTRIBUTE_DESTINATION_HOST, connector->adaptor_config->host);
vflow_set_string(connector->common.vflow, VFLOW_ATTRIBUTE_DESTINATION_PORT, connector->adaptor_config->port);
vflow_set_string(connector->common.vflow, VFLOW_ATTRIBUTE_VAN_ADDRESS, connector->adaptor_config->address);
vflow_set_uint64(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, 0);
vflow_add_rate(connector->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, VFLOW_ATTRIBUTE_FLOW_RATE_L4);
connector->out_link = qdr_link_first_attach(connector->core_conn, QD_OUTGOING, source, 0, "tcp.connector.out", 0, false, 0, &connector->link_id);
qdr_link_set_user_streaming(connector->out_link);
qdr_link_set_context(connector->out_link, connector);
qdr_link_flow(tcp_context->core, connector->out_link, 5, false);
}
static void drain_read_buffers_XSIDE_IO(pn_raw_connection_t *raw_conn)
{
ASSERT_RAW_IO;
pn_raw_buffer_t raw_buffers[RAW_BUFFER_BATCH_SIZE];
size_t count;
while ((count = pn_raw_connection_take_read_buffers(raw_conn, raw_buffers, RAW_BUFFER_BATCH_SIZE))) {
for (size_t i = 0; i < count; i++) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
qd_buffer_free(buf);
}
}
}
static void drain_write_buffers_XSIDE_IO(pn_raw_connection_t *raw_conn)
{
ASSERT_RAW_IO;
pn_raw_buffer_t raw_buffers[RAW_BUFFER_BATCH_SIZE];
size_t count;
while ((count = pn_raw_connection_take_written_buffers(raw_conn, raw_buffers, RAW_BUFFER_BATCH_SIZE))) {
for (size_t i = 0; i < count; i++) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
if (!!buf) {
qd_buffer_free(buf);
}
}
}
}
static void set_state_XSIDE_IO(qd_tcp_connection_t *conn, qd_tcp_connection_state_t new_state)
{
ASSERT_RAW_IO;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] State change %s -> %s",
conn->conn_id, qd_tcp_connection_state_name(conn->state), qd_tcp_connection_state_name(new_state));
conn->state = new_state;
}
//
// Connector/Connection cleanup
//
// Both tcp_connector_t and tcp_connection_t allocate a core qdr_connection_t instance which is used by the Core
// thread for activation (see CORE_activate()). During cleanup of these objects we need to ensure that both the I/O and
// Core threads do not reference them after they have been deallocated. To do this we use a two-phase approach to
// freeing these objects. In the first phase all non-activation-related resources are released by the I/O thread (see
// free_connector(), free_connection_IO). Then the object is passed to the Core thread for cleanup of the activation
// resources and freeing the base object (see free_tcp_resource(), qdr_core_free_tcp_resource_CT()).
//
// tcp_listener_t does not use a qdr_connection_t so this process does not apply to it.
//
static void qdr_core_free_tcp_resource_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
// note: need to release the resource regardless of discard flag
qd_tcp_common_t *common = (qd_tcp_common_t *)action->args.general.context_1;
if (common->context_type == TL_CONNECTION) {
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) common;
sys_atomic_destroy(&conn->core_activation);
sys_atomic_destroy(&conn->raw_opened);
sys_mutex_free(&conn->activation_lock);
free_qd_tcp_connection_t(conn);
} else {
// Core does not hold a reference to a listener so they are not freed here
assert(common->context_type == TL_CONNECTOR);
qd_tcp_connector_t *cr = (qd_tcp_connector_t *)common;
sys_mutex_free(&cr->lock);
free_qd_tcp_connector_t(cr);
}
}
static void free_tcp_resource(qd_tcp_common_t *resource)
{
assert(tcp_context);
qdr_action_t *action = qdr_action(qdr_core_free_tcp_resource_CT, "core free tcp resource");
action->args.general.context_1 = resource;
qdr_action_enqueue(tcp_context->core, action);
}
static void free_listener(qd_tcp_listener_t *li)
{
sys_mutex_lock(&tcp_context->lock);
DEQ_REMOVE(tcp_context->listeners, li);
sys_mutex_unlock(&tcp_context->lock);
//
// This call to vflow_end_record is only here to doubly make sure that any future calls to free_listener
// will end the vflow record if it has not already ended and zeroed out.
//
if (li->common.vflow) {
vflow_end_record(li->common.vflow);
}
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"Deleted TcpListener for %s, %s:%s",
li->adaptor_config->address, li->adaptor_config->host, li->adaptor_config->port);
qdpo_free(li->protocol_observer);
qdpo_config_free(li->protocol_observer_config);
qd_tls_domain_decref(li->tls_domain);
qd_free_adaptor_config(li->adaptor_config);
sys_mutex_free(&li->lock);
free_qd_tcp_listener_t(li);
}
static void free_connector(qd_tcp_connector_t *cr)
{
// Disable activation by the Core thread.
sys_mutex_lock(&cr->lock);
qd_timer_free(cr->activate_timer);
cr->activate_timer = 0;
sys_mutex_unlock(&cr->lock);
// Do NOT free the cr->lock mutex since the core may be holding it.
sys_mutex_lock(&tcp_context->lock);
DEQ_REMOVE(tcp_context->connectors, cr);
sys_mutex_unlock(&tcp_context->lock);
vflow_end_record(cr->common.vflow);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"Deleted TcpConnector for %s, %s:%s",
cr->adaptor_config->address, cr->adaptor_config->host, cr->adaptor_config->port);
qd_tls_domain_decref(cr->tls_domain);
qd_free_adaptor_config(cr->adaptor_config);
// Pass connector to Core for final deallocation. The Core will free the cr->lock.
// see qdr_core_free_tcp_resource_CT()
free_tcp_resource(&cr->common);
}
static void free_connection_IO(void *context)
{
// No thread assertion here - can be RAW_IO or TIMER_IO
bool free_parent = false;
qd_tcp_connection_t *conn = (qd_tcp_connection_t*) context;
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] Cleaning up resources", conn->conn_id);
// Disable activation via Core thread. The lock needs to be taken to ensure the core thread is not currently
// attempting to activate the connection: after the mutex is unlocked we're guaranteed no further activations can
// take place.
sys_mutex_lock(&conn->activation_lock);
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
sys_mutex_unlock(&conn->activation_lock);
// Do NOT free the core_activation lock since the core may be holding it
if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
DEQ_REMOVE(li->connections, conn);
if (li->closing && DEQ_SIZE(li->connections) == 0) {
free_parent = true;
}
sys_mutex_unlock(&li->lock);
if (free_parent) {
free_listener(li);
}
} else {
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&cr->lock);
DEQ_REMOVE(cr->connections, conn);
if (cr->closing && DEQ_SIZE(cr->connections) == 0) {
free_parent = true;
}
sys_mutex_unlock(&cr->lock);
if (free_parent) {
free_connector(cr);
}
}
// Pass connector to Core for final deallocation. The Core will free the activation_lock and the related flags. See
// qdr_core_free_tcp_resource_CT()
free_tcp_resource(&conn->common);
}
// Initate close of the raw connection.
//
// The close will be complete when the PN_RAW_CONNECTION_DISCONNECTED event is handled. At that point any associated
// connection condition information will be read from the raw conn and written to the flow log.
//
// @param conn Holds the raw connection to close
// @param condition Optional condition identifying the reason the connection was closed
// @param description Optional description assocated with condition
//
static void close_raw_connection(qd_tcp_connection_t *conn, const char *condition, const char *description)
{
ASSERT_RAW_IO;
assert(conn->raw_conn);
if (condition) {
pn_condition_t *cond = pn_raw_connection_condition(conn->raw_conn);
if (!!cond) {
(void) pn_condition_set_name(cond, condition);
if (description) {
(void) pn_condition_set_description(cond, description);
}
}
}
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
pn_raw_connection_close(conn->raw_conn);
// Connection cleanup occurs on the PN_RAW_CONNECTION_DISCONNECTED event
}
static void close_connection_XSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
if (conn->state != XSIDE_CLOSING)
set_state_XSIDE_IO(conn, XSIDE_CLOSING);
if (!!conn->raw_conn) {
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
pn_raw_connection_close(conn->raw_conn);
drain_read_buffers_XSIDE_IO(conn->raw_conn);
drain_write_buffers_XSIDE_IO(conn->raw_conn);
// note: this disables the raw connection event handler. No further PN_RAW_CONNECTION_* events will occur,
// including DISCONNECTED!
sys_mutex_lock(&conn->activation_lock);
pn_raw_connection_set_context(conn->raw_conn, 0);
conn->raw_conn = 0;
sys_mutex_unlock(&conn->activation_lock);
}
free(conn->reply_to);
if (!!conn->inbound_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel producer activation", DLV_ARGS(conn->inbound_delivery));
qd_message_cancel_producer_activation(conn->inbound_stream);
}
if (!!conn->outbound_stream) {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP cancel consumer activation", DLV_ARGS(conn->outbound_delivery));
qd_message_cancel_consumer_activation(conn->outbound_stream);
}
if (!!conn->inbound_delivery) {
if (!!conn->inbound_stream) {
qd_message_set_receive_complete(conn->inbound_stream);
qdr_delivery_continue(tcp_context->core, conn->inbound_delivery, false);
}
qdr_delivery_remote_state_updated(tcp_context->core, conn->inbound_delivery, 0, true, 0, false);
qdr_delivery_set_context(conn->inbound_delivery, 0);
qdr_delivery_decref(tcp_context->core, conn->inbound_delivery, "close_connection_XSIDE_IO - inbound_delivery released");
}
if (!!conn->inbound_link) {
qdr_link_detach(conn->inbound_link, QD_LOST, 0);
}
if (!!conn->outbound_delivery) {
qdr_delivery_remote_state_updated(tcp_context->core, conn->outbound_delivery, PN_MODIFIED, true, 0, false);
qdr_delivery_set_context(conn->outbound_delivery, 0);
qdr_delivery_decref(tcp_context->core, conn->outbound_delivery, "close_connection_XSIDE_IO - outbound_delivery released");
}
if (!!conn->outbound_link) {
qdr_link_detach(conn->outbound_link, QD_LOST, 0);
}
if (!!conn->core_conn) {
qdr_connection_closed(conn->core_conn);
qd_connection_counter_dec(QD_PROTOCOL_TCP);
}
if (!!conn->common.vflow) {
vflow_set_uint64(conn->common.vflow, VFLOW_ATTRIBUTE_OCTETS, conn->inbound_octets);
vflow_end_record(conn->common.vflow);
}
qd_tls_free2(conn->tls);
qd_tls_domain_decref(conn->tls_domain);
free(conn->alpn_protocol);
conn->reply_to = 0;
conn->inbound_link = 0;
conn->inbound_stream = 0;
conn->inbound_delivery = 0;
conn->outbound_link = 0;
conn->outbound_stream = 0;
conn->outbound_delivery = 0;
conn->core_conn = 0;
conn->common.vflow = 0;
conn->tls = 0;
conn->tls_domain = 0;
if (!!conn->common.parent && conn->common.parent->context_type == TL_LISTENER) {
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
sys_mutex_lock(&li->lock);
li->connections_closed++;
sys_mutex_unlock(&li->lock);
} else {
qd_tcp_connector_t *cr = (qd_tcp_connector_t*) conn->common.parent;
sys_mutex_lock(&cr->lock);
cr->connections_closed++;
sys_mutex_unlock(&cr->lock);
}
free_connection_IO(conn);
}
static void grant_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, const size_t capacity)
{
ASSERT_RAW_IO;
//
// Cannot grant read buffers if the connection is currently blocked due to window flow control
//
if (window_full(conn)) {
return;
}
//
// Define the allocation tiers. The tier values are the number of read buffers to be granted
// to raw connections based on the percentage of usage of the router-wide buffer ceiling.
//
#define TIER_1 8 // [0% .. 50%)
#define TIER_2 4 // [50% .. 75%)
#define TIER_3 2 // [75% .. 85%)
#define TIER_4 1 // [85% .. 100%]
//
// Since we can't query Proton for the maximum read-buffer capacity, we will infer it from
// calls to pn_raw_connection_read_buffers_capacity.
//
static atomic_size_t max_capacity;
size_t current_mc = atomic_load(&max_capacity);
while (capacity > current_mc) {
if (atomic_compare_exchange_weak(&max_capacity, ¤t_mc, capacity))
break;
}
//
// Get the "held_by_threads" stats for router buffers as an approximation of how many
// buffers are in-use. This is an approximation since it also counts free buffers held
// in the per-thread free-pools. Since we will be dealing with large numbers here, the
// number of buffers in free-pools will not be significant.
//
qd_alloc_stats_t stats = alloc_stats_qd_buffer_t();
uint64_t buffers_in_use = stats.held_by_threads;
//
// Choose the grant-allocation tier based on the number of buffers in use.
//
size_t desired = TIER_4;
if (buffers_in_use < buffer_threshold_50) {
desired = TIER_1;
} else if (buffers_in_use < buffer_threshold_75) {
desired = TIER_2;
} else if (buffers_in_use < buffer_threshold_85) {
desired = TIER_3;
}
//
// Determine how many buffers are already granted. This will always be a non-negative value.
//
current_mc = atomic_load(&max_capacity);
assert(current_mc >= capacity);
size_t already_granted = current_mc - capacity;
//
// If we desire to grant additional buffers, calculate the number to grant now.
//
const size_t granted = desired > already_granted ? desired - already_granted : 0;
if (granted > 0) {
//
// Grant the buffers.
//
pn_raw_buffer_t raw_buffers[granted];
for (size_t i = 0; i < granted; i++) {
qd_buffer_t *buf = qd_buffer();
raw_buffers[i].context = (uintptr_t) buf;
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
raw_buffers[i].capacity = qd_buffer_capacity(buf);
raw_buffers[i].offset = 0;
raw_buffers[i].size = 0;
}
size_t actual = pn_raw_connection_give_read_buffers(conn->raw_conn, raw_buffers, granted);
(void) actual;
assert(actual == granted);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] grant_read_buffers_XSIDE_IO - %ld", conn->conn_id, granted);
}
}
static uint64_t produce_read_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, bool *read_closed)
{
ASSERT_RAW_IO;
uint64_t octet_count = 0;
*read_closed = false;
if (qd_message_can_produce_buffers(stream)) {
qd_buffer_list_t qd_buffers = DEQ_EMPTY;
pn_raw_buffer_t raw_buffers[RAW_BUFFER_BATCH_SIZE];
size_t count;
count = pn_raw_connection_take_read_buffers(conn->raw_conn, raw_buffers, RAW_BUFFER_BATCH_SIZE);
while (count > 0) {
for (size_t i = 0; i < count; i++) {
qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
qd_buffer_insert(buf, raw_buffers[i].size);
octet_count += raw_buffers[i].size;
if (qd_buffer_size(buf) > 0) {
DEQ_INSERT_TAIL(qd_buffers, buf);
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, true, buf, 0);
}
} else {
qd_buffer_free(buf);
}
}
count = pn_raw_connection_take_read_buffers(conn->raw_conn, raw_buffers, RAW_BUFFER_BATCH_SIZE);
}
// ISSUE-1446: it is only safe to check pn_raw_connection_is_read_closed() after all read buffers are drained since
// the connection can be marked closed while read buffers are still pending in the raw connection.
//
*read_closed = pn_raw_connection_is_read_closed(conn->raw_conn);
if (!DEQ_IS_EMPTY(qd_buffers)) {
//qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] produce_read_buffers_XSIDE_IO - Producing %ld buffers", conn->conn_id, DEQ_SIZE(qd_buffers));
qd_message_produce_buffers(stream, &qd_buffers);
}
}
return octet_count;
}
static uint64_t consume_write_buffers_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream)
{
ASSERT_RAW_IO;
size_t limit = pn_raw_connection_write_buffers_capacity(conn->raw_conn);
uint64_t octet_count = 0;
if (limit > 0) {
qd_buffer_list_t buffers = DEQ_EMPTY;
size_t actual = qd_message_consume_buffers(stream, &buffers, limit);
assert(actual == DEQ_SIZE(buffers));
if (actual > 0) {
pn_raw_buffer_t raw_buffers[actual];
qd_buffer_t *buf = DEQ_HEAD(buffers);
for (size_t i = 0; i < actual; i++) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, buf, 0);
}
raw_buffers[i].context = (uintptr_t) buf;
raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
raw_buffers[i].capacity = qd_buffer_capacity(buf);
raw_buffers[i].size = qd_buffer_size(buf);
raw_buffers[i].offset = 0;
octet_count += raw_buffers[i].size;
buf = DEQ_NEXT(buf);
}
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%"PRIu64"] consume_write_buffers_XSIDE_IO - Consuming %ld buffers", conn->conn_id, actual);
pn_raw_connection_write_buffers(conn->raw_conn, raw_buffers, actual);
}
}
return octet_count;
}
// Alternative to consume_message_body_XSIDE_IO() for use with TLS connections. The TLS layer takes ownership of all
// output message buffers and will free them as they are processed. Due to that we need to make a copy of these buffers
// in order to avoid freeing buffers that are part of the message (double-free).
//
static uint64_t copy_message_body_TLS_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream, qd_buffer_list_t *buffers, size_t limit)
{
size_t offset = 0;
uint64_t octets = 0;
const bool observe = (conn->listener_side && !!conn->observer_handle);
assert(conn->tls);
if (!conn->outbound_body) {
assert(!conn->outbound_body_complete);
qd_message_get_raw_body_data(stream, &conn->outbound_body, &offset);
}
while (limit-- && conn->outbound_body) {
size_t size = qd_buffer_size(conn->outbound_body) - offset;
if (size > 0) {
qd_buffer_t *clone = qd_buffer();
clone->size = size;
memcpy(qd_buffer_base(clone), qd_buffer_base(conn->outbound_body) + offset, size);
if (observe) {
qdpo_data(conn->observer_handle, false, clone, 0);
}
DEQ_INSERT_TAIL(*buffers, clone);
}
octets += size;
offset = 0;
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
}
if (!conn->outbound_body) {
conn->outbound_body_complete = true;
qd_message_release_raw_body(stream);
}
return octets;
}
static uint64_t consume_message_body_XSIDE_IO(qd_tcp_connection_t *conn, qd_message_t *stream)
{
ASSERT_RAW_IO;
assert(!conn->outbound_body_complete);
uint64_t octets = 0;
size_t offset = 0;
if (!conn->outbound_body) {
// Get the pointer to the buffer containing the first octet of the body and the offset to that octet. If there
// are no body octets, we will be given a NULL pointer.
//
qd_message_get_raw_body_data(stream, &conn->outbound_body, &offset);
}
//
// Process classic (non cut-though) body buffers until they are all sent onto the raw connection.
// Note that this may take multiple runs through this function if there is any back-pressure
// outbound on the raw connection.
//
// Note: There may be a non-zero offset only on the first body buffer. It is assumed that
// every subsequent buffer will have an offset of 0.
//
while (!!conn->outbound_body && pn_raw_connection_write_buffers_capacity(conn->raw_conn) > 0) {
if (conn->listener_side && !!conn->observer_handle) {
qdpo_data(conn->observer_handle, false, conn->outbound_body, offset);
}
pn_raw_buffer_t raw_buffer;
raw_buffer.context = 0;
raw_buffer.bytes = (char*) qd_buffer_base(conn->outbound_body);
raw_buffer.capacity = qd_buffer_capacity(conn->outbound_body);
raw_buffer.size = qd_buffer_size(conn->outbound_body) - offset;
raw_buffer.offset = offset;
octets += raw_buffer.size;
pn_raw_connection_write_buffers(conn->raw_conn, &raw_buffer, 1);
conn->outbound_body = DEQ_NEXT(conn->outbound_body);
offset = 0;
}
if (!conn->outbound_body) {
conn->outbound_body_complete = true;
qd_message_release_raw_body(stream);
}
return octets;
}
static void link_setup_LSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
qdr_terminus_t *target = qdr_terminus(0);
qdr_terminus_t *source = qdr_terminus(0);
char host[64]; // for numeric remote client IP:port address
qdr_terminus_set_address(target, li->adaptor_config->address);
qdr_terminus_set_dynamic(source);
qd_raw_conn_get_address_buf(conn->raw_conn, host, sizeof(host));
conn->core_conn = TL_open_core_connection(conn->conn_id, true, host);
qdr_connection_set_context(conn->core_conn, conn);
conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), target, "tcp.lside.in", 0, false, 0, &conn->inbound_link_id);
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, source, qdr_terminus(0), "tcp.lside.out", 0, false, 0, &conn->outbound_link_id);
qdr_link_set_context(conn->outbound_link, conn);
qdr_link_set_user_streaming(conn->outbound_link);
qdr_link_flow(tcp_context->core, conn->outbound_link, 1, false);
}
static void link_setup_CSIDE_IO(qd_tcp_connection_t *conn, qdr_delivery_t *delivery)
{
ASSERT_RAW_IO;
assert(conn->common.parent->context_type == TL_CONNECTOR);
const char *host = ((qd_tcp_connector_t *)conn->common.parent)->adaptor_config->host_port;
conn->core_conn = TL_open_core_connection(conn->conn_id, false, host);
qdr_connection_set_context(conn->core_conn, conn);
// use an anonymous inbound link in order to ensure credit arrives otherwise if the client has dropped the state machine will stall waiting for credit
conn->inbound_link = qdr_link_first_attach(conn->core_conn, QD_INCOMING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.in", 0, false, 0, &conn->inbound_link_id);
qdr_link_set_context(conn->inbound_link, conn);
conn->outbound_link = qdr_link_first_attach(conn->core_conn, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), "tcp.cside.out", 0, false, delivery, &conn->outbound_link_id);
qdr_link_set_context(conn->outbound_link, conn);
// now that the raw connection is up and able to be activated enable cutthrough activation
assert(conn->outbound_stream);
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, DLV_FMT " TCP enabling consumer activation", DLV_ARGS(delivery));
qd_message_set_consumer_activation(conn->outbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->outbound_stream);
}
static bool try_compose_and_send_client_stream_LSIDE_IO(qd_tcp_connection_t *conn)
{
ASSERT_RAW_IO;
qd_tcp_listener_t *li = (qd_tcp_listener_t*) conn->common.parent;
qd_composed_field_t *message = 0;
//
// The content-type value of "application/octet-stream" is used to signal to the network that
// the body of this stream will be a completely unstructured octet stream, without even an
// application-data performative. The octets directly following the application-properties
// (or properties if there are no application-properties) section will constitute the stream
// and will consist solely of AMQP transport frames.
//
if (!!conn->reply_to) {
message = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
qd_compose_start_list(message);
qd_compose_insert_null(message); // message-id
qd_compose_insert_null(message); // user-id
qd_compose_insert_string(message, li->adaptor_config->address); // to
qd_compose_insert_null(message); // subject
qd_compose_insert_string(message, conn->reply_to); // reply-to
vflow_serialize_identity(conn->common.vflow, message); // correlation-id
qd_compose_insert_string(message, QD_CONTENT_TYPE_APP_OCTETS); // content-type
//qd_compose_insert_null(message); // content-encoding
//qd_compose_insert_timestamp(message, 0); // absolute-expiry-time
//qd_compose_insert_timestamp(message, 0); // creation-time
//qd_compose_insert_null(message); // group-id
//qd_compose_insert_uint(message, 0); // group-sequence
//qd_compose_insert_null(message); // reply-to-group-id
qd_compose_end_list(message);
if (conn->alpn_protocol) {
// add the ALPN protocol as negotiated with the remote via TLS.
message = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, message);
qd_compose_start_map(message);
qd_compose_insert_string_n(message, (const char *) QD_TLS_ALPN_KEY, QD_TLS_ALPN_KEY_LEN);
qd_compose_insert_string_n(message, (const char *) conn->alpn_protocol,
strlen(conn->alpn_protocol));
qd_compose_end_map(message);
}
message = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, message);
qd_compose_insert_null(message);
}
if (message == 0) {
return false;
}
conn->inbound_stream = qd_message();
qd_message_set_streaming_annotation(conn->inbound_stream);
qd_message_set_Q2_disabled_annotation(conn->inbound_stream);
qd_message_compose_2(conn->inbound_stream, message, false);
qd_compose_free(message);
//
// Start cut-through mode for this stream.
//
qd_message_activation_t activation;
activation.type = QD_ACTIVATION_TCP;
activation.delivery = 0;
qd_alloc_set_safe_ptr(&activation.safeptr, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "][L%" PRIu64 "] TCP enabling producer activation", conn->conn_id, conn->inbound_link_id);
qd_message_set_producer_activation(conn->inbound_stream, &activation);
qd_message_start_unicast_cutthrough(conn->inbound_stream);
//
// The delivery comes with a ref-count to protect the returned value. Inherit that ref-count as the
// protection of our held pointer.
//
conn->inbound_delivery = qdr_link_deliver(conn->inbound_link, conn->inbound_stream, 0, false, 0, 0, 0, 0);
qdr_delivery_set_context(conn->inbound_delivery, conn);
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG,
DLV_FMT " Initiating listener side empty client inbound stream message", DLV_ARGS(conn->inbound_delivery));