Skip to content

Commit

Permalink
6ffdb9d9
Browse files Browse the repository at this point in the history
  • Loading branch information
bureddy committed Dec 4, 2024
1 parent fbdb6ee commit 5b214f7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 28 deletions.
9 changes: 7 additions & 2 deletions include/net_v9.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#define NCCL_NET_V9_H_
#include "net_device.h"

// Max number of ncclNet objects which can live in the same process
#define NCCL_NET_MAX_PLUGINS 3

#define NCCL_NET_MAX_DEVS_PER_NIC_V9 4
#define NCCL_NET_MAX_DEVS_PER_NIC NCCL_NET_MAX_DEVS_PER_NIC_V9

Expand Down Expand Up @@ -91,8 +94,7 @@ typedef struct {
// Notify the plugin that a recv has completed by the device
ncclResult_t (*irecvConsumed)(void* recvComm, int n, void* request);

// Virtual NIC APIs. makeVDevice will create a virtual NIC given the specified properties, and tell the caller
// what index this new vNIC exists at
// Create a virtual NIC given the specified properties, which can be accessed at device index d
ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props);
} ncclNet_v9_t;

Expand Down Expand Up @@ -147,6 +149,9 @@ typedef struct {
// Close and free collective comm objects
ncclResult_t (*closeColl)(void* collComm);
ncclResult_t (*closeListen)(void* listenComm);

// Create a virtual NIC given the specified properties, which can be accessed at device index d
ncclResult_t (*makeVDevice)(int* d, ncclNetVDeviceProps_t* props);
} ncclCollNet_v9_t;

#endif // end include guard
15 changes: 5 additions & 10 deletions src/ib_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ NCCL_PARAM(IbRetryCnt, "IB_RETRY_CNT", 7);
NCCL_PARAM(IbPkey, "IB_PKEY", 0);
NCCL_PARAM(IbUseInline, "IB_USE_INLINE", 0);
NCCL_PARAM(IbSl, "IB_SL", 0);
NCCL_PARAM(IbTc, "IB_TC", 0);
NCCL_PARAM(IbTc, "IB_TC", -1);
NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192);
NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2);
NCCL_PARAM(IbFifoTc, "IB_FIFO_TC", 0);
Expand Down Expand Up @@ -653,7 +653,7 @@ ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbNetCommDevBase* base,
return ncclSuccess;
}

ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint32_t dest_qp_num, struct ncclIbDevInfo* info, bool override_tc) {
ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint32_t dest_qp_num, struct ncclIbDevInfo* info, bool fifoTc) {
struct ibv_qp_attr qpAttr;
int same_subnet;
memset(&qpAttr, 0, sizeof(struct ibv_qp_attr));
Expand All @@ -670,11 +670,7 @@ ncclResult_t ncclIbRtrQp(struct ibv_qp* qp, struct ncclIbGidInfo* sGidInfo, uint
qpAttr.ah_attr.grh.flow_label = 0;
qpAttr.ah_attr.grh.sgid_index = sGidInfo->localGidIndex;
qpAttr.ah_attr.grh.hop_limit = 255;
if(ncclParamIbFifoTc() && override_tc) {
qpAttr.ah_attr.grh.traffic_class = ncclParamIbFifoTc();
} else {
qpAttr.ah_attr.grh.traffic_class = ncclParamIbTc();
}
qpAttr.ah_attr.grh.traffic_class = fifoTc && ncclParamIbFifoTc() != -1 ? ncclParamIbFifoTc() : ncclParamIbTc();
} else {
same_subnet = (ncclIbExtractLocalSubnetPrefix(sGidInfo->localGid.global.subnet_prefix) ==
ncclIbExtractLocalSubnetPrefix(info->gid.global.subnet_prefix));
Expand Down Expand Up @@ -995,7 +991,7 @@ ncclResult_t ncclIbConnect_v6(int dev, void* opaqueHandle, void** sendComm) {
return ncclIbConnect(dev, opaqueHandle, sendComm, &handle);
}

NCCL_PARAM(IbWarnRailLocal, "NCCL_IB_WARN_RAIL_LOCAL", 0);
NCCL_PARAM(IbWarnRailLocal, "IB_WARN_RAIL_LOCAL", 0);

ncclResult_t ncclIbCheckVProps(ncclNetVDeviceProps_t* vProps1, ncclNetVDeviceProps_t* vProps2) {
ncclNetVDeviceProps_t outVProps = {0};
Expand Down Expand Up @@ -1187,8 +1183,7 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl
} else {
meta.qpInfo[q].ece_supported = 0;
}
bool override_tc = (q == 0) ? true : false;
NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, override_tc), ret, fail);
NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, true), ret, fail);
NCCLCHECKGOTO(ncclIbRtsQp(qp->qp), ret, fail);
}

Expand Down
37 changes: 23 additions & 14 deletions src/ibvwrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ static void ibvQpStateName(enum ibv_qp_state state, char* msg, const size_t len)
}
}

static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, char* msg, size_t msgLen) {
#define QP_ATTR(attr, userAttr, userFlag, mask) ((userFlag & mask) ? (userAttr) : (attr))

static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, struct ibv_qp_attr* userAttr, int userFlag, char* msg, size_t msgLen) {
ncclResult_t res;
int portNum = -1;
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr;
int portNum = -1, gidIndex = -1;
char localGidName[INET6_ADDRSTRLEN], remoteGidName[INET6_ADDRSTRLEN];
const char *localGidRes = NULL, *remoteGidRes = NULL;

Expand All @@ -210,22 +210,31 @@ static void ibvModifyQpLog(struct ibv_qp* qp, enum ibv_qp_state qpState, char* m
char devName[IBV_SYSFS_NAME_MAX] = "";
snprintf(devName, sizeof(devName), "%s", (qp->pd->context) ? wrap_ibv_get_device_name(qp->pd->context->device) : "N/A");

// get the QP attr, if error, log what we have
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr;
int attr_mask = IBV_QP_PORT | IBV_QP_AV;
NCCLCHECKGOTO(wrap_ibv_query_qp(qp, &attr, attr_mask, &init_attr), res, print);
portNum = attr.port_num;
if (attr.ah_attr.is_global) {
union ibv_gid* remoteGid = &attr.ah_attr.grh.dgid;
res = wrap_ibv_query_qp(qp, &attr, attr_mask, &init_attr);
struct ibv_qp_attr *qpAttr = (res == ncclSuccess) ? &attr : NULL;

// port info, portAttr can be NULL if not given by the user and query_qp failed
struct ibv_qp_attr *portAttr = QP_ATTR(qpAttr, userAttr, userFlag, IBV_QP_PORT);
portNum = portAttr ? portAttr->port_num : -1;

// address info, avAttr can be NULL if not given by the user and query_qp failed
struct ibv_qp_attr *avAttr = QP_ATTR(qpAttr, userAttr, userFlag, IBV_QP_AV);
if (avAttr && avAttr->ah_attr.is_global) {
union ibv_gid *remoteGid = &avAttr->ah_attr.grh.dgid;
remoteGidRes = ibvGetGidStr(remoteGid, remoteGidName, sizeof(remoteGidName));
// we need pd->context to retrieve local GID, skip if not there
if (!qp->pd->context) goto print;
gidIndex = avAttr->ah_attr.grh.sgid_index;
union ibv_gid localGid;
NCCLCHECKGOTO(wrap_ibv_query_gid(qp->pd->context, attr.port_num, attr.ah_attr.grh.sgid_index, &localGid), res, print);
NCCLCHECKGOTO(wrap_ibv_query_gid(qp->pd->context, portNum, gidIndex, &localGid), res, print);
localGidRes = ibvGetGidStr(&localGid, localGidName, sizeof(localGidName));
}
print:
snprintf(msg, msgLen, "on dev %s:%d, curr state %s, next state %s, local GID %s, remote GID %s",
devName, portNum, currState, nextState, localGidRes ? localGidName : "N/A", remoteGidRes ? remoteGidName : "N/A");
snprintf(msg, msgLen, "on dev %s:%d, curr state %s, next state %s, local GID index %d, local GID %s, remote GID %s",
devName, portNum, currState, nextState, gidIndex, localGidRes ? localGidName : "N/A", remoteGidRes ? remoteGidName : "N/A");
return;
}

Expand All @@ -237,7 +246,7 @@ ncclResult_t wrap_ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int
do {
if (attempts > 0) {
unsigned int sleepTime = timeOut * attempts;
ibvModifyQpLog(qp, attr->qp_state, qpMsg, sizeof(qpMsg));
ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg));
INFO(NCCL_NET, "Call to ibv_modify_qp failed with %d %s, %s, retrying %d/%d after %u msec of sleep", ret, strerror(ret), qpMsg, attempts, maxCnt, sleepTime);
// sleep before retrying
struct timespec tv = {.tv_sec = sleepTime / 1000, .tv_nsec = (sleepTime % 1000) * ((long)1e6)};
Expand All @@ -247,7 +256,7 @@ ncclResult_t wrap_ibv_modify_qp(struct ibv_qp* qp, struct ibv_qp_attr* attr, int
attempts++;
} while (IBV_MQP_RETRY_ERRNO_ALL(ret) && attempts < maxCnt);
if (ret != 0) {
ibvModifyQpLog(qp, attr->qp_state, qpMsg, sizeof(qpMsg));
ibvModifyQpLog(qp, attr->qp_state, attr, attr_mask, qpMsg, sizeof(qpMsg));
WARN("Call to ibv_modify_qp failed with %d %s, %s", ret, strerror(ret), qpMsg);
return ncclSystemError;
}
Expand Down
5 changes: 5 additions & 0 deletions src/p2p_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ ncclResult_t ncclIbMakeVDeviceInternal(int* d, ncclNetVDeviceProps_t* props, int
return ncclInvalidUsage;
}

if (props->ndevs == 0) {
WARN("NET/IB : Can't make virtual NIC with 0 devices");
return ncclInvalidUsage;
}

if (*ncclNMergedIbDevs == MAX_IB_VDEVS) {
WARN("NET/IB : Cannot allocate any more virtual devices (%d)", MAX_IB_VDEVS);
return ncclInvalidUsage;
Expand Down
3 changes: 2 additions & 1 deletion src/sharp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,8 @@ ncclCollNet_v9_t ncclCollNetPlugin_v9 = {
ncclSharpIflush,
ncclSharpTest,
ncclSharpCloseColl,
ncclSharpCloseListen
ncclSharpCloseListen,
NULL
};

ncclCollNet_v8_t ncclCollNetPlugin_v8 = {
Expand Down
2 changes: 1 addition & 1 deletion src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ ncclResult_t ncclSocketShutdown(struct ncclSocket* sock, int how) {

ncclResult_t ncclSocketClose(struct ncclSocket* sock) {
if (sock != NULL) {
if (sock->fd >= 0) {
if (sock->state > ncclSocketStateNone && sock->state < ncclSocketStateNum && sock->fd >= 0) {
/* shutdown() is needed to send FIN packet to proxy thread; shutdown() is not affected
* by refcount of fd, but close() is. close() won't close a fd and send FIN packet if
* the fd is duplicated (e.g. fork()). So shutdown() guarantees the correct and graceful
Expand Down

0 comments on commit 5b214f7

Please sign in to comment.