-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaea_vision_capsules.py
74 lines (53 loc) · 2.64 KB
/
aea_vision_capsules.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import argparse
import json
import logging as log
import sys
from pathlib import Path
from typing import Tuple
from vcap import NodeDescription, DetectionNode
from vcap.loading.capsule_loading import load_capsule
from adl_edge_iot.datacls import PyDetectionBox
from aea_aicv_sdk import frame_data_2_np_array, ObjectDetector
log.basicConfig(format='[ %(levelname)s ] %(message)s', level=log.INFO, stream=sys.stdout)
def argument_parser():
log.info('Creating the argument parser...')
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-c', '--capsule', type=str, required=True,
help='The capsule file')
parser.add_argument('-p', '--properties', type=str, required=False,
help='The URI (without file://) to the properties file.',
default='./config/Viewer.json')
def add_detection_node_2_detection_box(node: DetectionNode, container: PyDetectionBox) -> None:
container.add_box(category_label=node.name, x1=node.bbox.x1, y1=node.bbox.y1, x2=node.bbox.x2, y2=node.bbox.y2)
def build_engine(capsule_file: Path):
capsule = load_capsule(capsule_file)
def score(flow_id: str, frame: object) -> Tuple[str, PyDetectionBox]:
img = frame_data_2_np_array(frame)
if capsule.input_type.size is NodeDescription.Size.NONE:
input_node = None
else:
input_node = DetectionNode(name='', coords=[[0, 0], [frame.width, 0], [frame.width, frame.height],
[0, frame.height]])
if capsule.input_type.size is NodeDescription.Size.ALL:
input_node = [input_node]
result = capsule.process_frame(frame=img, detection_node=input_node, options=capsule.default_options,
state=capsule.stream_state())
detection_box = PyDetectionBox(frame_id=frame.frame_id, engine_id='vision_capsules')
if isinstance(result, list):
for node in result:
add_detection_node_2_detection_box(node, detection_box)
elif isinstance(result, DetectionNode):
add_detection_node_2_detection_box(result, detection_box)
return flow_id, detection_box
return score
def main():
args = vars(argument_parser())
with open(args['properties']) as f:
properties_str = json.load(f)
properties_str = json.dumps(properties_str) if properties_str is not None else None
engine = ObjectDetector(properties_str=properties_str,
inference=build_engine(args['capsule']))
engine.run()
return 0
if __name__ == '__main__':
sys.exit(main())