From ffbfdd355a7c5e6b0d57955735ad7ae2aea571a6 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 17 Sep 2024 14:45:15 -0700 Subject: [PATCH 01/25] Non-blocking PUT implementation Previously, non-blocking PUTs were implemented via blocking PUTs, which could severely limit performance. Prior to 2.0, small PUTs invoked fi_inject_write, which essentially turned them into non-blocking PUTs, but chpl_comm_put returned as if the PUT was completed. This could cause MCM violations as well as hangs caused by not progressing the network stack properly. These deficiences were fixed in 2.0, but led to a performance regression. This commit implements non-blocking PUTs correctly, so that the chpl_comm_*nb* functions work correctly. This should restore 1.32.0 performance while avoiding MCM violations and hangs. Signed-off-by: John H. Hartman --- runtime/include/chpl-comm.h | 2 + runtime/include/chpl-mem-desc.h | 1 + runtime/src/comm/ofi/comm-ofi.c | 412 ++++++++++++++++++++++---------- 3 files changed, 287 insertions(+), 128 deletions(-) diff --git a/runtime/include/chpl-comm.h b/runtime/include/chpl-comm.h index 9fd6d2432bee..2bb1e54ddc78 100644 --- a/runtime/include/chpl-comm.h +++ b/runtime/include/chpl-comm.h @@ -155,6 +155,8 @@ void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); // detected. int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); +void chpl_comm_free_nb(chpl_comm_nb_handle_t* h); + // Returns whether or not the passed wide address is known to be in // a communicable memory region and known to be readable. That is, // GET to that address should succeed without an access violation diff --git a/runtime/include/chpl-mem-desc.h b/runtime/include/chpl-mem-desc.h index 4855aef27d59..f9e65ea46892 100644 --- a/runtime/include/chpl-mem-desc.h +++ b/runtime/include/chpl-mem-desc.h @@ -67,6 +67,7 @@ extern "C" { m(COMM_PER_LOC_INFO, "comm layer per-locale information", false), \ m(COMM_PRV_OBJ_ARRAY, "comm layer private objects array", false), \ m(COMM_PRV_BCAST_DATA, "comm layer private broadcast data", false), \ + m(COMM_NB_HANDLE, "comm layer non-blocking handle", false), \ m(MEM_HEAP_SPACE, "mem layer heap expansion space", false), \ m(GLOM_STRINGS_DATA, "glom strings data", true ), \ m(STRING_LITERALS_BUF, "string literals buffer", true ), \ diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 87e6c6ad6438..56fd205c8900 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -4314,10 +4314,17 @@ void amRequestExecOn(c_nodeid_t node, c_sublocid_t subloc, } } - +/* + * amRequestRmaPut + * + * Performs a PUT by sending an active message to the remote node that + * causes it to perform GET. This is currently a blocking operation + * so the "blocking" argument is unused. When this operation returns + * the data have been successfully transmitted to the remote node. + */ static inline void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size, - chpl_bool blocking) { + chpl_bool blocking /* unused */) { assert(!isAmHandler); retireDelayedAmDone(false /*taskIsEnding*/); @@ -4335,7 +4342,7 @@ void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size, .addr = raddr, .raddr = myAddr, .size = size, }, }; - amRequestCommon(node, &req, sizeof(req.rma), true, NULL); + amRequestCommon(node, &req, sizeof(req.rma), true /*blocking*/, NULL); mrUnLocalizeSource(myAddr, addr); } @@ -4360,7 +4367,7 @@ void amRequestRmaGet(c_nodeid_t node, void* addr, void* raddr, size_t size) { .addr = raddr, .raddr = myAddr, .size = size, }, }; - amRequestCommon(node, &req, sizeof(req.rma), true, NULL); + amRequestCommon(node, &req, sizeof(req.rma), true /*blocking*/, NULL); mrUnLocalizeTarget(myAddr, addr, size); } @@ -5446,11 +5453,54 @@ void amCheckLiveness(void) { // Interface: RMA // +// OFI-specific non-blocking handle implementation + +typedef struct chpl_comm_ofi_nb_handle_t { + chpl_bool completed; // operation has completed + size_t count; // number of sub-operations + chpl_atomic_bool complete[1]; // flag for sub-operation completion +} chpl_comm_ofi_nb_handle_t; + chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { - chpl_comm_put(addr, node, raddr, size, commID, ln, fn); - return NULL; + DBG_PRINTF(DBG_IFACE, + "%s(%p, %d, %p, %zd, %d)", __func__, + addr, (int) node, raddr, size, (int) commID); + + retireDelayedAmDone(false /*taskIsEnding*/); + + chpl_comm_ofi_nb_handle_t *handle = NULL; + + // + // Sanity checks, self-communication. + // + CHK_TRUE(addr != NULL); + CHK_TRUE(raddr != NULL); + + if (size == 0) { + goto done; + } + + if (node == chpl_nodeID) { + memmove(raddr, addr, size); + goto done; + } + + // Communications callback support + if (chpl_comm_have_callbacks(chpl_comm_cb_event_kind_put)) { + chpl_comm_cb_info_t cb_data = + {chpl_comm_cb_event_kind_put, chpl_nodeID, node, + .iu.comm={addr, raddr, size, commID, ln, fn}}; + chpl_comm_do_callbacks (&cb_data); + } + + chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); + chpl_comm_diags_incr(put); + + handle = ofi_put(addr, node, raddr, size, false /*blocking*/); +done: + return handle; } @@ -5463,35 +5513,104 @@ chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { + chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h; chpl_comm_diags_incr(test_nb); - - // fi_cq_readfrom? - return ((void*) h) == NULL; + DBG_PRINTF(DBG_RMA, "chpl_comm_test_nb_complete %p", handle); + int completed = 1; + if (handle != NULL) { + completed = handle->completed; + } + DBG_PRINTF(DBG_RMA, "chpl_comm_test_nb_complete %p %s", handle, + completed ? "true" : "false"); + return completed; } +/* + * check_complete + * + * Returns true if a new handle completion is detected, false otherwise + * Ignores handles that have previously completed. If blocking is true and + * there are uncompleted handles this will not return until a new handle + * completion is detected. + */ +static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, + chpl_bool blocking) { -void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { - chpl_comm_diags_incr(wait_nb); + DBG_PRINTF(DBG_RMA, "check_complete"); - size_t i; - // fi_cq_readfrom? - for( i = 0; i < nhandles; i++ ) { - CHK_TRUE(h[i] == NULL); + chpl_bool completed = false; // at least one new completion detected + chpl_bool pending = false; // there is a handle with uncompleted operations + struct perTxCtxInfo_t* tcip = NULL; + while (true) { + pending = false; + for(size_t i = 0; i < nhandles; i++) { + chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h[i]; + DBG_PRINTF(DBG_RMA, "handle[%d] %p", i, handle); + + // ignore handles that have already completed + // NULL handles have by definition already completed + if ((handle == NULL) || handle->completed) { + continue; + } + pending = true; + // determine if this handle is now complete by checking the status + // of its individual operations + chpl_bool handleComplete = true; + for (size_t j = 0; j < handle->count; j++) { + if(!atomic_load_explicit_bool(&handle->complete[j], + chpl_memory_order_acquire)) { + handleComplete = false; + break; + } + } + if (handleComplete) { + completed = true; + handle->completed = true; + // break here when one handle completes instead of checking them all? + } + } + DBG_PRINTF(DBG_RMA, "check_complete blocking %s", blocking ? "true" : "false"); + DBG_PRINTF(DBG_RMA, "check_complete completed %s", completed ? "true" : "false"); + DBG_PRINTF(DBG_RMA, "check_complete pending %s", pending ? "true" : "false"); + if (!blocking || completed || !pending) { + break; + } + // progress the endpoint so handles can complete and then try again + if (tcip == NULL) { + CHK_TRUE((tcip = tciAlloc()) != NULL); + } + DBG_PRINTF(DBG_RMA, "check_complete yielding tcip %p", tcip); + sched_yield(); + (*tcip->ensureProgressFn)(tcip); + } + if (tcip) { + tciFree(tcip); } + DBG_PRINTF(DBG_RMA, "check_complete returning %s", completed ? + "true" : "false"); + return completed; } +void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { + chpl_comm_diags_incr(wait_nb); + + DBG_PRINTF(DBG_RMA, "chpl_comm_wait_nb_some"); + + (void) check_complete(h, nhandles, true /*blocking*/); +} int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(try_nb); - size_t i; - // fi_cq_readfrom? - for( i = 0; i < nhandles; i++ ) { - CHK_TRUE(h[i] == NULL); - } - return 0; + DBG_PRINTF(DBG_RMA, "chpl_comm_try_nb_some"); + return check_complete(h, nhandles, false /*blocking*/); } +void chpl_comm_free_nb(chpl_comm_nb_handle_t* h) { + if (h != NULL) { + chpl_mem_free(h, 0, 0); + } +} void chpl_comm_put(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { @@ -5901,70 +6020,103 @@ void waitForCQSpace(struct perTxCtxInfo_t* tcip, size_t len) { } } -typedef chpl_comm_nb_handle_t (rmaPutFn_t)(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip); +typedef void (rmaPutFn_t)(void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + chpl_bool blocking, + chpl_atomic_bool *done, + struct perTxCtxInfo_t* tcip); static rmaPutFn_t rmaPutFn_selector; static inline chpl_comm_nb_handle_t ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size, chpl_bool blocking) { - // - // Don't ask the provider to transfer more than it wants to. - // - if (size > ofi_info->ep_attr->max_msg_size) { + + char *src = (char *) addr; + char *dest = (char *) raddr; + chpl_comm_ofi_nb_handle_t *handle = NULL; + + // Determine how many operations the PUT requires based on the provider's + // maximum message size + + int ops = (size + ofi_info->ep_attr->max_msg_size - 1) / + ofi_info->ep_attr->max_msg_size; + if (ops > 1) { DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "splitting large PUT %d:%p <= %p, size %zd", (int) node, raddr, addr, size); + } - size_t chunkSize = ofi_info->ep_attr->max_msg_size; - for (size_t i = 0; i < size; i += chunkSize) { - if (chunkSize > size - i) { - chunkSize = size - i; - } - (void) ofi_put(&((const char*) addr)[i], node, &((char*) raddr)[i], - chunkSize, blocking); - } - - return NULL; + struct perTxCtxInfo_t* tcip = NULL; + CHK_TRUE((tcip = tciAlloc()) != NULL); + if (!blocking && !tcip->bound) { + // Non-blocking operations require bound endpoints + blocking = true; } - DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, - "PUT %d:%p <= %p, size %zd", - (int) node, raddr, addr, size); + if (!blocking) { + // Allocate a handle large enough to hold one "done" flags per op + int handleSize = sizeof(chpl_comm_ofi_nb_handle_t) + + ((ops - 1) * sizeof(chpl_atomic_bool)); + handle = chpl_mem_alloc(handleSize, CHPL_RT_MD_COMM_NB_HANDLE, 0, 0); - // - // If the remote address is directly accessible do an RMA from this - // side; otherwise do the opposite RMA from the other side. - // - chpl_comm_nb_handle_t ret; - uint64_t mrKey; - uint64_t mrRaddr; - if (mrGetKey(&mrKey, &mrRaddr, node, raddr, size)) { - struct perTxCtxInfo_t* tcip; - CHK_TRUE((tcip = tciAlloc()) != NULL); - if (tcip->txCntr == NULL) { - waitForCQSpace(tcip, 1); + handle->count = ops; + handle->completed = false; + for (size_t i = 0; i < ops; i++) { + atomic_init_bool(&handle->complete[i], false); } + } - void* mrDesc; - void* myAddr = mrLocalizeSource(&mrDesc, addr, size, "PUT src"); + size_t chunkSize = ofi_info->ep_attr->max_msg_size; + size_t offset = 0; + for (int i = 0; i < ops; i++) { + if (chunkSize > size - offset) { + chunkSize = size - offset; + } + DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, + "PUT %d:%p <= %p, size %zd, %s", + (int) node, dest, src, chunkSize, + blocking ? "blocking" : "non-blocking"); - ret = rmaPutFn_selector(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); + // + // If the remote address is directly accessible do an RMA from this + // side; otherwise do the opposite RMA from the other side. + // + uint64_t mrKey; + uint64_t mrRaddr; + if (mrGetKey(&mrKey, &mrRaddr, node, (void *) dest, chunkSize)) { + if (tcip->txCntr == NULL) { + // TODO: why is this necessary? + waitForCQSpace(tcip, 1); + } - mrUnLocalizeSource(myAddr, addr); - tciFree(tcip); - } else { - amRequestRmaPut(node, (void*) addr, raddr, size, blocking); - ret = NULL; - } + void* mrDesc; + void* myAddr = mrLocalizeSource(&mrDesc, (const void *) src, + chunkSize, "PUT src"); - return ret; + chpl_atomic_bool *done = blocking ? NULL : &handle->complete[i]; + rmaPutFn_selector(myAddr, mrDesc, node, mrRaddr, mrKey, chunkSize, + blocking, done, tcip); + + mrUnLocalizeSource(myAddr, src); + } else { + amRequestRmaPut(node, (void *) src, (void *) dest, size, blocking); + // amRequestRmaPut is currently a blocking operation, so mark + // the operation as complete + if (!blocking) { + atomic_init_bool(&(handle->complete[i]), true); + } + } + offset += chunkSize; + src += chunkSize; + dest += chunkSize; + } + tciFree(tcip); + DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, + "PUT %d:%p <= %p, handle %p", handle); + return handle; } @@ -5973,33 +6125,31 @@ static rmaPutFn_t rmaPutFn_msgOrd; static rmaPutFn_t rmaPutFn_dlvrCmplt; static inline -chpl_comm_nb_handle_t rmaPutFn_selector(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { - chpl_comm_nb_handle_t ret = NULL; +void rmaPutFn_selector(void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + chpl_bool blocking, + chpl_atomic_bool *done, + struct perTxCtxInfo_t* tcip) { switch (mcmMode) { - case mcmm_msgOrdFence: - ret = rmaPutFn_msgOrdFence(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - case mcmm_msgOrd: - ret = rmaPutFn_msgOrd(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - case mcmm_dlvrCmplt: - ret = rmaPutFn_dlvrCmplt(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, tcip); - break; - default: - INTERNAL_ERROR_V("unexpected mcmMode %d", mcmMode); - break; - } - - return ret; + case mcmm_msgOrdFence: + rmaPutFn_msgOrdFence(myAddr, mrDesc, node, mrRaddr, mrKey, size, + blocking, done, tcip); + break; + case mcmm_msgOrd: + rmaPutFn_msgOrd(myAddr, mrDesc, node, mrRaddr, mrKey, size, + blocking, done, tcip); + break; + case mcmm_dlvrCmplt: + rmaPutFn_dlvrCmplt(myAddr, mrDesc, node, mrRaddr, mrKey, size, + blocking, done, tcip); + break; + default: + INTERNAL_ERROR_V("unexpected mcmMode %d", mcmMode); + break; + } } @@ -6020,12 +6170,13 @@ static ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, // Implements ofi_put() when MCM mode is message ordering with fences. // static -chpl_comm_nb_handle_t rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { +void rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + chpl_bool blocking, + chpl_atomic_bool *done, + struct perTxCtxInfo_t* tcip) { uint64_t flags = 0; chpl_atomic_bool txnDone; void *ctx; @@ -6040,17 +6191,18 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, // memory visibility until later. // flags = FI_INJECT; - } else { - blocking = true; } if (bitmapTest(tcip->amoVisBitmap, node)) { // - // Special case: If our last operation was an AMO then we need to do a - // fenced PUT to force the AMO to complete before this PUT. + // Special case: If our last operation was an AMO then we need to do a + // fenced PUT to force the AMO to be visible before this PUT. // flags |= FI_FENCE; } - ctx = TX_CTX_INIT(tcip, blocking, &txnDone); + if (done == NULL) { + done = &txnDone; + } + ctx = txCtxInit(tcip, __LINE__, done); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); if (blocking) { @@ -6070,8 +6222,6 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, } else { mcmReleaseOneNode(node, tcip, "PUT"); } - - return NULL; } @@ -6079,12 +6229,13 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, // Implements ofi_put() when MCM mode is message ordering. // TODO: see comment for rmaPutFn_msgOrdFence. static -chpl_comm_nb_handle_t rmaPutFn_msgOrd(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { +void rmaPutFn_msgOrd(void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + chpl_bool blocking, + chpl_atomic_bool *done, + struct perTxCtxInfo_t* tcip) { uint64_t flags = 0; chpl_atomic_bool txnDone; @@ -6106,10 +6257,11 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrd(void* myAddr, void* mrDesc, // and we have a bound tx context so we can delay forcing the // memory visibility until later. flags = FI_INJECT; - } else { - blocking = true; } - ctx = TX_CTX_INIT(tcip, blocking, &txnDone); + if (done == NULL) { + done = &txnDone; + } + ctx = TX_CTX_INIT(tcip, blocking, done); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); @@ -6123,8 +6275,6 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrd(void* myAddr, void* mrDesc, } else { mcmReleaseOneNode(node, tcip, "PUT"); } - - return NULL; } @@ -6132,19 +6282,24 @@ chpl_comm_nb_handle_t rmaPutFn_msgOrd(void* myAddr, void* mrDesc, // Implements ofi_put() when MCM mode is delivery complete. // static -chpl_comm_nb_handle_t rmaPutFn_dlvrCmplt(void* myAddr, void* mrDesc, - c_nodeid_t node, - uint64_t mrRaddr, uint64_t mrKey, - size_t size, - chpl_bool blocking, - struct perTxCtxInfo_t* tcip) { +void rmaPutFn_dlvrCmplt(void* myAddr, void* mrDesc, + c_nodeid_t node, + uint64_t mrRaddr, uint64_t mrKey, + size_t size, + chpl_bool blocking, + chpl_atomic_bool *done, + struct perTxCtxInfo_t* tcip) { chpl_atomic_bool txnDone; - void *ctx = TX_CTX_INIT(tcip, true /*blocking*/, &txnDone); + if (done == NULL) { + done = &txnDone; + } + void *ctx = TX_CTX_INIT(tcip, blocking, done); (void) wrap_fi_write(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, tcip); - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - return NULL; + if (blocking) { + waitForTxnComplete(tcip, ctx); + txCtxCleanup(ctx); + } } @@ -6193,8 +6348,8 @@ ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, } DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "tx write msg: %d:%#" PRIx64 " <= %p, size %zd, ctx %p, " - "flags %#" PRIx64, - (int) node, mrRaddr, addr, size, ctx, flags); + "flags %#" PRIx64 " tcip %p", + (int) node, mrRaddr, addr, size, ctx, flags, tcip); OFI_RIDE_OUT_EAGAIN(tcip, fi_writemsg(tcip->txCtx, &msg, flags)); tcip->numTxnsOut++; tcip->numTxnsSent++; @@ -6375,6 +6530,7 @@ chpl_comm_nb_handle_t ofi_get(void* addr, c_nodeid_t node, if (mrGetKey(&mrKey, &mrRaddr, node, raddr, size)) { struct perTxCtxInfo_t* tcip; CHK_TRUE((tcip = tciAlloc()) != NULL); + // TODO: Why is this necessary? waitForCQSpace(tcip, 1); void* mrDesc; From 4ba2770446315b28d21692cc912c5aa86c861a94 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 18 Sep 2024 06:39:27 -0700 Subject: [PATCH 02/25] Added comments Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 56fd205c8900..8eda55eb45ef 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -5453,7 +5453,15 @@ void amCheckLiveness(void) { // Interface: RMA // -// OFI-specific non-blocking handle implementation +// OFI-specific non-blocking handle implementation + +// Non-blocking operations require bound endpoints, to avoid having one thread +// with a pending operation while the endpoint is in use by a different +// thread. Since we assume bound endpoints are the norm, it's easiest to just +// disallow non-bound endpoints. This allows the "completed" flag to be a +// simple boolean. The "complete" flags for the sub-operations are booleans +// because the lower-level code that uses them does not assume bound +// endpoints. typedef struct chpl_comm_ofi_nb_handle_t { chpl_bool completed; // operation has completed @@ -5461,6 +5469,18 @@ typedef struct chpl_comm_ofi_nb_handle_t { chpl_atomic_bool complete[1]; // flag for sub-operation completion } chpl_comm_ofi_nb_handle_t; +/* + * chpl_comm_put_nb + * + * Non-blocking PUT. The PUT may complete after this function returns. Returns + * a handle that can be used to wait for and check the status of the PUT. The + * handle may be NULL, in which case the PUT has already completed. The + * memory buffer must not be modified before the PUT completes. Completion + * indicates that subsequent PUTs to the same memory will occur after the + * completed PUT; it does not mean that the results of the PUT are visible in + * memory (see the README.md for details). Concurrent non-blocking PUTs may + * occur in any order. + */ chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { @@ -5566,7 +5586,6 @@ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, if (handleComplete) { completed = true; handle->completed = true; - // break here when one handle completes instead of checking them all? } } DBG_PRINTF(DBG_RMA, "check_complete blocking %s", blocking ? "true" : "false"); From 132677df14f6e8d51d03f8b9d078be04c89761bd Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 18 Sep 2024 06:48:11 -0700 Subject: [PATCH 03/25] Free non-blocking handle after operation completes Signed-off-by: John H. Hartman --- runtime/src/chpl-cache.c | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/src/chpl-cache.c b/runtime/src/chpl-cache.c index 11acf41d4717..2503eafb1eb6 100644 --- a/runtime/src/chpl-cache.c +++ b/runtime/src/chpl-cache.c @@ -1967,6 +1967,7 @@ chpl_bool do_wait_for(struct rdcache_s* cache, cache_seqn_t sn) // Whether we waited above or not, if the first entry's event // is already complete, then remove it from the queue. if (chpl_comm_test_nb_complete(cache->pending[index])) { + chpl_comm_free_nb(cache->pending[index]); fifo_circleb_pop(&cache->pending_first_entry, &cache->pending_last_entry, cache->pending_len); From 11eccc12ca2b48290b32c7a8d77d8e720c79014f Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 18 Sep 2024 10:26:15 -0700 Subject: [PATCH 04/25] Cleanup Signed-off-by: John H. Hartman --- runtime/include/chpl-comm.h | 1 + runtime/src/comm/ofi/comm-ofi.c | 42 +++++++++------------------------ 2 files changed, 12 insertions(+), 31 deletions(-) diff --git a/runtime/include/chpl-comm.h b/runtime/include/chpl-comm.h index 2bb1e54ddc78..e24ef5d8647a 100644 --- a/runtime/include/chpl-comm.h +++ b/runtime/include/chpl-comm.h @@ -155,6 +155,7 @@ void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); // detected. int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); +// Free a handle returned by chpl_comm_*_nb. void chpl_comm_free_nb(chpl_comm_nb_handle_t* h); // Returns whether or not the passed wide address is known to be in diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 8eda55eb45ef..22d947ea4790 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -5455,13 +5455,13 @@ void amCheckLiveness(void) { // OFI-specific non-blocking handle implementation -// Non-blocking operations require bound endpoints, to avoid having one thread -// with a pending operation while the endpoint is in use by a different -// thread. Since we assume bound endpoints are the norm, it's easiest to just -// disallow non-bound endpoints. This allows the "completed" flag to be a -// simple boolean. The "complete" flags for the sub-operations are booleans -// because the lower-level code that uses them does not assume bound -// endpoints. +// Non-blocking operations require bound endpoints, to avoid having a handle +// for a pending operation held by one thread, while the endpoint is in use +// by a different thread. Bound endpoints are the norm, so it's easiest to +// just disallow non-blocking operations on non-bound endpoints. This allows +// the "completed" flag to be a simple boolean. The "complete" flags for the +// sub-operations are booleans because the lower-level code that uses them +// does not assume bound endpoints. typedef struct chpl_comm_ofi_nb_handle_t { chpl_bool completed; // operation has completed @@ -5531,33 +5531,23 @@ chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, return NULL; } - int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h; chpl_comm_diags_incr(test_nb); - DBG_PRINTF(DBG_RMA, "chpl_comm_test_nb_complete %p", handle); - int completed = 1; - if (handle != NULL) { - completed = handle->completed; - } - DBG_PRINTF(DBG_RMA, "chpl_comm_test_nb_complete %p %s", handle, - completed ? "true" : "false"); - return completed; + return handle != NULL ? handle->completed : 1; } /* * check_complete * - * Returns true if a new handle completion is detected, false otherwise + * Returns true if a new handle completion is detected, false otherwise. * Ignores handles that have previously completed. If blocking is true and - * there are uncompleted handles this will not return until a new handle - * completion is detected. + * there are uncompleted handles this will not return until a new completion + * is detected. */ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, chpl_bool blocking) { - DBG_PRINTF(DBG_RMA, "check_complete"); - chpl_bool completed = false; // at least one new completion detected chpl_bool pending = false; // there is a handle with uncompleted operations struct perTxCtxInfo_t* tcip = NULL; @@ -5565,7 +5555,6 @@ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, pending = false; for(size_t i = 0; i < nhandles; i++) { chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h[i]; - DBG_PRINTF(DBG_RMA, "handle[%d] %p", i, handle); // ignore handles that have already completed // NULL handles have by definition already completed @@ -5588,9 +5577,6 @@ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, handle->completed = true; } } - DBG_PRINTF(DBG_RMA, "check_complete blocking %s", blocking ? "true" : "false"); - DBG_PRINTF(DBG_RMA, "check_complete completed %s", completed ? "true" : "false"); - DBG_PRINTF(DBG_RMA, "check_complete pending %s", pending ? "true" : "false"); if (!blocking || completed || !pending) { break; } @@ -5598,30 +5584,24 @@ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, if (tcip == NULL) { CHK_TRUE((tcip = tciAlloc()) != NULL); } - DBG_PRINTF(DBG_RMA, "check_complete yielding tcip %p", tcip); sched_yield(); (*tcip->ensureProgressFn)(tcip); } if (tcip) { tciFree(tcip); } - DBG_PRINTF(DBG_RMA, "check_complete returning %s", completed ? - "true" : "false"); return completed; } void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(wait_nb); - DBG_PRINTF(DBG_RMA, "chpl_comm_wait_nb_some"); - (void) check_complete(h, nhandles, true /*blocking*/); } int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(try_nb); - DBG_PRINTF(DBG_RMA, "chpl_comm_try_nb_some"); return check_complete(h, nhandles, false /*blocking*/); } From a77c004441a347093e98f1d427d04a73be5041dd Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Thu, 19 Sep 2024 14:18:34 -0700 Subject: [PATCH 05/25] Rewrote PUT logic Rewrote PUT logic so that low-level functions are non-blocking, and a blocking PUT is implemented by initiating a non-blocking PUT and waiting for it to complete. This simplifies the implementation and avoids code duplication. Signed-off-by: John H. Hartman --- runtime/include/chpl-comm.h | 2 +- runtime/src/comm/ofi/comm-ofi.c | 440 ++++++++++++++++---------------- 2 files changed, 225 insertions(+), 217 deletions(-) diff --git a/runtime/include/chpl-comm.h b/runtime/include/chpl-comm.h index e24ef5d8647a..07a4037e2c23 100644 --- a/runtime/include/chpl-comm.h +++ b/runtime/include/chpl-comm.h @@ -156,7 +156,7 @@ void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); // Free a handle returned by chpl_comm_*_nb. -void chpl_comm_free_nb(chpl_comm_nb_handle_t* h); +void chpl_comm_free_nb(chpl_comm_nb_handle_t h); // Returns whether or not the passed wide address is known to be in // a communicable memory region and known to be readable. That is, diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 22d947ea4790..f9c03e98a26d 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -359,6 +359,19 @@ static const char* mcmModeNames[] = { "undefined", static bool cxiHybridMRMode = false; + +// +// Non-blocking handle +// +typedef struct nb_handle { + chpl_taskID_t id; // task that created the handle + chpl_bool reported; // operation has been reported as complete + chpl_atomic_bool complete; // operation has completed + struct nb_handle *next; +} nb_handle; + +typedef nb_handle* nb_handle_t; + //////////////////////////////////////// // // Forward decls @@ -369,8 +382,8 @@ static struct perTxCtxInfo_t* tciAllocForAmHandler(void); static chpl_bool tciAllocTabEntry(struct perTxCtxInfo_t*); static void tciFree(struct perTxCtxInfo_t*); static void waitForCQSpace(struct perTxCtxInfo_t*, size_t); -static chpl_comm_nb_handle_t ofi_put(const void*, c_nodeid_t, void*, size_t, - chpl_bool); +static void ofi_put(const void*, c_nodeid_t, void*, size_t); +static nb_handle_t ofi_put_nb(nb_handle_t, const void*, c_nodeid_t, void*, size_t); static void ofi_put_lowLevel(const void*, void*, c_nodeid_t, uint64_t, uint64_t, size_t, void*, uint64_t, struct perTxCtxInfo_t*); @@ -379,6 +392,8 @@ static chpl_comm_nb_handle_t ofi_get(void*, c_nodeid_t, void*, size_t); static void ofi_get_lowLevel(void*, void*, c_nodeid_t, uint64_t, uint64_t, size_t, void*, uint64_t, struct perTxCtxInfo_t*); +static chpl_bool check_complete(nb_handle_t*, size_t, chpl_bool); + static void do_remote_get_buff(void*, c_nodeid_t, void*, size_t); static void do_remote_amo_nf_buff(void*, c_nodeid_t, void*, size_t, enum fi_op, enum fi_datatype); @@ -3322,7 +3337,7 @@ void chpl_comm_broadcast_private(int id, size_t size) { for (int i = 0; i < chpl_numNodes; i++) { if (i != chpl_nodeID) { (void) ofi_put(chpl_rt_priv_bcast_tab[id], i, - chplPrivBcastTabMap[i][id], size, true /*blocking*/); + chplPrivBcastTabMap[i][id], size); } } } @@ -4170,7 +4185,7 @@ static void am_debugPrep(amRequest_t*); static void amRequestExecOn(c_nodeid_t, c_sublocid_t, chpl_fn_int_t, chpl_comm_on_bundle_t*, size_t, chpl_bool, chpl_bool); -static void amRequestRmaPut(c_nodeid_t, void*, void*, size_t, chpl_bool); +static void amRequestRmaPut(c_nodeid_t, void*, void*, size_t); static void amRequestRmaGet(c_nodeid_t, void*, void*, size_t); static void amRequestAMO(c_nodeid_t, void*, const void*, const void*, void*, int, enum fi_datatype, size_t); @@ -4317,14 +4332,11 @@ void amRequestExecOn(c_nodeid_t node, c_sublocid_t subloc, /* * amRequestRmaPut * - * Performs a PUT by sending an active message to the remote node that - * causes it to perform GET. This is currently a blocking operation - * so the "blocking" argument is unused. When this operation returns - * the data have been successfully transmitted to the remote node. + * Performs a PUT by sending an active message to the remote node that causes + * it to perform a GET. This operation returns when the GET has completed. */ static inline -void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size, - chpl_bool blocking /* unused */) { +void amRequestRmaPut(c_nodeid_t node, void* addr, void* raddr, size_t size) { assert(!isAmHandler); retireDelayedAmDone(false /*taskIsEnding*/); @@ -5288,8 +5300,7 @@ void amWrapPut(struct taskArg_RMA_t* tsk_rma) { DBG_PRINTF(DBG_AM | DBG_AM_RECV, "%s", am_reqStartStr((amRequest_t*) rma)); CHK_TRUE(mrGetKey(NULL, NULL, rma->b.node, rma->raddr, rma->size)); - (void) ofi_put(rma->addr, rma->b.node, rma->raddr, rma->size, - true /*blocking*/); + (void) ofi_put(rma->addr, rma->b.node, rma->raddr, rma->size); // // Note: the RMA bytes must be visible in target memory before the @@ -5455,6 +5466,8 @@ void amCheckLiveness(void) { // OFI-specific non-blocking handle implementation +// XXX update + // Non-blocking operations require bound endpoints, to avoid having a handle // for a pending operation held by one thread, while the endpoint is in use // by a different thread. Bound endpoints are the norm, so it's easiest to @@ -5463,35 +5476,32 @@ void amCheckLiveness(void) { // sub-operations are booleans because the lower-level code that uses them // does not assume bound endpoints. -typedef struct chpl_comm_ofi_nb_handle_t { - chpl_bool completed; // operation has completed - size_t count; // number of sub-operations - chpl_atomic_bool complete[1]; // flag for sub-operation completion -} chpl_comm_ofi_nb_handle_t; +static inline +void nb_handle_init(nb_handle_t h) { + h->id = chpl_task_getId(); + h->reported = false; + atomic_init_bool(&h->complete, false); + h->next = NULL; +} + +static inline +void nb_handle_destroy(nb_handle_t h) { + atomic_destroy_bool(&h->complete); +} /* - * chpl_comm_put_nb + * put_prologue * - * Non-blocking PUT. The PUT may complete after this function returns. Returns - * a handle that can be used to wait for and check the status of the PUT. The - * handle may be NULL, in which case the PUT has already completed. The - * memory buffer must not be modified before the PUT completes. Completion - * indicates that subsequent PUTs to the same memory will occur after the - * completed PUT; it does not mean that the results of the PUT are visible in - * memory (see the README.md for details). Concurrent non-blocking PUTs may - * occur in any order. + * Common prologue operations for chpl_comm_put and chpl_comm_put_nb. Returns + * true if the PUT should proceed, false if it was handled in this function. */ -chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, - void* raddr, size_t size, - int32_t commID, int ln, int32_t fn) { - DBG_PRINTF(DBG_IFACE, - "%s(%p, %d, %p, %zd, %d)", __func__, - addr, (int) node, raddr, size, (int) commID); +static inline +chpl_bool put_prologue(void* addr, c_nodeid_t node, void* raddr, size_t size, + int32_t commID, int ln, int32_t fn) { + chpl_bool proceed = false; retireDelayedAmDone(false /*taskIsEnding*/); - chpl_comm_ofi_nb_handle_t *handle = NULL; - // // Sanity checks, self-communication. // @@ -5517,12 +5527,36 @@ chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); chpl_comm_diags_incr(put); - - handle = ofi_put(addr, node, raddr, size, false /*blocking*/); + proceed = true; done: - return handle; + return proceed; } +/* + * chpl_comm_put_nb + * + * Non-blocking PUT. The PUT may complete after this function returns. Returns + * a handle that can be used to wait for and check the status of the PUT. The + * handle may be NULL, in which case the PUT has already completed. The + * memory buffer must not be modified before the PUT completes. Completion + * indicates that subsequent PUTs to the same memory will occur after the + * completed PUT; it does not mean that the results of the PUT are visible in + * memory (see the README.md for details). Concurrent non-blocking PUTs may + * occur in any order. + */ +chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, + void* raddr, size_t size, + int32_t commID, int ln, int32_t fn) { + DBG_PRINTF(DBG_IFACE, + "%s(%p, %d, %p, %zd, %d)", __func__, + addr, (int) node, raddr, size, (int) commID); + + nb_handle_t handle = NULL; + if (put_prologue(addr, node, raddr, size, commID, ln, fn)) { + handle = ofi_put_nb(handle, addr, node, raddr, size); + } + return (chpl_comm_nb_handle_t) handle; +} chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, void* raddr, size_t size, @@ -5531,50 +5565,77 @@ chpl_comm_nb_handle_t chpl_comm_get_nb(void* addr, c_nodeid_t node, return NULL; } + +static inline +int test_nb_complete(nb_handle_t handle) { + return handle != NULL ? handle->reported : 1; +} + +static inline +void wait_nb_some(nb_handle_t *handles, size_t nhandles) { + (void) check_complete(handles, nhandles, true /*blocking*/); +} + +static inline +int try_nb_some(nb_handle_t *handles, size_t nhandles) { + return check_complete(handles, nhandles, false /*blocking*/); +} + int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { - chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h; chpl_comm_diags_incr(test_nb); - return handle != NULL ? handle->completed : 1; + return test_nb_complete((nb_handle_t) h); } /* * check_complete * * Returns true if a new handle completion is detected, false otherwise. - * Ignores handles that have previously completed. If blocking is true and - * there are uncompleted handles this will not return until a new completion - * is detected. + * Ignores handles that have previously completed (h->reported == true). If + * blocking is true and there are uncompleted handles this will not return + * until a new completion is detected. */ -static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, +static +chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, chpl_bool blocking) { chpl_bool completed = false; // at least one new completion detected - chpl_bool pending = false; // there is a handle with uncompleted operations + chpl_bool pending = false; // there is an uncompleted handle + if ((handles == NULL) || (nhandles == 0)) { + goto done; + } struct perTxCtxInfo_t* tcip = NULL; while (true) { pending = false; for(size_t i = 0; i < nhandles; i++) { - chpl_comm_ofi_nb_handle_t *handle = (chpl_comm_ofi_nb_handle_t *) h[i]; - + nb_handle_t handle = handles[i]; // ignore handles that have already completed // NULL handles have by definition already completed - if ((handle == NULL) || handle->completed) { + if ((handle == NULL) || handle->reported) { continue; } + if (handle->id != chpl_task_getId()) { + char msg[128]; + char task1[32]; + char task2[32]; + snprintf(msg, sizeof(msg), + "Task %s did not create non-blocking handle (created by %s)", + chpl_task_idToString(task1, sizeof(task1), chpl_task_getId()), + chpl_task_idToString(task2, sizeof(task2), handle->id)); + } pending = true; - // determine if this handle is now complete by checking the status - // of its individual operations - chpl_bool handleComplete = true; - for (size_t j = 0; j < handle->count; j++) { - if(!atomic_load_explicit_bool(&handle->complete[j], + // determine if this handle is now complete by checking the completion + // status of its operations + chpl_bool allComplete = true; + for (nb_handle_t p = handle; p != NULL; p = p->next) { + if(!atomic_load_explicit_bool(&p->complete, chpl_memory_order_acquire)) { - handleComplete = false; + allComplete = false; break; } } - if (handleComplete) { + if (allComplete) { completed = true; - handle->completed = true; + handle->reported = true; } } if (!blocking || completed || !pending) { @@ -5590,24 +5651,27 @@ static chpl_bool check_complete(chpl_comm_nb_handle_t* h, size_t nhandles, if (tcip) { tciFree(tcip); } +done: return completed; } void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(wait_nb); - - (void) check_complete(h, nhandles, true /*blocking*/); + wait_nb_some((nb_handle_t *) h, nhandles); } int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(try_nb); - - return check_complete(h, nhandles, false /*blocking*/); + return try_nb_some((nb_handle_t *) h, nhandles); } -void chpl_comm_free_nb(chpl_comm_nb_handle_t* h) { - if (h != NULL) { - chpl_mem_free(h, 0, 0); +void chpl_comm_free_nb(chpl_comm_nb_handle_t h) { + nb_handle_t handle = (nb_handle_t) h; + nb_handle_t next; + for (; handle != NULL; handle = next) { + next = handle->next; + nb_handle_destroy(handle); + chpl_mem_free(handle, 0, 0); } } @@ -5617,38 +5681,11 @@ void chpl_comm_put(void* addr, c_nodeid_t node, void* raddr, "%s(%p, %d, %p, %zd, %d)", __func__, addr, (int) node, raddr, size, (int) commID); - retireDelayedAmDone(false /*taskIsEnding*/); - - // - // Sanity checks, self-communication. - // - CHK_TRUE(addr != NULL); - CHK_TRUE(raddr != NULL); - - if (size == 0) { - return; - } - - if (node == chpl_nodeID) { - memmove(raddr, addr, size); - return; - } - - // Communications callback support - if (chpl_comm_have_callbacks(chpl_comm_cb_event_kind_put)) { - chpl_comm_cb_info_t cb_data = - {chpl_comm_cb_event_kind_put, chpl_nodeID, node, - .iu.comm={addr, raddr, size, commID, ln, fn}}; - chpl_comm_do_callbacks (&cb_data); + if (put_prologue(addr, node, raddr, size, commID, ln, fn)) { + ofi_put(addr, node, raddr, size); } - - chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); - chpl_comm_diags_incr(put); - - (void) ofi_put(addr, node, raddr, size, true /*blocking*/); } - void chpl_comm_get(void* addr, int32_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { DBG_PRINTF(DBG_IFACE, @@ -6000,6 +6037,7 @@ void tciFree(struct perTxCtxInfo_t* tcip) { // if (!tcip->bound) { DBG_PRINTF(DBG_TCIPS, "free tciTab[%td]", tcip - tciTab); + forceMemFxVisAllNodes(true, true, -1, tcip); atomic_store_bool(&tcip->allocated, false); } } @@ -6019,30 +6057,54 @@ void waitForCQSpace(struct perTxCtxInfo_t* tcip, size_t len) { } } -typedef void (rmaPutFn_t)(void* myAddr, void* mrDesc, +typedef void (rmaPutFn_t)(nb_handle_t handle, void* myAddr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, - chpl_bool blocking, - chpl_atomic_bool *done, struct perTxCtxInfo_t* tcip); static rmaPutFn_t rmaPutFn_selector; +/* + * ofi_put + * + * Blocking PUT. Implemented by initiating a non-blocking PUT and waiting for + * it to complete. + */ + static inline -chpl_comm_nb_handle_t ofi_put(const void* addr, c_nodeid_t node, - void* raddr, size_t size, chpl_bool blocking) { +void ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size) { + + // Allocate the handle on the stack to avoid malloc overhead + nb_handle handle_struct; + nb_handle_t handle = &handle_struct; + nb_handle_init(handle); + + handle = ofi_put_nb(handle, addr, node, raddr, size); + do { + wait_nb_some(&handle, 1); + } while(!test_nb_complete(handle)); + nb_handle_destroy(handle); +} + +/* + * ofi_put_nb + * + * Non-blocking PUT. Returns a handle that can be used to test the completion + * status of the PUT and wait for it to complete. If the PUT is too large + * for the fabric it is broken into multiple PUTs. + * + */ +static +nb_handle_t ofi_put_nb(nb_handle_t handle, const void* addr, c_nodeid_t node, + void* raddr, size_t size) { char *src = (char *) addr; char *dest = (char *) raddr; - chpl_comm_ofi_nb_handle_t *handle = NULL; + nb_handle_t prev = NULL; + nb_handle_t first = NULL; - // Determine how many operations the PUT requires based on the provider's - // maximum message size - - int ops = (size + ofi_info->ep_attr->max_msg_size - 1) / - ofi_info->ep_attr->max_msg_size; - if (ops > 1) { + if (size > ofi_info->ep_attr->max_msg_size) { DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "splitting large PUT %d:%p <= %p, size %zd", (int) node, raddr, addr, size); @@ -6050,38 +6112,34 @@ chpl_comm_nb_handle_t ofi_put(const void* addr, c_nodeid_t node, struct perTxCtxInfo_t* tcip = NULL; CHK_TRUE((tcip = tciAlloc()) != NULL); - if (!blocking && !tcip->bound) { - // Non-blocking operations require bound endpoints - blocking = true; - } - - if (!blocking) { - // Allocate a handle large enough to hold one "done" flags per op - int handleSize = sizeof(chpl_comm_ofi_nb_handle_t) + - ((ops - 1) * sizeof(chpl_atomic_bool)); - handle = chpl_mem_alloc(handleSize, CHPL_RT_MD_COMM_NB_HANDLE, 0, 0); - - handle->count = ops; - handle->completed = false; - for (size_t i = 0; i < ops; i++) { - atomic_init_bool(&handle->complete[i], false); - } - } size_t chunkSize = ofi_info->ep_attr->max_msg_size; size_t offset = 0; - for (int i = 0; i < ops; i++) { + while (offset < size) { if (chunkSize > size - offset) { chunkSize = size - offset; } DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, - "PUT %d:%p <= %p, size %zd, %s", - (int) node, dest, src, chunkSize, - blocking ? "blocking" : "non-blocking"); + "PUT %d:%p <= %p, size %zd", + (int) node, dest, src, chunkSize); + + if (handle == NULL) { + handle = chpl_mem_alloc(sizeof(*handle), + CHPL_RT_MD_COMM_NB_HANDLE, 0, 0); + nb_handle_init(handle); + } + // Make a linked-list of handles + if (prev != NULL) { + prev->next = handle; + } + // Keep track of the first handle so we can return it. + if (first == NULL) { + first = handle; + } // - // If the remote address is directly accessible do an RMA from this - // side; otherwise do the opposite RMA from the other side. + // If the remote address is directly accessible do a PUT RMA from this + // side; otherwise do a GET from the other side. // uint64_t mrKey; uint64_t mrRaddr; @@ -6095,27 +6153,24 @@ chpl_comm_nb_handle_t ofi_put(const void* addr, c_nodeid_t node, void* myAddr = mrLocalizeSource(&mrDesc, (const void *) src, chunkSize, "PUT src"); - chpl_atomic_bool *done = blocking ? NULL : &handle->complete[i]; - rmaPutFn_selector(myAddr, mrDesc, node, mrRaddr, mrKey, chunkSize, - blocking, done, tcip); + rmaPutFn_selector(handle, myAddr, mrDesc, node, mrRaddr, + mrKey, chunkSize, tcip); mrUnLocalizeSource(myAddr, src); } else { - amRequestRmaPut(node, (void *) src, (void *) dest, size, blocking); - // amRequestRmaPut is currently a blocking operation, so mark - // the operation as complete - if (!blocking) { - atomic_init_bool(&(handle->complete[i]), true); - } + amRequestRmaPut(node, (void *) src, (void *) dest, size); + atomic_store_bool(&handle->complete, true); } offset += chunkSize; src += chunkSize; dest += chunkSize; + prev = handle; + handle = NULL; } tciFree(tcip); DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, - "PUT %d:%p <= %p, handle %p", handle); - return handle; + "PUT %d:%p <= %p, handle %p", first); + return first; } @@ -6124,26 +6179,24 @@ static rmaPutFn_t rmaPutFn_msgOrd; static rmaPutFn_t rmaPutFn_dlvrCmplt; static inline -void rmaPutFn_selector(void* myAddr, void* mrDesc, +void rmaPutFn_selector(nb_handle_t handle, void* myAddr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, - chpl_bool blocking, - chpl_atomic_bool *done, struct perTxCtxInfo_t* tcip) { switch (mcmMode) { case mcmm_msgOrdFence: - rmaPutFn_msgOrdFence(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, done, tcip); + rmaPutFn_msgOrdFence(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); break; case mcmm_msgOrd: - rmaPutFn_msgOrd(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, done, tcip); + rmaPutFn_msgOrd(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); break; case mcmm_dlvrCmplt: - rmaPutFn_dlvrCmplt(myAddr, mrDesc, node, mrRaddr, mrKey, size, - blocking, done, tcip); + rmaPutFn_dlvrCmplt(handle, myAddr, mrDesc, node, mrRaddr, mrKey, size, + tcip); break; default: INTERNAL_ERROR_V("unexpected mcmMode %d", mcmMode); @@ -6166,26 +6219,22 @@ static ssize_t wrap_fi_writemsg(const void* addr, void* mrDesc, // -// Implements ofi_put() when MCM mode is message ordering with fences. +// Implements ofi_put_nb() when MCM mode is message ordering with fences. // static -void rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, +void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, - chpl_bool blocking, - chpl_atomic_bool *done, struct perTxCtxInfo_t* tcip) { - uint64_t flags = 0; - chpl_atomic_bool txnDone; - void *ctx; + uint64_t flags = 0; if (tcip->bound && size <= ofi_info->tx_attr->inject_size - && !blocking && envInjectRMA) { + && envInjectRMA) { // // Special case: write injection has the least latency. We can use it if - // this PUT is non-blocking, its size doesn't exceed the injection size + // this PUT doesn't exceed the injection size // limit, and we have a bound tx context so we can delay forcing the // memory visibility until later. // @@ -6195,110 +6244,69 @@ void rmaPutFn_msgOrdFence(void* myAddr, void* mrDesc, // // Special case: If our last operation was an AMO then we need to do a // fenced PUT to force the AMO to be visible before this PUT. + // TODO: this logic is a bit screwed-up. FI_FENCE by itself doesn't + // force the AMO to be visible, it just ensures that the PUT cannot pass + // the AMO. We need to do something to make it visible, and we need + // to clear the bitmap so that we don't keep fencing PUTs until something + // else makes it visible. // flags |= FI_FENCE; } - if (done == NULL) { - done = &txnDone; - } - ctx = txCtxInit(tcip, __LINE__, done); + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); - if (blocking) { - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - } - // // When using message ordering we have to do something after the PUT // to force it into visibility, and on the same tx context as the PUT // itself because libfabric message ordering is specific to endpoint - // pairs. With a bound tx context we can do it later, when needed. - // Otherwise we have to do it here, before we release the tx context. + // pairs. Indicate that there is dangling PUT to the remote node. // - if (tcip->bound) { - bitmapSet(tcip->putVisBitmap, node); - } else { - mcmReleaseOneNode(node, tcip, "PUT"); - } + bitmapSet(tcip->putVisBitmap, node); } // -// Implements ofi_put() when MCM mode is message ordering. +// Implements ofi_put_nb() when MCM mode is message ordering. // TODO: see comment for rmaPutFn_msgOrdFence. static -void rmaPutFn_msgOrd(void* myAddr, void* mrDesc, +void rmaPutFn_msgOrd(nb_handle_t handle, void* myAddr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, - chpl_bool blocking, - chpl_atomic_bool *done, struct perTxCtxInfo_t* tcip) { uint64_t flags = 0; - chpl_atomic_bool txnDone; - void *ctx; - // - // When using message ordering we have to do something after the PUT - // to force it into visibility, and on the same tx context as the PUT - // itself because libfabric message ordering is specific to endpoint - // pairs. With a bound tx context we can do it later, when needed. - // Otherwise we have to do it here, before we release the tx context. - // if (tcip->bound && size <= ofi_info->tx_attr->inject_size - && !blocking && envInjectRMA) { + && envInjectRMA) { // + // XXX update this // Special case: write injection has the least latency. We can use // that if this PUT's size doesn't exceed the injection size limit // and we have a bound tx context so we can delay forcing the // memory visibility until later. flags = FI_INJECT; } - if (done == NULL) { - done = &txnDone; - } - ctx = TX_CTX_INIT(tcip, blocking, done); + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_writemsg(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, flags, tcip); - - if (blocking) { - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - } - - if (tcip->bound) { - bitmapSet(tcip->putVisBitmap, node); - } else { - mcmReleaseOneNode(node, tcip, "PUT"); - } + bitmapSet(tcip->putVisBitmap, node); } // -// Implements ofi_put() when MCM mode is delivery complete. +// Implements ofi_put_nb() when MCM mode is delivery complete. // static -void rmaPutFn_dlvrCmplt(void* myAddr, void* mrDesc, +void rmaPutFn_dlvrCmplt(nb_handle_t handle, void* myAddr, void* mrDesc, c_nodeid_t node, uint64_t mrRaddr, uint64_t mrKey, size_t size, - chpl_bool blocking, - chpl_atomic_bool *done, struct perTxCtxInfo_t* tcip) { - chpl_atomic_bool txnDone; - if (done == NULL) { - done = &txnDone; - } - void *ctx = TX_CTX_INIT(tcip, blocking, done); + void *ctx = txCtxInit(tcip, __LINE__, &handle->complete); (void) wrap_fi_write(myAddr, mrDesc, node, mrRaddr, mrKey, size, ctx, tcip); - if (blocking) { - waitForTxnComplete(tcip, ctx); - txCtxCleanup(ctx); - } } @@ -6454,7 +6462,7 @@ void do_remote_put_buff(void* addr, c_nodeid_t node, void* raddr, if (size > MAX_UNORDERED_TRANS_SZ || !mrGetKey(&mrKey, &mrRaddr, node, raddr, size) || (info = task_local_buff_acquire(put_buff)) == NULL) { - (void) ofi_put(addr, node, raddr, size, true /*blocking*/); + (void) ofi_put(addr, node, raddr, size); return; } @@ -8300,7 +8308,7 @@ void chpl_comm_impl_barrier(const char *msg) { DBG_PRINTF(DBG_BARRIER, "BAR notify parent %d", (int) bar_parent); ofi_put(&one, bar_parent, (void*) &bar_infoMap[bar_parent]->child_notify[parChild], - sizeof(one), true /*blocking*/); + sizeof(one)); // // Wait for our parent locale to release us from the barrier. @@ -8327,7 +8335,7 @@ void chpl_comm_impl_barrier(const char *msg) { DBG_PRINTF(DBG_BARRIER, "BAR release child %d", (int) child); ofi_put(&one, child, (void*) &bar_infoMap[child]->parent_release, - sizeof(one), true /*blocking*/); + sizeof(one)); } } From 08861b52c30418b731e5562d05e8cd73a6c945bb Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 11:01:36 -0700 Subject: [PATCH 06/25] Add environment variables for testing Allow specifying the maximum message size and maximum number of endpoings. These are intended primarily for testing. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index f9c03e98a26d..b1fadcd88736 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -1588,6 +1588,13 @@ struct fi_info* findProvInList(struct fi_info* info, if (best && (isInProvider("efa", best))) { best->tx_attr->inject_size = 0; } + + // Set the maximum message size if specified + + best->ep_attr->max_msg_size = + chpl_env_rt_get_int("COMM_OFI_MAX_MSG_SIZE", + best->ep_attr->max_msg_size); + return (best == NULL) ? NULL : fi_dupinfo(best); } @@ -1707,6 +1714,11 @@ chpl_bool canBindTxCtxs(struct fi_info* info) { // endpoints. Until that is fixed, assume it can create as many endpoints // as we need. size_t epCount = isInProvider("cxi", info) ? SIZE_MAX : dom_attr->ep_cnt; + + // Set the maximum number of endpoints if specified + + epCount = chpl_env_rt_get_int("COMM_OFI_MAX_ENDPOINTS", epCount); + size_t numWorkerTxCtxs = ((envPreferScalableTxEp && dom_attr->max_ep_tx_ctx > 1) ? dom_attr->max_ep_tx_ctx From 6baa0e0a1a1ced0eef7f26b9ef213ede0110a493 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 11:09:52 -0700 Subject: [PATCH 07/25] Free dynamically-allocated handles in ofi_put Also some code cleanup. Signed-off-by: John H. Hartman --- runtime/include/chpl-comm.h | 2 +- runtime/src/chpl-cache.c | 2 +- runtime/src/comm/ofi/comm-ofi.c | 28 ++++++++++++++++------------ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/runtime/include/chpl-comm.h b/runtime/include/chpl-comm.h index 07a4037e2c23..332f361b26e6 100644 --- a/runtime/include/chpl-comm.h +++ b/runtime/include/chpl-comm.h @@ -156,7 +156,7 @@ void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles); // Free a handle returned by chpl_comm_*_nb. -void chpl_comm_free_nb(chpl_comm_nb_handle_t h); +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h); // Returns whether or not the passed wide address is known to be in // a communicable memory region and known to be readable. That is, diff --git a/runtime/src/chpl-cache.c b/runtime/src/chpl-cache.c index 2503eafb1eb6..4ff6008409a3 100644 --- a/runtime/src/chpl-cache.c +++ b/runtime/src/chpl-cache.c @@ -1967,7 +1967,7 @@ chpl_bool do_wait_for(struct rdcache_s* cache, cache_seqn_t sn) // Whether we waited above or not, if the first entry's event // is already complete, then remove it from the queue. if (chpl_comm_test_nb_complete(cache->pending[index])) { - chpl_comm_free_nb(cache->pending[index]); + chpl_comm_free_nb_handle(cache->pending[index]); fifo_circleb_pop(&cache->pending_first_entry, &cache->pending_last_entry, cache->pending_len); diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index b1fadcd88736..1c0a2ec7a420 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -1717,7 +1717,7 @@ chpl_bool canBindTxCtxs(struct fi_info* info) { // Set the maximum number of endpoints if specified - epCount = chpl_env_rt_get_int("COMM_OFI_MAX_ENDPOINTS", epCount); + epCount = chpl_env_rt_get_int("COMM_OFI_EP_CNT", epCount); size_t numWorkerTxCtxs = ((envPreferScalableTxEp && dom_attr->max_ep_tx_ctx > 1) @@ -5583,16 +5583,6 @@ int test_nb_complete(nb_handle_t handle) { return handle != NULL ? handle->reported : 1; } -static inline -void wait_nb_some(nb_handle_t *handles, size_t nhandles) { - (void) check_complete(handles, nhandles, true /*blocking*/); -} - -static inline -int try_nb_some(nb_handle_t *handles, size_t nhandles) { - return check_complete(handles, nhandles, false /*blocking*/); -} - int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { chpl_comm_diags_incr(test_nb); return test_nb_complete((nb_handle_t) h); @@ -5667,17 +5657,27 @@ chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, return completed; } +static inline +void wait_nb_some(nb_handle_t *handles, size_t nhandles) { + (void) check_complete(handles, nhandles, true /*blocking*/); +} + void chpl_comm_wait_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(wait_nb); wait_nb_some((nb_handle_t *) h, nhandles); } +static inline +int try_nb_some(nb_handle_t *handles, size_t nhandles) { + return check_complete(handles, nhandles, false /*blocking*/); +} + int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) { chpl_comm_diags_incr(try_nb); return try_nb_some((nb_handle_t *) h, nhandles); } -void chpl_comm_free_nb(chpl_comm_nb_handle_t h) { +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { nb_handle_t handle = (nb_handle_t) h; nb_handle_t next; for (; handle != NULL; handle = next) { @@ -6096,6 +6096,10 @@ void ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size) { do { wait_nb_some(&handle, 1); } while(!test_nb_complete(handle)); + if (handle->next != NULL) { + // free any handles for sub-operations + chpl_comm_free_nb_handle(handle->next); + } nb_handle_destroy(handle); } From 2e0c3891b0e43b45a8b3774e0396303925f97585 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 11:19:29 -0700 Subject: [PATCH 08/25] Change forceMemFxVisAllNodes to work on unbound endpoints We are now using this function to force visibility when an unbound endpoint is released, so it needs to work on unbound endpoints. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 1c0a2ec7a420..ba0d9d78e1d4 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -7511,15 +7511,11 @@ void forceMemFxVisAllNodes(chpl_bool checkPuts, chpl_bool checkAmos, struct perTxCtxInfo_t* tcip) { // // Enforce MCM: make sure the memory effects of all the operations - // we've done so far, to any node, are actually visible. This is only - // needed if we have a bound tx context. Otherwise, we would have - // forced visibility at the time of the operation. + // we've done so far, to any node, are actually visible. // - if (tcip->bound) { - mcmReleaseAllNodes(checkPuts ? tcip->putVisBitmap : NULL, - checkAmos ? tcip->amoVisBitmap : NULL, - skipNode, tcip, "PUT and/or AMO"); - } + mcmReleaseAllNodes(checkPuts ? tcip->putVisBitmap : NULL, + checkAmos ? tcip->amoVisBitmap : NULL, + skipNode, tcip, "PUT and/or AMO"); } From 47e75cbdc0f6dd77cf6c55bc1b8f13f7b7029935 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 11:33:18 -0700 Subject: [PATCH 09/25] Improved tci debugging output Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index ba0d9d78e1d4..8d95f5c0c72f 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -5971,8 +5971,8 @@ struct perTxCtxInfo_t* tciAllocCommon(chpl_bool bindToAmHandler) { _ttcip->amoVisBitmap = bitmapAlloc(chpl_numNodes); } } - DBG_PRINTF(DBG_TCIPS, "alloc%s tciTab[%td]", - _ttcip->bound ? " bound" : "", _ttcip - tciTab); + DBG_PRINTF(DBG_TCIPS, "alloc%s tciTab[%td] %p", + _ttcip->bound ? " bound" : "", _ttcip - tciTab, _ttcip); return _ttcip; } @@ -6048,7 +6048,7 @@ void tciFree(struct perTxCtxInfo_t* tcip) { // Bound contexts stay bound. We only release non-bound ones. // if (!tcip->bound) { - DBG_PRINTF(DBG_TCIPS, "free tciTab[%td]", tcip - tciTab); + DBG_PRINTF(DBG_TCIPS, "free tciTab[%td] %p", tcip - tciTab, tcip); forceMemFxVisAllNodes(true, true, -1, tcip); atomic_store_bool(&tcip->allocated, false); } From 9f4079e3567a28e45d2c68a905b82328a28fb49c Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 13:01:57 -0700 Subject: [PATCH 10/25] Allocate visibility bitmaps for unbound endpoints Operations to force visibility are deferred until the endpoint is released, which requires the visibility bitmaps. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 8d95f5c0c72f..cbefe62c573f 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -5966,6 +5966,8 @@ struct perTxCtxInfo_t* tciAllocCommon(chpl_bool bindToAmHandler) { if (bindToAmHandler || (tciTabBindTxCtxs && chpl_task_isFixedThread())) { _ttcip->bound = true; + } + if (mcmMode != mcmm_dlvrCmplt) { _ttcip->putVisBitmap = bitmapAlloc(chpl_numNodes); if ((ofi_info->caps & FI_ATOMIC) != 0) { _ttcip->amoVisBitmap = bitmapAlloc(chpl_numNodes); From 06e11af40a855f181a44fd8bc8b4a4eb20257735 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 14:17:56 -0700 Subject: [PATCH 11/25] Fixed number of transmit contexts computation Fixed how the number of transmit contexts needed is computed, and added some comments. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index cbefe62c573f..b0839499d6cd 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -1722,13 +1722,14 @@ chpl_bool canBindTxCtxs(struct fi_info* info) { size_t numWorkerTxCtxs = ((envPreferScalableTxEp && dom_attr->max_ep_tx_ctx > 1) ? dom_attr->max_ep_tx_ctx - : epCount) - - 1 - - numAmHandlers; + : epCount) - 1 - numAmHandlers; + if (envCommConcurrency > 0 && envCommConcurrency < numWorkerTxCtxs) { numWorkerTxCtxs = envCommConcurrency; } + numTxCtxs = numWorkerTxCtxs + 1 + numAmHandlers; + return fixedNumThreads <= numWorkerTxCtxs; } @@ -2526,12 +2527,30 @@ void init_ofiEp(void) { // // Compute numbers of transmit and receive contexts, and then create // the transmit context table. - // + // + // The logic here is a bit convoluted and can probably be cleaned up. See + // the tciTab comment above for more details. For non-scalable endpoints, + // we would like to have one transmit context (and therefore one endpoint) + // per worker thread, one per AM handler, and one for the process in + // general. That will allow us to bind worker threads and AM handlers to + // transmit contexts. If we can't get that many endpoints then transmit + // contexts will not be bound, which signficantly reduces performance. + // + // For scalable endpoints we only need one transmit endpoint with enough + // transmit contexts to bind them as described above. If max_ep_tx_ctx for + // the provider is less than that, then we won't use a scalable endpoint. + // If we are using a scalable endpoint we have to set tx_ctx_cnt to tell + // the provider how many transmit contexts we want per endpoint. + // + int desiredTxCtxs; tciTabBindTxCtxs = canBindTxCtxs(ofi_info); if (tciTabBindTxCtxs) { - numTxCtxs = chpl_task_getFixedNumThreads() + numAmHandlers + 1; + desiredTxCtxs = chpl_task_getFixedNumThreads() + numAmHandlers + 1; } else { - numTxCtxs = chpl_task_getMaxPar() + numAmHandlers + 1; + desiredTxCtxs = chpl_task_getMaxPar() + numAmHandlers + 1; + } + if (desiredTxCtxs < numTxCtxs) { + numTxCtxs = desiredTxCtxs; } DBG_PRINTF(DBG_CFG,"tciTabBindTxCtxs %s numTxCtxs %d numAmHandlers %d", tciTabBindTxCtxs ? "true" : "false", numTxCtxs, numAmHandlers); From 1872b30a47b4274def696303ec1dfdb51498c908 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Fri, 20 Sep 2024 14:40:20 -0700 Subject: [PATCH 12/25] Added tciAlloc call site debug info Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index b0839499d6cd..81107a4ce937 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -74,6 +74,7 @@ #include #include + #include #ifndef MAP_HUGETLB // MAP_HUGETLB is not defined on all systems (e.g. MacOS) @@ -377,7 +378,8 @@ typedef nb_handle* nb_handle_t; // Forward decls // -static struct perTxCtxInfo_t* tciAlloc(void); +static struct perTxCtxInfo_t* tciAllocFunc(const char *, int); +#define tciAlloc() tciAllocFunc(__FILE__, __LINE__) static struct perTxCtxInfo_t* tciAllocForAmHandler(void); static chpl_bool tciAllocTabEntry(struct perTxCtxInfo_t*); static void tciFree(struct perTxCtxInfo_t*); @@ -5947,11 +5949,11 @@ static __thread struct perTxCtxInfo_t* _ttcip; static inline -struct perTxCtxInfo_t* tciAlloc(void) { +struct perTxCtxInfo_t* tciAllocFunc(const char *file, int line) { + DBG_PRINTF(DBG_TCIPS, "tciAlloc %s:%d]", file, line); return tciAllocCommon(false /*bindToAmHandler*/); } - static inline struct perTxCtxInfo_t* tciAllocForAmHandler(void) { return tciAllocCommon(true /*bindToAmHandler*/); From b07b36d74ef8f0130f66b4fbf4fe4d9d686ab602 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Mon, 23 Sep 2024 15:20:38 -0700 Subject: [PATCH 13/25] Change type of numTxCtxs and numRxCtxs to size_t Change type of numTxCtxs and numRxCtxs to size_t to match type of info->domain_attr->ep_cnt. Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 81107a4ce937..207b9871b02c 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -187,8 +187,8 @@ static chpl_bool envInjectAM; // env: inject AM messages static chpl_bool envUseDedicatedAmhCores; // env: use dedicated AM cores static const char* envExpectedProvider; // env: provider we should select -static int numTxCtxs; -static int numRxCtxs; +static size_t numTxCtxs; +static size_t numRxCtxs; struct perTxCtxInfo_t { chpl_atomic_bool allocated; // true: in use; false: available @@ -1231,15 +1231,15 @@ void init_ofi(void) { (tciTab[tciTabLen - 1].txCntr == NULL) ? "CQ" : "counter"); if (ofi_txEpScal != NULL) { DBG_PRINTF(DBG_CFG, - "per node config: 1 scalable tx ep + %d tx ctx%s (%d bound), " - "%d rx ctx%s", + "per node config: 1 scalable tx ep + %zu tx ctx%s (%d bound), " + "%zu rx ctx%s", numTxCtxs, (numTxCtxs == 1) ? "" : "s", tciTabBindTxCtxs ? chpl_task_getFixedNumThreads() : 0, numRxCtxs, (numRxCtxs == 1) ? "" : "s"); } else { DBG_PRINTF(DBG_CFG, - "per node config: %d regular tx ep+ctx%s (%d bound), " - "%d rx ctx%s", + "per node config: %zu regular tx ep+ctx%s (%d bound), " + "%zu rx ctx%s", numTxCtxs, (numTxCtxs == 1) ? "" : "s", tciTabBindTxCtxs ? chpl_task_getFixedNumThreads() : 0, numRxCtxs, (numRxCtxs == 1) ? "" : "s"); From 27f9312f139955daf153d7c2df7ff3a4ee2b2b20 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Mon, 23 Sep 2024 16:05:08 -0700 Subject: [PATCH 14/25] numTxCtxs is now of type size_t Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 207b9871b02c..94ca09dfa5e1 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -8430,7 +8430,7 @@ void ofiErrReport(const char* exprStr, int retVal, const char* errStr) { "OFI error: %s: %s:\n" " The program has reached the limit on the number of files it can\n" " have open at once. This may be because the product of the number\n" - " of locales (%d) and the communication concurrency (roughly %d) is\n" + " of locales (%d) and the communication concurrency (roughly %zu) is\n" " a significant fraction of the open-file limit (%ld). If so,\n" " either setting CHPL_RT_COMM_CONCURRENCY to decrease communication\n" " concurrency or running on fewer locales may allow the program to\n" From e3bc955f523eaafc1762eda9eec2dd06c00e97d8 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Mon, 23 Sep 2024 16:31:03 -0700 Subject: [PATCH 15/25] Better comments Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 94ca09dfa5e1..f23f56290d5b 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -361,9 +361,22 @@ static const char* mcmModeNames[] = { "undefined", static bool cxiHybridMRMode = false; -// -// Non-blocking handle -// +// OFI-specific non-blocking handle implementation + +// This is defined here because it is used in the forward declarations below. +// The rountines to initialize and destroy handles, nb_handle_init and +// nb_handle_destroy appear in the RMA section later. The "id" is used to +// verify that the only the task that created the handle uses it -- this +// prevents multiple threads from simultaneously accessing the same transmit +// context if they are not bound to threads. The semantics of +// chpl_comm_test_nb_complete, chpl_comm_wait_nb_some, and chpl_comm_try_nb +// some require distinguishing newly-completed handles from those that that +// have previously commited. The "reported" field is used to distinguish +// between the two. The "complete" field is set when the operation completes. +// It is an atomic because the lower-level functions that set it require it. +// Operations that are too large for the underlying fabric are represented by +// a linked-list of handles. + typedef struct nb_handle { chpl_taskID_t id; // task that created the handle chpl_bool reported; // operation has been reported as complete @@ -5497,18 +5510,6 @@ void amCheckLiveness(void) { // Interface: RMA // -// OFI-specific non-blocking handle implementation - -// XXX update - -// Non-blocking operations require bound endpoints, to avoid having a handle -// for a pending operation held by one thread, while the endpoint is in use -// by a different thread. Bound endpoints are the norm, so it's easiest to -// just disallow non-blocking operations on non-bound endpoints. This allows -// the "completed" flag to be a simple boolean. The "complete" flags for the -// sub-operations are booleans because the lower-level code that uses them -// does not assume bound endpoints. - static inline void nb_handle_init(nb_handle_t h) { h->id = chpl_task_getId(); @@ -6320,7 +6321,6 @@ void rmaPutFn_msgOrd(nb_handle_t handle, void* myAddr, void* mrDesc, && size <= ofi_info->tx_attr->inject_size && envInjectRMA) { // - // XXX update this // Special case: write injection has the least latency. We can use // that if this PUT's size doesn't exceed the injection size limit // and we have a bound tx context so we can delay forcing the From 80d1bc1296760ce91c2b962258b855d6a09c10dc Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 24 Sep 2024 09:46:19 -0700 Subject: [PATCH 16/25] Remove trailing whitespace Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index f23f56290d5b..594e3e7b36c2 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -361,7 +361,7 @@ static const char* mcmModeNames[] = { "undefined", static bool cxiHybridMRMode = false; -// OFI-specific non-blocking handle implementation +// OFI-specific non-blocking handle implementation // This is defined here because it is used in the forward declarations below. // The rountines to initialize and destroy handles, nb_handle_init and @@ -2542,14 +2542,14 @@ void init_ofiEp(void) { // // Compute numbers of transmit and receive contexts, and then create // the transmit context table. - // + // // The logic here is a bit convoluted and can probably be cleaned up. See // the tciTab comment above for more details. For non-scalable endpoints, // we would like to have one transmit context (and therefore one endpoint) // per worker thread, one per AM handler, and one for the process in // general. That will allow us to bind worker threads and AM handlers to // transmit contexts. If we can't get that many endpoints then transmit - // contexts will not be bound, which signficantly reduces performance. + // contexts will not be bound, which signficantly reduces performance. // // For scalable endpoints we only need one transmit endpoint with enough // transmit contexts to bind them as described above. If max_ep_tx_ctx for @@ -4377,7 +4377,7 @@ void amRequestExecOn(c_nodeid_t node, c_sublocid_t subloc, /* * amRequestRmaPut - * + * * Performs a PUT by sending an active message to the remote node that causes * it to perform a GET. This operation returns when the GET has completed. */ @@ -5510,7 +5510,7 @@ void amCheckLiveness(void) { // Interface: RMA // -static inline +static inline void nb_handle_init(nb_handle_t h) { h->id = chpl_task_getId(); h->reported = false; @@ -5518,7 +5518,7 @@ void nb_handle_init(nb_handle_t h) { h->next = NULL; } -static inline +static inline void nb_handle_destroy(nb_handle_t h) { atomic_destroy_bool(&h->complete); } @@ -5585,7 +5585,7 @@ chpl_comm_nb_handle_t chpl_comm_put_nb(void* addr, c_nodeid_t node, "%s(%p, %d, %p, %zd, %d)", __func__, addr, (int) node, raddr, size, (int) commID); - nb_handle_t handle = NULL; + nb_handle_t handle = NULL; if (put_prologue(addr, node, raddr, size, commID, ln, fn)) { handle = ofi_put_nb(handle, addr, node, raddr, size); } @@ -5612,7 +5612,7 @@ int chpl_comm_test_nb_complete(chpl_comm_nb_handle_t h) { /* * check_complete - * + * * Returns true if a new handle completion is detected, false otherwise. * Ignores handles that have previously completed (h->reported == true). If * blocking is true and there are uncompleted handles this will not return @@ -5662,7 +5662,7 @@ chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, handle->reported = true; } } - if (!blocking || completed || !pending) { + if (!blocking || completed || !pending) { break; } // progress the endpoint so handles can complete and then try again @@ -6110,7 +6110,7 @@ static rmaPutFn_t rmaPutFn_selector; static inline void ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size) { - + // Allocate the handle on the stack to avoid malloc overhead nb_handle handle_struct; nb_handle_t handle = &handle_struct; @@ -6138,12 +6138,12 @@ void ofi_put(const void* addr, c_nodeid_t node, void* raddr, size_t size) { static nb_handle_t ofi_put_nb(nb_handle_t handle, const void* addr, c_nodeid_t node, void* raddr, size_t size) { - + char *src = (char *) addr; char *dest = (char *) raddr; nb_handle_t prev = NULL; nb_handle_t first = NULL; - + if (size > ofi_info->ep_attr->max_msg_size) { DBG_PRINTF(DBG_RMA | DBG_RMA_WRITE, "splitting large PUT %d:%p <= %p, size %zd", @@ -6284,7 +6284,7 @@ void rmaPutFn_msgOrdFence(nb_handle_t handle, void* myAddr, void* mrDesc, // // Special case: If our last operation was an AMO then we need to do a // fenced PUT to force the AMO to be visible before this PUT. - // TODO: this logic is a bit screwed-up. FI_FENCE by itself doesn't + // TODO: this logic is a bit screwed-up. FI_FENCE by itself doesn't // force the AMO to be visible, it just ensures that the PUT cannot pass // the AMO. We need to do something to make it visible, and we need // to clear the bitmap so that we don't keep fencing PUTs until something @@ -7534,7 +7534,7 @@ void forceMemFxVisAllNodes(chpl_bool checkPuts, chpl_bool checkAmos, struct perTxCtxInfo_t* tcip) { // // Enforce MCM: make sure the memory effects of all the operations - // we've done so far, to any node, are actually visible. + // we've done so far, to any node, are actually visible. // mcmReleaseAllNodes(checkPuts ? tcip->putVisBitmap : NULL, checkAmos ? tcip->amoVisBitmap : NULL, From b58f4e03a8961902152ed00b8e5fa1741a878b86 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 24 Sep 2024 10:09:07 -0700 Subject: [PATCH 17/25] Run bigTransfer test with unbound endpoints Signed-off-by: John H. Hartman --- test/runtime/configMatters/comm/unbound/EXECENV | 1 + test/runtime/configMatters/comm/unbound/README | 2 ++ test/runtime/configMatters/comm/unbound/SKIPIF | 1 + test/runtime/configMatters/comm/unbound/bigTransfer.chpl | 1 + test/runtime/configMatters/comm/unbound/bigTransfer.compopts | 1 + test/runtime/configMatters/comm/unbound/bigTransfer.execopts | 1 + test/runtime/configMatters/comm/unbound/bigTransfer.good | 1 + test/runtime/configMatters/comm/unbound/bigTransfer.numlocales | 1 + 8 files changed, 9 insertions(+) create mode 100644 test/runtime/configMatters/comm/unbound/EXECENV create mode 100644 test/runtime/configMatters/comm/unbound/README create mode 100644 test/runtime/configMatters/comm/unbound/SKIPIF create mode 120000 test/runtime/configMatters/comm/unbound/bigTransfer.chpl create mode 120000 test/runtime/configMatters/comm/unbound/bigTransfer.compopts create mode 120000 test/runtime/configMatters/comm/unbound/bigTransfer.execopts create mode 120000 test/runtime/configMatters/comm/unbound/bigTransfer.good create mode 120000 test/runtime/configMatters/comm/unbound/bigTransfer.numlocales diff --git a/test/runtime/configMatters/comm/unbound/EXECENV b/test/runtime/configMatters/comm/unbound/EXECENV new file mode 100644 index 000000000000..a2aab52168d8 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/EXECENV @@ -0,0 +1 @@ +CHPL_RT_COMM_OFI_EP_CNT=10 \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/README b/test/runtime/configMatters/comm/unbound/README new file mode 100644 index 000000000000..6c22d25f53b5 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/README @@ -0,0 +1,2 @@ +Tests for CHPL_COMM=ofi with unbound endpoints. This is accomplished by +setting CHPL_RT_COMM_OFI_EP_CNT to a small value. \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/SKIPIF b/test/runtime/configMatters/comm/unbound/SKIPIF new file mode 100644 index 000000000000..1a0e68776535 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/SKIPIF @@ -0,0 +1 @@ +CHPL_COMM != ofi \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.chpl b/test/runtime/configMatters/comm/unbound/bigTransfer.chpl new file mode 120000 index 000000000000..3d38c2034ae9 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.chpl @@ -0,0 +1 @@ +../bigTransfer.chpl \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.compopts b/test/runtime/configMatters/comm/unbound/bigTransfer.compopts new file mode 120000 index 000000000000..3d12c48c7991 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.compopts @@ -0,0 +1 @@ +../bigTransfer.compopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.execopts b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts new file mode 120000 index 000000000000..88345245bd8d --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts @@ -0,0 +1 @@ +../bigTransfer.execopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.good b/test/runtime/configMatters/comm/unbound/bigTransfer.good new file mode 120000 index 000000000000..523fb7880072 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.good @@ -0,0 +1 @@ +../bigTransfer.good \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales b/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales new file mode 120000 index 000000000000..6d7c873c2c01 --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.numlocales @@ -0,0 +1 @@ +../bigTransfer.numlocales \ No newline at end of file From a174b158a17ccdbc3a08c261d4b23e2308db0d4a Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 24 Sep 2024 17:22:55 +0000 Subject: [PATCH 18/25] Only run PUT Signed-off-by: John H. Hartman --- test/runtime/configMatters/comm/unbound/bigTransfer.execopts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 120000 => 100644 test/runtime/configMatters/comm/unbound/bigTransfer.execopts diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.execopts b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts deleted file mode 120000 index 88345245bd8d..000000000000 --- a/test/runtime/configMatters/comm/unbound/bigTransfer.execopts +++ /dev/null @@ -1 +0,0 @@ -../bigTransfer.execopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/unbound/bigTransfer.execopts b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts new file mode 100644 index 000000000000..e7574f0058dd --- /dev/null +++ b/test/runtime/configMatters/comm/unbound/bigTransfer.execopts @@ -0,0 +1 @@ +--doGET=false --xferMB=2048 From c06b0b4cd8e7ac08b558cfda1369744304f70cca Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 24 Sep 2024 10:29:27 -0700 Subject: [PATCH 19/25] Run bigTransfer tests with small fabric message size Signed-off-by: John H. Hartman --- test/runtime/configMatters/comm/large-rma/EXECENV | 1 + test/runtime/configMatters/comm/large-rma/README | 3 +++ test/runtime/configMatters/comm/large-rma/bigTransfer.chpl | 1 + test/runtime/configMatters/comm/large-rma/bigTransfer.compopts | 1 + test/runtime/configMatters/comm/large-rma/bigTransfer.execopts | 1 + test/runtime/configMatters/comm/large-rma/bigTransfer.good | 1 + .../configMatters/comm/large-rma/bigTransfer.numlocales | 1 + 7 files changed, 9 insertions(+) create mode 100644 test/runtime/configMatters/comm/large-rma/EXECENV create mode 100644 test/runtime/configMatters/comm/large-rma/README create mode 120000 test/runtime/configMatters/comm/large-rma/bigTransfer.chpl create mode 120000 test/runtime/configMatters/comm/large-rma/bigTransfer.compopts create mode 100644 test/runtime/configMatters/comm/large-rma/bigTransfer.execopts create mode 120000 test/runtime/configMatters/comm/large-rma/bigTransfer.good create mode 120000 test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales diff --git a/test/runtime/configMatters/comm/large-rma/EXECENV b/test/runtime/configMatters/comm/large-rma/EXECENV new file mode 100644 index 000000000000..2e7098a16072 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/EXECENV @@ -0,0 +1 @@ +CHPL_RT_COMM_OFI_MAX_MSG_SIZE=100 \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/README b/test/runtime/configMatters/comm/large-rma/README new file mode 100644 index 000000000000..2f57cdde6057 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/README @@ -0,0 +1,3 @@ +Test RMA operations that are larger than the maximum message size of the fabric +and therefore require multiple transfers. This is accomplished by setting +the CHPL_RT_COMM_OFI_MAX_MSG_SIZE to a small value. \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl b/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl new file mode 120000 index 000000000000..3d38c2034ae9 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.chpl @@ -0,0 +1 @@ +../bigTransfer.chpl \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts b/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts new file mode 120000 index 000000000000..3d12c48c7991 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.compopts @@ -0,0 +1 @@ +../bigTransfer.compopts \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts b/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts new file mode 100644 index 000000000000..e7574f0058dd --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.execopts @@ -0,0 +1 @@ +--doGET=false --xferMB=2048 diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.good b/test/runtime/configMatters/comm/large-rma/bigTransfer.good new file mode 120000 index 000000000000..523fb7880072 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.good @@ -0,0 +1 @@ +../bigTransfer.good \ No newline at end of file diff --git a/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales b/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales new file mode 120000 index 000000000000..6d7c873c2c01 --- /dev/null +++ b/test/runtime/configMatters/comm/large-rma/bigTransfer.numlocales @@ -0,0 +1 @@ +../bigTransfer.numlocales \ No newline at end of file From c4ff0a13d9e37951c87aefae25c18cd6c8b0397e Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Tue, 1 Oct 2024 16:21:46 -0700 Subject: [PATCH 20/25] Add chpl_comm_free_nb_handle to CHPL_COMM=none Signed-off-by: John H. Hartman --- runtime/src/comm/none/comm-none.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runtime/src/comm/none/comm-none.c b/runtime/src/comm/none/comm-none.c index 8e6df722ec78..fff2c16c7f03 100644 --- a/runtime/src/comm/none/comm-none.c +++ b/runtime/src/comm/none/comm-none.c @@ -98,6 +98,10 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) return 0; } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { + assert(h == NULL); +} + int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) { return 0; From 95a418e1435a89936963edbec3266e28553ca692 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 2 Oct 2024 06:52:14 -0700 Subject: [PATCH 21/25] Add chpl_comm_free_nb_handle to CHPL_COMM=gasnet Signed-off-by: John H. Hartman --- runtime/src/comm/gasnet/comm-gasnet.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime/src/comm/gasnet/comm-gasnet.c b/runtime/src/comm/gasnet/comm-gasnet.c index 961c04a2f665..b184abbc25f0 100644 --- a/runtime/src/comm/gasnet/comm-gasnet.c +++ b/runtime/src/comm/gasnet/comm-gasnet.c @@ -686,6 +686,8 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) return gasnet_try_syncnb_some((gasnet_handle_t*) h, nhandles) == GASNET_OK; } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } + int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) { #ifdef GASNET_SEGMENT_EVERYTHING From 1efd4f47d86d59478690ee1d11892851d1d9c9c6 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 2 Oct 2024 06:53:45 -0700 Subject: [PATCH 22/25] Added chpl_comm_free_nb_handle to CHPL_COMM=ugni Signed-off-by: John H. Hartman --- runtime/src/comm/ugni/comm-ugni.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runtime/src/comm/ugni/comm-ugni.c b/runtime/src/comm/ugni/comm-ugni.c index 8ebda8765909..dc9df8e74b8f 100644 --- a/runtime/src/comm/ugni/comm-ugni.c +++ b/runtime/src/comm/ugni/comm-ugni.c @@ -6189,6 +6189,9 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } + + int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) { // This call asks if a future GET is safe, but we can't know that in the case From 9bbb972c852cca914656f50fd8a4030f0963991b Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 2 Oct 2024 07:00:43 -0700 Subject: [PATCH 23/25] Added chpl_comm_free_nb_handle to gasnet-ex Signed-off-by: John H. Hartman --- runtime/src/comm/gasnet/comm-gasnet-ex.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime/src/comm/gasnet/comm-gasnet-ex.c b/runtime/src/comm/gasnet/comm-gasnet-ex.c index 1dc6ed1b0d60..283a64e71611 100644 --- a/runtime/src/comm/gasnet/comm-gasnet-ex.c +++ b/runtime/src/comm/gasnet/comm-gasnet-ex.c @@ -685,6 +685,8 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) return gex_Event_TestSome((gex_Event_t*) h, nhandles, GEX_NO_FLAGS) == GASNET_OK; } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } + // TODO GEX could be scalable query to gasnet itself int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) { From 2dfb923d5c29711e6e3f4b3899cbbf9ce2299d51 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 2 Oct 2024 07:02:43 -0700 Subject: [PATCH 24/25] Fixed typos Signed-off-by: John H. Hartman --- runtime/src/comm/gasnet/comm-gasnet-ex.c | 2 +- runtime/src/comm/gasnet/comm-gasnet.c | 2 +- runtime/src/comm/ugni/comm-ugni.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/src/comm/gasnet/comm-gasnet-ex.c b/runtime/src/comm/gasnet/comm-gasnet-ex.c index 283a64e71611..88e9ff99b10b 100644 --- a/runtime/src/comm/gasnet/comm-gasnet-ex.c +++ b/runtime/src/comm/gasnet/comm-gasnet-ex.c @@ -685,7 +685,7 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) return gex_Event_TestSome((gex_Event_t*) h, nhandles, GEX_NO_FLAGS) == GASNET_OK; } -void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { } // TODO GEX could be scalable query to gasnet itself int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) diff --git a/runtime/src/comm/gasnet/comm-gasnet.c b/runtime/src/comm/gasnet/comm-gasnet.c index b184abbc25f0..53e168e6b4e0 100644 --- a/runtime/src/comm/gasnet/comm-gasnet.c +++ b/runtime/src/comm/gasnet/comm-gasnet.c @@ -686,7 +686,7 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) return gasnet_try_syncnb_some((gasnet_handle_t*) h, nhandles) == GASNET_OK; } -void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { } int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) { diff --git a/runtime/src/comm/ugni/comm-ugni.c b/runtime/src/comm/ugni/comm-ugni.c index dc9df8e74b8f..098a331c8089 100644 --- a/runtime/src/comm/ugni/comm-ugni.c +++ b/runtime/src/comm/ugni/comm-ugni.c @@ -6189,7 +6189,7 @@ int chpl_comm_try_nb_some(chpl_comm_nb_handle_t* h, size_t nhandles) } -void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t* h) { } +void chpl_comm_free_nb_handle(chpl_comm_nb_handle_t h) { } int chpl_comm_addr_gettable(c_nodeid_t node, void* start, size_t len) From b260c4feca30d019f2f5e73a2e3e3dc25875d0f9 Mon Sep 17 00:00:00 2001 From: "John H. Hartman" Date: Wed, 9 Oct 2024 06:51:09 -0700 Subject: [PATCH 25/25] Addressed reviewer's comments Signed-off-by: John H. Hartman --- runtime/src/comm/ofi/comm-ofi.c | 12 ++++-------- test/runtime/configMatters/comm/large-rma/EXECENV | 2 +- test/runtime/configMatters/comm/large-rma/README | 6 +++--- test/runtime/configMatters/comm/unbound/EXECENV | 2 +- test/runtime/configMatters/comm/unbound/README | 2 +- test/runtime/configMatters/comm/unbound/SKIPIF | 2 +- 6 files changed, 11 insertions(+), 15 deletions(-) diff --git a/runtime/src/comm/ofi/comm-ofi.c b/runtime/src/comm/ofi/comm-ofi.c index 594e3e7b36c2..8f7d79a1a88b 100644 --- a/runtime/src/comm/ofi/comm-ofi.c +++ b/runtime/src/comm/ofi/comm-ofi.c @@ -5533,7 +5533,6 @@ static inline chpl_bool put_prologue(void* addr, c_nodeid_t node, void* raddr, size_t size, int32_t commID, int ln, int32_t fn) { - chpl_bool proceed = false; retireDelayedAmDone(false /*taskIsEnding*/); // @@ -5543,12 +5542,12 @@ chpl_bool put_prologue(void* addr, c_nodeid_t node, void* raddr, size_t size, CHK_TRUE(raddr != NULL); if (size == 0) { - goto done; + return false; } if (node == chpl_nodeID) { memmove(raddr, addr, size); - goto done; + return false; } // Communications callback support @@ -5561,9 +5560,7 @@ chpl_bool put_prologue(void* addr, c_nodeid_t node, void* raddr, size_t size, chpl_comm_diags_verbose_rdma("put", node, size, ln, fn, commID); chpl_comm_diags_incr(put); - proceed = true; -done: - return proceed; + return true; } /* @@ -5625,7 +5622,7 @@ chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, chpl_bool completed = false; // at least one new completion detected chpl_bool pending = false; // there is an uncompleted handle if ((handles == NULL) || (nhandles == 0)) { - goto done; + return false; } struct perTxCtxInfo_t* tcip = NULL; while (true) { @@ -5675,7 +5672,6 @@ chpl_bool check_complete(nb_handle_t *handles, size_t nhandles, if (tcip) { tciFree(tcip); } -done: return completed; } diff --git a/test/runtime/configMatters/comm/large-rma/EXECENV b/test/runtime/configMatters/comm/large-rma/EXECENV index 2e7098a16072..0ad0a9d5532a 100644 --- a/test/runtime/configMatters/comm/large-rma/EXECENV +++ b/test/runtime/configMatters/comm/large-rma/EXECENV @@ -1 +1 @@ -CHPL_RT_COMM_OFI_MAX_MSG_SIZE=100 \ No newline at end of file +CHPL_RT_COMM_OFI_MAX_MSG_SIZE=100 diff --git a/test/runtime/configMatters/comm/large-rma/README b/test/runtime/configMatters/comm/large-rma/README index 2f57cdde6057..2c52e1865396 100644 --- a/test/runtime/configMatters/comm/large-rma/README +++ b/test/runtime/configMatters/comm/large-rma/README @@ -1,3 +1,3 @@ -Test RMA operations that are larger than the maximum message size of the fabric -and therefore require multiple transfers. This is accomplished by setting -the CHPL_RT_COMM_OFI_MAX_MSG_SIZE to a small value. \ No newline at end of file +Test RMA operations that are larger than the maximum message size of the +fabric and therefore require multiple transfers. This is accomplished by +setting the CHPL_RT_COMM_OFI_MAX_MSG_SIZE to a small value. diff --git a/test/runtime/configMatters/comm/unbound/EXECENV b/test/runtime/configMatters/comm/unbound/EXECENV index a2aab52168d8..f96d1e8c5898 100644 --- a/test/runtime/configMatters/comm/unbound/EXECENV +++ b/test/runtime/configMatters/comm/unbound/EXECENV @@ -1 +1 @@ -CHPL_RT_COMM_OFI_EP_CNT=10 \ No newline at end of file +CHPL_RT_COMM_OFI_EP_CNT=10 diff --git a/test/runtime/configMatters/comm/unbound/README b/test/runtime/configMatters/comm/unbound/README index 6c22d25f53b5..47b5253591dc 100644 --- a/test/runtime/configMatters/comm/unbound/README +++ b/test/runtime/configMatters/comm/unbound/README @@ -1,2 +1,2 @@ Tests for CHPL_COMM=ofi with unbound endpoints. This is accomplished by -setting CHPL_RT_COMM_OFI_EP_CNT to a small value. \ No newline at end of file +setting CHPL_RT_COMM_OFI_EP_CNT to a small value. diff --git a/test/runtime/configMatters/comm/unbound/SKIPIF b/test/runtime/configMatters/comm/unbound/SKIPIF index 1a0e68776535..3a1ff69f948a 100644 --- a/test/runtime/configMatters/comm/unbound/SKIPIF +++ b/test/runtime/configMatters/comm/unbound/SKIPIF @@ -1 +1 @@ -CHPL_COMM != ofi \ No newline at end of file +CHPL_COMM != ofi