diff --git a/collector/Makefile b/collector/Makefile index 84803fa..32de5c8 100644 --- a/collector/Makefile +++ b/collector/Makefile @@ -18,7 +18,7 @@ # Please read Dockerfile for details on building this service. PYTHON="python" -test: test_utilities test_cache test_collector test_docker_proxy +test: test_utilities test_cache test_collector test_docker_proxy test_global_state test_cache: simple_cache_test.py $(PYTHON) $^ @@ -31,3 +31,6 @@ test_collector: collector_test.py test_docker_proxy: docker_proxy_test.py $(PYTHON) $^ + +test_global_state: global_state_test.py + $(PYTHON) $^ diff --git a/collector/collector.py b/collector/collector.py index d2672fc..9592d63 100644 --- a/collector/collector.py +++ b/collector/collector.py @@ -57,6 +57,45 @@ def valid_id(x): return utilities.valid_optional_string(x) +def return_elapsed(gs): + """Returns a description of the elapsed time of recent operations. + + Args: + gs: global state. + + Returns: + A dictionary containing the count, minimum elapsed time, + maximum elapsed time, average elapsed time, and list of elapsed time + records. + """ + assert isinstance(gs, global_state.GlobalState) + elapsed_list = [] + elapsed_sum = 0.0 + elapsed_min = None + elapsed_max = None + for elapsed_record in gs.get_elapsed(): + duration = elapsed_record.elapsed_seconds + elapsed_list.append( + {'start_time': utilities.seconds_to_timestamp( + elapsed_record.start_time), + 'what': elapsed_record.what, + 'threadIdentifier': elapsed_record.thread_identifier, + 'elapsed_seconds': duration}) + elapsed_sum += duration + if (elapsed_min is None) or (elapsed_max is None): + elapsed_min = duration + elapsed_max = duration + else: + elapsed_min = min(elapsed_min, duration) + elapsed_max = max(elapsed_max, duration) + + return {'count': len(elapsed_list), + 'min': elapsed_min, + 'max': elapsed_max, + 'average': elapsed_sum / len(elapsed_list) if elapsed_list else None, + 'items': elapsed_list} + + @app.route('/', methods=['GET']) def home(): """Returns the response of the '/' endpoint. @@ -337,6 +376,26 @@ def get_minions(): return flask.jsonify(utilities.make_response(minions_status, 'minionsStatus')) +@app.route('/elapsed', methods=['GET']) +def get_elapsed(): + """Computes the response of the '/elapsed' endpoint. + + Returns: + A successful response containing the list of elapsed time records of the + most recent Kubernetes and Docker access operations since the previous + call to the '/elapsed' endpoint. Never returns more than + constants.MAX_ELAPSED_QUEUE_SIZE elapsed time records. + """ + gs = app.context_graph_global_state + try: + result = return_elapsed(gs) + return flask.jsonify(utilities.make_response(result, 'elapsed')) + except: + msg = 'get_elapsed() failed with exception %s' % sys.exc_info()[0] + app.logger.exception(msg) + return flask.jsonify(utilities.make_error(msg)) + + @app.route('/healthz', methods=['GET']) def get_health(): """Computes the response of the '/healthz' endpoint. diff --git a/collector/collector_test.py b/collector/collector_test.py index ea3edb5..a234a4d 100644 --- a/collector/collector_test.py +++ b/collector/collector_test.py @@ -317,7 +317,7 @@ def test_version(self): version = result.get('version') self.assertTrue(isinstance(version, types.StringTypes)) self.assertEqual( - 'kubernetes/cluster-insight ac933439ec5a 2015-03-28T17:23:41', version) + 'kubernetes/cluster-insight ac933439ec5a 2015-03-28T17:23:41', version) def test_minions_status(self): """Test the '/minions_status' endpoint.""" @@ -331,6 +331,46 @@ def test_minions_status(self): '"k8s-guestbook-node-3": "OK", "k8s-guestbook-node-4": "ERROR"}', json.dumps(status, sort_keys=True)) + def verify_empty_elapsed(self): + """Verify that '/elapsed' endoint returns an empty list of elapsed times. + """ + ret_value = self.app.get('/elapsed') + result = json.loads(ret_value.data) + self.assertTrue(result.get('success')) + elapsed = result.get('elapsed') + self.assertTrue(isinstance(elapsed, types.DictType)) + self.assertEqual(0, elapsed.get('count')) + self.assertTrue(elapsed.get('min') is None) + self.assertTrue(elapsed.get('max') is None) + self.assertTrue(elapsed.get('average') is None) + self.assertTrue(isinstance(elapsed.get('items'), types.ListType)) + self.assertEqual([], elapsed.get('items')) + + def test_elapsed(self): + """Test the '/elapsed' endpoint with and without calls to Kubernetes/Docker. + """ + self.verify_empty_elapsed() + + # Issue a few requests to Kubernetes and Docker. + self.app.get('/version') + + # Now we should have a few elpased time records. + ret_value = self.app.get('/elapsed') + result = json.loads(ret_value.data) + self.assertTrue(result.get('success')) + elapsed = result.get('elapsed') + self.assertTrue(isinstance(elapsed, types.DictType)) + self.assertEqual(3, elapsed.get('count')) + self.assertTrue(elapsed.get('min') > 0) + self.assertTrue(elapsed.get('max') > 0) + self.assertTrue(elapsed.get('min') <= elapsed.get('average') <= + elapsed.get('max')) + self.assertTrue(isinstance(elapsed.get('items'), types.ListType)) + self.assertEqual(3, len(elapsed.get('items'))) + + # The next call to '/elapsed' should return an empty list + self.verify_empty_elapsed() + def test_healthz(self): """Test the '/healthz' endpoint.""" ret_value = self.app.get('/healthz') diff --git a/collector/constants.py b/collector/constants.py index 90f29b0..d3e02af 100644 --- a/collector/constants.py +++ b/collector/constants.py @@ -44,3 +44,6 @@ MODE_MINION = 'minion' MODE_MASTER = 'master' + +# Maximum number of elapsed time records in the elapsed time queue. +MAX_ELAPSED_QUEUE_SIZE = 1000 diff --git a/collector/docker.py b/collector/docker.py index 8d5d088..ec13253 100644 --- a/collector/docker.py +++ b/collector/docker.py @@ -69,6 +69,7 @@ def fetch_data(gs, url, base_name, expect_missing=False): assert isinstance(gs, global_state.GlobalState) assert isinstance(url, types.StringTypes) assert isinstance(base_name, types.StringTypes) + start_time = time.time() if gs.get_testing(): # Read the data from a file. fname = 'testdata/' + base_name + '.input.json' @@ -76,6 +77,7 @@ def fetch_data(gs, url, base_name, expect_missing=False): f = open(fname, 'r') v = json.loads(f.read()) f.close() + gs.add_elapsed(start_time, fname, time.time() - start_time) return v except IOError: # File not found @@ -91,7 +93,9 @@ def fetch_data(gs, url, base_name, expect_missing=False): raise collector_error.CollectorError(msg) else: # Send the request to Kubernetes - return requests.get(url).json() + v = requests.get(url).json() + gs.add_elapsed(start_time, url, time.time() - start_time) + return v @utilities.global_state_two_string_args diff --git a/collector/global_state.py b/collector/global_state.py index a8af9c6..7d3f25f 100644 --- a/collector/global_state.py +++ b/collector/global_state.py @@ -16,14 +16,23 @@ """Keeps global system state to be used by concurrent threads. """ +import collections +import Queue # "Queue" was renamed "queue" in Python 3. import random import sys +import thread import threading import types # local imports import constants import simple_cache +import utilities + + +ElapsedRecord = collections.namedtuple( + 'ElapsedRecord', + ['start_time', 'what', 'thread_identifier', 'elapsed_seconds']) class GlobalState(object): @@ -75,6 +84,9 @@ def __init__(self): # pointers to synchronization constructs. self._bounded_semaphore = None + # Elapsed time queue containing ElapsedRecord items. + self._elapsed_queue = Queue.Queue() # a FIFO queue + # pointers to shared dictionaries. self._relations_lock = threading.Lock() self._relations_to_timestamps = {} @@ -207,3 +219,52 @@ def set_relations_to_timestamps(self, v): assert isinstance(v, types.DictType) with self._relations_lock: self._relations_to_timestamps = v + + def add_elapsed(self, start_time, url_or_fname, elapsed_seconds): + """Append an ElapsedRecord of an access operation to the elapsed time queue. + + Keep at most constants.MAX_ELAPSED_QUEUE_SIZE elements in the elapsed + time queue. + + Args: + start_time: the timestamp at the start of the operation. + url_or_fname: the URL or file name of the operation. + elapsed_seconds: the elapsed time of the operation. + """ + assert isinstance(start_time, types.FloatType) + assert utilities.valid_string(url_or_fname) + assert isinstance(elapsed_seconds, types.FloatType) + + # If the queue is too large, remove some items until it contains less + # than constants.MAX_ELAPSED_QUEUE_SIZE elements. + while self._elapsed_queue.qsize() >= constants.MAX_ELAPSED_QUEUE_SIZE: + try: + self._elapsed_queue.get(block=False) + except Queue.Empty: + # self._elapsed_queue.get() may raise the EMPTY exception if the + # queue becomes empty (for example, due to concurrent access). + break + + self._elapsed_queue.put( + ElapsedRecord(start_time=start_time, what=url_or_fname, + thread_identifier=thread.get_ident(), + elapsed_seconds=elapsed_seconds)) + + def get_elapsed(self): + """Returns a list of all queued elapsed time records and clears the queue. + + Returns: + An empty list if the elapsed time queue is empty. + Otherwise, a list of ElapsedRecord in the order that they appear + in the queue. + """ + result = [] + while not self._elapsed_queue.empty(): + try: + result.append(self._elapsed_queue.get(block=False)) + except Queue.Empty: + # self._elapsed_queue.get() may raise the Empty exception if the + # queue becomes empty (for example, due to concurrent access). + break + + return result diff --git a/collector/global_state_test.py b/collector/global_state_test.py new file mode 100644 index 0000000..87b4c55 --- /dev/null +++ b/collector/global_state_test.py @@ -0,0 +1,59 @@ +#!/usr/bin/python +# +# Copyright 2015 The Cluster-Insight Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for collector/global_state.py.""" + +# global imports +import thread +import time +import types +import unittest + +# local imports +import global_state + + +class TestGlobalState(unittest.TestCase): + + def setUp(self): + self._state = global_state.GlobalState() + self._state.init_caches_and_synchronization() + + def test_elapsed(self): + result = self._state.get_elapsed() + self.assertTrue(isinstance(result, types.ListType)) + self.assertEqual([], result) + + now = time.time() + self._state.add_elapsed(now, 'abc', 13.4) + + # expect to get a list of one elapsed time records. + result = self._state.get_elapsed() + self.assertTrue(isinstance(result, types.ListType)) + self.assertEqual(1, len(result)) + self.assertEqual(now, result[0].start_time) + self.assertEqual('abc', result[0].what) + self.assertEqual(13.4, result[0].elapsed_seconds) + self.assertEqual(thread.get_ident(), result[0].thread_identifier) + + # Calling get_elapsed() should clear the list of elapsed times. + result = self._state.get_elapsed() + self.assertTrue(isinstance(result, types.ListType)) + self.assertEqual([], result) + + +if __name__ == '__main__': + unittest.main() diff --git a/collector/kubernetes.py b/collector/kubernetes.py index 802c292..0458ad2 100644 --- a/collector/kubernetes.py +++ b/collector/kubernetes.py @@ -22,10 +22,10 @@ """ import json +import os import sys import time import types -import os import requests @@ -39,16 +39,17 @@ KUBERNETES_API = 'https://%s:%s/api/v1' + def get_kubernetes_base_url(): - """ - Computes the base URL for the Kubernetes master from the environment - variables for the kubernetes service. + """Returns the base URL for the Kubernetes master. + + Uses the environment variables for the kubernetes service. Returns: - The base URL for the Kubernetes master, including the api prefix. + The base URL for the Kubernetes master, including the API prefix. Raises: - CollectorError: if the environment variable KUBERNETES_SERVICE_HOST + CollectorError: if the environment variable KUBERNETES_SERVICE_HOST or KUBERNETES_SERVICE_PORT is not defined or empty. """ service_host = os.environ.get('KUBERNETES_SERVICE_HOST') @@ -65,18 +66,20 @@ def get_kubernetes_base_url(): KUBERNETES_BEARER_TOKEN = '' -KUBERNETES_BEARER_TOKEN_FILE = '/var/run/secrets/kubernetes.io/serviceaccount/token' +KUBERNETES_BEARER_TOKEN_FILE = ( + '/var/run/secrets/kubernetes.io/serviceaccount/token') + def get_kubernetes_bearer_token(): - """ - Reads the bearer token required to call the Kubernetes master from a file - The file is installed in every container within a Kubernetes pod by the Kubelet. - The path to the file is documented at + """Reads the bearer token required to call the Kubernetes master from a file. + + The file is installed in every container within a Kubernetes pod by the + Kubelet. The path to the file is documented at https://github.com/GoogleCloudPlatform/kubernetes/blob/master/docs/accessing-the-cluster.md. Returns: - The contents of the token file as a string for use in the Authorization header - as a bearer token: 'Authorization: Bearer ' + The contents of the token file as a string for use in the Authorization + header as a bearer token: 'Authorization: Bearer ' Raises: IOError: if cannot open the token file. @@ -126,15 +129,21 @@ def fetch_data(gs, url): Other exceptions may be raised as the result of attempting to fetch the URL. """ + start_time = time.time() if gs.get_testing(): # Read the data from a file. url_elements = url.split('/') fname = 'testdata/' + url_elements[-1] + '.input.json' - return json.loads(open(fname, 'r').read()) + v = json.loads(open(fname, 'r').read()) + gs.add_elapsed(start_time, fname, time.time() - start_time) + return v else: # Send the request to Kubernetes headers = get_kubernetes_headers() - return requests.get(url, headers=headers, verify=False).json() + v = requests.get(url, headers=headers, verify=False).json() + gs.add_elapsed(start_time, url, time.time() - start_time) + return v + @utilities.global_state_arg def get_nodes(gs): diff --git a/collector/static/home.html b/collector/static/home.html index 615467e..616de58 100644 --- a/collector/static/home.html +++ b/collector/static/home.html @@ -55,6 +55,9 @@ /minions_status Status of the Cluster-Insight minions in this cluster (JSON) + /elapsed + List of recent Kubernets/Docker access times (JSON) + /healthz A health check response (JSON)