Skip to content

Commit

Permalink
Refactor query_conn
Browse files Browse the repository at this point in the history
  • Loading branch information
auxten committed Jan 3, 2025
1 parent 4b4f152 commit ad9b30f
Showing 1 changed file with 46 additions and 30 deletions.
76 changes: 46 additions & 30 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,58 +1493,74 @@ void close_conn(chdb_conn ** conn)

struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format)
{
// Add connection validity check under global lock
std::lock_guard<std::mutex> global_lock(global_connection_mutex);

if (!conn || !conn->connected || !conn->queue)
return new local_result_v2{nullptr, 0, nullptr, 0, 0, 0, nullptr};
{
auto * result = new local_result_v2{};
const char * error = "Invalid or closed connection";
result->error_message = new char[strlen(error) + 1];
std::strcpy(result->error_message, error);
return result;
}

// Release global lock before processing query
auto * queue = static_cast<query_queue *>(conn->queue);
local_result_v2 * result = nullptr;

try
{
std::unique_lock<std::mutex> lock(queue->mutex);
// Wait until any ongoing query completes
queue->query_cv.wait(lock, [queue]() { return !queue->has_query || queue->shutdown; });

if (queue->shutdown)
{
auto * result = new local_result_v2{};
const char * error = "Connection is shutting down";
result->error_message = new char[strlen(error) + 1];
std::strcpy(result->error_message, error);
return result;
}
std::unique_lock<std::mutex> lock(queue->mutex);
// Wait until any ongoing query completes
queue->result_cv.wait(lock, [queue]() { return !queue->has_query || queue->shutdown; });

queue->current_query = {query, format};
queue->has_query = true;
queue->current_result = nullptr;
}
queue->query_cv.notify_one();
if (queue->shutdown)
{
result = new local_result_v2{};
const char * error = "Connection is shutting down";
result->error_message = new char[strlen(error) + 1];
std::strcpy(result->error_message, error);
return result;
}

local_result_v2 * result = nullptr;
{
std::unique_lock<std::mutex> lock(queue->mutex);
queue->result_cv.wait(lock, [queue]() { return queue->current_result != nullptr || queue->shutdown; });
// Set new query
queue->current_query = {query, format};
queue->has_query = true;
queue->current_result = nullptr;
}
queue->query_cv.notify_one();

if (!queue->shutdown && queue->current_result)
{
result = queue->current_result;
if (result->len == 0)
std::unique_lock<std::mutex> lock(queue->mutex);
queue->result_cv.wait(lock, [queue]() { return queue->current_result != nullptr || queue->shutdown; });

if (!queue->shutdown && queue->current_result)
{
LOG_DEBUG(getLogger("CHDB"), "Empty result returned for query: {}", query);
result = queue->current_result;
queue->current_result = nullptr;
queue->has_query = false;
}
queue->current_result = nullptr;
}
queue->query_cv.notify_one();
}

queue->query_cv.notify_one();
if (result == nullptr)
catch (...)
{
// Handle any exceptions during query processing
result = new local_result_v2{};
const char * error = "Error occurred while processing query";
result->error_message = new char[strlen(error) + 1];
std::strcpy(result->error_message, error);
}

if (!result)
{
result = new local_result_v2{};
const char * error = "Query processing failed";
result->error_message = new char[strlen(error) + 1];
std::strcpy(result->error_message, error);
}

return result;
}

Expand Down

0 comments on commit ad9b30f

Please sign in to comment.