-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsession.py
79 lines (67 loc) · 2.41 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import logging
import traceback
from swan.api.orchestrator import Orchestrator
from swan.api_client import OrchestratorAPIClient
from swan.common.constant import *
from swan.common.exception import SwanAPIException
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Session:
"""
A session stores configuration states
"""
def __init__(
self,
api_key: str = None,
network: str = "mainnet",
login_url: str = None,
login: bool = True,
):
self.token = None
if api_key:
self.api_key = api_key
else:
self.api_key = os.getenv("API_KEY")
if login_url:
self.login_url = login_url
elif network == "testnet":
self.login_url = ORCHESTRATOR_API_TESTNET
logging.info("Logging in Testnet")
else:
self.login_url = ORCHESTRATOR_API_MAINNET
logging.info("Logging in Mainnet")
self.api_client = OrchestratorAPIClient()
self.login = login
if login:
self.api_key_login()
def api_key_login(self):
"""Login with Orchestrator API Key.
Returns:
A str access token for further Orchestrator API access in
current session.
"""
params = {"api_key": self.api_key}
try:
result = self.api_client._request_with_params(
POST, SWAN_APIKEY_LOGIN, self.login_url, params, None, None
)
if result["status"] == "failed":
raise SwanAPIException("Login Failed")
self.token = result["data"]
logging.info("Login Successfully!")
except SwanAPIException as e:
logging.error(e.message)
except Exception as e:
logging.error(str(e) + traceback.format_exc())
# login = False, because should already be logged into session
def resource(self, service_name: str, network=None, login=False, url_endpoint=None, verification=True):
if service_name.lower() == 'orchestrator':
resource = Orchestrator(
api_key=self.api_key,
network=network,
url_endpoint=url_endpoint,
token=self.token,
login=login,
verification=verification
)
return resource