From 5b214f7a88de8eafff27769f666ac06a0bae150a Mon Sep 17 00:00:00 2001 From: Devendar Bureddy Date: Wed, 4 Dec 2024 17:09:46 +0200 Subject: [PATCH] 6ffdb9d9 --- include/net_v9.h | 9 +++++++-- src/ib_plugin.c | 15 +++++---------- src/ibvwrap.c | 37 +++++++++++++++++++++++-------------- src/p2p_plugin.c | 5 +++++ src/sharp_plugin.c | 3 ++- src/socket.c | 2 +- 6 files changed, 43 insertions(+), 28 deletions(-) diff --git a/include/net_v9.h b/include/net_v9.h index 904f0d0..4c29cde 100644 --- a/include/net_v9.h +++ b/include/net_v9.h @@ -6,6 +6,9 @@ #define NCCL_NET_V9_H_ #include "net_device.h" +// Max number of ncclNet objects which can live in the same process +#define NCCL_NET_MAX_PLUGINS 3 + #define NCCL_NET_MAX_DEVS_PER_NIC_V9 4 #define NCCL_NET_MAX_DEVS_PER_NIC NCCL_NET_MAX_DEVS_PER_NIC_V9 @@ -91,8 +94,7 @@ typedef struct { // Notify the plugin that a recv has completed by the device ncclResult_t (*irecvConsumed)(void* recvComm, int n, void* request); - // Virtual NIC APIs. makeVDevice will create a virtual NIC given the specified properties, and tell the caller - // what index this new vNIC exists at + // Create a virtual NIC given the specified properties, which can be accessed at device index d ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props); } ncclNet_v9_t; @@ -147,6 +149,9 @@ typedef struct { // Close and free collective comm objects ncclResult_t (*closeColl)(void* collComm); ncclResult_t (*closeListen)(void* listenComm); + + // Create a virtual NIC given the specified properties, which can be accessed at device index d + ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props); } ncclCollNet_v9_t; #endif // end include guard diff --git a/src/ib_plugin.c b/src/ib_plugin.c index f097881..96571ea 100644 --- a/src/ib_plugin.c +++ b/src/ib_plugin.c @@ -44,7 +44,7 @@ NCCL_PARAM(IbRetryCnt, "IB_RETRY_CNT", 7); NCCL_PARAM(IbPkey, "IB_PKEY", 0); NCCL_PARAM(IbUseInline, "IB_USE_INLINE", 0); NCCL_PARAM(IbSl, "IB_SL", 0); -NCCL_PARAM(IbTc, "IB_TC", 0); +NCCL_PARAM(IbTc, "IB_TC", -1); NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192); NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2); NCCL_PARAM(IbFifoTc, "IB_FIFO_TC", 0); @@ -653,7 +653,7 @@ ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbNetCommDevBase* base, return ncclSuccess; } -ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint32_t dest_qp_num, struct ncclIbDevInfo* info, bool override_tc) { +ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint32_t dest_qp_num, struct ncclIbDevInfo* info, bool fifoTc) { struct ibv_qp_attr qpAttr; int same_subnet; memset(&qpAttr, 0, sizeof(struct ibv_qp_attr)); @@ -670,11 +670,7 @@ ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint qpAttr.ah_attr.grh.flow_label = 0; qpAttr.ah_attr.grh.sgid_index = sGidInfo->localGidIndex; qpAttr.ah_attr.grh.hop_limit = 255; - if(ncclParamIbFifoTc() && override_tc) { - qpAttr.ah_attr.grh.traffic_class = ncclParamIbFifoTc(); - } else { - qpAttr.ah_attr.grh.traffic_class = ncclParamIbTc(); - } + qpAttr.ah_attr.grh.traffic_class = fifoTc && ncclParamIbFifoTc() != -1 ? ncclParamIbFifoTc() : ncclParamIbTc(); } else { same_subnet = (ncclIbExtractLocalSubnetPrefix(sGidInfo->localGid.global.subnet_prefix) == ncclIbExtractLocalSubnetPrefix(info->gid.global.subnet_prefix)); @@ -995,7 +991,7 @@ ncclResult_t ncclIbConnect_v6(int dev, void* opaqueHandle, void** sendComm) { return ncclIbConnect(dev, opaqueHandle, sendComm, &handle); } -NCCL_PARAM(IbWarnRailLocal, "NCCL_IB_WARN_RAIL_LOCAL", 0); +NCCL_PARAM(IbWarnRailLocal, "IB_WARN_RAIL_LOCAL", 0); ncclResult_t ncclIbCheckVProps(ncclNetVDeviceProps_t* vProps1, ncclNetVDeviceProps_t* vProps2) { ncclNetVDeviceProps_t outVProps = {0}; @@ -1187,8 +1183,7 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl } else { meta.qpInfo[q].ece_supported = 0; } - bool override_tc = (q == 0) ? true : false; - NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, override_tc), ret, fail); + NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, true), ret, fail); NCCLCHECKGOTO(ncclIbRtsQp(qp->qp), ret, fail); } diff --git a/src/ibvwrap.c b/src/ibvwrap.c index 6b3a1b6..eac086d 100644 --- a/src/ibvwrap.c +++ b/src/ibvwrap.c @@ -196,11 +196,11 @@ static void ibvQpStateName(enum ibv_qp_state state, char* msg, const size_t len) } } -static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, char* msg, size_t msgLen) { +#define QP_ATTR(attr, userAttr, userFlag, mask) ((userFlag & mask) ? (userAttr) : (attr)) + +static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, struct ibv_qp_attr* userAttr, int userFlag, char* msg, size_t msgLen) { ncclResult_t res; - int portNum = -1; - struct ibv_qp_attr attr; - struct ibv_qp_init_attr init_attr; + int portNum = -1, gidIndex = -1; char localGidName[INET6_ADDRSTRLEN], remoteGidName[INET6_ADDRSTRLEN]; const char *localGidRes = NULL, *remoteGidRes = NULL; @@ -210,22 +210,31 @@ static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, char* m char devName[IBV_SYSFS_NAME_MAX] = ""; snprintf(devName, sizeof(devName), "%s", (qp->pd->context) ? wrap_ibv_get_device_name(qp->pd->context->device) : "N/A"); - // get the QP attr, if error, log what we have + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr; int attr_mask = IBV_QP_PORT | IBV_QP_AV; - NCCLCHECKGOTO(wrap_ibv_query_qp(qp, &attr, attr_mask, &init_attr), res, print); - portNum = attr.port_num; - if (attr.ah_attr.is_global) { - union ibv_gid* remoteGid = &attr.ah_attr.grh.dgid; + res = wrap_ibv_query_qp(qp, &attr, attr_mask, &init_attr); + struct ibv_qp_attr *qpAttr = (res == ncclSuccess) ? &attr : NULL; + + // port info, portAttr can be NULL if not given by the user and query_qp failed + struct ibv_qp_attr *portAttr = QP_ATTR(qpAttr, userAttr, userFlag, IBV_QP_PORT); + portNum = portAttr ? portAttr->port_num : -1; + + // address info, avAttr can be NULL if not given by the user and query_qp failed + struct ibv_qp_attr *avAttr = QP_ATTR(qpAttr, userAttr, userFlag, IBV_QP_AV); + if (avAttr && avAttr->ah_attr.is_global) { + union ibv_gid *remoteGid = &avAttr->ah_attr.grh.dgid; remoteGidRes = ibvGetGidStr(remoteGid, remoteGidName, sizeof(remoteGidName)); // we need pd->context to retrieve local GID, skip if not there if (!qp->pd->context) goto print; + gidIndex = avAttr->ah_attr.grh.sgid_index; union ibv_gid localGid; - NCCLCHECKGOTO(wrap_ibv_query_gid(qp->pd->context, attr.port_num, attr.ah_attr.grh.sgid_index, &localGid), res, print); + NCCLCHECKGOTO(wrap_ibv_query_gid(qp->pd->context, portNum, gidIndex, &localGid), res, print); localGidRes = ibvGetGidStr(&localGid, localGidName, sizeof(localGidName)); } print: - snprintf(msg, msgLen, "on dev %s:%d, curr state %s, next state %s, local GID %s, remote GID %s", - devName, portNum, currState, nextState, localGidRes ? localGidName : "N/A", remoteGidRes ? remoteGidName : "N/A"); + snprintf(msg, msgLen, "on dev %s:%d, curr state %s, next state %s, local GID index %d, local GID %s, remote GID %s", + devName, portNum, currState, nextState, gidIndex, localGidRes ? localGidName : "N/A", remoteGidRes ? remoteGidName : "N/A"); return; } @@ -237,7 +246,7 @@ ncclResult_t wrap_ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int do { if (attempts > 0) { unsigned int sleepTime = timeOut * attempts; - ibvModifyQpLog(qp, attr->qp_state, qpMsg, sizeof(qpMsg)); + ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg)); INFO(NCCL_NET, "Call to ibv_modify_qp failed with %d %s, %s, retrying %d/%d after %u msec of sleep", ret, strerror(ret), qpMsg, attempts, maxCnt, sleepTime); // sleep before retrying struct timespec tv = {.tv_sec = sleepTime / 1000, .tv_nsec = (sleepTime % 1000) * ((long)1e6)}; @@ -247,7 +256,7 @@ ncclResult_t wrap_ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int attempts++; } while (IBV_MQP_RETRY_ERRNO_ALL(ret) && attempts < maxCnt); if (ret != 0) { - ibvModifyQpLog(qp, attr->qp_state, qpMsg, sizeof(qpMsg)); + ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg)); WARN("Call to ibv_modify_qp failed with %d %s, %s", ret, strerror(ret), qpMsg); return ncclSystemError; } diff --git a/src/p2p_plugin.c b/src/p2p_plugin.c index 293d906..62bbbd8 100644 --- a/src/p2p_plugin.c +++ b/src/p2p_plugin.c @@ -409,6 +409,11 @@ ncclResult_t ncclIbMakeVDeviceInternal(int* d, ncclNetVDeviceProps_t* props, int return ncclInvalidUsage; } + if (props->ndevs == 0) { + WARN("NET/IB : Can't make virtual NIC with 0 devices"); + return ncclInvalidUsage; + } + if (*ncclNMergedIbDevs == MAX_IB_VDEVS) { WARN("NET/IB : Cannot allocate any more virtual devices (%d)", MAX_IB_VDEVS); return ncclInvalidUsage; diff --git a/src/sharp_plugin.c b/src/sharp_plugin.c index 1544cd9..6234dd8 100644 --- a/src/sharp_plugin.c +++ b/src/sharp_plugin.c @@ -775,7 +775,8 @@ ncclCollNet_v9_t ncclCollNetPlugin_v9 = { ncclSharpIflush, ncclSharpTest, ncclSharpCloseColl, - ncclSharpCloseListen + ncclSharpCloseListen, + NULL }; ncclCollNet_v8_t ncclCollNetPlugin_v8 = { diff --git a/src/socket.c b/src/socket.c index 52e0233..89365e3 100755 --- a/src/socket.c +++ b/src/socket.c @@ -913,7 +913,7 @@ ncclResult_t ncclSocketShutdown(struct ncclSocket* sock, int how) { ncclResult_t ncclSocketClose(struct ncclSocket* sock) { if (sock != NULL) { - if (sock->fd >= 0) { + if (sock->state > ncclSocketStateNone && sock->state < ncclSocketStateNum && sock->fd >= 0) { /* shutdown() is needed to send FIN packet to proxy thread; shutdown() is not affected * by refcount of fd, but close() is. close() won't close a fd and send FIN packet if * the fd is duplicated (e.g. fork()). So shutdown() guarantees the correct and graceful