Skip to content

Commit

Permalink
Refactor - same container globally each reports failing services for …
Browse files Browse the repository at this point in the history
…its own host
  • Loading branch information
brablc committed Jun 3, 2024
1 parent c26a240 commit b03da35
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 175 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dockerize
data
__pycache__
.venv
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ RUN mkdir -p "$DATA_DIR"
COPY *.sh ./
COPY *.py ./
COPY integrations/ integrations/
COPY requirements.txt ./

RUN apk update --no-cache \
&& apk add --no-cache bash curl jq openssl \
&& curl -s -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar xzf - -C .

RUN pip install --no-warn-script-location --no-cache-dir -r requirements.txt

CMD ["./docker-cmd.sh"]
23 changes: 22 additions & 1 deletion config.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
export SCRIPT_NAME=${0##*/}
SCRIPT_NAME=${0##*/}

export LOOP_SLEEP=${LOOP_SLEEP:-10}
export ALERTER_URL=${ALERTER_URL:-http://alerter:80}
export SWARM_NAME=${SWARM_NAME:-Swarm}
export LOGGER_USE_TS=1
export LOGGER_USE_SYSLOG=0

export DATA_DIR=${DATA_DIR:-$script_dir/data}
mkdir -p $DATA_DIR

if [[ ! -S /var/run/docker.sock ]]; then
log_error "Mount to /var/run/docker.sock missing?"
exit 1
fi

if [[ -z $ALERT_SCRIPT ]]; then
log_error "Alert script not defined!"
export ALERT_SCRIPT="jq ."
fi

if [[ ! -f $ALERT_SCRIPT ]]; then
|| ! -f $ALERT_SCRIPT ]]; then
log_error "Alert script not accessible on $ALERT_SCRIPT path!"
export ALERT_SCRIPT="jq ."
fi
77 changes: 6 additions & 71 deletions docker-cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,82 +20,17 @@ if [[ -z $ALERT_SCRIPT || ! -f $ALERT_SCRIPT ]]; then
export ALERT_SCRIPT="jq ."
fi

# On all nodes start scraper, on manager node start alerter
log_info "Starting event alerter ..."
./event-alerter.py &
trap "kill $!" EXIT

services=$(./services.sh 2>&1)
if [ $? != 0 ]; then
./event-scraper.sh
exit
fi

log_info "Initial list of services (run services.sh using docker exec to see actual):"
log_info "Initial list of services:"
echo "$services"
log_info "Starting event alerter ..."
./event-alerter.py &
trap "kill $!" EXIT
log_info "Starting event scraper ..."
./event-scraper.sh &
trap "kill $!" EXIT

### Manager code only

function check_services() {
local swarm_name=$SWARM_NAME
while read service_name network_alias port; do
unique_name=$(echo "${swarm_name} ${service_name} ${network_alias} ${port}" )
unique_code=$(echo "${unique_name,,}" | sed -e 's/ /_/g' -e 's/[^a-zA-Z0-9_-]/_/g')
random_str=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | head -c 10)
read unique_id _ < <(echo -n "$unique_name $random_str" | md5sum)
prefix="$DATA_DIR/${unique_code}"
pending_file="${prefix}.pending"
log_file="${prefix}.log"

# used for testing
real_port="$port"
if [[ -f "$DATA_DIR/test-change-port-$port" ]]; then
real_port=$(< "$DATA_DIR/test-change-port-$port")
fi

action=""
appendix=""
message="${swarm_name} service ${service_name} (${network_alias}:${port})"
./dockerize -timeout 5s -wait tcp://$network_alias:$real_port true 2>$log_file
if [ $? -ne 0 ]; then
if [[ -f $pending_file ]]; then
log_warn "Pending alert: $message"
else
echo "$unique_id" > $pending_file
action="create"
appendix="not available"
fi
else
if [[ -f $pending_file ]]; then
action="resolve"
appendix="is available"
unique_id=$(cat $pending_file)
rm -f $pending_file
fi
fi
if [[ -n $action ]]; then
jq -n \
--arg action "$action" \
--arg unique_id "$unique_id" \
--arg message "$message $appendix" \
--arg summary "$(jq -R -s @json $log_file)" \
'{
"action": $action,
"unique_id": $unique_id,
"message": $message,
"summary": $summary
}' | /bin/bash -c "$ALERT_SCRIPT"
fi
rm -f $log_file
done < <(./services.sh)
}

log_info "Entering loop with ${LOOP_SLEEP} sleep on entry ..."

while true; do
sleep $LOOP_SLEEP
check_services
done
log_info "Starting port alerter ..."
./port-alerter.sh
130 changes: 62 additions & 68 deletions event-alerter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,23 @@
import hashlib
import json
import os
import requests
import requests_unixsocket
import secrets
import socket
import string
import subprocess
import time
import threading
import urllib.parse as urlparse

from http.server import BaseHTTPRequestHandler, HTTPServer
from collections import defaultdict, deque
from logger import log_info, log_error

ALERT_SCRIPT = os.getenv("ALERT_SCRIPT", "jq .")
EVENTS_WINDOW = int(os.getenv("EVENTS_WINDOW", "300"))
EVENTS_THRESHOLD = int(os.getenv("EVENTS_THRESHOLD", "3"))
EVENTS_WINDOW = int(os.getenv("EVENTS_WINDOW", "60"))
EVENTS_THRESHOLD = int(os.getenv("EVENTS_THRESHOLD", "2"))
HOSTNAME = os.getenv("HOSTNAME", socket.gethostname())
LOOP_SLEEP = int(os.getenv("LOOP_SLEEP", "10"))
SWARM_NAME = os.getenv("SWARM_NAME", "Swarm")

Expand All @@ -25,16 +28,51 @@
lock = threading.Lock()


def calculate_md5(input_str):
md5_hash = hashlib.md5()
md5_hash.update(input_str.encode("utf-8"))
return md5_hash.hexdigest()


def send_alert(data):
if not ALERT_SCRIPT:
return

json_data = json.dumps(data)
process = subprocess.Popen(
["/bin/bash", "-c", ALERT_SCRIPT], stdin=subprocess.PIPE, text=True
)
process.communicate(input=json_data)


def get_random_str(length):
characters = string.ascii_letters + string.digits
return "".join(secrets.choice(characters) for _ in range(length))


def docker_events_stream():
base_url = "http+unix://"
socket_path = "/var/run/docker.sock"
endpoint = "/v1.41/events"
url = f"{base_url}{urlparse.quote(socket_path, safe='')}{endpoint}"

params = {
"filters": json.dumps({"type": ["container"], "event": ["create", "destroy"]})
}

session = requests_unixsocket.Session()
response = session.get(url, params=params, stream=True)
response.raise_for_status()

for line in response.iter_lines():
if line:
yield json.loads(line.decode("utf-8"))


def process_events():
current_time = time.time()

counts = defaultdict(lambda: {"create": 0, "destroy": 0})
hosts = defaultdict(set)
seen_services = set()

# Remove events older than EVENTS_WINDOW
Expand All @@ -43,7 +81,6 @@ def process_events():

for event in events:
counts[event["service_name"]][event["action"]] += 1
hosts[event["service_name"]].add(event["host"])
seen_services.add(event["service_name"])

for service_name, actions in counts.items():
Expand All @@ -61,8 +98,8 @@ def process_events():
"unique_id": calculate_md5(
f"{SWARM_NAME} {service_name} {get_random_str(10)}"
),
"message": f"{SWARM_NAME} service {service_name} not healthy",
"summary": f"There were {actions["create"]} containers created and {actions["destroy"]} destroyed within {EVENTS_WINDOW} seconds.\nReported by {list(hosts[service_name])} host(s).",
"message": f"{SWARM_NAME} service {service_name} failing on {HOSTNAME}",
"summary": f"There were {actions["create"]} containers created and {actions["destroy"]} destroyed within {EVENTS_WINDOW} seconds.",
}
pending_alerts[service_name] = data
send_alert(data)
Expand All @@ -74,82 +111,39 @@ def process_events():
data = {
"action": "resolve",
"unique_id": pending_alerts[service_name]["unique_id"],
"message": f"{SWARM_NAME} service {service_name} is healthy",
"message": f"{SWARM_NAME} service {service_name} stable on {HOSTNAME}",
"summary": f"No events in last {EVENTS_WINDOW} seconds, assuming service is healthy (or stopped)",
}
del pending_alerts[service_name]
send_alert(data)


def calculate_md5(input_str):
md5_hash = hashlib.md5()
md5_hash.update(input_str.encode("utf-8"))
return md5_hash.hexdigest()


def send_alert(data):
if not ALERT_SCRIPT:
return

json_data = json.dumps(data)
process = subprocess.Popen(
["/bin/bash", "-c", ALERT_SCRIPT], stdin=subprocess.PIPE, text=True
)
process.communicate(input=json_data)


def resolve_pending():
while True:
time.sleep(LOOP_SLEEP)
with lock:
process_events()


class EventHandler(BaseHTTPRequestHandler):
def do_GET(self):
parsed_path = urlparse.urlparse(self.path)
query = urlparse.parse_qs(parsed_path.query)
payload = query.get("payload", [None])[0]
if not payload:
self.send_response(400)
self.end_headers()
self.wfile.write(b"No payload received")
return

payload_data = json.loads(payload)
host = payload_data["host"]
timestamp = payload_data["ts"]
action = payload_data["action"]
service_name = payload_data["service_name"]

with lock:
events.append(
{
"ts": timestamp,
"action": action,
"service_name": service_name,
"host": host,
}
)
process_events()

self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")

def log_message(self, format, *args):
return


def main():
try:
cleanup_thread = threading.Thread(target=resolve_pending, daemon=True)
cleanup_thread.start()

server = HTTPServer(("0.0.0.0", 80), EventHandler)
server.serve_forever()
except Exception as e:
log_error(f"{e}")
resolving_thread = threading.Thread(target=resolve_pending, daemon=True)
resolving_thread.start()

for event in docker_events_stream():
data = {
"ts": event["time"],
"action": event["Action"],
"service_name": event["Actor"]["Attributes"].get(
"com.docker.swarm.service.name", "unknown"
),
}

# log_info(json.dumps(data))
events.append(data)
process_events()
except requests.exceptions.RequestException as e:
log_error(f"Error connecting to Docker API: {e}")


if __name__ == "__main__":
Expand Down
35 changes: 0 additions & 35 deletions event-scraper.sh

This file was deleted.

Loading

0 comments on commit b03da35

Please sign in to comment.