From 9efa8710b6d154315ddb789370a51134380a4b84 Mon Sep 17 00:00:00 2001 From: Luis Segalla Date: Thu, 5 Sep 2024 11:53:04 +0100 Subject: [PATCH 1/4] Add sum plugin like feature --- .../include/XspressProcessPlugin.h | 3 + .../src/XspressProcessPlugin.cpp | 55 ++++++++++++++----- .../data/xspress_meta_writer.py | 51 +++++++++++++++++ 3 files changed, 96 insertions(+), 13 deletions(-) diff --git a/cpp/data/frameProcessor/include/XspressProcessPlugin.h b/cpp/data/frameProcessor/include/XspressProcessPlugin.h index a90ebea..f5c3401 100644 --- a/cpp/data/frameProcessor/include/XspressProcessPlugin.h +++ b/cpp/data/frameProcessor/include/XspressProcessPlugin.h @@ -95,6 +95,9 @@ class XspressMemoryBlock void *scalar_memblock_; void *dtc_memblock_; void *inp_est_memblock_; + void *sum_memblock_; + /** Memory block for sum value */ + void calculate_sum(boost::shared_ptr frame); /** Number of scalars recorded */ uint32_t num_scalars_recorded_; diff --git a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp index 8644a8f..8fc5abb 100644 --- a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp +++ b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp @@ -36,6 +36,7 @@ const std::string META_XSPRESS_CHUNK = "xspress_meta_chunk"; const std::string META_XSPRESS_SCALARS = "xspress_scalars"; const std::string META_XSPRESS_DTC = "xspress_dtc"; const std::string META_XSPRESS_INP_EST = "xspress_inp_est"; + const std::string META_XSPRESS_SUM = "xspress_sum"; XspressMemoryBlock::XspressMemoryBlock() : ptr_(0), @@ -140,6 +141,7 @@ XspressProcessPlugin::XspressProcessPlugin() : scalar_memblock_(0), dtc_memblock_(0), inp_est_memblock_(0), + sum_memblock_(0), num_scalars_recorded_(0), offset(0) { @@ -345,6 +347,26 @@ void XspressProcessPlugin::setup_memory_allocation() free(inp_est_memblock_); } inp_est_memblock_ = malloc(sizeof(double) * this->frames_per_block_ * num_channels_); + + if (sum_memblock_) + { + free(sum_memblock_); + } + sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_); + } + + void XspressProcessPlugin::calculate_sum(boost::shared_ptr frame) + { + double sum_value = 0; + const uint32_t *data = static_cast(frame->get_image_ptr()); + size_t elements_count = frame->get_image_size() / sizeof(uint32_t); + for (size_t pixel_index = 0; pixel_index < elements_count; pixel_index++) + { + sum_value += data[pixel_index]; + } + double *sum_ptr = (double *)sum_memblock_; + sum_ptr += num_scalars_recorded_; + memcpy(sum_ptr, &sum_value, sizeof(double)); } void XspressProcessPlugin::process_frame(boost::shared_ptr frame) @@ -415,19 +437,6 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr frame) double *dest_inp_est_ptr = (double *)inp_est_memblock_; dest_inp_est_ptr += (num_inp_est * num_scalars_recorded_); memcpy(dest_inp_est_ptr, inp_est_ptr, num_inp_est * sizeof(double)); - num_scalars_recorded_ += 1; - - // Calculate the elapsed time since we last posted meta data - boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); - int32_t elapsed_time = (now - last_scalar_send_time_).total_milliseconds(); - - // Send scalars to be writen once we reach the desired number of frames. - // Number has to be the same as the number of MCA frames for live processing reasons - if ((num_scalars_recorded_ == this->frames_per_block_)) - { - send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels); - last_scalar_send_time_ = now; - } char *mca_ptr = frame_bytes; mca_ptr += (sizeof(FrameHeader) + @@ -437,6 +446,9 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr frame) ); // Create the live view frame and push it + // I'm using this live frame as a way to calculate the sum metadata + // That's why it needs to be moved up here at the moment, as I don't want to have to redo the work of creating + // a frame without the scalars metadata dimensions_t live_dims; live_dims.push_back(num_channels_); live_dims.push_back(header->num_aux); @@ -448,6 +460,17 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr frame) live_frame->set_outer_chunk_size(1); // Push out the live MCA data to the live view plugin only this->push(live_view_name_, live_frame); + calculate_sum(live_frame); + + num_scalars_recorded_ += 1; + + // Send scalars to be writen once we reach the desired number of frames. + // Number has to be the same as the number of MCA frames for live processing reasons + if ((num_scalars_recorded_ == this->frames_per_block_)) + { + send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels); + } + LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameId = " << frame_id); for (int index = 0; index < num_channels_; index++){ memory_ptrs_[index]->add_frame(frame_id, mca_ptr); @@ -579,6 +602,12 @@ void XspressProcessPlugin::send_scalars(uint32_t last_frame_id, uint32_t num_sca num_inp_est * num_scalars_recorded_ * sizeof(double), buffer.GetString()); + this->publish_meta(META_NAME, + META_XSPRESS_SUM, + sum_memblock_, + num_scalars_recorded_ * sizeof(double), + buffer.GetString()); + num_scalars_recorded_ = 0; } } diff --git a/python/src/xspress_detector/data/xspress_meta_writer.py b/python/src/xspress_detector/data/xspress_meta_writer.py index 77361ff..09796ac 100644 --- a/python/src/xspress_detector/data/xspress_meta_writer.py +++ b/python/src/xspress_detector/data/xspress_meta_writer.py @@ -24,6 +24,7 @@ XSPRESS_DTC = "xspress_dtc" XSPRESS_INP_EST = "xspress_inp_est" XSPRESS_CHUNK = "xspress_meta_chunk" +XSPRESS_SUM = "xspress_sum" # Number of scalars per channel XSPRESS_SCALARS_PER_CHANNEL = 9 @@ -34,6 +35,7 @@ DATASET_INP_EST = "inp_est" DATASET_DAQ_VERSION = "data_version" DATASET_META_VERSION = "meta_version" +DATASET_SUM = "sum" class XspressMetaWriter(MetaWriter): @@ -60,6 +62,7 @@ def __init__(self, name, directory, endpoints, config): } self._dtc = {} self._inp = {} + self._sum = {} super(XspressMetaWriter, self).__init__(name, directory, endpoints, config) self._series = None @@ -106,6 +109,18 @@ def _define_detector_datasets(self): block_size=self._chunk_size, ) ) + self._logger.info("Adding dataset: {}".format(DATASET_SUM)) + dsets.append( + Int32HDF5Dataset( + DATASET_SUM, + shape=(self._num_frames,), + maxshape=(None,), + chunks=(self._chunk_size,), + rank=1, + cache=True, + block_size=self._chunk_size, + ) + ) dsets.append(Int64HDF5Dataset(DATASET_DAQ_VERSION)) dsets.append(Int64HDF5Dataset(DATASET_META_VERSION)) return dsets @@ -117,6 +132,7 @@ def detector_message_handlers(self): XSPRESS_DTC: self.handle_xspress_dtc, XSPRESS_INP_EST: self.handle_xspress_inp_est, XSPRESS_CHUNK: self.handle_xspress_meta_chunk, + XSPRESS_SUM: self.handle_xspress_sum, } def handle_xspress_scalars(self, header, _data): @@ -244,6 +260,7 @@ def add_inp_est_value(self, frame, channel, value): if self._inp[frame]['qty'] == self._num_channels: self._add_value(DATASET_INP_EST, self._inp[frame]['values'], offset=frame) del self._inp[frame] + def handle_xspress_meta_chunk(self, header, _data): self._chunk_size = int(_data) if header["frame_id"] == -1 and self._configured == 0: @@ -255,6 +272,40 @@ def handle_xspress_meta_chunk(self, header, _data): for dset in meta_dsets: self._datasets[dset.name] = dset + def handle_xspress_sum(self, header, _data): + self._logger.debug("%s | Handling xspress sum message", self._name) + # Extract the channel number from the header + channel = header['channel_index'] + # Extract Number of channels + number_of_channels = header['number_of_channels'] + # Number of frames + number_of_frames = header['number_of_frames'] + # Frame ID + frame_id = header['frame_id'] + + + format_str = '{}d'.format(number_of_frames) + array = struct.unpack(format_str, _data) + pairs = self._num_channels / number_of_channels + for frame in range(number_of_frames): + current_frame_id = frame_id + frame + self.add_sum_value(current_frame_id, pairs, array[frame]) + + def add_sum_value(self, frame, pairs, value): + # Check if we need to create an entry for this frame + if frame not in self._sum: + self._sum[frame] = { + 'qty': 0, + 'values': 0 + } + + self._sum[frame]['values'] += value + self._sum[frame]['qty'] += 1 + + if self._sum[frame]['qty'] == int(pairs): + self._add_value(DATASET_SUM, self._sum[frame]['values'], offset=frame) + del self._sum[frame] + @staticmethod def get_version(): return ( From ef7b4ca450c9d2889c50f5db3950c70c4ba6733c Mon Sep 17 00:00:00 2001 From: Luis Segalla Date: Thu, 28 Nov 2024 11:36:36 +0000 Subject: [PATCH 2/4] Fixing indent --- .../include/XspressProcessPlugin.h | 6 +- .../src/XspressProcessPlugin.cpp | 71 +++++++++---------- 2 files changed, 37 insertions(+), 40 deletions(-) diff --git a/cpp/data/frameProcessor/include/XspressProcessPlugin.h b/cpp/data/frameProcessor/include/XspressProcessPlugin.h index f5c3401..a0d5267 100644 --- a/cpp/data/frameProcessor/include/XspressProcessPlugin.h +++ b/cpp/data/frameProcessor/include/XspressProcessPlugin.h @@ -95,9 +95,9 @@ class XspressMemoryBlock void *scalar_memblock_; void *dtc_memblock_; void *inp_est_memblock_; - void *sum_memblock_; - /** Memory block for sum value */ - void calculate_sum(boost::shared_ptr frame); + void *sum_memblock_; + /** Memory block for sum value */ + void calculate_sum(boost::shared_ptr frame); /** Number of scalars recorded */ uint32_t num_scalars_recorded_; diff --git a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp index 8fc5abb..ebe4616 100644 --- a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp +++ b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp @@ -15,17 +15,17 @@ namespace FrameProcessor { -const std::string XspressProcessPlugin::CONFIG_ACQ_ID = "acq_id"; +const std::string XspressProcessPlugin::CONFIG_ACQ_ID = "acq_id"; -const std::string XspressProcessPlugin::CONFIG_PROCESS = "process"; -const std::string XspressProcessPlugin::CONFIG_PROCESS_NUMBER = "number"; -const std::string XspressProcessPlugin::CONFIG_PROCESS_RANK = "rank"; +const std::string XspressProcessPlugin::CONFIG_PROCESS = "process"; +const std::string XspressProcessPlugin::CONFIG_PROCESS_NUMBER = "number"; +const std::string XspressProcessPlugin::CONFIG_PROCESS_RANK = "rank"; -const std::string XspressProcessPlugin::CONFIG_LIVE_VIEW_NAME = "live_view"; +const std::string XspressProcessPlugin::CONFIG_LIVE_VIEW_NAME = "live_view"; -const std::string XspressProcessPlugin::CONFIG_FRAMES = "frames"; -const std::string XspressProcessPlugin::CONFIG_DTC_FLAGS = "dtc/flags"; -const std::string XspressProcessPlugin::CONFIG_DTC_PARAMS = "dtc/params"; +const std::string XspressProcessPlugin::CONFIG_FRAMES = "frames"; +const std::string XspressProcessPlugin::CONFIG_DTC_FLAGS = "dtc/flags"; +const std::string XspressProcessPlugin::CONFIG_DTC_PARAMS = "dtc/params"; const std::string XspressProcessPlugin::CONFIG_CHUNK = "chunks"; @@ -36,7 +36,7 @@ const std::string META_XSPRESS_CHUNK = "xspress_meta_chunk"; const std::string META_XSPRESS_SCALARS = "xspress_scalars"; const std::string META_XSPRESS_DTC = "xspress_dtc"; const std::string META_XSPRESS_INP_EST = "xspress_inp_est"; - const std::string META_XSPRESS_SUM = "xspress_sum"; +const std::string META_XSPRESS_SUM = "xspress_sum"; XspressMemoryBlock::XspressMemoryBlock() : ptr_(0), @@ -141,7 +141,7 @@ XspressProcessPlugin::XspressProcessPlugin() : scalar_memblock_(0), dtc_memblock_(0), inp_est_memblock_(0), - sum_memblock_(0), + sum_memblock_(0), num_scalars_recorded_(0), offset(0) { @@ -348,25 +348,25 @@ void XspressProcessPlugin::setup_memory_allocation() } inp_est_memblock_ = malloc(sizeof(double) * this->frames_per_block_ * num_channels_); - if (sum_memblock_) - { - free(sum_memblock_); - } - sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_); + if (sum_memblock_) + { + free(sum_memblock_); } + sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_); +} - void XspressProcessPlugin::calculate_sum(boost::shared_ptr frame) +void XspressProcessPlugin::calculate_sum(boost::shared_ptr frame) +{ + double sum_value = 0; + const uint32_t *data = static_cast(frame->get_image_ptr()); + size_t elements_count = frame->get_image_size() / sizeof(uint32_t); + for (size_t pixel_index = 0; pixel_index < elements_count; pixel_index++) { - double sum_value = 0; - const uint32_t *data = static_cast(frame->get_image_ptr()); - size_t elements_count = frame->get_image_size() / sizeof(uint32_t); - for (size_t pixel_index = 0; pixel_index < elements_count; pixel_index++) - { - sum_value += data[pixel_index]; - } - double *sum_ptr = (double *)sum_memblock_; - sum_ptr += num_scalars_recorded_; - memcpy(sum_ptr, &sum_value, sizeof(double)); + sum_value += data[pixel_index]; + } + double *sum_ptr = (double *)sum_memblock_; + sum_ptr += num_scalars_recorded_; + memcpy(sum_ptr, &sum_value, sizeof(double)); } void XspressProcessPlugin::process_frame(boost::shared_ptr frame) @@ -446,9 +446,6 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr frame) ); // Create the live view frame and push it - // I'm using this live frame as a way to calculate the sum metadata - // That's why it needs to be moved up here at the moment, as I don't want to have to redo the work of creating - // a frame without the scalars metadata dimensions_t live_dims; live_dims.push_back(num_channels_); live_dims.push_back(header->num_aux); @@ -460,16 +457,16 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr frame) live_frame->set_outer_chunk_size(1); // Push out the live MCA data to the live view plugin only this->push(live_view_name_, live_frame); - calculate_sum(live_frame); + calculate_sum(live_frame); - num_scalars_recorded_ += 1; + num_scalars_recorded_ += 1; - // Send scalars to be writen once we reach the desired number of frames. - // Number has to be the same as the number of MCA frames for live processing reasons - if ((num_scalars_recorded_ == this->frames_per_block_)) - { - send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels); - } + // Send scalars to be writen once we reach the desired number of frames. + // Number has to be the same as the number of MCA frames for live processing reasons + if ((num_scalars_recorded_ == this->frames_per_block_)) + { + send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels); + } LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameId = " << frame_id); for (int index = 0; index < num_channels_; index++){ From af5d8b95e779219d26418524e021526676af451c Mon Sep 17 00:00:00 2001 From: Luis Segalla Date: Fri, 29 Nov 2024 14:51:44 +0000 Subject: [PATCH 3/4] Add comments to the calculate_sum method --- .../frameProcessor/src/XspressProcessPlugin.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp index ebe4616..583c57e 100644 --- a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp +++ b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp @@ -355,6 +355,20 @@ void XspressProcessPlugin::setup_memory_allocation() sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_); } + +/** + * Calculate the sum for a frame + * + * This method will receive a frame, and sum each element of data on it. + * The data is stored in the `sum_memblock_` array (allocated in the `setup_memory_allocation method`) + * and we iterate to by casting a pointer (`sum_ptr`) to the beginning of the array and moving it + * by `num_scalars_recorded_` each time this method is called again. + * The 'num_scalars_recorded_' variable is set back to zero each time the frame arrays are flushed. + * i.e.: when num_scalars_recorded_ is 0 we move the sum_ptr pointer to index 0, + * when num_scalars_recorded_ is 1 we move the sum_ptr to index 1 based + * + * \param[in] frame - shared_ptr for a Frame class that contains the data for the acquisition on each configured channel. + */ void XspressProcessPlugin::calculate_sum(boost::shared_ptr frame) { double sum_value = 0; From 114013cd7dd65f182630f366c0889bb53c9cf664 Mon Sep 17 00:00:00 2001 From: Luis Fernando Segalla <36170084+LuisFSegalla@users.noreply.github.com> Date: Mon, 2 Dec 2024 16:43:40 +0000 Subject: [PATCH 4/4] tidy calculate_sum comments Co-authored-by: Gary Yendell --- cpp/data/frameProcessor/src/XspressProcessPlugin.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp index 583c57e..a0df502 100644 --- a/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp +++ b/cpp/data/frameProcessor/src/XspressProcessPlugin.cpp @@ -361,11 +361,9 @@ void XspressProcessPlugin::setup_memory_allocation() * * This method will receive a frame, and sum each element of data on it. * The data is stored in the `sum_memblock_` array (allocated in the `setup_memory_allocation method`) - * and we iterate to by casting a pointer (`sum_ptr`) to the beginning of the array and moving it - * by `num_scalars_recorded_` each time this method is called again. + * and each time this method is called it stores the result to this array at the offset `num_scalars_recorded_`, i.e.: when + * `num_scalars_recorded_` is 0 we store the sum to index 0, when `num_scalars_recorded_` is 1 we store the sum to index 1. * The 'num_scalars_recorded_' variable is set back to zero each time the frame arrays are flushed. - * i.e.: when num_scalars_recorded_ is 0 we move the sum_ptr pointer to index 0, - * when num_scalars_recorded_ is 1 we move the sum_ptr to index 1 based * * \param[in] frame - shared_ptr for a Frame class that contains the data for the acquisition on each configured channel. */