Skip to content

Commit

Permalink
Cleanup, add basic redis flow
Browse files Browse the repository at this point in the history
  • Loading branch information
tuokri committed Feb 4, 2025
1 parent e70384c commit 030804c
Showing 1 changed file with 54 additions and 57 deletions.
111 changes: 54 additions & 57 deletions build_commands_bot/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <format>
#include <iostream>
#include <memory>
#include <random>
#include <string_view>
#include <thread>

Expand All @@ -19,16 +20,18 @@

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/cobalt.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
// #include <boost/asio/experimental/use_promise.hpp> TODO: even including this errors?
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/cobalt.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/src.hpp>

#include <dpp/dpp.h>

Expand All @@ -37,6 +40,10 @@
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/sinks/rotating_file_sink.h>

#include <uuid_v4.h>

#include <glaze/glaze.hpp>

#include "taskiq.hpp"

namespace
Expand All @@ -59,6 +66,10 @@ enum class MessageType: std::uint8_t
// our messages are always the same and we don't need any argument handling.
using MessageChannel = concurrent_channel<void(boost::system::error_code, MessageType)>;

constexpr auto g_taskiq_redis_channel = "taskiq";
// Mirrored in Python bobuild/tasks.py.
constexpr auto g_taskiq_lock_timeout = 240 * 60;

constexpr auto g_cmd_name_sws_upload_beta = "workshop_upload_beta";
constexpr auto g_cmd_name_sws_get_info = "workshop_info";

Expand Down Expand Up @@ -128,29 +139,12 @@ std::string get_env_var(const std::string_view key)
#endif // BO_WINDOWS
}

auto send_workshop_upload_beta_msg(
std::shared_ptr<MessageChannel> msg_channel
) -> asio::awaitable<int>
{
g_logger->info("send_workshop_upload_beta_msg running in asio::awaitable");

co_await msg_channel->async_send(
boost::system::error_code{},
MessageType::WorkshopUploadBeta,
asio::use_awaitable);

co_return 1;
}

// TODO: THIS IS THE RIGHT WAY TO CO-OP DPP AND ASIO/COBALT!
auto send_workshop_upload_beta_msg_cobalt(
std::shared_ptr<MessageChannel> msg_channel
) -> cobalt::task<int>
{
// 1. Check if task lock exists.

g_logger->info("send_workshop_upload_beta_msg running in cobalt::task");
// auto exec = co_await cobalt::this_coro::executor;

co_await msg_channel->async_send(
boost::system::error_code{},
Expand All @@ -167,34 +161,6 @@ auto send_workshop_upload_beta_msg_cobalt(
co_return 1;
}

auto workshop_upload_beta_task(
std::shared_ptr<MessageChannel> msg_channel
) -> dpp::task<int>
{
g_logger->info("workshop_upload_beta_task running in dpp::task");

// Offload work to ASIO thread.
auto deferred_op = asio::co_spawn(
msg_channel->get_executor(),
send_workshop_upload_beta_msg(msg_channel),
asio::deferred);

auto future = std::move(deferred_op)(asio::use_future);

asio::steady_timer test_timer{msg_channel->get_executor()};
test_timer.expires_after(chrono::seconds(5));
test_timer.async_wait();

const int result = future.get();
co_return result;
}

auto cobalt_task_test() -> cobalt::task<void>
{
g_logger->info("cobalt_task_test running in cobalt::task");
co_return;
}

void bot_main(std::shared_ptr<MessageChannel> msg_channel)
{
const auto redis_url = get_env_var("BO_REDIS_URL");
Expand Down Expand Up @@ -272,11 +238,6 @@ void bot_main(std::shared_ptr<MessageChannel> msg_channel)
const auto x = f.get();
g_logger->info("x={}", x);

// co_await send_workshop_upload_beta_msg_cobalt(msg_channel);

// Cobalt task.
// co_await cobalt_task_test();

// TODO: we should probably wait to hear back from the
// Python job here!
}
Expand Down Expand Up @@ -349,9 +310,45 @@ void bot_main(std::shared_ptr<MessageChannel> msg_channel)
// g_bot->shutdown();
}

auto co_handle_workshop_upload_beta() -> asio::awaitable<void>
auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable<void>
{
// TODO: post to Redis!
auto ex = co_await asio::this_coro::executor;
auto conn = redis::connection{ex};

// 1. Check if task lock exists.
redis::request lock_req;
redis::response<bool> lock_resp; // TODO: what's the resp here?
lock_req.push("SET", "key", taskiq::bo_build_lock_name, "EX", g_taskiq_lock_timeout, "NX", 1);
const auto [lock_req_ec, _0] = co_await conn.async_exec(
lock_req, lock_resp, asio::as_tuple(asio::use_awaitable));
if (lock_req_ec)
{
// TODO: propagate error codes!
throw std::runtime_error(std::format(
"redis error: {}", lock_req_ec.message()));
}

auto msg = taskiq::make_bo_sws_upload_msg();
UUIDv4::UUIDGenerator<std::mt19937_64> uuid_gen;
UUIDv4::UUID uuid = uuid_gen.getUUID();
msg.task_id = uuid.bytes();

std::string msg_json{};
const auto json_ec = glz::write_json(msg, msg_json);
if (json_ec)
{
// TODO: propagate error codes!
throw std::runtime_error(std::format(
"glz::write_json error: {}", json_ec.custom_error_message));
}

redis::request pub_req;
pub_req.push("PUBLISH", g_taskiq_redis_channel, msg_json);
const auto [pub_req_ec, _1] = co_await conn.async_exec(
pub_req, redis::ignore, asio::as_tuple(asio::use_awaitable));
// TODO: check ec.

conn.cancel();

co_return;
}
Expand Down Expand Up @@ -382,7 +379,7 @@ auto co_main(
switch (msg_type)
{
case MessageType::WorkshopUploadBeta:
co_await co_handle_workshop_upload_beta();
co_await co_handle_workshop_upload_beta(cfg);
break;
default:
g_logger->error("invalid MessageType: {}", mt);
Expand Down

0 comments on commit 030804c

Please sign in to comment.