From 030804c3ad4acfe32ff49227a97afb4fd4f388b3 Mon Sep 17 00:00:00 2001 From: Tuomo Kriikkula Date: Wed, 5 Feb 2025 01:46:45 +0200 Subject: [PATCH] Cleanup, add basic redis flow --- build_commands_bot/src/main.cpp | 111 ++++++++++++++++---------------- 1 file changed, 54 insertions(+), 57 deletions(-) diff --git a/build_commands_bot/src/main.cpp b/build_commands_bot/src/main.cpp index 359ebda..cb4956d 100644 --- a/build_commands_bot/src/main.cpp +++ b/build_commands_bot/src/main.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -19,16 +20,18 @@ #include #include -#include -#include -#include #include -// #include TODO: even including this errors? #include #include #include #include +#include +#include #include +#include +#include +#include +#include #include @@ -37,6 +40,10 @@ #include #include +#include + +#include + #include "taskiq.hpp" namespace @@ -59,6 +66,10 @@ enum class MessageType: std::uint8_t // our messages are always the same and we don't need any argument handling. using MessageChannel = concurrent_channel; +constexpr auto g_taskiq_redis_channel = "taskiq"; +// Mirrored in Python bobuild/tasks.py. +constexpr auto g_taskiq_lock_timeout = 240 * 60; + constexpr auto g_cmd_name_sws_upload_beta = "workshop_upload_beta"; constexpr auto g_cmd_name_sws_get_info = "workshop_info"; @@ -128,29 +139,12 @@ std::string get_env_var(const std::string_view key) #endif // BO_WINDOWS } -auto send_workshop_upload_beta_msg( - std::shared_ptr msg_channel -) -> asio::awaitable -{ - g_logger->info("send_workshop_upload_beta_msg running in asio::awaitable"); - - co_await msg_channel->async_send( - boost::system::error_code{}, - MessageType::WorkshopUploadBeta, - asio::use_awaitable); - - co_return 1; -} - // TODO: THIS IS THE RIGHT WAY TO CO-OP DPP AND ASIO/COBALT! auto send_workshop_upload_beta_msg_cobalt( std::shared_ptr msg_channel ) -> cobalt::task { - // 1. Check if task lock exists. - g_logger->info("send_workshop_upload_beta_msg running in cobalt::task"); - // auto exec = co_await cobalt::this_coro::executor; co_await msg_channel->async_send( boost::system::error_code{}, @@ -167,34 +161,6 @@ auto send_workshop_upload_beta_msg_cobalt( co_return 1; } -auto workshop_upload_beta_task( - std::shared_ptr msg_channel -) -> dpp::task -{ - g_logger->info("workshop_upload_beta_task running in dpp::task"); - - // Offload work to ASIO thread. - auto deferred_op = asio::co_spawn( - msg_channel->get_executor(), - send_workshop_upload_beta_msg(msg_channel), - asio::deferred); - - auto future = std::move(deferred_op)(asio::use_future); - - asio::steady_timer test_timer{msg_channel->get_executor()}; - test_timer.expires_after(chrono::seconds(5)); - test_timer.async_wait(); - - const int result = future.get(); - co_return result; -} - -auto cobalt_task_test() -> cobalt::task -{ - g_logger->info("cobalt_task_test running in cobalt::task"); - co_return; -} - void bot_main(std::shared_ptr msg_channel) { const auto redis_url = get_env_var("BO_REDIS_URL"); @@ -272,11 +238,6 @@ void bot_main(std::shared_ptr msg_channel) const auto x = f.get(); g_logger->info("x={}", x); - // co_await send_workshop_upload_beta_msg_cobalt(msg_channel); - - // Cobalt task. - // co_await cobalt_task_test(); - // TODO: we should probably wait to hear back from the // Python job here! } @@ -349,9 +310,45 @@ void bot_main(std::shared_ptr msg_channel) // g_bot->shutdown(); } -auto co_handle_workshop_upload_beta() -> asio::awaitable +auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable { - // TODO: post to Redis! + auto ex = co_await asio::this_coro::executor; + auto conn = redis::connection{ex}; + + // 1. Check if task lock exists. + redis::request lock_req; + redis::response lock_resp; // TODO: what's the resp here? + lock_req.push("SET", "key", taskiq::bo_build_lock_name, "EX", g_taskiq_lock_timeout, "NX", 1); + const auto [lock_req_ec, _0] = co_await conn.async_exec( + lock_req, lock_resp, asio::as_tuple(asio::use_awaitable)); + if (lock_req_ec) + { + // TODO: propagate error codes! + throw std::runtime_error(std::format( + "redis error: {}", lock_req_ec.message())); + } + + auto msg = taskiq::make_bo_sws_upload_msg(); + UUIDv4::UUIDGenerator uuid_gen; + UUIDv4::UUID uuid = uuid_gen.getUUID(); + msg.task_id = uuid.bytes(); + + std::string msg_json{}; + const auto json_ec = glz::write_json(msg, msg_json); + if (json_ec) + { + // TODO: propagate error codes! + throw std::runtime_error(std::format( + "glz::write_json error: {}", json_ec.custom_error_message)); + } + + redis::request pub_req; + pub_req.push("PUBLISH", g_taskiq_redis_channel, msg_json); + const auto [pub_req_ec, _1] = co_await conn.async_exec( + pub_req, redis::ignore, asio::as_tuple(asio::use_awaitable)); + // TODO: check ec. + + conn.cancel(); co_return; } @@ -382,7 +379,7 @@ auto co_main( switch (msg_type) { case MessageType::WorkshopUploadBeta: - co_await co_handle_workshop_upload_beta(); + co_await co_handle_workshop_upload_beta(cfg); break; default: g_logger->error("invalid MessageType: {}", mt);