-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtests.py
177 lines (142 loc) · 5.92 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python
import json
import unittest
import os
import random
import sys
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from threading import Thread
from lib.stats import Stats, ProxyStatsModel
import shared.crypto as crypto
import shared.proxy as proxy
from main import DEFAULT_MAX_LAMBDAS, DEFAULT_PORT, build_local_proxy, \
build_lambda_proxy, build_handler
from gen_rsa_kp import generate_key_pair
def silence_stdout(func):
def decorator(*args, **kwargs):
try:
with open(os.devnull, 'wb') as devnull:
sys.stdout = devnull
func(*args, **kwargs)
finally:
sys.stdout = sys.__stdout__
return decorator
class TestCrypto(unittest.TestCase):
def test_gcm_encypt_decrypt(self):
key = 'a' * 16
cleartext = 'Hello'
nonce = 'my-nonce'
ciphertext, tag = crypto.encrypt_with_gcm(key, cleartext, nonce)
decrypted = crypto.decrypt_with_gcm(key, ciphertext, tag, nonce)
self.assertEqual(cleartext, decrypted)
def _start_test_server(port, numRequests):
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args): pass
def __respond(self, statusCode):
for header in self.headers:
if header == 'A': assert self.headers['A'] == '1'
self.send_response(statusCode)
self.send_header('B', '2')
self.end_headers()
self.wfile.write(TestProxy.EXPECTED_RESPONSE_BODY)
def do_GET(self): self.__respond(200)
def do_POST(self):
body = self.rfile.read(int(self.headers['Content-Length']))
assert body == TestProxy.EXPECTED_POST_BODY
self.__respond(201)
server = HTTPServer(('localhost', port), Handler)
def run_server():
for _ in xrange(numRequests):
server.handle_request()
t = Thread(target=run_server)
t.daemon = True
t.start()
class TestProxy(unittest.TestCase):
EXPECTED_REQUEST_HEADERS = {'A': '1'}
EXPECTED_RESPONSE_HEADERS = {'B': '2'}
EXPECTED_POST_BODY = json.dumps({'request': 'Ping'})
EXPECTED_RESPONSE_BODY = json.dumps({'response': 'pong'})
def test_proxy_real_request(self):
response = proxy.proxy_single_request('GET', 'http://google.com',
{'Connection': 'close'}, None)
self.assertEqual(response.statusCode, 301,
'Response from Google should be redirect')
def test_proxy_local_request(self):
port = random.randint(9000, 10000)
url = 'http://localhost:%d/' % port
_start_test_server(port, 3)
response = proxy.proxy_single_request(
'GET', url, TestProxy.EXPECTED_REQUEST_HEADERS, b'')
self.assertEqual(response.statusCode, 200)
self.assertDictContainsSubset(TestProxy.EXPECTED_RESPONSE_HEADERS,
response.headers)
self.assertEqual(response.content,
TestProxy.EXPECTED_RESPONSE_BODY)
response = proxy.proxy_single_request(
'GET', url, TestProxy.EXPECTED_REQUEST_HEADERS, None)
self.assertEqual(response.statusCode, 200)
self.assertDictContainsSubset(TestProxy.EXPECTED_RESPONSE_HEADERS,
response.headers)
self.assertEqual(response.content,
TestProxy.EXPECTED_RESPONSE_BODY)
response = proxy.proxy_single_request(
'POST', url, {
'Foo': 'Bar',
'Content-Length': str(len(TestProxy.EXPECTED_POST_BODY))
},
TestProxy.EXPECTED_POST_BODY)
self.assertEqual(response.statusCode, 201)
self.assertDictContainsSubset(TestProxy.EXPECTED_RESPONSE_HEADERS,
response.headers)
self.assertEqual(response.content,
TestProxy.EXPECTED_RESPONSE_BODY)
class TestRsaKeygen(unittest.TestCase):
@silence_stdout
def test_keygen(self):
generate_key_pair(os.devnull, os.devnull)
class TestBuildProxy(unittest.TestCase):
"""Tries to build the proxies, but not actually run the server."""
@staticmethod
def _get_default_setup():
stats = Stats()
stats.register_model('proxy', ProxyStatsModel())
class MockArgs(object):
pass
args = MockArgs()
args.port = DEFAULT_PORT
args.host = 'localhost'
args.functions = []
args.enableEncryption = False
args.lambdaType = 'short'
args.s3Bucket = None
args.publicServerHostAndPort = None
args.maxLambdas = DEFAULT_MAX_LAMBDAS
args.enableMitm = False
args.disableStats = False
args.verbose = False
return args, stats, None
@silence_stdout
def test_build_local_no_mitm(self):
args, stats, _ = TestBuildProxy._get_default_setup()
args.local = True
args.enableMitm = False
proxy = build_local_proxy(args, stats)
build_handler(proxy, stats, verbose=True)
@silence_stdout
def test_build_local_with_mitm(self):
args, stats, _ = TestBuildProxy._get_default_setup()
args.local = True
args.enableMitm = True
proxy = build_local_proxy(args, stats)
build_handler(proxy, stats, verbose=True)
@silence_stdout
def test_build_lambda_with_mitm(self):
args, stats, reverseServer = TestBuildProxy._get_default_setup()
args.enableMitm = True
args.functions = ['proxy']
args.s3Bucket = 'mock-bucket'
args.enableEncryption = True
proxy = build_lambda_proxy(args, stats, reverseServer)
build_handler(proxy, stats, verbose=True)
if __name__ == '__main__':
unittest.main()