Skip to content

Commit

Permalink
MCOL-5279 This approach executes same node DEC::writeToClient the las…
Browse files Browse the repository at this point in the history
…t taking multiple ExeMgrs into account (#2623)

Co-authored-by: Roman Nozdrin <rnozdrin@mariadb.com>
  • Loading branch information
David.Hall and Roman Nozdrin authored Nov 11, 2022
1 parent e5f3260 commit 61d5f80
Showing 1 changed file with 55 additions and 6 deletions.
61 changes: 55 additions & 6 deletions dbcon/joblist/distributedenginecomm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,14 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
toAck = &ism->Size;

msg->advanceInputPtr(sizeof(ISMPacketHeader));

// There must be only one local connection here.
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();
for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
localConnectionId = i;
}
bool sendToLocal = false;
while (l_msgCount > 0)
{
/* could have to send up to pmCount ACKs */
Expand All @@ -738,9 +745,19 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId)
{
sendToLocal = true;
continue;
}
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal && localConnectionId < fPmConnections.size())
{
pmAcked[localConnectionId] = true;
writeToClient(localConnectionId, msg);
}

// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
// This is apply to the big message case only. For small messages, the flow control is
Expand All @@ -749,17 +766,27 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{
uint64_t totalUnackedWork = 0;

for (int i = pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < pmCount; ++i)
totalUnackedWork += mqe->unackedWork[i];

if (totalUnackedWork == 0)
{
*toAck = 1;

for (int i = pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < pmCount; ++i)
{
if (!pmAcked[i])
{
if (i == localConnectionId)
{
continue;
}
writeToClient(i, msg);
}
}
if (!pmAcked[localConnectionId])
{
writeToClient(localConnectionId, msg);
}
}
}
Expand Down Expand Up @@ -847,9 +874,19 @@ void DistributedEngineComm::setFlowControl(bool enabled, uint32_t uniqueID, boos
#endif

msg->advanceInputPtr(sizeof(ISMPacketHeader));
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();

for (int i = mqe->pmCount - 1; i >= 0; --i)
for (uint32_t i = 0; i < mqe->pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
{
localConnectionId = i;
continue;
}
writeToClient(i, msg);
}
if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg);
}

void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
Expand All @@ -875,9 +912,21 @@ void DistributedEngineComm::write(uint32_t senderID, const SBS& msg)
case DICT_DESTROY_EQUALITY_FILTER:
/* XXXPAT: This relies on the assumption that the first pmCount "PMS*"
entries in the config file point to unique PMs */
for (int i = pmCount - 1; i >= 0; --i)
writeToClient(i, msg, senderID);
{
uint32_t localConnectionId = std::numeric_limits<uint32_t>::max();

for (uint32_t i = 0; i < pmCount; ++i)
{
if (fPmConnections[i]->atTheSameHost() && fIsExeMgr)
{
localConnectionId = i;
continue;
}
writeToClient(i, msg, senderID);
}
if (localConnectionId < fPmConnections.size())
writeToClient(localConnectionId, msg);
}
return;

case BATCH_PRIMITIVE_RUN:
Expand Down

0 comments on commit 61d5f80

Please sign in to comment.