Skip to content

Commit 32674f5

Browse files
c0c0n3chicco785
andauthoredJan 27, 2021
Telemetry (#411)
* pin pg8000 to last version known to work w/ ql. * rework profiling not to use db backend---avoids skewing measurements. * implement efficient, low mem footprint, data analysis friendly telemetry. * use constants for cvs field names. * fix tmp file mv across file systems. * implement import of telemetry data into pandas. * initial benchmarking scripts. * get rid of first implementation of telemetry stuff. * undo the rebase mess! * replay python deps changes from before botched rebase. * cosmetic changes to keep code climate happy. * document telemetry features for admins and power users. * use context decorator instead of function decorator for timing code blocks. * use custom load test driver for better accuracy. * add telemetry section to docs index. Co-authored-by: Federico M. Facca <chicco785@users.noreply.github.com>
1 parent b231c04 commit 32674f5

24 files changed

+2713
-132
lines changed
 

‎Pipfile

+4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ rethinkdb = "==2.3"
2727
pickle-mixin = "==1.0.2"
2828
pytest-lazy-fixture = "~=0.6.3"
2929

30+
# run `pipenv install --dev` to get the packages below in your env
3031
[dev-packages]
32+
aiohttp = "~=3.7"
33+
matplotlib = "~=3.3"
34+
pandas = "~=1.1"
3135

3236
[requires]
3337
python_version = "3.8.5"

‎Pipfile.lock

+496-132
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎docs/manuals/admin/telemetry.md

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Telemetry
2+
3+
QuantumLeap ships with a telemetry component for concurrent, low memory
4+
footprint, efficient collection of time-varying quantities. Presently,
5+
it is possible to collect:
6+
7+
* Duration of selected code blocks;
8+
* Python garbage collection metrics;
9+
* Python profiler (cProfile) data;
10+
* Operating system resource usage: maximum resident set size, user and
11+
kernel time.
12+
13+
Profiler data is collected in files that can be loaded into the Python
14+
built-in analyser (`pstats`) whereas all other sampled quantities are
15+
assembled into time series and output to CSV files which can be easily
16+
imported into data analysis tools such as Pandas or a time series database
17+
such as Crate or Timescale.
18+
19+
20+
### Output files
21+
22+
As QuantumLeap collects telemetry data, files will be written to a
23+
monitoring directory of your choice. Duration time series are output
24+
to CSV files having a "duration" prefix and "csv" extension. Likewise
25+
garbage collection and operating system resource usage time series are
26+
collected in CSV files having a prefix of "runtime" and an extension
27+
of "csv". Finally profiler data go into files having a name of:
28+
"profiler.PID.data" where PID is the operating system PID of the process
29+
being profiled---e.g. "profiler.5662.data". CSV files can be read and
30+
deleted at will without interfering with QuantumLeap's telemetry collection
31+
process, even if QuantumLeap is restarted multiple times. On the other
32+
hand, profiler data files should only be opened after stopping QuantumLeap.
33+
(These files are produced by cProfile not by QuantumLeap, so it is best
34+
not to touch them until cProfile exits.)
35+
36+
37+
### Output format
38+
39+
The profiler data files are binary files in the cProfile format as
40+
documented in the Python standard library, hence they will not be
41+
discussed here. The CSV files contain time series data and fields
42+
are arranged as follows:
43+
44+
- **Timepoint**: time at which the measurement was taken, expressed
45+
as number of nanoseconds since the epoch. (Integer value.)
46+
- **Measurement**: sampled quantity. (Float value.)
47+
- **Label**: name used to identify a particular kind of measurement
48+
when sampling. (String value.)
49+
- **PID**: operating system ID of the process that sampled the quantity.
50+
51+
Out of convenience, the CSV file starts with a header of:
52+
53+
Timepoint, Measurement, Label, PID
54+
55+
For duration files the sampled quantity is the amount of time, in
56+
fractional seconds, that an HTTP request took to complete and the
57+
label identifies that request using a combination of path and verb
58+
as shown in the duration file excerpt below
59+
60+
Timepoint, Measurement, Label, PID
61+
...
62+
1607092101580206000, 0.237, "/v2/notify [POST]", 5659
63+
...
64+
1607092101580275000, 0.291, "/v2/notify [POST]", 5662
65+
...
66+
67+
Runtime files contain both Python garbage collection and operating
68+
system resource usage time series. Labels and measurements are as
69+
follows.
70+
71+
- **GC collections**. Each measurement in the series represents the total
72+
number of times the GC collector swept memory since the interpreter
73+
was started. (This is the total across all generations.) The series
74+
is labelled with "gc collections".
75+
- **GC collected**. Each measurement in the series represents the total
76+
number of objects the GC collector freed since the interpreter was
77+
started. (This is the total across all generations.) The series is
78+
labelled with "gc collected".
79+
- **GC uncollectable**. Each measurement in the series represents the
80+
total number of objects the GC collector couldn't free since the
81+
interpreter was started. (This is the total across all generations.)
82+
The series is labelled with "gc uncollectable".
83+
- **User Time**. Each measurement in the series is the total amount of
84+
time, in seconds, the process spent executing in user mode. The
85+
series is labelled with "user time".
86+
- **System Time**. Each measurement in the series is the total amount of
87+
time, in seconds, the process spent executing in kernel mode. The
88+
series is labelled with "system time".
89+
- **Maximum RSS**. Each measurement in the series is maximum resident set
90+
size used. The value will be in kilobytes on Linux and bytes on MacOS.
91+
The series is labelled with "max rss".
92+
93+
94+
### Basic usage
95+
96+
Telemetry is turned off by default but can easily be switched on using
97+
the Gunicorn configuration file provided in the `server` package:
98+
`gconfig_telemetry.py`. With this configuration, QuantumLeap will collect
99+
100+
* The duration of each HTTP request;
101+
* Python garbage collection metrics;
102+
* Operating system resource usage: maximum resident set size, user and
103+
kernel time.
104+
105+
If profiling data are needed too, edit `gconfig_telemetry.py` to enable
106+
Python's built-in profiler (cProfile)
107+
108+
def post_worker_init(worker):
109+
...
110+
monitor.start(monitoring_dir=monitoring_dir,
111+
with_runtime=True,
112+
with_profiler=False)
113+
# ^ set this to True
114+
115+
By default telemetry data are written to files in the `_monitoring`
116+
directory under QuantumLeap's current working directory---if the directory
117+
doesn't exist, it is automatically created. To choose a different location,
118+
set the `monitoring_dir` variable in `gconfig_telemetry.py` to your liking.
119+
120+
#### Turning telemetry on
121+
As mentioned earlier, telemetry is turned off by default. To turn it on,
122+
start QuantumLeap this way
123+
124+
$ python app.py --config server/gconfig_telemetry.py
125+
126+
or, to use your own Gunicorn instead of QuantumLeap's embedded one
127+
128+
$ gunicorn server.wsgi --config server/gconfig_telemetry.py
129+
130+
If you are using the Docker image, pass the telemetry configuration
131+
as a command argument, as in the Docker Compose snippet below:
132+
133+
quantumleap:
134+
image: smartsdk/quantumleap:latest
135+
command: --config server/gconfig_telemetry.py
136+
...
137+
138+
At the moment the only way to turn telemetry off is to stop QuantumLeap
139+
and then restart it with its default configuration---i.e. `gconfig.py`.
140+
141+
#### Analysing telemetry data
142+
Profiler data can be analysed interactively using the Python `pstats`
143+
module as explained in the Python standard library documentation, e.g.
144+
145+
$ python -m pstats profiler.5662.data
146+
147+
CSV files can be easily imported into data analysis tools such as Pandas
148+
or a time series database such as Crate or Timescale using the `COPY FROM`
149+
statement. For added convenience, there is a `pandas_import` module in
150+
the `telemetry` package that you can use to import all duration and
151+
runtime CSV files found in the monitoring directory:
152+
153+
$ cd ngsi-timeseries-api
154+
$ pipenv install --dev
155+
$ python
156+
>>> import pandas as pd
157+
>>> from server.telemetry.pandas_import import TelemetryDB
158+
>>>
159+
>>> db = TelemetryDB('/path/to/_monitoring')
160+
161+
Then you can use the `TelemetryDB` methods to populate Pandas frames
162+
with duration and runtime data combed from the CSV files. For example
163+
here's how to calculate requests per second statistics for the version
164+
endpoint and plot requests per second over time
165+
166+
>>> get_version = db.duration().time_series('/version [GET]')
167+
>>> rps = get_version.data().resample('1S').count()
168+
>>> rps.describe()
169+
...
170+
>>> fig = rps.plot().get_figure()
171+
>>> fig.savefig("get-version-rps.pdf")
172+
173+
For further inspiration, you can have a look at the `analysis` module
174+
in the `tests/benchmark` directory.
175+
176+
177+
### Advanced usage
178+
179+
Power users who need to instrument the code to investigate performance
180+
bottlenecks can do so by decorating functions with a duration sampler
181+
as in the example below where a `time_it` decorator is added to the
182+
the version endpoint's handler.
183+
184+
from server.telemetry.monitor import time_it
185+
186+
@time_it(label='version()')
187+
def version():
188+
...
189+
190+
It is also possible to time specific blocks of code inside functions
191+
or methods or in the outer module's scope, please refer to the documentation
192+
of the `monitor` module for the details.
193+
194+
For more advanced scenarios or for writing your own samplers, first
195+
familiarise yourself with the `observation` module (core functionality,
196+
it comes with numerous examples), then have a look at the samplers in
197+
the `sampler` module to see how to write one, finally you can use the
198+
implementation of the `monitor` module as a starting point for wiring
199+
together the building blocks to make them fit for your use case.

‎mkdocs.yml

+1
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ nav:
2323
- 'Grafana': 'admin/grafana.md'
2424
- 'Data-Migration': 'admin/dataMigration.md'
2525
- 'Bechmarks': 'admin/benchmarks.md'
26+
- 'Telemetry': 'admin/telemetry.md'

‎src/server/gconfig_telemetry.py

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import multiprocessing
2+
import os
3+
import server
4+
import server.telemetry.monitor as monitor
5+
6+
7+
bind = f"{server.DEFAULT_HOST}:{server.DEFAULT_PORT}"
8+
workers = multiprocessing.cpu_count() * 4 + 1
9+
worker_class = 'gthread'
10+
threads = 1
11+
loglevel = 'error'
12+
13+
14+
monitoring_dir = '_monitoring'
15+
16+
17+
def post_worker_init(worker):
18+
os.makedirs(monitoring_dir, exist_ok=True)
19+
monitor.start(monitoring_dir=monitoring_dir,
20+
with_runtime=True,
21+
with_profiler=False)
22+
23+
24+
def pre_request(worker, req):
25+
req.duration_sample_id = monitor.start_duration_sample()
26+
27+
28+
def post_request(worker, req, environ, resp):
29+
key = f"{req.path} [{req.method}]"
30+
monitor.stop_duration_sample(key, req.duration_sample_id)
31+
32+
33+
def worker_exit(servo, worker):
34+
monitor.stop()

‎src/server/telemetry/__init__.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""
2+
Thread-safe, low memory footprint, and efficient collection of time-varying
3+
quantities.
4+
5+
For common telemetry scenarios (timing, profiling, GC) you should just be
6+
able to use the ``monitor`` module as is. See there for details and usage.
7+
8+
For more advanced scenarios or writing your own samplers, familiarise
9+
yourself with the ``observation`` module (core functionality, comes with
10+
lots of examples) first, then have a look at the samplers in the ``sampler``
11+
module to see how to write one, finally you can use the implementation of
12+
the ``monitor`` module as a starting point for wiring together the building
13+
blocks to make them fit for your use case.
14+
"""

‎src/server/telemetry/flush.py

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""
2+
Flushing of time series memory buffers to permanent storage.
3+
Each buffer is saved to its own file to avoid race conditions among processes
4+
and threads. Saving of data is efficient since it's based on streams and
5+
lock-free, i.e. there's no need to acquire global locks to coordinate
6+
writers. Files are written to a configured target directory atomically and
7+
with unique names. This avoids interference with other programs that may be
8+
processing previously written files. For example, another program can safely
9+
scan the directory, aggregate data in each file, process the aggregate and
10+
then delete the processed files with no risk of race conditions w/r/t the
11+
writers in this module.
12+
"""
13+
14+
import csv
15+
import os
16+
from uuid import uuid4
17+
18+
from server.telemetry.observation import ObservationStore, \
19+
ObservationStoreAction, tabulate
20+
21+
22+
TIMEPOINT_CSV_FIELD = 'Timepoint'
23+
"""
24+
Name of the observation's timepoint field in the CSV header.
25+
"""
26+
MEASUREMENT_CSV_FIELD = 'Measurement'
27+
"""
28+
Name of the observation's measurement field in the CSV header.
29+
"""
30+
LABEL_CSV_FIELD = 'Label'
31+
"""
32+
Name of the observation's label field in the CSV header.
33+
"""
34+
PID_CSV_FIELD = 'PID'
35+
"""
36+
Name of the PID field in the CSV header.
37+
"""
38+
39+
OBSERVATION_STORE_HEADER = [TIMEPOINT_CSV_FIELD, MEASUREMENT_CSV_FIELD,
40+
LABEL_CSV_FIELD, PID_CSV_FIELD]
41+
"""
42+
Header of the CSV file where observation store contents get written.
43+
"""
44+
45+
46+
def flush_to_csv(target_dir: str, filename_prefix: str) \
47+
-> ObservationStoreAction:
48+
"""
49+
Build an action to stream the contents of an observation store to a CSV
50+
file. Write the file *atomically* to the specified target directory and
51+
with a unique file name. Write CSV fields in this order: time point,
52+
measurement, label, PID. Notice PID is the process ID of the current
53+
process which isn't part of the observation store but is added by this
54+
function to each row.
55+
56+
:param target_dir: the directory where to write the file.
57+
:param filename_prefix: a string to prepend to the generated unique file
58+
name.
59+
:return: a function that takes an observation store and writes its contents
60+
to file.
61+
"""
62+
return lambda store: _save_csv(target_dir, filename_prefix, store)
63+
64+
65+
def _save_csv(target_dir: str, filename_prefix: str,
66+
store: ObservationStore):
67+
temp_name, filename = _file_names(filename_prefix)
68+
temp_path = os.path.join(target_dir, temp_name) # (*)
69+
target_path = os.path.join(target_dir, filename)
70+
71+
_write_csv(temp_path, store)
72+
os.rename(temp_path, target_path) # (*)
73+
74+
# NOTE. Atomic move. Rename is atomic but won't work across file systems,
75+
# see
76+
# - https://alexwlchan.net/2019/03/atomic-cross-filesystem-moves-in-python/
77+
# If you try moving a file across file systems you get an error similar to:
78+
#
79+
# OSError: [Errno 18] Cross-device link:
80+
# '/tmp/file.csv' -> '/dir/on/other/fs/file.csv'
81+
#
82+
# This is why we write the file directly to the target dir with a temp name
83+
# and then do the move. In fact, putting the file in a temp dir and then
84+
# moving it to the target dir may fail if the two dirs are on different
85+
# file systems.
86+
87+
88+
def _file_names(filename_prefix: str) -> (str, str):
89+
fid = uuid4().hex
90+
temp_name = f"{filename_prefix}.{fid}.tmp"
91+
target_name = f"{filename_prefix}.{fid}.csv"
92+
return temp_name, target_name
93+
94+
95+
def _write_csv(path: str, content: ObservationStore):
96+
pid = os.getpid()
97+
ts = ((t, m, k, pid) for t, m, k in tabulate(content)) # (1)
98+
with open(path, mode='w') as fd:
99+
w = csv.writer(fd, delimiter=',', quotechar='"',
100+
quoting=csv.QUOTE_MINIMAL) # (2)
101+
w.writerow(OBSERVATION_STORE_HEADER)
102+
w.writerows(ts)
103+
104+
# NOTES.
105+
# 1. Lazy evaluation. Parens, contrary to square brackets, don't force
106+
# evaluation, so we won't wind up with double the memory of the store.
107+
# See:
108+
# - https://stackoverflow.com/questions/18883414
109+
# 2. CSV quoting. Only quoting fields if they contain a delimiter or the
110+
# quote char.

0 commit comments

Comments
 (0)
Failed to load comments.