Skip to content

Commit

Permalink
Support controlled pulling from submission queue
Browse files Browse the repository at this point in the history
Add "kcidb_load_queue" Google Cloud Function, which pulls submissions
from the queue itself, instead of being fed by the trigger. Rename the
"kcidb_load" Google Cloud Function to "kcidb_load_message", to
emphathise single-message loading it does, and distinguish it from the
new function.

The "kcidb_load_queue" is supposed to be triggered by cron job messages
sent to a message queue.
  • Loading branch information
spbnick committed Aug 10, 2020
1 parent ac9c9ba commit 58fcc8f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 10 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Kcidb infrastructure is mostly based on Google Cloud services at the moment:
kcidb-query <----------------------------------------------- : tests :
''''''''''''''
~~ Pub/Sub ~~ ~~~~ Cloud Functions ~~~~ ^
kcidb-submit -> kcidb_new ----> kcidb_load --------------------------'
kcidb-submit -> kcidb_new ----> kcidb_load_message ------------------'
|
.-------------'
v ~~~~ Firestore ~~~~
Expand All @@ -94,8 +94,8 @@ using the kcidb library to query the database.

Whenever a client submits reports, either via `kcidb-submit` or the kcidb
library, they go to a Pub/Sub message queue topic named `kcidb_new`, then to
the `kcidb_load` "Cloud Function", which loads the data to the BigQuery
dataset, and then pushes it to `kcidb_loaded` topic.
the `kcidb_load_message` "Cloud Function", which loads the data to the
BigQuery dataset, and then pushes it to `kcidb_loaded` topic.

That topic is watched by `kcidb_spool_notifications` function, which picks up
the data, generates report notifications, and stores them in a Firestore
Expand Down Expand Up @@ -228,7 +228,7 @@ amend if not:
Deploy the functions (do **not** allow unauthenticated invocations when
prompted):

gcloud functions deploy kcidb_load \
gcloud functions deploy kcidb_load_message \
--runtime python37 \
--trigger-topic kernelci_new \
--env-vars-file main.env.yaml \
Expand Down
8 changes: 7 additions & 1 deletion main.env.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# Environment variables for Google Cloud Functions
KCIDB_LOG_LEVEL: INFO
KCIDB_LOAD_QUEUE_TOPIC: kernelci_new
KCIDB_LOAD_QUEUE_SUBSCRIPTION: kernelci_new_load
KCIDB_LOAD_QUEUE_MSG_MAX: "256"
KCIDB_LOAD_QUEUE_OBJ_MAX: "8192"
KCIDB_LOAD_QUEUE_TIMEOUT_SEC: "30"
KCIDB_DATASET: kernelci04
KCIDB_MQ_LOADED_TOPIC: kernelci_loaded
KCIDB_DATASET_LOAD_PERIOD_SEC: "180"
KCIDB_LOADED_QUEUE_TOPIC: kernelci_loaded
KCIDB_SELECTED_SUBSCRIPTIONS: mainline
KCIDB_SPOOL_COLLECTION_PATH: notifications
KCIDB_SMTP_HOST: smtp.gmail.com
Expand Down
142 changes: 137 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import json
import base64
import datetime
import logging
import smtplib
import kcidb
Expand All @@ -15,8 +16,19 @@
)
LOGGER = logging.getLogger()

LOAD_QUEUE_SUBSCRIBER = kcidb.mq.Subscriber(
PROJECT_ID,
os.environ["KCIDB_LOAD_QUEUE_TOPIC"],
os.environ["KCIDB_LOAD_QUEUE_SUBSCRIPTION"]
)
LOAD_QUEUE_MSG_MAX = int(os.environ["KCIDB_LOAD_QUEUE_MSG_MAX"])
LOAD_QUEUE_OBJ_MAX = int(os.environ["KCIDB_LOAD_QUEUE_OBJ_MAX"])
LOAD_QUEUE_TIMEOUT_SEC = float(os.environ["KCIDB_LOAD_QUEUE_TIMEOUT_SEC"])

DATASET = os.environ["KCIDB_DATASET"]
MQ_LOADED_TOPIC = os.environ["KCIDB_MQ_LOADED_TOPIC"]
DATASET_LOAD_PERIOD = datetime.timedelta(
seconds=int(os.environ["KCIDB_DATASET_LOAD_PERIOD_SEC"])
)

SELECTED_SUBSCRIPTIONS = \
os.environ.get("KCIDB_SELECTED_SUBSCRIPTIONS", "").split()
Expand All @@ -33,22 +45,142 @@

DB_CLIENT = kcidb.db.Client(DATASET)
SPOOL_CLIENT = kcidb.spool.Client(SPOOL_COLLECTION_PATH)
MQ_LOADED_PUBLISHER = kcidb.mq.Publisher(PROJECT_ID, MQ_LOADED_TOPIC)
LOADED_QUEUE_PUBLISHER = kcidb.mq.Publisher(
PROJECT_ID,
os.environ["KCIDB_LOADED_QUEUE_TOPIC"]
)


# pylint: disable=unused-argument

def kcidb_load(event, context):
def kcidb_load_message(event, context):
"""
Load KCIDB data from a Pub Sub subscription into the dataset
Load a single message's KCIDB data from the triggering Pub Sub
subscription into the database.
"""
# Get new data
data = kcidb.mq.Subscriber.decode_data(base64.b64decode(event["data"]))
LOGGER.debug("DATA: %s", json.dumps(data))
# Store it in the database
DB_CLIENT.load(data)
# Forward the data to the "loaded" MQ topic
MQ_LOADED_PUBLISHER.publish(data)
LOADED_QUEUE_PUBLISHER.publish(data)


def kcidb_load_queue_msgs(subscriber, msg_max, obj_max, timeout_sec):
"""
Pull I/O data messages from a subscriber with a limit on message number,
total object number and time spent.
Args:
subscriber: The subscriber (kcidb.mq.Subscriber) to pull from.
msg_max: Maximum number of messages to pull.
obj_max: Maximum number of objects to pull.
timeout_sec: Maximum number of seconds to spend.
Returns:
The list of pulled messages.
"""
# Yeah it's crowded, but bear with us, pylint: disable=too-many-locals
# Pull data from queue until we get enough, or time runs out
start = datetime.datetime.now(datetime.timezone.utc)
obj_num = 0
pulls = 0
msgs = []
while True:
# Calculate remaining messages
pull_msg_max = msg_max - len(msgs)
if pull_msg_max <= 0:
LOGGER.debug("Received enough messages")
break

# Calculate remaining time
pull_timeout_sec = \
timeout_sec - \
(datetime.datetime.now(datetime.timezone.utc) - start). \
total_seconds()
if pull_timeout_sec <= 0:
LOGGER.debug("Ran out of time")
break

# Pull
LOGGER.debug("Pulling <= %u messages from the queue, "
"with timeout %us...", pull_msg_max, pull_timeout_sec)
pull_msgs = subscriber.pull(pull_msg_max, timeout=pull_timeout_sec)
pulls += 1
LOGGER.debug("Pulled %u messages", len(pull_msgs))

# Add messages up to obj_max, except the first one
for index, msg in enumerate(pull_msgs):
msg_obj_num = kcidb.io.get_obj_num(msg[1])
obj_num += msg_obj_num
if msgs and obj_num > obj_max:
LOGGER.debug("Message #%u crossed %u-object boundary "
"at %u total objects",
len(msgs) + 1, obj_max, obj_num)
obj_num -= msg_obj_num
for nack_msg in pull_msgs[index:]:
subscriber.nack(nack_msg[0])
LOGGER.debug("NACK'ed %s messages", len(pull_msgs) - index)
break
msgs.append(msg)
else:
continue
break

duration_seconds = \
(datetime.datetime.now(datetime.timezone.utc) - start).total_seconds()
LOGGER.debug("Pulled %u messages, %u objects total "
"in %u pulls and %u seconds",
len(msgs), obj_num, pulls, duration_seconds)
return msgs


def kcidb_load_queue(event, context):
"""
Load multiple KCIDB data messages from the LOAD_QUEUE_SUBSCRIBER queue
into the database, if it stayed unmodified for at least
DATASET_LOAD_PERIOD.
"""
# Do nothing, if updated recently
now = datetime.datetime.now(datetime.timezone.utc)
last_modified = DB_CLIENT.get_last_modified()
LOGGER.debug("Now: %s, Last modified: %s", now, last_modified)
if last_modified and now - last_modified < DATASET_LOAD_PERIOD:
LOGGER.info("Database too fresh, exiting")
return

# Pull messages
msgs = kcidb_load_queue_msgs(LOAD_QUEUE_SUBSCRIBER,
LOAD_QUEUE_MSG_MAX,
LOAD_QUEUE_OBJ_MAX,
LOAD_QUEUE_TIMEOUT_SEC)
if msgs:
LOGGER.info("Pulled %u messages", len(msgs))
else:
LOGGER.info("Pulled nothing, exiting")
return

# Create merged data referencing the pulled pieces
LOGGER.debug("Merging %u messages...", len(msgs))
data = kcidb.io.merge(kcidb.io.new(), (msg[1] for msg in msgs),
copy_target=False, copy_sources=False)
LOGGER.info("Merged %u messages", len(msgs))
# Load the merged data into the database
obj_num = kcidb.io.get_obj_num(data)
LOGGER.debug("Loading %u objects...", obj_num)
DB_CLIENT.load(data)
LOGGER.info("Loaded %u objects", obj_num)

# Acknowledge all the loaded data
for msg in msgs:
LOAD_QUEUE_SUBSCRIBER.ack(msg[0])
LOGGER.debug("ACK'ed %u messages", len(msgs))

# Forward the loaded data to the "loaded" topic
for msg in msgs:
LOADED_QUEUE_PUBLISHER.publish(msg[1])
LOGGER.debug("Forwarded %u messages", len(msgs))


def kcidb_spool_notifications(event, context):
Expand Down

0 comments on commit 58fcc8f

Please sign in to comment.