diff --git a/build_commands_bot/src/main.cpp b/build_commands_bot/src/main.cpp index 347364f..5863b99 100644 --- a/build_commands_bot/src/main.cpp +++ b/build_commands_bot/src/main.cpp @@ -21,22 +21,22 @@ #include #include +#include #include #include +#include #include #include #include #include #include +#include #include #include #include #include #include -// TODO: group this up after merge: -#include - #include #include @@ -59,6 +59,7 @@ namespace redis = boost::redis; namespace cobalt = boost::cobalt; namespace process = boost::process::v2; +using boost::asio::experimental::channel; using boost::asio::experimental::concurrent_channel; enum class MessageType: std::uint8_t @@ -66,10 +67,12 @@ enum class MessageType: std::uint8_t WorkshopUploadBeta, }; +// TODO: passed into MessageChannel message to provide a one way +// single-producer, single-consumer return channel for the job result/error. +using MessageResponseChannel = channel; // TODO: include explicit executor type here? -// TODO: currently this channel only send/receives the message type as -// our messages are always the same and we don't need any argument handling. -using MessageChannel = concurrent_channel; +using MessageChannel = concurrent_channel< + void(boost::system::error_code, MessageType, MessageResponseChannel*)>; constexpr auto g_taskiq_redis_channel = "taskiq"; // Mirrored in Python bobuild/tasks.py. @@ -116,6 +119,13 @@ constexpr bool debugger_present() #endif // NDEBUG +#define THROW_IF_DEBUGGING() \ +if (debugger_present()) \ +{ \ + throw; \ +} \ + + std::string get_env_var(const std::string_view key) { #if BO_WINDOWS @@ -147,23 +157,40 @@ std::string get_env_var(const std::string_view key) // TODO: THIS IS THE RIGHT WAY TO CO-OP DPP AND ASIO/COBALT! auto send_workshop_upload_beta_msg_cobalt( std::shared_ptr msg_channel -) -> cobalt::task +) -> cobalt::task> { + auto ex = co_await cobalt::this_coro::executor; + + // TODO: workshop upload beta task runs in asio executor, we should + // somehow wait for responses/errors here, and propagate them to the DPP bot. + // 1. Create a channel when we get here. + // 2. Send that channel along with the message type. + // 3. Wait for response, error or OK status, co_return them. + MessageResponseChannel resp_channel{ex}; + g_logger->info("send_workshop_upload_beta_msg running in cobalt::task"); co_await msg_channel->async_send( boost::system::error_code{}, MessageType::WorkshopUploadBeta, + &resp_channel, cobalt::use_op ); g_logger->info("sent"); - asio::steady_timer test_timer{msg_channel->get_executor()}; + asio::steady_timer test_timer{ex}; test_timer.expires_after(chrono::seconds(10)); co_await test_timer.async_wait(); - co_return 1; + const auto [ec, resp] = co_await resp_channel.async_receive( + asio::as_tuple(cobalt::use_op)); + + if (ec) + { + co_return ec; + } + co_return resp; } void bot_main(std::shared_ptr msg_channel) @@ -235,13 +262,20 @@ void bot_main(std::shared_ptr msg_channel) // send_workshop_upload_beta_msg(msg_channel), // cobalt::use_op); - auto f = cobalt::spawn( + auto sws_future = cobalt::spawn( msg_channel->get_executor(), send_workshop_upload_beta_msg_cobalt(msg_channel), asio::use_future ); - const auto x = f.get(); - g_logger->info("x={}", x); + const boost::system::result result = sws_future.get(); + if (result) + { + g_logger->info("result={}", result.value()); + } + else + { + g_logger->error("result={}", result.error().message()); + } // TODO: we should probably wait to hear back from the // Python job here! @@ -315,7 +349,9 @@ void bot_main(std::shared_ptr msg_channel) // g_bot->shutdown(); } -auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable +auto co_handle_workshop_upload_beta( + redis::config cfg +) -> asio::awaitable> { auto ex = co_await asio::this_coro::executor; auto conn = redis::connection{ex}; @@ -355,7 +391,8 @@ auto co_handle_workshop_upload_beta(redis::config cfg) -> asio::awaitable conn.cancel(); - co_return; + // TODO: actual error handling! + co_return "OK?"; } auto co_main( @@ -372,23 +409,40 @@ auto co_main( while (true) { - const auto [ec, msg_type] = co_await msg_channel->async_receive( + const auto [ec, msg_type, resp_channel_ptr] = co_await msg_channel->async_receive( asio::as_tuple(asio::use_awaitable)); const auto mt = static_cast::type>(msg_type); g_logger->info("got message: {}", mt); + boost::system::result job_result; + boost::system::error_code job_error{}; + std::string job_result_value{}; + boost::system::error_code send_resp_ec{}; if (!ec) { switch (msg_type) { case MessageType::WorkshopUploadBeta: - co_await co_handle_workshop_upload_beta(cfg); + job_result = co_await co_handle_workshop_upload_beta(cfg); + job_error = job_result.error(); + job_result_value = (job_result.has_value()) ? job_result.value() : ""; + co_await resp_channel_ptr->async_send( + job_error, + job_result_value, + asio::redirect_error(send_resp_ec) + ); break; default: g_logger->error("invalid MessageType: {}", mt); } + + if (send_resp_ec) + { + g_logger->error("async_send error: failed to send job result response: {}", + send_resp_ec.message()); + } } else if (ec.value() == asio::error::eof) { @@ -406,9 +460,15 @@ auto co_main( int main() { + int rc = EXIT_SUCCESS; + try { +#ifndef NDEBUG + constexpr auto default_log_level = spdlog::level::debug; +#else constexpr auto default_log_level = spdlog::level::info; +#endif spdlog::init_thread_pool(8192, 2); constexpr auto max_log_size = 1024 * 1024 * 10; auto stdout_sink = std::make_shared(); @@ -504,30 +564,25 @@ int main() } g_logger->info("exiting"); - - if (g_logger) - { - g_logger->flush(); - } - - return EXIT_SUCCESS; + rc = EXIT_SUCCESS; } catch (const std::exception& ex) { + rc = EXIT_FAILURE; std::cout << std::format("unhandled exception: {}", ex.what()) << std::endl; - if (debugger_present()) - { - throw; - } + THROW_IF_DEBUGGING(); } catch (...) { + rc = EXIT_FAILURE; std::cout << "unhandled error" << std::endl; - if (debugger_present()) - { - throw; - } + THROW_IF_DEBUGGING(); + } + + if (g_logger) + { + g_logger->flush(); } - return EXIT_FAILURE; + return rc; }