|
17 | 17 | # under the License.
|
18 | 18 | #
|
19 | 19 | import os
|
| 20 | +import shutil |
20 | 21 | from system_test import unittest, TestCase, Qdrouterd, NcatException, Logger, Process, run_curl, \
|
21 | 22 | CA_CERT, CLIENT_CERTIFICATE, CLIENT_PRIVATE_KEY, CLIENT_PRIVATE_KEY_PASSWORD, \
|
22 | 23 | SERVER_CERTIFICATE, SERVER_PRIVATE_KEY, SERVER_PRIVATE_KEY_PASSWORD, SERVER_PRIVATE_KEY_NO_PASS, BAD_CA_CERT, \
|
@@ -801,3 +802,127 @@ def setUpClass(cls):
|
801 | 802 | super(HttpOverTcpLiteTestTlsTwoRouterNginx, cls).setUpClass(encap='lite')
|
802 | 803 | if skip_nginx_test():
|
803 | 804 | return
|
| 805 | + |
| 806 | + |
| 807 | +class TcpAdaptorCertCorruptionTests(TestCase): |
| 808 | + """ |
| 809 | + The TCP adaptor re-loads the certificate files every time a new client or |
| 810 | + server TCP connection is created. This test verifies that the router can |
| 811 | + handle the case where the certificate files are removed or corrupted while |
| 812 | + new connections are created. |
| 813 | + """ |
| 814 | + @classmethod |
| 815 | + def setUpClass(cls): |
| 816 | + super(TcpAdaptorCertCorruptionTests, cls).setUpClass() |
| 817 | + cls.openssl_server_listening_port = cls.tester.get_port() |
| 818 | + cls.router_listener_port = cls.tester.get_port() |
| 819 | + |
| 820 | + # make a local copy of the client and server certificates. This test |
| 821 | + # destroys these files, so we do not want to destroy the originals! |
| 822 | + |
| 823 | + cls.server_cert_file = shutil.copy(SERVER_CERTIFICATE, |
| 824 | + os.path.dirname(os.getcwd())) |
| 825 | + cls.client_cert_file = shutil.copy(CLIENT_CERTIFICATE, |
| 826 | + os.path.dirname(os.getcwd())) |
| 827 | + |
| 828 | + # Set up two routers, TCP listener on one, connector on another |
| 829 | + |
| 830 | + inter_router_port = cls.tester.get_port() |
| 831 | + |
| 832 | + config_qdra = Qdrouterd.Config([ |
| 833 | + ('router', {'mode': 'interior', 'id': 'QDR.A'}), |
| 834 | + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), |
| 835 | + ('listener', {'role': 'inter-router', 'port': inter_router_port}), |
| 836 | + ('sslProfile', {'name': 'listener-ssl-profile', |
| 837 | + 'caCertFile': CA_CERT, |
| 838 | + 'certFile': cls.server_cert_file, |
| 839 | + 'privateKeyFile': SERVER_PRIVATE_KEY, |
| 840 | + 'password': SERVER_PRIVATE_KEY_PASSWORD}), |
| 841 | + ('tcpListener', {'port': cls.router_listener_port, |
| 842 | + 'address': 'corrupt/me', |
| 843 | + 'host': 'localhost', |
| 844 | + 'authenticatePeer': 'yes', |
| 845 | + 'encapsulation': 'lite', |
| 846 | + 'sslProfile': 'listener-ssl-profile'}) |
| 847 | + ]) |
| 848 | + |
| 849 | + config_qdrb = Qdrouterd.Config([ |
| 850 | + ('router', {'mode': 'interior', 'id': 'QDR.B'}), |
| 851 | + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), |
| 852 | + ('connector', {'name': 'connectorToA', 'role': 'inter-router', |
| 853 | + 'port': inter_router_port}), |
| 854 | + ('sslProfile', {'name': 'connector-ssl-profile', |
| 855 | + 'caCertFile': CA_CERT, |
| 856 | + 'certFile': cls.client_cert_file, |
| 857 | + 'privateKeyFile': CLIENT_PRIVATE_KEY, |
| 858 | + 'password': CLIENT_PRIVATE_KEY_PASSWORD}), |
| 859 | + ('tcpConnector', {'port': cls.openssl_server_listening_port, |
| 860 | + 'address': 'corrupt/me', |
| 861 | + 'host': 'localhost', |
| 862 | + 'verifyHostname': 'yes', |
| 863 | + 'encapsulation': 'lite', |
| 864 | + 'sslProfile': 'connector-ssl-profile'}), |
| 865 | + ]) |
| 866 | + |
| 867 | + cls.router_qdra = cls.tester.qdrouterd("QDR.A", config_qdra, wait=True) |
| 868 | + cls.router_qdrb = cls.tester.qdrouterd("QDR.B", config_qdrb) |
| 869 | + cls.router_qdra.wait_router_connected('QDR.B') |
| 870 | + cls.router_qdrb.wait_router_connected('QDR.A') |
| 871 | + wait_tcp_listeners_up(cls.router_qdra.addresses[0]) |
| 872 | + |
| 873 | + def test_cert_corruption(self): |
| 874 | + # Note we do not want to corrupt the certs held by the ssl server or |
| 875 | + # client. We need them to be valid so the test client/server do not |
| 876 | + # fail themselves. We only want the router to be affected! |
| 877 | + |
| 878 | + server_ssl_info = dict() |
| 879 | + server_ssl_info['CA_CERT'] = CA_CERT |
| 880 | + server_ssl_info['SERVER_CERTIFICATE'] = SERVER_CERTIFICATE |
| 881 | + server_ssl_info['SERVER_PRIVATE_KEY'] = SERVER_PRIVATE_KEY |
| 882 | + server_ssl_info['SERVER_PRIVATE_KEY_PASSWORD'] = SERVER_PRIVATE_KEY_PASSWORD |
| 883 | + |
| 884 | + openssl_server = self.tester.openssl_server |
| 885 | + self.openssl_server = openssl_server(listening_port=self.openssl_server_listening_port, |
| 886 | + ssl_info=server_ssl_info, |
| 887 | + name="OpenSSLServerAuthPeer", |
| 888 | + cl_args=['-Verify', '1']) |
| 889 | + |
| 890 | + client_ssl_info = dict() |
| 891 | + client_ssl_info['CA_CERT'] = CA_CERT |
| 892 | + client_ssl_info['CLIENT_CERTIFICATE'] = CLIENT_CERTIFICATE |
| 893 | + client_ssl_info['CLIENT_PRIVATE_KEY'] = CLIENT_PRIVATE_KEY |
| 894 | + client_ssl_info['CLIENT_PRIVATE_KEY_PASSWORD'] = CLIENT_PRIVATE_KEY_PASSWORD |
| 895 | + |
| 896 | + out, error = self.opensslclient(port=self.router_listener_port, |
| 897 | + ssl_info=client_ssl_info, |
| 898 | + data=b"Sanity Check the Configuration!") |
| 899 | + self.assertIn(b"Verification: OK", out) |
| 900 | + self.assertIn(b"Verify return code: 0 (ok)", out) |
| 901 | + |
| 902 | + self.openssl_server.wait_out_message("Sanity Check the Configuration!") |
| 903 | + |
| 904 | + # |
| 905 | + # Phase 1: corrupt the server-facing certificate |
| 906 | + # |
| 907 | + |
| 908 | + with open(self.client_cert_file, mode="w") as clientf: |
| 909 | + clientf.write("Oh mercy! I'm corrupt!") |
| 910 | + |
| 911 | + _, _ = self.opensslclient(port=self.router_listener_port, |
| 912 | + ssl_info=client_ssl_info, |
| 913 | + data=b"Phase 1") |
| 914 | + self.router_qdrb.wait_log_message(r'failed to set TLS certificate') |
| 915 | + |
| 916 | + # |
| 917 | + # Phase 1: corrupt the client-facing certificate |
| 918 | + # |
| 919 | + |
| 920 | + with open(self.server_cert_file, mode="w") as serverf: |
| 921 | + serverf.write("Alas! I too have been rendered corrupt!") |
| 922 | + |
| 923 | + _, _ = self.opensslclient(port=self.router_listener_port, |
| 924 | + ssl_info=client_ssl_info, |
| 925 | + data=b"Phase 2", |
| 926 | + expect=Process.EXIT_FAIL) |
| 927 | + self.router_qdra.wait_log_message(r'failed to set TLS certificate') |
| 928 | + |
0 commit comments