Skip to content

Commit a8063c6

Browse files
committed
fixup: add test for error path
1 parent 406a98e commit a8063c6

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed

tests/system_tests_tcp_adaptor_tls.py

+141
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818
#
1919
import os
20+
import shutil
2021
from system_test import unittest, TestCase, Qdrouterd, NcatException, Logger, Process, run_curl, \
2122
CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \
2223
SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \
@@ -801,3 +802,143 @@ def setUpClass(cls):
801802
super(HttpOverTcpLiteTestTlsTwoRouterNginx, cls).setUpClass(encap='lite')
802803
if skip_nginx_test():
803804
return
805+
806+
807+
class TcpAdaptorCertCorruptionTests(TestCase):
808+
"""
809+
The TCP adaptor re-loads the certificate files every time a new client or
810+
server TCP connection is created. This test verifies that the router can
811+
handle the case where the certificate files are removed or corrupted while
812+
new connections are created.
813+
"""
814+
@classmethod
815+
def setUpClass(cls):
816+
super(TcpAdaptorCertCorruptionTests, cls).setUpClass()
817+
cls.openssl_server_listening_port = cls.tester.get_port()
818+
cls.router_listener_port = cls.tester.get_port()
819+
820+
# make a local copy of the client and server certificates. This test
821+
# destroys these files, so we do not want to destroy the originals!
822+
823+
cls.server_cert_file = shutil.copy(SERVER_CERTIFICATE,
824+
os.path.dirname(os.getcwd()))
825+
cls.client_cert_file = shutil.copy(CLIENT_CERTIFICATE,
826+
os.path.dirname(os.getcwd()))
827+
828+
# Set up two routers, TCP listener on one, connector on another
829+
830+
inter_router_port = cls.tester.get_port()
831+
832+
config_qdra = Qdrouterd.Config([
833+
('router', {'mode': 'interior', 'id': 'QDR.A'}),
834+
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
835+
('listener', {'role': 'inter-router', 'port': inter_router_port}),
836+
('sslProfile', {'name': 'listener-ssl-profile',
837+
'caCertFile': CA_CERT,
838+
'certFile': cls.server_cert_file,
839+
'privateKeyFile': SERVER_PRIVATE_KEY,
840+
'password': SERVER_PRIVATE_KEY_PASSWORD}),
841+
('tcpListener', {'port': cls.router_listener_port,
842+
'address': 'corrupt/me',
843+
'host': 'localhost',
844+
'authenticatePeer': 'yes',
845+
'encapsulation': 'lite',
846+
'sslProfile': 'listener-ssl-profile'})
847+
])
848+
849+
config_qdrb = Qdrouterd.Config([
850+
('router', {'mode': 'interior', 'id': 'QDR.B'}),
851+
('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
852+
('connector', {'name': 'connectorToA', 'role': 'inter-router',
853+
'port': inter_router_port}),
854+
('sslProfile', {'name': 'connector-ssl-profile',
855+
'caCertFile': CA_CERT,
856+
'certFile': cls.client_cert_file,
857+
'privateKeyFile': CLIENT_PRIVATE_KEY,
858+
'password': CLIENT_PRIVATE_KEY_PASSWORD}),
859+
('tcpConnector', {'port': cls.openssl_server_listening_port,
860+
'address': 'corrupt/me',
861+
'host': 'localhost',
862+
'verifyHostname': 'yes',
863+
'encapsulation': 'lite',
864+
'sslProfile': 'connector-ssl-profile'}),
865+
])
866+
867+
cls.router_qdra = cls.tester.qdrouterd("QDR.A", config_qdra, wait=True)
868+
cls.router_qdrb = cls.tester.qdrouterd("QDR.B", config_qdrb)
869+
cls.router_qdra.wait_router_connected('QDR.B')
870+
cls.router_qdrb.wait_router_connected('QDR.A')
871+
wait_tcp_listeners_up(cls.router_qdra.addresses[0])
872+
873+
def test_cert_corruption(self):
874+
# Note we do not want to corrupt the certs held by the ssl server or
875+
# client. We need them to be valid so the test client/server do not
876+
# fail themselves. We only want the router to be affected!
877+
878+
server_ssl_info = dict()
879+
server_ssl_info['CA_CERT'] = CA_CERT
880+
server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE
881+
server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY
882+
server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD
883+
884+
openssl_server = self.tester.openssl_server
885+
self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port,
886+
ssl_info=server_ssl_info,
887+
name="OpenSSLServerAuthPeer",
888+
cl_args=['-Verify', '1'])
889+
890+
client_ssl_info = dict()
891+
client_ssl_info['CA_CERT'] = CA_CERT
892+
client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE
893+
client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY
894+
client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD
895+
896+
out, error = self.opensslclient(port=self.router_listener_port,
897+
ssl_info=client_ssl_info,
898+
data=b"Sanity Check the Configuration!")
899+
self.assertIn(b"Verification: OK", out)
900+
self.assertIn(b"Verify return code: 0 (ok)", out)
901+
902+
self.openssl_server.wait_out_message("Sanity Check the Configuration!")
903+
904+
#
905+
# Phase 1: corrupt the server-facing certificate
906+
#
907+
908+
with open(self.client_cert_file, mode="w") as clientf:
909+
clientf.write("Oh mercy! I'm corrupt!")
910+
911+
_, _ = self.opensslclient(port=self.router_listener_port,
912+
ssl_info=client_ssl_info,
913+
data=b"Phase 1")
914+
self.router_qdrb.wait_log_message(r'failed to set TLS certificate')
915+
916+
#
917+
# Phase 2: corrupt the client-facing certificate
918+
#
919+
920+
with open(self.server_cert_file, mode="w") as serverf:
921+
serverf.write("Alas! I too have been rendered corrupt!")
922+
923+
_, _ = self.opensslclient(port=self.router_listener_port,
924+
ssl_info=client_ssl_info,
925+
data=b"Phase 2",
926+
expect=Process.EXIT_FAIL)
927+
self.router_qdra.wait_log_message(r'failed to set TLS certificate')
928+
929+
#
930+
# Phase 3: Restore the files and verify all is well
931+
#
932+
933+
self.server_cert_file = shutil.copy(SERVER_CERTIFICATE,
934+
os.path.dirname(os.getcwd()))
935+
self.client_cert_file = shutil.copy(CLIENT_CERTIFICATE,
936+
os.path.dirname(os.getcwd()))
937+
938+
out, error = self.opensslclient(port=self.router_listener_port,
939+
ssl_info=client_ssl_info,
940+
data=b"Hey we recovered!")
941+
self.assertIn(b"Verification: OK", out)
942+
self.assertIn(b"Verify return code: 0 (ok)", out)
943+
944+
self.openssl_server.wait_out_message("Hey we recovered!")

0 commit comments

Comments
 (0)