-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathserver.py
141 lines (117 loc) · 4.29 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import json
from json.decoder import JSONDecodeError
import os
import argparse
from app import create_app
from config import YamlConfig
from pathlib import Path
from flask import request, g
from models import RoleEnum
from werkzeug.exceptions import Forbidden
from utils import check_user_role, check_apptoken
parser = argparse.ArgumentParser()
parser.add_argument(
'--config', '-c',
dest='config_file',
type=str,
required=False,
help="The YAML configuration file of the API server.",
)
# Parse only known arguments because others arguments are added during
# a DB migration.
args, unknown = parser.parse_known_args()
config_file = args.config_file
# The default YAML config file is the option is not provided.
if config_file is None:
config_file = os.environ.get('CAPSULE_API_CONFIG')
if config_file is None:
if Path('config.yml').is_file():
config_file = 'config.yml'
else:
config_file = '/etc/capsule-api/config.yml'
yamlconfig = YamlConfig(config_file)
connex_app = create_app(yamlconfig)
app = connex_app.app
from app import oidc
@app.before_request
def before_request_func():
# Get username from token
try:
token = request.headers['Authorization'].split(None, 1)[1].strip()
(validity, name) = check_apptoken(token)
except KeyError:
validity = False
if validity:
g.capsule_app_token = name
else: # look with keycloak by validating token
try:
token = request.headers['Authorization'].split(None, 1)[1].strip()
if oidc._validate_token(token):
name = g.oidc_token_info['username']
else:
name = "-"
except (KeyError, AttributeError):
name = "-"
# Do not allow admins to remove specific objects listed in config
if name != '-' and request.method != 'GET':
for key in yamlconfig.PRESERVE:
for id in yamlconfig.PRESERVE[key]:
if f'{key}/{id}' in request.path:
try:
check_user_role(RoleEnum.superadmin)
except Forbidden:
msg = f'You cannot edit this resource {key}/{id}.'
return Forbidden(description=msg)
@app.after_request
def log_request_info(response):
""" build log :
ip - user "METHOD path" response_code "server" "user_agent" [payload=]
"""
request_ip = request.remote_addr
request_method = request.method
request_server = request.url_root
request_agent = request.user_agent
request_path = request.path
request_full_path = request.full_path
if f'{request_path}?' != request_full_path:
request_path = request_full_path
if request_method == "OPTIONS": # ignore noise from front
return response
# Get username from token
if hasattr(g, 'capsule_app_token'): # Get user name from application token
name = g.capsule_app_token
else: # look with keycloak by validating token
try:
token = request.headers['Authorization'].split(None, 1)[1].strip()
if oidc._validate_token(token):
name = g.oidc_token_info['username']
else:
name = "-"
except (KeyError, AttributeError):
name = "-"
msg = f'{request_ip} - {name} "{request_method} {request_path}" '\
f'{response.status_code} {response.content_length} '\
f'"{request_server}" "{request_agent}"'
data = request.get_data()
if len(data) > 0:
try:
payload = json.loads(data.decode('utf-8'))
except JSONDecodeError:
data_str = data.decode('utf-8')
msg = f'{msg} payload=**not valid json**:\n{data_str}'
app.logger.warning(msg)
return response
keys = ['crt', 'key']
for key in keys:
if key in payload:
payload[key] = "****** secure data ******"
p_string = json.dumps(payload)
if len(p_string) > 0:
msg = f'{msg} payload={p_string}'
app.logger.info(msg)
return response
if __name__ == "__main__":
# NOTE: reloader should be deactivated for mitigating duplicated
# NATS client connections
app.logger.info(f'Configured with {Path(config_file).resolve()}')
connex_app.run(use_reloader=False)