Skip to content

Commit

Permalink
Add sum plugin like feature
Browse files Browse the repository at this point in the history
formatting
  • Loading branch information
LuisFSegalla committed Sep 17, 2024
1 parent d0c2052 commit b482ae9
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 13 deletions.
3 changes: 3 additions & 0 deletions cpp/data/frameProcessor/include/XspressProcessPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class XspressMemoryBlock
void *scalar_memblock_;
void *dtc_memblock_;
void *inp_est_memblock_;
void *sum_memblock_;
/** Memory block for sum value */
void calculate_sum(boost::shared_ptr<Frame> frame);
/** Number of scalars recorded */
uint32_t num_scalars_recorded_;

Expand Down
55 changes: 42 additions & 13 deletions cpp/data/frameProcessor/src/XspressProcessPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const std::string META_XSPRESS_CHUNK = "xspress_meta_chunk";
const std::string META_XSPRESS_SCALARS = "xspress_scalars";
const std::string META_XSPRESS_DTC = "xspress_dtc";
const std::string META_XSPRESS_INP_EST = "xspress_inp_est";
const std::string META_XSPRESS_SUM = "xspress_sum";

XspressMemoryBlock::XspressMemoryBlock() :
ptr_(0),
Expand Down Expand Up @@ -140,6 +141,7 @@ XspressProcessPlugin::XspressProcessPlugin() :
scalar_memblock_(0),
dtc_memblock_(0),
inp_est_memblock_(0),
sum_memblock_(0),
num_scalars_recorded_(0),
offset(0)
{
Expand Down Expand Up @@ -344,6 +346,26 @@ void XspressProcessPlugin::setup_memory_allocation()
free(inp_est_memblock_);
}
inp_est_memblock_ = malloc(sizeof(double) * this->frames_per_block_ * num_channels_);

if (sum_memblock_)
{
free(sum_memblock_);
}
sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_);
}

void XspressProcessPlugin::calculate_sum(boost::shared_ptr<Frame> frame)
{
double sum_value = 0;
const uint32_t *data = static_cast<const uint32_t *>(frame->get_image_ptr());
size_t elements_count = frame->get_image_size() / sizeof(uint32_t);
for (size_t pixel_index = 0; pixel_index < elements_count; pixel_index++)
{
sum_value += data[pixel_index];
}
double *sum_ptr = (double *)sum_memblock_;
sum_ptr += num_scalars_recorded_;
memcpy(sum_ptr, &sum_value, sizeof(double));
}

void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
Expand Down Expand Up @@ -414,19 +436,6 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
double *dest_inp_est_ptr = (double *)inp_est_memblock_;
dest_inp_est_ptr += (num_inp_est * num_scalars_recorded_);
memcpy(dest_inp_est_ptr, inp_est_ptr, num_inp_est * sizeof(double));
num_scalars_recorded_ += 1;

// Calculate the elapsed time since we last posted meta data
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
int32_t elapsed_time = (now - last_scalar_send_time_).total_milliseconds();

// Send scalars to be writen once we reach the desired number of frames.
// Number has to be the same as the number of MCA frames for live processing reasons
if ((num_scalars_recorded_ == this->frames_per_block_))
{
send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels);
last_scalar_send_time_ = now;
}

char *mca_ptr = frame_bytes;
mca_ptr += (sizeof(FrameHeader) +
Expand All @@ -436,6 +445,9 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
);

// Create the live view frame and push it
// I'm using this live frame as a way to calculate the sum metadata
// That's why it needs to be moved up here at the moment, as I don't want to have to redo the work of creating
// a frame without the scalars metadata
dimensions_t live_dims;
live_dims.push_back(num_channels_);
live_dims.push_back(header->num_aux);
Expand All @@ -447,6 +459,17 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
live_frame->set_outer_chunk_size(1);
// Push out the live MCA data to the live view plugin only
this->push(live_view_name_, live_frame);
calculate_sum(live_frame);

num_scalars_recorded_ += 1;

// Send scalars to be writen once we reach the desired number of frames.
// Number has to be the same as the number of MCA frames for live processing reasons
if ((num_scalars_recorded_ == this->frames_per_block_))
{
send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels);
}

LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameId = " << frame_id);
for (int index = 0; index < num_channels_; index++){
memory_ptrs_[index]->add_frame(frame_id, mca_ptr);
Expand Down Expand Up @@ -578,6 +601,12 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
num_inp_est * num_scalars_recorded_ * sizeof(double),
buffer.GetString());

this->publish_meta(META_NAME,
META_XSPRESS_SUM,
sum_memblock_,
num_scalars_recorded_ * sizeof(double),
buffer.GetString());

num_scalars_recorded_ = 0;
}
}
51 changes: 51 additions & 0 deletions python/src/xspress_detector/data/xspress_meta_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
XSPRESS_DTC = "xspress_dtc"
XSPRESS_INP_EST = "xspress_inp_est"
XSPRESS_CHUNK = "xspress_meta_chunk"
XSPRESS_SUM = "xspress_sum"

# Number of scalars per channel
XSPRESS_SCALARS_PER_CHANNEL = 9
Expand All @@ -34,6 +35,7 @@
DATASET_INP_EST = "inp_est"
DATASET_DAQ_VERSION = "data_version"
DATASET_META_VERSION = "meta_version"
DATASET_SUM = "sum"


class XspressMetaWriter(MetaWriter):
Expand All @@ -60,6 +62,7 @@ def __init__(self, name, directory, endpoints, config):
}
self._dtc = {}
self._inp = {}
self._sum = {}
super(XspressMetaWriter, self).__init__(name, directory, endpoints, config)

self._series = None
Expand Down Expand Up @@ -106,6 +109,18 @@ def _define_detector_datasets(self):
block_size=self._chunk_size,
)
)
self._logger.info("Adding dataset: {}".format(DATASET_SUM))
dsets.append(
Int32HDF5Dataset(
DATASET_SUM,
shape=(self._num_frames,),
maxshape=(None,),
chunks=(self._chunk_size,),
rank=1,
cache=True,
block_size=self._chunk_size,
)
)
dsets.append(Int64HDF5Dataset(DATASET_DAQ_VERSION))
dsets.append(Int64HDF5Dataset(DATASET_META_VERSION))
return dsets
Expand All @@ -117,6 +132,7 @@ def detector_message_handlers(self):
XSPRESS_DTC: self.handle_xspress_dtc,
XSPRESS_INP_EST: self.handle_xspress_inp_est,
XSPRESS_CHUNK: self.handle_xspress_meta_chunk,
XSPRESS_SUM: self.handle_xspress_sum,
}

def handle_xspress_scalars(self, header, _data):
Expand Down Expand Up @@ -244,6 +260,7 @@ def add_inp_est_value(self, frame, channel, value):
if self._inp[frame]['qty'] == self._num_channels:
self._add_value(DATASET_INP_EST, self._inp[frame]['values'], offset=frame)
del self._inp[frame]

def handle_xspress_meta_chunk(self, header, _data):
self._chunk_size = int(_data)
if header["frame_id"] == -1 and self._configured == 0:
Expand All @@ -255,6 +272,40 @@ def handle_xspress_meta_chunk(self, header, _data):
for dset in meta_dsets:
self._datasets[dset.name] = dset

def handle_xspress_sum(self, header, _data):
self._logger.debug("%s | Handling xspress sum message", self._name)
# Extract the channel number from the header
channel = header['channel_index']
# Extract Number of channels
number_of_channels = header['number_of_channels']
# Number of frames
number_of_frames = header['number_of_frames']
# Frame ID
frame_id = header['frame_id']


format_str = '{}d'.format(number_of_frames)
array = struct.unpack(format_str, _data)
pairs = self._num_channels / number_of_channels
for frame in range(number_of_frames):
current_frame_id = frame_id + frame
self.add_sum_value(current_frame_id, pairs, array[frame])

def add_sum_value(self, frame, pairs, value):
# Check if we need to create an entry for this frame
if frame not in self._sum:
self._sum[frame] = {
'qty': 0,
'values': 0
}

self._sum[frame]['values'] += value
self._sum[frame]['qty'] += 1

if self._sum[frame]['qty'] == int(pairs):
self._add_value(DATASET_SUM, self._sum[frame]['values'], offset=frame)
del self._sum[frame]

@staticmethod
def get_version():
return (
Expand Down

0 comments on commit b482ae9

Please sign in to comment.