This repository has been archived by the owner on Sep 16, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClient.py
65 lines (54 loc) · 1.79 KB
/
Client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
import requests
import json
class Client:
protocol = os.environ.get("PROTOCOL")
endpoint_cb = os.environ.get("ENDPOINT_CB")
def __init__(self) -> None:
pass
def get_all_entities_by_type(
self,
type,
context,
limit=100,
offset=0,
service=None,
subservice=None,
token=None,
):
url = "{PROTOCOL}://{ENDPOINT_CB}/ngsi-ld/v1/entities".format(
PROTOCOL=self.protocol, ENDPOINT_CB=self.endpoint_cb
)
payload = {
"type": type,
"scopeQ": subservice,
"limit": limit,
"offset": offset,
"options": "count",
}
headers = {
"NGSILD-Tenant": service, # equals to "Fiware-Service": service,
"X-Auth-Token": token,
"Link": '<{CONTEXT}>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'.format(
CONTEXT=context
),
"Accept": "application/ld+json",
}
response = requests.request(
"GET", url, headers=headers, params=payload, verify=False
)
return response
def upsert_entities(self, data, context):
url = "{PROTOCOL}://{ENDPOINT_CB}/ngsi-ld/v1/entityOperations/upsert".format(
PROTOCOL=self.protocol, ENDPOINT_CB=self.endpoint_cb
)
payload = json.dumps(data)
headers = {
"Content-Type": "application/json",
"Link": '<{CONTEXT}>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'.format(
CONTEXT=context
),
"Accept": "application/ld+json",
}
response = requests.request("POST", url, headers=headers, data=payload)
return response