Skip to content

Commit

Permalink
ASYNC: Improved error status transfer from workers to main thread
Browse files Browse the repository at this point in the history
If a thread was aborted through a regular AbortOnValue this was caught
but only a possible RTE was evaluated. Thus, the readout function
could not determine if the thread was aborted or not.

Changes:
- The readout function now uses ASYNC_ReadOutStruct as single argument.
  This allows extensions without changing all function APIs.
  The structure includes the former values for dfr, RTE code, RTE message
  and now additionally the V_abortCode.
  The RTE message must be included here as it has to be retrieved as soon as
  it happened (in the thread) and needs to be trnasported then to
  the main thread as well.
- The user readout function can evaluate then both error conditions and react
  accordingly.
- Tests
  • Loading branch information
MichaelHuth committed Sep 9, 2024
1 parent 99f2c0e commit 9a0fbf4
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 78 deletions.
60 changes: 35 additions & 25 deletions Packages/MIES/MIES_Async.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ static StrConstant ASYNC_WORKLOADCLASS_STR = "workloadClass"
static StrConstant ASYNC_PARAMCOUNT_STR = "paramCount"
static StrConstant ASYNC_INORDER_STR = "inOrder"
static StrConstant ASYNC_ABORTFLAG_STR = "abortFlag"
static StrConstant ASYNC_ERROR_STR = "err"
static StrConstant ASYNC_ERRORMSG_STR = "errmsg"
static StrConstant ASYNC_RTERROR_STR = "rtErr"
static StrConstant ASYNC_RTERRORMSG_STR = "rtErrMessage"
static StrConstant ASYNC_ABORTCODE_STR = "abortCode"
static StrConstant ASYNC_WLCOUNTER_STR = "workloadClassCounter"
///@}

Expand Down Expand Up @@ -86,13 +87,8 @@ End

/// @brief Prototype function for an async readout function
///
/// @param dfr reference to returned data folder from thread
/// @param err error code, only set in the error case
/// @param errmsg error message, only set in the error case
Function ASYNC_ReadOut(dfr, err, errmsg)
DFREF dfr
variable err
string errmsg
/// @param ar ASYNC readout structure
Function ASYNC_ReadOut(STRUCT ASYNC_ReadOutStruct &ar)
End

/// @brief thread function that receives data folders from the thread input queue
Expand Down Expand Up @@ -134,19 +130,26 @@ threadsafe static Function/DF ASYNC_Run_Worker(DFREF dfr)
DFREF dfrInp = dfr:input
DFREF dfrAsync = dfr:async

variable/G dfrAsync:$ASYNC_ERROR_STR = 0
NVAR err = dfrAsync:$ASYNC_ERROR_STR
string/G dfrAsync:$ASYNC_ERRORMSG_STR = ""
SVAR errmsg = dfrAsync:$ASYNC_ERRORMSG_STR
variable/G dfrAsync:$ASYNC_RTERROR_STR = 0
variable/G dfrAsync:$ASYNC_ABORTCODE_STR = 0
string/G dfrAsync:$ASYNC_RTERRORMSG_STR = ""
NVAR rtErr = dfrAsync:$ASYNC_RTERROR_STR
NVAR abortCode = dfrAsync:$ASYNC_ABORTCODE_STR
SVAR rtErrMsg = dfrAsync:$ASYNC_RTERRORMSG_STR

err = 0
dfrOut = $""
AssertOnAndClearRTError()
try
dfrOut = f(dfrInp); AbortOnRTE
catch
errmsg = GetRTErrMessage()
err = ClearRTError()
rtErrMsg = GetRTErrMessage()
rtErr = ClearRTError()
if(!rtErr)
rtErrMsg = ""
endif
if(V_AbortCode != ABORTCODE_ABORTONRTE)
abortCode = V_AbortCode
endif
endtry

if(IsFreeDatafolder(dfrOut))
Expand All @@ -168,7 +171,8 @@ Function ASYNC_ThreadReadOut()
variable justBuffered

variable wlcIndex, statCnt, index
string rterrmsg
string msg
STRUCT ASYNC_ReadOutStruct ar
NVAR tgID = $GetThreadGroupID()
ASSERT(!isNaN(tgID), "Async frame work is not running")
WAVE/DF DFREFbuffer = GetDFREFBuffer(getAsyncHomeDF())
Expand Down Expand Up @@ -238,19 +242,25 @@ Function ASYNC_ThreadReadOut()

track[%$workloadClass][%OUTPUTCOUNT] += 1

SVAR RFunc = dfr:$ASYNC_READOUTFUNC_STR
FUNCREF ASYNC_ReadOut f = $RFunc
NVAR err = dfr:$ASYNC_ERROR_STR
SVAR errmsg = dfr:$ASYNC_ERRORMSG_STR
SVAR RFunc = dfr:$ASYNC_READOUTFUNC_STR
FUNCREF ASYNC_ReadOut f = $RFunc
NVAR rtErr = dfr:$ASYNC_RTERROR_STR
SVAR rtErrMsg = dfr:$ASYNC_RTERRORMSG_STR
NVAR abortCode = dfr:$ASYNC_ABORTCODE_STR

ar.dfr = dfrOut
ar.rtErr = rtErr
ar.rtErrMsg = rtErrMsg
ar.abortCode = abortCode

statCnt += 1
AssertOnAndClearRTError()
try
f(dfrOut, err, errmsg); AbortOnRTE
f(ar); AbortOnRTE
catch
rterrmsg = GetRTErrMessage()
ClearRTError()
ASSERT(0, "ReadOut function " + RFunc + " aborted with: " + rterrmsg)
msg = GetRTErrMessage()
ASSERT(!ClearRTError(), "ReadOut function " + RFunc + " encountered an RTE: " + msg)
ASSERT(!V_AbortCode, "ReadOut function " + RFunc + " aborted with code: " + GetErrMessage(V_AbortCode))
endtry

endfor
Expand Down
10 changes: 10 additions & 0 deletions Packages/MIES/MIES_Constants.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -2307,3 +2307,13 @@ Constant SUTTER_MAX_MAX_TP_PULSES = 10000
Constant INVALID_SWEEP_NUMBER = -1

StrConstant PERCENT_F_MAX_PREC = "%.15f"

/// @name Igor Internal Abort Codes
///
/// @anchor IgorAbortCodes
///@{
Constant ABORTCODE_ABORTONRTE = -4
Constant ABORTCODE_ABORT = -3
Constant ABORTCODE_STACKOVERFLOW = -2
Constant ABORTCODE_USERABORT = -1
///@}
11 changes: 6 additions & 5 deletions Packages/MIES/MIES_NeuroDataWithoutBorders.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -366,13 +366,14 @@ threadsafe Function/DF NWB_ASYNC_Worker(DFREF dfr)
return $""
End

Function NWB_ASYNC_Readout(DFREF dfr, variable err, string errmsg)
Function NWB_ASYNC_Readout(STRUCT ASYNC_ReadOutStruct &ar)

if(!err)
return NaN
if(ar.rtErr)
BUG("Async jobs finished with RTE: " + ar.rtErrMsg)
endif
if(ar.abortCode)
BUG("Async jobs finished with Abort: " + GetErrMessage(ar.abortCode))
endif

BUG("Async jobs finished with: " + errmsg)
End

threadsafe static Function NWB_WriteLabnoteBooksAndComments(STRUCT NWBAsyncParameters &s)
Expand Down
8 changes: 8 additions & 0 deletions Packages/MIES/MIES_Structures.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -637,3 +637,11 @@ Structure SF_PlotMetaData
string xAxisLabel // from SF_META_XAXISLABEL constant
string yAxisLabel // from SF_META_YAXISLABEL constant
EndStructure

/// @brief ReadOut Structure for ASYNC
Structure ASYNC_ReadOutStruct
DFREF dfr // dfr with output data
variable rtErr // runtime error code
string rtErrMsg // runtime error message
variable abortCode // abort code
EndStructure
19 changes: 7 additions & 12 deletions Packages/MIES/MIES_TestPulse.ipf
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,21 @@ End
/// results are base line level, steady state resistance, instantaneous resistance and their positions
/// collected results for all channels of a measurement are send to TP_RecordTP(), DQ_ApplyAutoBias() when complete
///
/// @param dfr output data folder from ASYNC frame work with results from workloads associated with this registered function
/// The output parameter in the data folder follow the definition as created in TP_TSAnalysis()
///
/// @param err error code of TP_TSAnalysis() function
///
/// @param errmsg error message of TP_TSAnalysis() function
Function TP_ROAnalysis(dfr, err, errmsg)
DFREF dfr
variable err
string errmsg
/// @param ar ASYNC_ReadOutStruct structure with dfr with input data
Function TP_ROAnalysis(STRUCT ASYNC_ReadOutStruct &ar)

variable i, j, bufSize
variable posMarker, posAsync, tpBufferSize
variable posBaseline, posSSRes, posInstRes
variable posElevSS, posElevInst

if(err)
ASSERT(0, "RTError " + num2str(err) + " in TP_Analysis thread: " + errmsg)
if(ar.rtErr || ar.abortCode)
ASSERT(!ar.rtErr, "TP analysis thread encountered RTE " + ar.rtErrMsg)
ASSERT(!ar.abortCode, "TP analysis thread aborted with code: " + GetErrMessage(ar.abortCode))
endif

DFREF dfr = ar.dfr

WAVE/SDFR=dfr inData = outData
NVAR/SDFR=dfr now = now
NVAR/SDFR=dfr hsIndex = hsIndex
Expand Down
Loading

0 comments on commit 9a0fbf4

Please sign in to comment.