-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvolga_ctf.py
55 lines (37 loc) · 1.36 KB
/
volga_ctf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# params: SYSTEM_HOST
from collections.abc import Iterator
import jwt
import aiohttp
from models import Flag
API_PREFIX = 'api/capsule/v1'
PUBLIC_KEY_ENDPOINT = 'public_key'
async def get_public_key(host):
async with aiohttp.ClientSession() as session:
async with session.get(f'{host}/{API_PREFIX}/{PUBLIC_KEY_ENDPOINT}') as r:
if r is not None and r.status == 200:
return await r.text()
raise ConnectionError("Could not get public key to check flags from %s." % host)
def decode(key, capsule):
return jwt.decode(
capsule,
algorithms=['ES256', 'RS256'],
key=key
)
async def validate_flags(flags: Iterator[Flag], app, config: dict) -> Iterator[Flag]:
host = config.get('SYSTEM_HOST')
assert host, 'SYSTEM_HOST parameter is not set'
key = app.get('SYSTEM_SERVER_KEY', None)
if not key:
key = await get_public_key(host)
app['SYSTEM_SERVER_KEY'] = key
for flag_obj in flags:
flag = flag_obj.flag
if not flag:
continue
if flag.startswith('VolgaCTF{'):
flag = flag[len('VolgaCTF{'):-len('}')]
try:
flag_obj.flag = decode(key, flag).get('flag')
yield flag_obj
except jwt.exceptions.InvalidSignatureError:
continue