-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathsslhandling.py
109 lines (91 loc) · 4.54 KB
/
sslhandling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import ssl
import configparser
def ssl_settings(host, config_file, env=os.environ):
"""
Function which generates SSL setting for cassandra.Cluster
Params:
* host .........: hostname of Cassandra node.
* env ..........: environment variables. SSL factory will use, if passed,
SSL_CERTFILE and SSL_VALIDATE variables.
* config_file ..: path to cqlsh config file (usually ~/.cqlshrc).
SSL factory will use, if set, certfile and validate
options in [ssl] section, as well as host to certfile
mapping in [certfiles] section.
[certfiles] section is optional, 'validate' setting in [ssl] section is
optional too. If validation is enabled then SSL certfile must be provided
either in the config file or as an environment variable.
Environment variables override any options set in cqlsh config file.
"""
configs = configparser.ConfigParser()
configs.read(config_file)
def get_option(section, option):
try:
return configs.get(section, option)
except configparser.Error:
return None
def get_best_tls_protocol(ssl_ver_str):
if ssl_ver_str:
print("Warning: Explicit SSL and TLS versions in the cqlshrc file or in SSL_VERSION environment property are ignored as the protocol is auto-negotiated.\n")
return ssl.PROTOCOL_TLS_CLIENT
ssl_validate = env.get('SSL_VALIDATE')
if ssl_validate is None:
ssl_validate = get_option('ssl', 'validate')
ssl_validate = ssl_validate is None or ssl_validate.lower() != 'false'
ssl_check_hostname = env.get('SSL_CHECK_HOSTNAME')
if ssl_check_hostname is None:
ssl_check_hostname = get_option('ssl', 'check_hostname')
ssl_check_hostname = ssl_check_hostname is not None and ssl_check_hostname.lower() != 'false'
if ssl_check_hostname and not ssl_validate:
sys.exit("SSL certificate hostname checking "
"(`check_hostname` in the [ssl] section) must be turned off "
"if certificate `validate` is turned off.")
ssl_version_str = env.get('SSL_VERSION')
if ssl_version_str is None:
ssl_version_str = get_option('ssl', 'version')
ssl_version = get_best_tls_protocol(ssl_version_str)
ssl_certfile = env.get('SSL_CERTFILE')
if ssl_certfile is None:
ssl_certfile = get_option('certfiles', host)
if ssl_certfile is None:
ssl_certfile = get_option('ssl', 'certfile')
if ssl_validate and ssl_certfile is None:
sys.exit("Validation is enabled; SSL transport factory requires a valid certfile "
"to be specified. Please provide path to the certfile in [ssl] section "
"as 'certfile' option in %s (or use [certfiles] section) or set SSL_CERTFILE "
"environment variable." % (config_file,))
if ssl_certfile is not None:
ssl_certfile = os.path.expanduser(ssl_certfile)
userkey = get_option('ssl', 'userkey')
if userkey:
userkey = os.path.expanduser(userkey)
usercert = get_option('ssl', 'usercert')
if usercert:
usercert = os.path.expanduser(usercert)
ssl_context = ssl.SSLContext(ssl_version)
ssl_context.check_hostname = ssl_check_hostname
if usercert and userkey:
ssl_context.load_cert_chain(certfile=usercert,
keyfile=userkey)
if (usercert and not userkey) or (userkey and not usercert):
print("Warning: userkey and usercert from [ssl] section, should be both configured, otherwise won't be used")
ssl_context.verify_mode = ssl.CERT_REQUIRED if ssl_validate else ssl.CERT_NONE
if ssl_certfile:
ssl_context.load_verify_locations(cafile=ssl_certfile)
return ssl_context