From 3bf51547b23275a72d02dd85d18ca65b696a81aa Mon Sep 17 00:00:00 2001 From: DeborahOoi96 Date: Fri, 19 Jan 2024 18:20:51 +0800 Subject: [PATCH] Address Event Callbacks Contain Unusable Parameters Issue --- generated/nidaqmx/task.py | 44 ++++++++++++++++++++ src/handwritten/task.py | 44 ++++++++++++++++++++ tests/_event_utils.py | 38 +++++++++++++++++ tests/component/test_task_events.py | 63 +++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+) diff --git a/generated/nidaqmx/task.py b/generated/nidaqmx/task.py index 4534e387..4421921a 100644 --- a/generated/nidaqmx/task.py +++ b/generated/nidaqmx/task.py @@ -760,6 +760,46 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, return data.tolist() + def add_arguments_if_necessary(self, callback_function, expected_number_of_arguments): + if (callback_function.__code__.co_argcount < expected_number_of_arguments): + print("Not enough arguments! Adding arguments") + if ("every_n_samples_event_type" in parameters): + result = self.n_samples_event_wrapper(parameters, callback_function) + elif ("signal_type" in parameters): + result = self.register_signal_event_wrapper(parameters, callback_function) + elif ("status" in parameters): + result = self.register_done_event_wrapper(parameters, callback_function) + else: + result = callback_function + return result + + def n_samples_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples) + elif ("task_handle" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(task_handle, every_n_samples_event_type, number_of_samples) + return result + + def register_signal_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(signal_type) + elif ("task_handle" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(signal_type, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(task_handle, signal_type) + return result + + def register_done_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(status) + elif ("task_handle" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(status, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(task_handle, status) + return result + def register_done_event(self, callback_method): """ Registers a callback function to receive an event when a task stops due @@ -788,6 +828,7 @@ def register_done_event(self, callback_method): function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 3) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.DONE_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_done_event(self._handle, 0, callback_method, None) @@ -836,6 +877,7 @@ def register_every_n_samples_acquired_into_buffer_event( function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 4) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_ACQ_INTO_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( @@ -887,6 +929,7 @@ def register_every_n_samples_transferred_from_buffer_event( function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 4) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_TRANSFERRED_FROM_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( @@ -933,6 +976,7 @@ def register_signal_event(self, signal_type, callback_method): function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 3) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.SIGNAL_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_signal_event( diff --git a/src/handwritten/task.py b/src/handwritten/task.py index 4534e387..4421921a 100644 --- a/src/handwritten/task.py +++ b/src/handwritten/task.py @@ -760,6 +760,46 @@ def read(self, number_of_samples_per_channel=NUM_SAMPLES_UNSET, return data.tolist() + def add_arguments_if_necessary(self, callback_function, expected_number_of_arguments): + if (callback_function.__code__.co_argcount < expected_number_of_arguments): + print("Not enough arguments! Adding arguments") + if ("every_n_samples_event_type" in parameters): + result = self.n_samples_event_wrapper(parameters, callback_function) + elif ("signal_type" in parameters): + result = self.register_signal_event_wrapper(parameters, callback_function) + elif ("status" in parameters): + result = self.register_done_event_wrapper(parameters, callback_function) + else: + result = callback_function + return result + + def n_samples_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples) + elif ("task_handle" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(every_n_samples_event_type, number_of_samples, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, every_n_samples_event_type, number_of_samples, callback_data: callback_function(task_handle, every_n_samples_event_type, number_of_samples) + return result + + def register_signal_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(signal_type) + elif ("task_handle" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(signal_type, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, signal_type, callback_data: callback_function(task_handle, signal_type) + return result + + def register_done_event_wrapper(self, parameters, callback_function): + if ("task_handle" and "callback_data" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(status) + elif ("task_handle" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(status, callback_data) + elif ("callback_data" not in parameters): + result = lambda task_handle, status, callback_data: callback_function(task_handle, status) + return result + def register_done_event(self, callback_method): """ Registers a callback function to receive an event when a task stops due @@ -788,6 +828,7 @@ def register_done_event(self, callback_method): function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 3) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.DONE_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_done_event(self._handle, 0, callback_method, None) @@ -836,6 +877,7 @@ def register_every_n_samples_acquired_into_buffer_event( function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 4) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_ACQ_INTO_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( @@ -887,6 +929,7 @@ def register_every_n_samples_transferred_from_buffer_event( function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 4) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.EVERY_N_SAMPS_TRANSFERRED_FROM_BUFFER_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_every_n_samples_event( @@ -933,6 +976,7 @@ def register_signal_event(self, signal_type, callback_method): function. """ if callback_method is not None: + callback_method = self.add_arguments_if_necessary(callback_method, 3) # If the event is already registered, the interpreter should raise DaqError with code # DAQmxErrors.SIGNAL_EVENT_ALREADY_REGISTERED. event_handler = self._interpreter.register_signal_event( diff --git a/tests/_event_utils.py b/tests/_event_utils.py index f6674162..cb31f85c 100644 --- a/tests/_event_utils.py +++ b/tests/_event_utils.py @@ -69,6 +69,17 @@ def handle_done_event(self, task_handle: object, status: int, callback_data: obj self._invoke_side_effect() return 0 +class DoneEventBasicParametersObserver(BaseEventObserver[DoneEvent]): + """An observer for Done events with only basic parameters in callback function.""" + + def handle_done_event(self, status: int) -> int: + """Handles a Done event.""" + with self._lock: + self._events.append(DoneEvent(status)) + self._event_semaphore.release() + self._invoke_side_effect() + return 0 + class EveryNSamplesEventObserver(BaseEventObserver[EveryNSamplesEvent]): """An observer for Every N Samples events.""" @@ -87,6 +98,20 @@ def handle_every_n_samples_event( self._invoke_side_effect() return 0 +class EveryNSamplesEventBasicParametersObserver(BaseEventObserver[EveryNSamplesEvent]): + """An observer for Every N Samples events with only basic parameters in callback function.""" + + def handle_every_n_samples_event( + self, + every_n_samples_event_type: int, + number_of_samples: int, + ) -> int: + """Handles an Every N Samples event.""" + with self._lock: + self._events.append(EveryNSamplesEvent(every_n_samples_event_type, number_of_samples)) + self._event_semaphore.release() + self._invoke_side_effect() + return 0 class SignalEventObserver(BaseEventObserver[SignalEvent]): """An observer for Signal events.""" @@ -100,3 +125,16 @@ def handle_signal_event( self._event_semaphore.release() self._invoke_side_effect() return 0 + +class SignalEventBasicParametersObserver(BaseEventObserver[SignalEvent]): + """An observer for Signal events with only basic parameters in callback function.""" + + def handle_signal_event( + self, signal_type: int + ) -> int: + """Handles a Signal event.""" + with self._lock: + self._events.append(SignalEvent(signal_type)) + self._event_semaphore.release() + self._invoke_side_effect() + return 0 \ No newline at end of file diff --git a/tests/component/test_task_events.py b/tests/component/test_task_events.py index 98b107cf..0f844061 100644 --- a/tests/component/test_task_events.py +++ b/tests/component/test_task_events.py @@ -12,8 +12,11 @@ from nidaqmx.task import _TaskEventType from tests._event_utils import ( DoneEventObserver, + DoneEventBasicParametersObserver, EveryNSamplesEventObserver, + EveryNSamplesEventBasicParametersObserver, SignalEventObserver, + SignalEventBasicParametersObserver, ) @@ -55,6 +58,24 @@ def test___done_event_registered___run_finite_acquisition___callback_invoked_onc assert all(e.status == 0 for e in event_observer.events) +def test___done_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_once_with_success_status( + ai_task: nidaqmx.Task, +) -> None: + event_observer = DoneEventBasicParametersObserver() + ai_task.register_done_event(event_observer.handle_done_event) + ai_task.timing.cfg_samp_clk_timing( + rate=10000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000 + ) + + ai_task.start() + + event_observer.wait_for_events() + with pytest.raises(TimeoutError): + event_observer.wait_for_events(timeout=100e-3) + assert len(event_observer.events) == 1 + assert all(e.status == 0 for e in event_observer.events) + + def test___every_n_samples_event_registered___run_finite_acquisition___callback_invoked_n_times_with_type_and_num_samples( ai_task: nidaqmx.Task, ) -> None: @@ -78,6 +99,29 @@ def test___every_n_samples_event_registered___run_finite_acquisition___callback_ ) assert all(e.number_of_samples == 100 for e in event_observer.events) +def test___every_n_samples_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_n_times_with_type_and_num_samples( + ai_task: nidaqmx.Task, +) -> None: + event_observer = EveryNSamplesEventBasicParametersObserver() + ai_task.register_every_n_samples_acquired_into_buffer_event( + 100, event_observer.handle_every_n_samples_event + ) + ai_task.timing.cfg_samp_clk_timing( + rate=10000.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=1000 + ) + + ai_task.start() + + event_observer.wait_for_events(10) + with pytest.raises(TimeoutError): + event_observer.wait_for_events(timeout=100e-3) + assert len(event_observer.events) == 10 + assert all( + e.event_type == EveryNSamplesEventType.ACQUIRED_INTO_BUFFER.value + for e in event_observer.events + ) + assert all(e.number_of_samples == 100 for e in event_observer.events) + def test___signal_event_registered___run_finite_acquisition___callback_invoked_n_times_with_type( ai_task_with_real_device: nidaqmx.Task, @@ -98,6 +142,25 @@ def test___signal_event_registered___run_finite_acquisition___callback_invoked_n assert all(e.signal_type == Signal.SAMPLE_COMPLETE.value for e in event_observer.events) +def test___signal_event_registered___run_finite_acquisition___callback_invoked_with_basic_parameters_n_times_with_type( + ai_task_with_real_device: nidaqmx.Task, +) -> None: + ai_task = ai_task_with_real_device + event_observer = SignalEventBasicParametersObserver() + ai_task.register_signal_event(Signal.SAMPLE_COMPLETE, event_observer.handle_signal_event) + ai_task.timing.cfg_samp_clk_timing( + rate=10.0, sample_mode=AcquisitionType.FINITE, samps_per_chan=10 + ) + + ai_task.start() + + event_observer.wait_for_events(10) + with pytest.raises(TimeoutError): + event_observer.wait_for_events(timeout=100e-3) + assert len(event_observer.events) == 10 + assert all(e.signal_type == Signal.SAMPLE_COMPLETE.value for e in event_observer.events) + + @pytest.mark.grpc_xfail( reason="Requires NI gRPC Device Server version 2.2 or later", raises=RpcError )