Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sum dataset #2

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/data/frameProcessor/include/XspressProcessPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class XspressMemoryBlock
void *scalar_memblock_;
void *dtc_memblock_;
void *inp_est_memblock_;
void *sum_memblock_;
/** Memory block for sum value */
void calculate_sum(boost::shared_ptr<Frame> frame);
/** Number of scalars recorded */
uint32_t num_scalars_recorded_;

Expand Down
80 changes: 59 additions & 21 deletions cpp/data/frameProcessor/src/XspressProcessPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@

namespace FrameProcessor {

const std::string XspressProcessPlugin::CONFIG_ACQ_ID = "acq_id";
const std::string XspressProcessPlugin::CONFIG_ACQ_ID = "acq_id";

const std::string XspressProcessPlugin::CONFIG_PROCESS = "process";
const std::string XspressProcessPlugin::CONFIG_PROCESS_NUMBER = "number";
const std::string XspressProcessPlugin::CONFIG_PROCESS_RANK = "rank";
const std::string XspressProcessPlugin::CONFIG_PROCESS = "process";
const std::string XspressProcessPlugin::CONFIG_PROCESS_NUMBER = "number";
const std::string XspressProcessPlugin::CONFIG_PROCESS_RANK = "rank";

const std::string XspressProcessPlugin::CONFIG_LIVE_VIEW_NAME = "live_view";
const std::string XspressProcessPlugin::CONFIG_LIVE_VIEW_NAME = "live_view";

const std::string XspressProcessPlugin::CONFIG_FRAMES = "frames";
const std::string XspressProcessPlugin::CONFIG_DTC_FLAGS = "dtc/flags";
const std::string XspressProcessPlugin::CONFIG_DTC_PARAMS = "dtc/params";
const std::string XspressProcessPlugin::CONFIG_FRAMES = "frames";
const std::string XspressProcessPlugin::CONFIG_DTC_FLAGS = "dtc/flags";
const std::string XspressProcessPlugin::CONFIG_DTC_PARAMS = "dtc/params";

const std::string XspressProcessPlugin::CONFIG_CHUNK = "chunks";

Expand All @@ -36,6 +36,7 @@ const std::string META_XSPRESS_CHUNK = "xspress_meta_chunk";
const std::string META_XSPRESS_SCALARS = "xspress_scalars";
const std::string META_XSPRESS_DTC = "xspress_dtc";
const std::string META_XSPRESS_INP_EST = "xspress_inp_est";
const std::string META_XSPRESS_SUM = "xspress_sum";

XspressMemoryBlock::XspressMemoryBlock() :
ptr_(0),
Expand Down Expand Up @@ -140,6 +141,7 @@ XspressProcessPlugin::XspressProcessPlugin() :
scalar_memblock_(0),
dtc_memblock_(0),
inp_est_memblock_(0),
sum_memblock_(0),
num_scalars_recorded_(0),
offset(0)
{
Expand Down Expand Up @@ -345,6 +347,38 @@ void XspressProcessPlugin::setup_memory_allocation()
free(inp_est_memblock_);
}
inp_est_memblock_ = malloc(sizeof(double) * this->frames_per_block_ * num_channels_);

if (sum_memblock_)
{
free(sum_memblock_);
}
sum_memblock_ = malloc(sizeof(double) * this->frames_per_block_);
}


/**
* Calculate the sum for a frame
*
* This method will receive a frame, and sum each element of data on it.
* The data is stored in the `sum_memblock_` array (allocated in the `setup_memory_allocation method`)
* and each time this method is called it stores the result to this array at the offset `num_scalars_recorded_`, i.e.: when
* `num_scalars_recorded_` is 0 we store the sum to index 0, when `num_scalars_recorded_` is 1 we store the sum to index 1.
* The 'num_scalars_recorded_' variable is set back to zero each time the frame arrays are flushed.
*
* \param[in] frame - shared_ptr for a Frame class that contains the data for the acquisition on each configured channel.
*/
void XspressProcessPlugin::calculate_sum(boost::shared_ptr<Frame> frame)
{
double sum_value = 0;
const uint32_t *data = static_cast<const uint32_t *>(frame->get_image_ptr());
size_t elements_count = frame->get_image_size() / sizeof(uint32_t);
for (size_t pixel_index = 0; pixel_index < elements_count; pixel_index++)
{
sum_value += data[pixel_index];
}
double *sum_ptr = (double *)sum_memblock_;
sum_ptr += num_scalars_recorded_;
memcpy(sum_ptr, &sum_value, sizeof(double));
}

void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
Expand Down Expand Up @@ -415,19 +449,6 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
double *dest_inp_est_ptr = (double *)inp_est_memblock_;
dest_inp_est_ptr += (num_inp_est * num_scalars_recorded_);
memcpy(dest_inp_est_ptr, inp_est_ptr, num_inp_est * sizeof(double));
num_scalars_recorded_ += 1;

// Calculate the elapsed time since we last posted meta data
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
int32_t elapsed_time = (now - last_scalar_send_time_).total_milliseconds();

// Send scalars to be writen once we reach the desired number of frames.
// Number has to be the same as the number of MCA frames for live processing reasons
if ((num_scalars_recorded_ == this->frames_per_block_))
{
send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels);
last_scalar_send_time_ = now;
}

char *mca_ptr = frame_bytes;
mca_ptr += (sizeof(FrameHeader) +
Expand All @@ -448,6 +469,17 @@ void XspressProcessPlugin::process_frame(boost::shared_ptr <Frame> frame)
live_frame->set_outer_chunk_size(1);
// Push out the live MCA data to the live view plugin only
this->push(live_view_name_, live_frame);
calculate_sum(live_frame);

num_scalars_recorded_ += 1;

// Send scalars to be writen once we reach the desired number of frames.
// Number has to be the same as the number of MCA frames for live processing reasons
if ((num_scalars_recorded_ == this->frames_per_block_))
{
send_scalars(frame_id, header->num_scalars, header->first_channel, header->num_channels);
}

LOG4CXX_DEBUG_LEVEL(1, logger_, "FrameId = " << frame_id);
for (int index = 0; index < num_channels_; index++){
memory_ptrs_[index]->add_frame(frame_id, mca_ptr);
Expand Down Expand Up @@ -579,6 +611,12 @@ void XspressProcessPlugin::send_scalars(uint32_t last_frame_id, uint32_t num_sca
num_inp_est * num_scalars_recorded_ * sizeof(double),
buffer.GetString());

this->publish_meta(META_NAME,
META_XSPRESS_SUM,
sum_memblock_,
num_scalars_recorded_ * sizeof(double),
buffer.GetString());

num_scalars_recorded_ = 0;
}
}
51 changes: 51 additions & 0 deletions python/src/xspress_detector/data/xspress_meta_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
XSPRESS_DTC = "xspress_dtc"
XSPRESS_INP_EST = "xspress_inp_est"
XSPRESS_CHUNK = "xspress_meta_chunk"
XSPRESS_SUM = "xspress_sum"

# Number of scalars per channel
XSPRESS_SCALARS_PER_CHANNEL = 9
Expand All @@ -34,6 +35,7 @@
DATASET_INP_EST = "inp_est"
DATASET_DAQ_VERSION = "data_version"
DATASET_META_VERSION = "meta_version"
DATASET_SUM = "sum"


class XspressMetaWriter(MetaWriter):
Expand All @@ -60,6 +62,7 @@ def __init__(self, name, directory, endpoints, config):
}
self._dtc = {}
self._inp = {}
self._sum = {}
super(XspressMetaWriter, self).__init__(name, directory, endpoints, config)

self._series = None
Expand Down Expand Up @@ -106,6 +109,18 @@ def _define_detector_datasets(self):
block_size=self._chunk_size,
)
)
self._logger.info("Adding dataset: {}".format(DATASET_SUM))
dsets.append(
Int32HDF5Dataset(
DATASET_SUM,
shape=(self._num_frames,),
maxshape=(None,),
chunks=(self._chunk_size,),
rank=1,
cache=True,
block_size=self._chunk_size,
)
)
dsets.append(Int64HDF5Dataset(DATASET_DAQ_VERSION))
dsets.append(Int64HDF5Dataset(DATASET_META_VERSION))
return dsets
Expand All @@ -117,6 +132,7 @@ def detector_message_handlers(self):
XSPRESS_DTC: self.handle_xspress_dtc,
XSPRESS_INP_EST: self.handle_xspress_inp_est,
XSPRESS_CHUNK: self.handle_xspress_meta_chunk,
XSPRESS_SUM: self.handle_xspress_sum,
}

def handle_xspress_scalars(self, header, _data):
Expand Down Expand Up @@ -244,6 +260,7 @@ def add_inp_est_value(self, frame, channel, value):
if self._inp[frame]['qty'] == self._num_channels:
self._add_value(DATASET_INP_EST, self._inp[frame]['values'], offset=frame)
del self._inp[frame]

def handle_xspress_meta_chunk(self, header, _data):
self._chunk_size = int(_data)
if header["frame_id"] == -1 and self._configured == 0:
Expand All @@ -255,6 +272,40 @@ def handle_xspress_meta_chunk(self, header, _data):
for dset in meta_dsets:
self._datasets[dset.name] = dset

def handle_xspress_sum(self, header, _data):
self._logger.debug("%s | Handling xspress sum message", self._name)
# Extract the channel number from the header
channel = header['channel_index']
# Extract Number of channels
number_of_channels = header['number_of_channels']
# Number of frames
number_of_frames = header['number_of_frames']
# Frame ID
frame_id = header['frame_id']


format_str = '{}d'.format(number_of_frames)
array = struct.unpack(format_str, _data)
pairs = self._num_channels / number_of_channels
for frame in range(number_of_frames):
current_frame_id = frame_id + frame
self.add_sum_value(current_frame_id, pairs, array[frame])

def add_sum_value(self, frame, pairs, value):
# Check if we need to create an entry for this frame
if frame not in self._sum:
self._sum[frame] = {
'qty': 0,
'values': 0
}

self._sum[frame]['values'] += value
self._sum[frame]['qty'] += 1

if self._sum[frame]['qty'] == int(pairs):
self._add_value(DATASET_SUM, self._sum[frame]['values'], offset=frame)
del self._sum[frame]

@staticmethod
def get_version():
return (
Expand Down
Loading