Skip to content

Commit

Permalink
The '/elapsed' endpoint shows the elapsed times of Docker/Kubernetes …
Browse files Browse the repository at this point in the history
…calls.

This endpoint should help to find performance problems which increase the
overall response time. See issue google#82.
  • Loading branch information
EranGabber committed Jul 7, 2015
1 parent 7047bcc commit ed5f300
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 18 deletions.
5 changes: 4 additions & 1 deletion collector/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# Please read Dockerfile for details on building this service.
PYTHON="python"

test: test_utilities test_cache test_collector test_docker_proxy
test: test_utilities test_cache test_collector test_docker_proxy test_global_state

test_cache: simple_cache_test.py
$(PYTHON) $^
Expand All @@ -31,3 +31,6 @@ test_collector: collector_test.py

test_docker_proxy: docker_proxy_test.py
$(PYTHON) $^

test_global_state: global_state_test.py
$(PYTHON) $^
58 changes: 58 additions & 0 deletions collector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,44 @@ def valid_id(x):
return utilities.valid_optional_string(x)


def return_elapsed(gs):
"""Returns a description of the elapsed time of recent operations.
Args:
gs: global state.
Returns:
A dictionary containing the count, minimum elapsed time,
maximum elapsed time, average elapsed time, and list of elapsed time
records.
"""
assert isinstance(gs, global_state.GlobalState)
elapsed_list = []
elapsed_sum = 0.0
elapsed_min = None
elapsed_max = None
for elapsed_record in gs.get_elapsed():
duration = elapsed_record.elapsed_seconds
elapsed_list.append(
{'start_time': utilities.seconds_to_timestamp(
elapsed_record.start_time),
'what': elapsed_record.what,
'elapsed_seconds': duration})
elapsed_sum += duration
if (elapsed_min is None) or (elapsed_max is None):
elapsed_min = duration
elapsed_max = duration
else:
elapsed_min = min(elapsed_min, duration)
elapsed_max = max(elapsed_max, duration)

return {'count': len(elapsed_list),
'min': elapsed_min,
'max': elapsed_max,
'average': elapsed_sum / len(elapsed_list) if elapsed_list else None,
'items': elapsed_list}


@app.route('/', methods=['GET'])
def home():
"""Returns the response of the '/' endpoint.
Expand Down Expand Up @@ -337,6 +375,26 @@ def get_minions():
return flask.jsonify(utilities.make_response(minions_status, 'minionsStatus'))


@app.route('/elapsed', methods=['GET'])
def get_elapsed():
"""Computes the response of the '/elapsed' endpoint.
Returns:
A successful response containing the list of elapsed time records of the
most recent Kubernetes and Docker access operations since the previous
call to the '/elapsed' endpoint. Never returns more than
constants.MAX_ELAPSED_QUEUE_SIZE elapsed time records.
"""
gs = app.context_graph_global_state
try:
result = return_elapsed(gs)
return flask.jsonify(utilities.make_response(result, 'elapsed'))
except:
msg = 'get_elapsed() failed with exception %s' % sys.exc_info()[0]
app.logger.exception(msg)
return flask.jsonify(utilities.make_error(msg))


@app.route('/healthz', methods=['GET'])
def get_health():
"""Computes the response of the '/healthz' endpoint.
Expand Down
42 changes: 41 additions & 1 deletion collector/collector_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def test_version(self):
version = result.get('version')
self.assertTrue(isinstance(version, types.StringTypes))
self.assertEqual(
'kubernetes/cluster-insight ac933439ec5a 2015-03-28T17:23:41', version)
'kubernetes/cluster-insight ac933439ec5a 2015-03-28T17:23:41', version)

def test_minions_status(self):
"""Test the '/minions_status' endpoint."""
Expand All @@ -331,6 +331,46 @@ def test_minions_status(self):
'"k8s-guestbook-node-3": "OK", "k8s-guestbook-node-4": "ERROR"}',
json.dumps(status, sort_keys=True))

def verify_empty_elapsed(self):
"""Verify that '/elapsed' endoint returns an empty list of elapsed times.
"""
ret_value = self.app.get('/elapsed')
result = json.loads(ret_value.data)
self.assertTrue(result.get('success'))
elapsed = result.get('elapsed')
self.assertTrue(isinstance(elapsed, types.DictType))
self.assertEqual(0, elapsed.get('count'))
self.assertTrue(elapsed.get('min') is None)
self.assertTrue(elapsed.get('max') is None)
self.assertTrue(elapsed.get('average') is None)
self.assertTrue(isinstance(elapsed.get('items'), types.ListType))
self.assertEqual([], elapsed.get('items'))

def test_elapsed(self):
"""Test the '/elapsed' endpoint with and without calls to Kubernetes/Docker.
"""
self.verify_empty_elapsed()

# Issue a few requests to Kubernetes and Docker.
self.app.get('/version')

# Now we should have a few elpased time records.
ret_value = self.app.get('/elapsed')
result = json.loads(ret_value.data)
self.assertTrue(result.get('success'))
elapsed = result.get('elapsed')
self.assertTrue(isinstance(elapsed, types.DictType))
self.assertEqual(3, elapsed.get('count'))
self.assertTrue(elapsed.get('min') > 0)
self.assertTrue(elapsed.get('max') > 0)
self.assertTrue(elapsed.get('min') <= elapsed.get('average') <=
elapsed.get('max'))
self.assertTrue(isinstance(elapsed.get('items'), types.ListType))
self.assertEqual(3, len(elapsed.get('items')))

# The next call to '/elapsed' should return an empty list
self.verify_empty_elapsed()

def test_healthz(self):
"""Test the '/healthz' endpoint."""
ret_value = self.app.get('/healthz')
Expand Down
3 changes: 3 additions & 0 deletions collector/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@

MODE_MINION = 'minion'
MODE_MASTER = 'master'

# Maximum number of elapsed time records in the elapsed time queue.
MAX_ELAPSED_QUEUE_SIZE = 1000
6 changes: 5 additions & 1 deletion collector/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ def fetch_data(gs, url, base_name, expect_missing=False):
assert isinstance(gs, global_state.GlobalState)
assert isinstance(url, types.StringTypes)
assert isinstance(base_name, types.StringTypes)
start_time = time.time()
if gs.get_testing():
# Read the data from a file.
fname = 'testdata/' + base_name + '.input.json'
try:
f = open(fname, 'r')
v = json.loads(f.read())
f.close()
gs.add_elapsed(start_time, fname, time.time() - start_time)
return v
except IOError:
# File not found
Expand All @@ -91,7 +93,9 @@ def fetch_data(gs, url, base_name, expect_missing=False):
raise collector_error.CollectorError(msg)
else:
# Send the request to Kubernetes
return requests.get(url).json()
v = requests.get(url).json()
gs.add_elapsed(start_time, url, time.time() - start_time)
return v


@utilities.global_state_two_string_args
Expand Down
58 changes: 58 additions & 0 deletions collector/global_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

"""Keeps global system state to be used by concurrent threads.
"""
import collections
import Queue # "Queue" was renamed "queue" in Python 3.
import random
import sys
import threading
Expand All @@ -24,6 +26,11 @@
# local imports
import constants
import simple_cache
import utilities


ElapsedRecord = collections.namedtuple(
'ElapsedRecord', ['start_time', 'what', 'elapsed_seconds'])


class GlobalState(object):
Expand Down Expand Up @@ -75,6 +82,9 @@ def __init__(self):
# pointers to synchronization constructs.
self._bounded_semaphore = None

# Elapsed time queue containing ElapsedRecord items.
self._elapsed_queue = Queue.Queue() # a FIFO queue

# pointers to shared dictionaries.
self._relations_lock = threading.Lock()
self._relations_to_timestamps = {}
Expand Down Expand Up @@ -207,3 +217,51 @@ def set_relations_to_timestamps(self, v):
assert isinstance(v, types.DictType)
with self._relations_lock:
self._relations_to_timestamps = v

def add_elapsed(self, start_time, url_or_fname, elapsed_seconds):
"""Append an ElapsedRecord of an access operation to the elapsed time queue.
Keep at most constants.MAX_ELAPSED_QUEUE_SIZE elements in the elapsed
time queue.
Args:
start_time: the timestamp at the start of the operation.
url_or_fname: the URL or file name of the operation.
elapsed_seconds: the elapsed time of the operation.
"""
assert isinstance(start_time, types.FloatType)
assert utilities.valid_string(url_or_fname)
assert isinstance(elapsed_seconds, types.FloatType)

# If the queue is too large, remove some items until its contains less
# than constants.MAX_ELAPSED_QUEUE_SIZE elements.
while self._elapsed_queue.qsize() >= constants.MAX_ELAPSED_QUEUE_SIZE:
try:
self._elapsed_queue.get(block=False)
except Queue.Empty:
# self._elapsed_queue.get() may raise the EMPTY exception if the
# queue becomes empty (for example, due to concurrent access).
break

self._elapsed_queue.put(
ElapsedRecord(start_time=start_time, what=url_or_fname,
elapsed_seconds=elapsed_seconds))

def get_elapsed(self):
"""Returns a list of all queued elapsed time records and clears the queue.
Returns:
An empty list if the elapsed time queue is empty.
Otherwise, a list of ElapsedRecord in the order that they appear
in the queue.
"""
result = []
while not self._elapsed_queue.empty():
try:
result.append(self._elapsed_queue.get(block=False))
except Queue.Empty:
# self._elapsed_queue.get() may raise the Empty exception if the
# queue becomes empty (for example, due to concurrent access).
break

return result
57 changes: 57 additions & 0 deletions collector/global_state_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/python
#
# Copyright 2015 The Cluster-Insight Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for collector/global_state.py."""

# global imports
import time
import types
import unittest

# local imports
import global_state


class TestGlobalState(unittest.TestCase):

def setUp(self):
self._state = global_state.GlobalState()
self._state.init_caches_and_synchronization()

def test_elapsed(self):
result = self._state.get_elapsed()
self.assertTrue(isinstance(result, types.ListType))
self.assertEqual([], result)

now = time.time()
self._state.add_elapsed(now, 'abc', 13.4)

# expect to get a list of one elapsed time records.
result = self._state.get_elapsed()
self.assertTrue(isinstance(result, types.ListType))
self.assertEqual(1, len(result))
self.assertEqual(now, result[0].start_time)
self.assertEqual('abc', result[0].what)
self.assertEqual(13.4, result[0].elapsed_seconds)

# Calling get_elapsed() should clear the list of elapsed times.
result = self._state.get_elapsed()
self.assertTrue(isinstance(result, types.ListType))
self.assertEqual([], result)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit ed5f300

Please sign in to comment.