Skip to content

Commit

Permalink
Small tweaks, message/job error and result handling
Browse files Browse the repository at this point in the history
  • Loading branch information
tuokri committed Feb 7, 2025
1 parent a92608b commit 1a9775e
Showing 1 changed file with 87 additions and 32 deletions.
119 changes: 87 additions & 32 deletions build_commands_bot/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/cobalt.hpp>
#include <boost/process/v2/process.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/connection.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/src.hpp>

// TODO: group this up after merge:
#include <boost/process/v2/process.hpp>

#include <dpp/dpp.h>

#include <spdlog/spdlog.h>
Expand All @@ -59,17 +59,20 @@ namespace redis = boost::redis;
namespace cobalt = boost::cobalt;
namespace process = boost::process::v2;

using boost::asio::experimental::channel;
using boost::asio::experimental::concurrent_channel;

enum class MessageType: std::uint8_t
{
WorkshopUploadBeta,
};

// TODO: passed into MessageChannel message to provide a one way
// single-producer, single-consumer return channel for the job result/error.
using MessageResponseChannel = channel<void(boost::system::error_code, std::string)>;
// TODO: include explicit executor type here?
// TODO: currently this channel only send/receives the message type as
// our messages are always the same and we don't need any argument handling.
using MessageChannel = concurrent_channel<void(boost::system::error_code, MessageType)>;
using MessageChannel = concurrent_channel<
void(boost::system::error_code, MessageType, MessageResponseChannel*)>;

constexpr auto g_taskiq_redis_channel = "taskiq";
// Mirrored in Python bobuild/tasks.py.
Expand Down Expand Up @@ -116,6 +119,13 @@ constexpr bool debugger_present()

#endif // NDEBUG

#define THROW_IF_DEBUGGING() \
if (debugger_present()) \
{ \
throw; \
} \


std::string get_env_var(const std::string_view key)
{
#if BO_WINDOWS
Expand Down Expand Up @@ -147,23 +157,40 @@ std::string get_env_var(const std::string_view key)
// TODO: THIS IS THE RIGHT WAY TO CO-OP DPP AND ASIO/COBALT!
auto send_workshop_upload_beta_msg_cobalt(
std::shared_ptr<MessageChannel> msg_channel
) -> cobalt::task<int>
) -> cobalt::task<boost::system::result<std::string>>
{
auto ex = co_await cobalt::this_coro::executor;

// TODO: workshop upload beta task runs in asio executor, we should
// somehow wait for responses/errors here, and propagate them to the DPP bot.
// 1. Create a channel when we get here.
// 2. Send that channel along with the message type.
// 3. Wait for response, error or OK status, co_return them.
MessageResponseChannel resp_channel{ex};

g_logger->info("send_workshop_upload_beta_msg running in cobalt::task");

co_await msg_channel->async_send(
boost::system::error_code{},
MessageType::WorkshopUploadBeta,
&resp_channel,
cobalt::use_op
);

g_logger->info("sent");

asio::steady_timer test_timer{msg_channel->get_executor()};
asio::steady_timer test_timer{ex};
test_timer.expires_after(chrono::seconds(10));
co_await test_timer.async_wait();

co_return 1;
const auto [ec, resp] = co_await resp_channel.async_receive(
asio::as_tuple(cobalt::use_op));

if (ec)
{
co_return ec;
}
co_return resp;
}

void bot_main(std::shared_ptr<MessageChannel> msg_channel)
Expand Down Expand Up @@ -235,13 +262,20 @@ void bot_main(std::shared_ptr<MessageChannel> msg_channel)
// send_workshop_upload_beta_msg(msg_channel),
// cobalt::use_op);

auto f = cobalt::spawn(
auto sws_future = cobalt::spawn(
msg_channel->get_executor(),
send_workshop_upload_beta_msg_cobalt(msg_channel),
asio::use_future
);
const auto x = f.get();
g_logger->info("x={}", x);
const boost::system::result<std::string> result = sws_future.get();
if (result)
{
g_logger->info("result={}", result.value());
}
else
{
g_logger->error("result={}", result.error().message());
}

// TODO: we should probably wait to hear back from the
// Python job here!
Expand Down Expand Up @@ -315,7 +349,9 @@ void bot_main(std::shared_ptr<MessageChannel> msg_channel)
// g_bot->shutdown();
}

auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable<void>
auto co_handle_workshop_upload_beta(
redis::config cfg
) -> asio::awaitable<boost::system::result<std::string>>
{
auto ex = co_await asio::this_coro::executor;
auto conn = redis::connection{ex};
Expand Down Expand Up @@ -355,7 +391,8 @@ auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable<void>

conn.cancel();

co_return;
// TODO: actual error handling!
co_return "OK?";
}

auto co_main(
Expand All @@ -372,23 +409,40 @@ auto co_main(

while (true)
{
const auto [ec, msg_type] = co_await msg_channel->async_receive(
const auto [ec, msg_type, resp_channel_ptr] = co_await msg_channel->async_receive(
asio::as_tuple(asio::use_awaitable));

const auto mt = static_cast<std::underlying_type<MessageType>::type>(msg_type);

g_logger->info("got message: {}", mt);

boost::system::result<std::string> job_result;
boost::system::error_code job_error{};
std::string job_result_value{};
boost::system::error_code send_resp_ec{};
if (!ec)
{
switch (msg_type)
{
case MessageType::WorkshopUploadBeta:
co_await co_handle_workshop_upload_beta(cfg);
job_result = co_await co_handle_workshop_upload_beta(cfg);
job_error = job_result.error();
job_result_value = (job_result.has_value()) ? job_result.value() : "";
co_await resp_channel_ptr->async_send(
job_error,
job_result_value,
asio::redirect_error(send_resp_ec)
);
break;
default:
g_logger->error("invalid MessageType: {}", mt);
}

if (send_resp_ec)
{
g_logger->error("async_send error: failed to send job result response: {}",
send_resp_ec.message());
}
}
else if (ec.value() == asio::error::eof)
{
Expand All @@ -406,9 +460,15 @@ auto co_main(

int main()
{
int rc = EXIT_SUCCESS;

try
{
#ifndef NDEBUG
constexpr auto default_log_level = spdlog::level::debug;
#else
constexpr auto default_log_level = spdlog::level::info;
#endif
spdlog::init_thread_pool(8192, 2);
constexpr auto max_log_size = 1024 * 1024 * 10;
auto stdout_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
Expand Down Expand Up @@ -504,30 +564,25 @@ int main()
}

g_logger->info("exiting");

if (g_logger)
{
g_logger->flush();
}

return EXIT_SUCCESS;
rc = EXIT_SUCCESS;
}
catch (const std::exception& ex)
{
rc = EXIT_FAILURE;
std::cout << std::format("unhandled exception: {}", ex.what()) << std::endl;
if (debugger_present())
{
throw;
}
THROW_IF_DEBUGGING();
}
catch (...)
{
rc = EXIT_FAILURE;
std::cout << "unhandled error" << std::endl;
if (debugger_present())
{
throw;
}
THROW_IF_DEBUGGING();
}

if (g_logger)
{
g_logger->flush();
}

return EXIT_FAILURE;
return rc;
}

0 comments on commit 1a9775e

Please sign in to comment.