diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d8fe4fa --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.project diff --git a/blocks/BandPassComplexNumber.mon b/blocks/BandPassComplexNumber.mon new file mode 100644 index 0000000..b9e8e47 --- /dev/null +++ b/blocks/BandPassComplexNumber.mon @@ -0,0 +1,85 @@ +//***************************************************************************** +// Title: ComplexType +// +// Copyright (c) 2015-2017 Software AG, Darmstadt, Germany and/or its licensors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//***************************************************************************** + +package com.industry.analytics; + +/** + * This event provides functionality to support complex number + * (numbers with real and imaginary components) arithmetic. + * This is currently used by the Fast Fourier Transformation + * calculations. + */ +event ComplexType { + decimal real; + decimal imaginary; + + static action init( decimal r, decimal i ) returns ComplexType { + ComplexType ret := new ComplexType; + ret.real := r; + ret.imaginary := i; + return ret; + } + + static action initFromSequence( sequence input ) returns sequence { + sequence cOutput := []; + + decimal currIn; + for currIn in input { + cOutput.append( ComplexType.init( currIn, 0.0d ) ); + } + return cOutput; + } + + action cexp() returns ComplexType { + decimal ex := self.real.exp(); + return ComplexType( ex * self.imaginary.cos(), ex * self.imaginary.sin() ); + } + + action add( ComplexType b ) returns ComplexType { + return ComplexType( self.real + b.real, self.imaginary + b.imaginary ); + } + + action subtract( ComplexType b ) returns ComplexType { + return ComplexType( self.real - b.real, self.imaginary - b.imaginary ); + } + + action multiply( ComplexType b ) returns ComplexType { + return ComplexType( self.real * b.real - self.imaginary * b.imaginary, + self.real * b.imaginary + self.imaginary * b.real); + } + + action abs() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ).sqrt(); + } + + action power() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ); + } + + action _toSimpleString() returns string { + return "("+real.toString()+","+imaginary.toString()+")"; + } + action _toString() returns string { + if( imaginary >= 0.0d ) then { + return real.toString()+" +"+imaginary.toString()+"i"; + } else { + return real.toString()+" "+imaginary.toString()+"i"; + } + } +} diff --git a/blocks/BandPassEventsWindowContents.mon b/blocks/BandPassEventsWindowContents.mon new file mode 100644 index 0000000..d5216e9 --- /dev/null +++ b/blocks/BandPassEventsWindowContents.mon @@ -0,0 +1,36 @@ +/* + * $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ + * This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 + * + */ +/* ***DISCLAIMER*** + * + * This is only a sample block and there is no support for this block. This block only supports English. There may be incompatible changes in the future releases without prior notice. + * To use this block, we recommend that you copy it and change the package name. Software AG accepts no responsibility for bug fixes, maintenance or adding new features to this block. + */ + +package apamax.analyticsbuilder.samples; + +/** + * Window contents. + * + * This is a single value in the sequence<WindowContents> property timeWindow provided by the TimeWindow block. + */ +event WindowContents { + /** Value. + * + * While typically a float, this can be any type that is valid for an Analytics Builder wire. + */ + any value; + /** Time of this data point. + * + * The timestamp of this data point, in standard Apama form (i.e. seconds since the Unix epoch) + */ + float timestamp; + /** Property name for timeWindow. + * + * For convenience, the property on the Value on which that timeWindow is stored. The value of this property should be + * sequence<WindowContents> + */ + constant string WINDOW_PROPERTY_NAME := "timeWindow"; // value will be sequence +} diff --git a/blocks/BandPassFFT.mon b/blocks/BandPassFFT.mon new file mode 100644 index 0000000..aadaa94 --- /dev/null +++ b/blocks/BandPassFFT.mon @@ -0,0 +1,227 @@ + +package com.industry.analytics; + +event AmplitudeFrequency { + integer frequency; + decimal amplitude; +} + +/** + * This event provides functionality to calculate + * Fast-Fourier Transformations using the Cooley-Tukey algorithm. + * Fourier analysis converts data from a time-domain to a representation + * in the frequency domain (and vice versa). + */ +event FFT { + + /** This action calculates the inverse FFT */ + static action ifft( sequence amplitudes ) returns sequence { + integer N := amplitudes.size(); + decimal iN := 1.0d / N.toDecimal(); + + // Conjugate if imaginary part is not 0 + integer i := 0; + ComplexType currVal; + for currVal in amplitudes { + amplitudes[i].imaginary := -currVal.imaginary; + i := i + 1; + } + + // Apply fourier transform + amplitudes := cfft( amplitudes ); + + while i < N { + // Conjugate again + amplitudes[i].imaginary := -amplitudes[i].imaginary; + // Scale + amplitudes[i].real := amplitudes[i].real * iN; + amplitudes[i].imaginary := amplitudes[i].imaginary * iN; + + i := i + 1; + } + return amplitudes; + } + + /** Calculate the FFT based on simple numbers */ + static action fft( sequence buffer ) returns sequence { + + // Create the set of complex numbers from the buffer + return cfft( ComplexType.initFromSequence( buffer ) ); + } + + /** Calculate the FFT based on complex numbers */ + static action cfft( sequence buffer ) returns sequence { + + integer N := buffer.size(); + if( N <= 1 ) then { + return buffer; + } + + integer hN := N / 2; + + sequence even := []; + sequence odd := []; + + // Set the size of each sequence to be half of the total + even.setSize( hN ); + odd.setSize( hN ); + + // Divide data + integer i := 0; + while i < hN { + even[i] := buffer[i*2]; + odd[i] := buffer[i*2+1]; + + // Increment the index + i := i + 1; + } + + // Analyze + even := cfft( even ); + odd := cfft( odd ); + + // Calculate this upfront for performance + decimal a := -2.0d * decimal.PI; + + // Combine results + integer k := 0; + while k < hN { + decimal p := (k / N).toDecimal(); + decimal term := a * k.toDecimal() / N.toDecimal(); + ComplexType t := ComplexType.init(0.0d, term ).cexp().multiply( odd[k] ); + + buffer[k] := even[ k ].add( t ); + odd[k] := buffer[ k ]; + buffer[k + hN] := even[ k ].subtract( t ); + even[k] := buffer[ k + hN ]; + + // Increment the index + k := k + 1; + } + return buffer; + } + + /** Get the set of amplitude values from a previously calculated set of FFT results */ + static action getAmplitudes( sequence fftResult ) returns sequence { + sequence magnitude := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the magnitude sequence for the power spectrum + magnitude.append( 2.0d * (fftResult[i].abs() / fftSizeAsDecimal ) ); + + // Increment the index + i := i + 1; + } + return magnitude; + } + + /** Get the set of frequency values from a previously + * calculated set of FFT results */ + static action getFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + sequence frequencies := []; + integer fftSize := fftResult.size(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the frequencies sequence + frequencies.append( (i*sampleSize) / fftSize ); + + // Increment the index + i := i + 1; + } + return frequencies; + } + + /** Get the set of amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getAmplitudesAndFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + AmplitudeFrequency af := new AmplitudeFrequency; + + // Calculate the frequency + af.frequency := (i*sampleSize) / fftSize; + + // Calculate the amplitude + af.amplitude := 2.0d * ( fftResult[i].abs() / fftSizeAsDecimal ); + + // Append the tuple to the sequence + ret.append( af ); + + // Increment the index + i := i + 1; + } + return ret; + } + + /** Get the defined number of highest amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getTopNAmplitudesAndFrequencies( integer sampleSize, sequence fftResult, + integer numToRet ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + sequence amplitudes := getAmplitudes( fftResult ); + + // Sort in descending order of amplitudes + sequence sortedAmps := amplitudes.clone(); + sortedAmps.sort(); + sortedAmps.reverse(); + sortedAmps.setSize( numToRet ); // Truncate the sequence by the number we want to return + + integer fftSize := fftResult.size(); + + // Iterate over the values to get the associated frequencies + decimal currAmplitude; + for currAmplitude in sortedAmps { + + integer i := amplitudes.indexOf( currAmplitude ); + if( i != -1 ) then { + // Calculate the frequency + AmplitudeFrequency af := new AmplitudeFrequency; + af.frequency := (i*sampleSize) / fftSize; + af.amplitude := currAmplitude; + + // Append the tuple to the sequence + ret.append( af ); + } + } + + return ret; + } + + /** Get the frequency with the highest amplitude from + * a previously calculated set of FFT results */ + static action getLargestFrequency( integer sampleSize, sequence fftResult ) returns integer { + // Get the amplitudes + sequence magnitudes := getAmplitudes( fftResult ); + + // Find largest peak in power spectrum + decimal max_magnitude := -decimal.INFINITY; + integer max_index := -1; + integer i := 0; + + decimal currVal; + for currVal in magnitudes { + if( currVal > max_magnitude ) then { + max_magnitude := currVal; + max_index := i; + } + // Increment the index + i := i + 1; + } + + // Convert index of largest peak to frequency + return (max_index * sampleSize / fftResult.size()); + } +} diff --git a/blocks/BandStopComplexNumber.mon b/blocks/BandStopComplexNumber.mon new file mode 100644 index 0000000..b9e8e47 --- /dev/null +++ b/blocks/BandStopComplexNumber.mon @@ -0,0 +1,85 @@ +//***************************************************************************** +// Title: ComplexType +// +// Copyright (c) 2015-2017 Software AG, Darmstadt, Germany and/or its licensors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//***************************************************************************** + +package com.industry.analytics; + +/** + * This event provides functionality to support complex number + * (numbers with real and imaginary components) arithmetic. + * This is currently used by the Fast Fourier Transformation + * calculations. + */ +event ComplexType { + decimal real; + decimal imaginary; + + static action init( decimal r, decimal i ) returns ComplexType { + ComplexType ret := new ComplexType; + ret.real := r; + ret.imaginary := i; + return ret; + } + + static action initFromSequence( sequence input ) returns sequence { + sequence cOutput := []; + + decimal currIn; + for currIn in input { + cOutput.append( ComplexType.init( currIn, 0.0d ) ); + } + return cOutput; + } + + action cexp() returns ComplexType { + decimal ex := self.real.exp(); + return ComplexType( ex * self.imaginary.cos(), ex * self.imaginary.sin() ); + } + + action add( ComplexType b ) returns ComplexType { + return ComplexType( self.real + b.real, self.imaginary + b.imaginary ); + } + + action subtract( ComplexType b ) returns ComplexType { + return ComplexType( self.real - b.real, self.imaginary - b.imaginary ); + } + + action multiply( ComplexType b ) returns ComplexType { + return ComplexType( self.real * b.real - self.imaginary * b.imaginary, + self.real * b.imaginary + self.imaginary * b.real); + } + + action abs() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ).sqrt(); + } + + action power() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ); + } + + action _toSimpleString() returns string { + return "("+real.toString()+","+imaginary.toString()+")"; + } + action _toString() returns string { + if( imaginary >= 0.0d ) then { + return real.toString()+" +"+imaginary.toString()+"i"; + } else { + return real.toString()+" "+imaginary.toString()+"i"; + } + } +} diff --git a/blocks/BandStopEventsWindowContents.mon b/blocks/BandStopEventsWindowContents.mon new file mode 100644 index 0000000..d5216e9 --- /dev/null +++ b/blocks/BandStopEventsWindowContents.mon @@ -0,0 +1,36 @@ +/* + * $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ + * This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 + * + */ +/* ***DISCLAIMER*** + * + * This is only a sample block and there is no support for this block. This block only supports English. There may be incompatible changes in the future releases without prior notice. + * To use this block, we recommend that you copy it and change the package name. Software AG accepts no responsibility for bug fixes, maintenance or adding new features to this block. + */ + +package apamax.analyticsbuilder.samples; + +/** + * Window contents. + * + * This is a single value in the sequence<WindowContents> property timeWindow provided by the TimeWindow block. + */ +event WindowContents { + /** Value. + * + * While typically a float, this can be any type that is valid for an Analytics Builder wire. + */ + any value; + /** Time of this data point. + * + * The timestamp of this data point, in standard Apama form (i.e. seconds since the Unix epoch) + */ + float timestamp; + /** Property name for timeWindow. + * + * For convenience, the property on the Value on which that timeWindow is stored. The value of this property should be + * sequence<WindowContents> + */ + constant string WINDOW_PROPERTY_NAME := "timeWindow"; // value will be sequence +} diff --git a/blocks/BandStopFFT.mon b/blocks/BandStopFFT.mon new file mode 100644 index 0000000..aadaa94 --- /dev/null +++ b/blocks/BandStopFFT.mon @@ -0,0 +1,227 @@ + +package com.industry.analytics; + +event AmplitudeFrequency { + integer frequency; + decimal amplitude; +} + +/** + * This event provides functionality to calculate + * Fast-Fourier Transformations using the Cooley-Tukey algorithm. + * Fourier analysis converts data from a time-domain to a representation + * in the frequency domain (and vice versa). + */ +event FFT { + + /** This action calculates the inverse FFT */ + static action ifft( sequence amplitudes ) returns sequence { + integer N := amplitudes.size(); + decimal iN := 1.0d / N.toDecimal(); + + // Conjugate if imaginary part is not 0 + integer i := 0; + ComplexType currVal; + for currVal in amplitudes { + amplitudes[i].imaginary := -currVal.imaginary; + i := i + 1; + } + + // Apply fourier transform + amplitudes := cfft( amplitudes ); + + while i < N { + // Conjugate again + amplitudes[i].imaginary := -amplitudes[i].imaginary; + // Scale + amplitudes[i].real := amplitudes[i].real * iN; + amplitudes[i].imaginary := amplitudes[i].imaginary * iN; + + i := i + 1; + } + return amplitudes; + } + + /** Calculate the FFT based on simple numbers */ + static action fft( sequence buffer ) returns sequence { + + // Create the set of complex numbers from the buffer + return cfft( ComplexType.initFromSequence( buffer ) ); + } + + /** Calculate the FFT based on complex numbers */ + static action cfft( sequence buffer ) returns sequence { + + integer N := buffer.size(); + if( N <= 1 ) then { + return buffer; + } + + integer hN := N / 2; + + sequence even := []; + sequence odd := []; + + // Set the size of each sequence to be half of the total + even.setSize( hN ); + odd.setSize( hN ); + + // Divide data + integer i := 0; + while i < hN { + even[i] := buffer[i*2]; + odd[i] := buffer[i*2+1]; + + // Increment the index + i := i + 1; + } + + // Analyze + even := cfft( even ); + odd := cfft( odd ); + + // Calculate this upfront for performance + decimal a := -2.0d * decimal.PI; + + // Combine results + integer k := 0; + while k < hN { + decimal p := (k / N).toDecimal(); + decimal term := a * k.toDecimal() / N.toDecimal(); + ComplexType t := ComplexType.init(0.0d, term ).cexp().multiply( odd[k] ); + + buffer[k] := even[ k ].add( t ); + odd[k] := buffer[ k ]; + buffer[k + hN] := even[ k ].subtract( t ); + even[k] := buffer[ k + hN ]; + + // Increment the index + k := k + 1; + } + return buffer; + } + + /** Get the set of amplitude values from a previously calculated set of FFT results */ + static action getAmplitudes( sequence fftResult ) returns sequence { + sequence magnitude := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the magnitude sequence for the power spectrum + magnitude.append( 2.0d * (fftResult[i].abs() / fftSizeAsDecimal ) ); + + // Increment the index + i := i + 1; + } + return magnitude; + } + + /** Get the set of frequency values from a previously + * calculated set of FFT results */ + static action getFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + sequence frequencies := []; + integer fftSize := fftResult.size(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the frequencies sequence + frequencies.append( (i*sampleSize) / fftSize ); + + // Increment the index + i := i + 1; + } + return frequencies; + } + + /** Get the set of amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getAmplitudesAndFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + AmplitudeFrequency af := new AmplitudeFrequency; + + // Calculate the frequency + af.frequency := (i*sampleSize) / fftSize; + + // Calculate the amplitude + af.amplitude := 2.0d * ( fftResult[i].abs() / fftSizeAsDecimal ); + + // Append the tuple to the sequence + ret.append( af ); + + // Increment the index + i := i + 1; + } + return ret; + } + + /** Get the defined number of highest amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getTopNAmplitudesAndFrequencies( integer sampleSize, sequence fftResult, + integer numToRet ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + sequence amplitudes := getAmplitudes( fftResult ); + + // Sort in descending order of amplitudes + sequence sortedAmps := amplitudes.clone(); + sortedAmps.sort(); + sortedAmps.reverse(); + sortedAmps.setSize( numToRet ); // Truncate the sequence by the number we want to return + + integer fftSize := fftResult.size(); + + // Iterate over the values to get the associated frequencies + decimal currAmplitude; + for currAmplitude in sortedAmps { + + integer i := amplitudes.indexOf( currAmplitude ); + if( i != -1 ) then { + // Calculate the frequency + AmplitudeFrequency af := new AmplitudeFrequency; + af.frequency := (i*sampleSize) / fftSize; + af.amplitude := currAmplitude; + + // Append the tuple to the sequence + ret.append( af ); + } + } + + return ret; + } + + /** Get the frequency with the highest amplitude from + * a previously calculated set of FFT results */ + static action getLargestFrequency( integer sampleSize, sequence fftResult ) returns integer { + // Get the amplitudes + sequence magnitudes := getAmplitudes( fftResult ); + + // Find largest peak in power spectrum + decimal max_magnitude := -decimal.INFINITY; + integer max_index := -1; + integer i := 0; + + decimal currVal; + for currVal in magnitudes { + if( currVal > max_magnitude ) then { + max_magnitude := currVal; + max_index := i; + } + // Increment the index + i := i + 1; + } + + // Convert index of largest peak to frequency + return (max_index * sampleSize / fftResult.size()); + } +} diff --git a/blocks/BandpassFilter.mon b/blocks/BandpassFilter.mon new file mode 100644 index 0000000..9508f09 --- /dev/null +++ b/blocks/BandpassFilter.mon @@ -0,0 +1,167 @@ +/* + * $Copyright (c) 2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ + * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG + */ +package apamax.analyticsbuilder.custom; + +using apama.analyticsbuilder.BlockBase; +using apama.analyticsbuilder.Activation; +using com.apama.json.JSONPlugin; +using com.apama.exceptions.Exception; +using com.apama.util.AnyExtractor; +using apama.analyticsbuilder.L10N; +using apamax.analyticsbuilder.samples.WindowContents; +using com.industry.analytics.FFT; +using com.industry.analytics.ComplexType; +using apama.analyticsbuilder.Value; + +/** The parameters for the BandpassFilter block. */ +event BandpassFilter_$Parameters{ + + /** + ** Upper Passband Frequency + * + * Only frequencies below the upper passband might be in the output. + * This is a float, must be a finite value. + */ + float upper; + + /** + ** Lower Passband Frequency + * + * Only frequencies above the lower passband might be in the output. + * This is a float, must be a finite value. + */ + float lower; + + /** + * Window Duration (secs) + * + * The amount of time in seconds for which signal values are to be kept in the window. + * This must be a finite and positive value. + */ + float windowDurationSec; + + action $validate() { + if (not upper.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_upper_value", [BlockBase.getL10N_param("upper",self),upper]); + } + if (not lower.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_lower_value", [BlockBase.getL10N_param("lower",self),lower]); + } + if (not windowDurationSec.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + if windowDurationSec < 0.0 { + throw L10N.getLocalizedException("fwk_param_positive_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + } +} + +/** State of the block.*/ +event BandpassFilter_$State{ + sequence contents; //we keep in memory the time-series values received as input. We need to associate the time to each value to work with the rolling window + float startTime; +} + + +/** +* Bandpass Filter +* +* Passes signals with a frequency within the lower and upper range and excludes signals with frequencies outside the range. +* It takes a signal (time-series of float value) as an input, buffer it until it reaches the window duration and then analyze the frequency of that signal. +* +* @$blockCategory Flow Manipulations +*/ +event BandpassFilter { + + /**BlockBase object. + * + * This is initialized by the framework when the block is required for a model. + */ + BlockBase $base; + + /** Parameters, filled in by the framework. */ + BandpassFilter_$Parameters $parameters; + constant string WINDOW_PROPERTY_NAME := "bandpassFilter"; + + float upperLimit; + float lowerLimit; + float periodSecs; + + /** Called once at block start up. */ + action $init() { + upperLimit := $parameters.upper; + lowerLimit := $parameters.lower; + periodSecs := $parameters.windowDurationSec; + } + + /** + * + * @param $activation The current activation, contextual information required when generating a block output. Blocks should only use the + * Activation object passed to them from the framework, never creating their own or holding on to an Activation object. + * + * @param $input_value float value to add to the time series. + * + * @$inputName value Value + */ + action $process(Activation $activation, float $input_value, BandpassFilter_$State $blockState) { + $base.createTimer(periodSecs, 0.0); + $blockState.contents.append(WindowContents($input_value, $activation.timestamp)); + } + + /** + * Called by the framework. Provide the blockstate specific to that partition, + * + * This block takes in time series values from which frequencies are then calculated via FFT algo and it filter all frequencies below a specific threshold (cutoff) + * + */ + action $timerTriggered(Activation $activation, BandpassFilter_$State $blockState) { + //if not input then no output + if $blockState.contents.size() = 0 {return;} + + sequence timeSeries := $blockState.contents; + + //calculate the start time of the window + $blockState.startTime := $activation.timestamp - periodSecs; + + //expire old entries (index 0) that are older than the start of the window. + while (timeSeries.size() > 0 and timeSeries[0].timestamp < $blockState.startTime) { + timeSeries.remove(0); + } + + sequence timeSeriesValue := new sequence; + WindowContents wc; + for wc in timeSeries { + timeSeriesValue.append(( wc.value).toDecimal()); + } + integer timeRange := (timeSeries[timeSeries.size()-1].timestamp - timeSeries[0].timestamp).floor(); + if timeRange > 0 { + //The sample size is the number of samples per-second that is used in the FFT calculation. + integer sampleSize := timeSeries.size() / timeRange; + sequence transformFF := FFT.fft(timeSeriesValue); + sequence frequencies := FFT.getFrequencies(sampleSize, transformFF); + sequence bandPass := new sequence; + integer f; + + for f in frequencies { + if f.toFloat() >= lowerLimit and f.toFloat() <= upperLimit { + bandPass.append(f.toFloat()); + } + } + Value v := new Value; + v.properties[BandpassFilter.WINDOW_PROPERTY_NAME] := bandPass; + v.value := true; //has to be set + $setOutput_filtered($activation, v); + } + } + + + /** + * Filtered + * + * Populated with frequencies lower than the cut-off. + */ + action $setOutput_filtered; // This is initialized by the framework. It sets the output of the block and may trigger any blocks connected to this output. + +} \ No newline at end of file diff --git a/blocks/BandstopFilter.mon b/blocks/BandstopFilter.mon new file mode 100644 index 0000000..8a4860f --- /dev/null +++ b/blocks/BandstopFilter.mon @@ -0,0 +1,165 @@ + +package apamax.analyticsbuilder.custom; + +using apama.analyticsbuilder.BlockBase; +using apama.analyticsbuilder.Activation; +using com.apama.json.JSONPlugin; +using com.apama.exceptions.Exception; +using com.apama.util.AnyExtractor; +using apama.analyticsbuilder.L10N; +using apamax.analyticsbuilder.samples.WindowContents; +using com.industry.analytics.FFT; +using com.industry.analytics.ComplexType; +using apama.analyticsbuilder.Value; + +/** The parameters for the BandstopFilter block. */ +event BandstopFilter_$Parameters{ + + /** + ** Upper Passband Frequency + * + * Only frequencies below the upper passband might be in the output. + * This is a float, must be a finite value. + */ + float upper; + + /** + ** Lower Passband Frequency + * + * Only frequencies above the lower passband might be in the output. + * This is a float, must be a finite value. + */ + float lower; + + /** + * Window Duration (secs) + * + * The amount of time in seconds for which signal values are to be kept in the window. + * This must be a finite and positive value. + */ + float windowDurationSec; + + action $validate() { + if (not upper.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_upper_value", [BlockBase.getL10N_param("upper",self),upper]); + } + if (not lower.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_lower_value", [BlockBase.getL10N_param("lower",self),lower]); + } + if (not windowDurationSec.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + if windowDurationSec < 0.0 { + throw L10N.getLocalizedException("fwk_param_positive_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + } +} + +/** State of the block.*/ +event BandstopFilter_$State{ + sequence contents; //we keep in memory the time-series values received as input. We need to associate the time to each value to work with the rolling window + float startTime; +} + + +/** +* Bandstop Filter +* +* Passes signals with a frequency outside the lower and upper range and excludes signals with frequencies inside the range. +* It takes a signal (time-series of float value) as an input, buffer it until it reaches the window duration and then analyze the frequency of that signal. +* +* @$blockCategory Flow Manipulations +*/ +event BandstopFilter { + + /**BlockBase object. + * + * This is initialized by the framework when the block is required for a model. + */ + BlockBase $base; + + /** Parameters, filled in by the framework. */ + BandstopFilter_$Parameters $parameters; + constant string WINDOW_PROPERTY_NAME := "bandstopFilter"; + + float upperLimit; + float lowerLimit; + float periodSecs; + + /** Called once at block start up. */ + action $init() { + upperLimit := $parameters.upper; + lowerLimit := $parameters.lower; + periodSecs := $parameters.windowDurationSec; + } + + /** + * + * @param $activation The current activation, contextual information required when generating a block output. Blocks should only use the + * Activation object passed to them from the framework, never creating their own or holding on to an Activation object. + * + * @param $input_value float value to add to the time series. + * + * @$inputName value Value + */ + action $process(Activation $activation, float $input_value, BandstopFilter_$State $blockState) { + $base.createTimer(periodSecs, 0.0); + $blockState.contents.append(WindowContents($input_value, $activation.timestamp)); + } + + + /** + * Called by the framework. Provide the blockstate specific to that partition, + * + * This block takes in time series values from which frequencies are then calculated via FFT algo and it filters all frequencies within a specific range. + * + */ + action $timerTriggered(Activation $activation, BandstopFilter_$State $blockState) { + //if not input then no output + if $blockState.contents.size() = 0 {return;} + + sequence timeSeries := $blockState.contents; + + //calculate the start time of the window + $blockState.startTime := $activation.timestamp - periodSecs; + + //expire old entries (index 0) that are older than the start of the window. + while (timeSeries.size() > 0 and timeSeries[0].timestamp < $blockState.startTime) { + timeSeries.remove(0); + } + + sequence timeSeriesValue := new sequence; + WindowContents wc; + for wc in timeSeries { + timeSeriesValue.append(( wc.value).toDecimal()); + } + integer timeRange := (timeSeries[timeSeries.size()-1].timestamp - timeSeries[0].timestamp).floor(); + if timeRange > 0 { + //The sample size is the number of samples per-second that is used in the FFT calculation. + integer sampleSize := timeSeries.size() / timeRange; + sequence transformFF := FFT.fft(timeSeriesValue); + sequence frequencies := FFT.getFrequencies(sampleSize, transformFF); + sequence bandStop := new sequence; + integer f; + + for f in frequencies { + if f.toFloat() < lowerLimit or f.toFloat() > upperLimit { + bandStop.append(f.toFloat()); + } + } + Value v := new Value; + v.properties[BandstopFilter.WINDOW_PROPERTY_NAME] := bandStop; + v.value := true; //has to be set + $setOutput_filtered($activation, v); + } + } + + + /** + * Filtered + * + * Populated with frequencies lower than the cut-off. + */ + action $setOutput_filtered; // This is initialized by the framework. It sets the output of the block and may trigger any blocks connected to this output. + +} \ No newline at end of file diff --git a/blocks/ComplexNumber.mon b/blocks/ComplexNumber.mon new file mode 100644 index 0000000..b9e8e47 --- /dev/null +++ b/blocks/ComplexNumber.mon @@ -0,0 +1,85 @@ +//***************************************************************************** +// Title: ComplexType +// +// Copyright (c) 2015-2017 Software AG, Darmstadt, Germany and/or its licensors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//***************************************************************************** + +package com.industry.analytics; + +/** + * This event provides functionality to support complex number + * (numbers with real and imaginary components) arithmetic. + * This is currently used by the Fast Fourier Transformation + * calculations. + */ +event ComplexType { + decimal real; + decimal imaginary; + + static action init( decimal r, decimal i ) returns ComplexType { + ComplexType ret := new ComplexType; + ret.real := r; + ret.imaginary := i; + return ret; + } + + static action initFromSequence( sequence input ) returns sequence { + sequence cOutput := []; + + decimal currIn; + for currIn in input { + cOutput.append( ComplexType.init( currIn, 0.0d ) ); + } + return cOutput; + } + + action cexp() returns ComplexType { + decimal ex := self.real.exp(); + return ComplexType( ex * self.imaginary.cos(), ex * self.imaginary.sin() ); + } + + action add( ComplexType b ) returns ComplexType { + return ComplexType( self.real + b.real, self.imaginary + b.imaginary ); + } + + action subtract( ComplexType b ) returns ComplexType { + return ComplexType( self.real - b.real, self.imaginary - b.imaginary ); + } + + action multiply( ComplexType b ) returns ComplexType { + return ComplexType( self.real * b.real - self.imaginary * b.imaginary, + self.real * b.imaginary + self.imaginary * b.real); + } + + action abs() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ).sqrt(); + } + + action power() returns decimal { + return ( ( self.real * self.real ) + ( self.imaginary * self.imaginary ) ); + } + + action _toSimpleString() returns string { + return "("+real.toString()+","+imaginary.toString()+")"; + } + action _toString() returns string { + if( imaginary >= 0.0d ) then { + return real.toString()+" +"+imaginary.toString()+"i"; + } else { + return real.toString()+" "+imaginary.toString()+"i"; + } + } +} diff --git a/blocks/EventsWindowContents.mon b/blocks/EventsWindowContents.mon new file mode 100644 index 0000000..d5216e9 --- /dev/null +++ b/blocks/EventsWindowContents.mon @@ -0,0 +1,36 @@ +/* + * $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ + * This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 + * + */ +/* ***DISCLAIMER*** + * + * This is only a sample block and there is no support for this block. This block only supports English. There may be incompatible changes in the future releases without prior notice. + * To use this block, we recommend that you copy it and change the package name. Software AG accepts no responsibility for bug fixes, maintenance or adding new features to this block. + */ + +package apamax.analyticsbuilder.samples; + +/** + * Window contents. + * + * This is a single value in the sequence<WindowContents> property timeWindow provided by the TimeWindow block. + */ +event WindowContents { + /** Value. + * + * While typically a float, this can be any type that is valid for an Analytics Builder wire. + */ + any value; + /** Time of this data point. + * + * The timestamp of this data point, in standard Apama form (i.e. seconds since the Unix epoch) + */ + float timestamp; + /** Property name for timeWindow. + * + * For convenience, the property on the Value on which that timeWindow is stored. The value of this property should be + * sequence<WindowContents> + */ + constant string WINDOW_PROPERTY_NAME := "timeWindow"; // value will be sequence +} diff --git a/blocks/FFT.mon b/blocks/FFT.mon new file mode 100644 index 0000000..aadaa94 --- /dev/null +++ b/blocks/FFT.mon @@ -0,0 +1,227 @@ + +package com.industry.analytics; + +event AmplitudeFrequency { + integer frequency; + decimal amplitude; +} + +/** + * This event provides functionality to calculate + * Fast-Fourier Transformations using the Cooley-Tukey algorithm. + * Fourier analysis converts data from a time-domain to a representation + * in the frequency domain (and vice versa). + */ +event FFT { + + /** This action calculates the inverse FFT */ + static action ifft( sequence amplitudes ) returns sequence { + integer N := amplitudes.size(); + decimal iN := 1.0d / N.toDecimal(); + + // Conjugate if imaginary part is not 0 + integer i := 0; + ComplexType currVal; + for currVal in amplitudes { + amplitudes[i].imaginary := -currVal.imaginary; + i := i + 1; + } + + // Apply fourier transform + amplitudes := cfft( amplitudes ); + + while i < N { + // Conjugate again + amplitudes[i].imaginary := -amplitudes[i].imaginary; + // Scale + amplitudes[i].real := amplitudes[i].real * iN; + amplitudes[i].imaginary := amplitudes[i].imaginary * iN; + + i := i + 1; + } + return amplitudes; + } + + /** Calculate the FFT based on simple numbers */ + static action fft( sequence buffer ) returns sequence { + + // Create the set of complex numbers from the buffer + return cfft( ComplexType.initFromSequence( buffer ) ); + } + + /** Calculate the FFT based on complex numbers */ + static action cfft( sequence buffer ) returns sequence { + + integer N := buffer.size(); + if( N <= 1 ) then { + return buffer; + } + + integer hN := N / 2; + + sequence even := []; + sequence odd := []; + + // Set the size of each sequence to be half of the total + even.setSize( hN ); + odd.setSize( hN ); + + // Divide data + integer i := 0; + while i < hN { + even[i] := buffer[i*2]; + odd[i] := buffer[i*2+1]; + + // Increment the index + i := i + 1; + } + + // Analyze + even := cfft( even ); + odd := cfft( odd ); + + // Calculate this upfront for performance + decimal a := -2.0d * decimal.PI; + + // Combine results + integer k := 0; + while k < hN { + decimal p := (k / N).toDecimal(); + decimal term := a * k.toDecimal() / N.toDecimal(); + ComplexType t := ComplexType.init(0.0d, term ).cexp().multiply( odd[k] ); + + buffer[k] := even[ k ].add( t ); + odd[k] := buffer[ k ]; + buffer[k + hN] := even[ k ].subtract( t ); + even[k] := buffer[ k + hN ]; + + // Increment the index + k := k + 1; + } + return buffer; + } + + /** Get the set of amplitude values from a previously calculated set of FFT results */ + static action getAmplitudes( sequence fftResult ) returns sequence { + sequence magnitude := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the magnitude sequence for the power spectrum + magnitude.append( 2.0d * (fftResult[i].abs() / fftSizeAsDecimal ) ); + + // Increment the index + i := i + 1; + } + return magnitude; + } + + /** Get the set of frequency values from a previously + * calculated set of FFT results */ + static action getFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + sequence frequencies := []; + integer fftSize := fftResult.size(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + // Add to the frequencies sequence + frequencies.append( (i*sampleSize) / fftSize ); + + // Increment the index + i := i + 1; + } + return frequencies; + } + + /** Get the set of amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getAmplitudesAndFrequencies( integer sampleSize, sequence fftResult ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + integer fftSize := fftResult.size(); + decimal fftSizeAsDecimal := fftSize.toDecimal(); + + // Iterate over the FFT values + integer i := 0; + while i < (fftSize / 2) { + AmplitudeFrequency af := new AmplitudeFrequency; + + // Calculate the frequency + af.frequency := (i*sampleSize) / fftSize; + + // Calculate the amplitude + af.amplitude := 2.0d * ( fftResult[i].abs() / fftSizeAsDecimal ); + + // Append the tuple to the sequence + ret.append( af ); + + // Increment the index + i := i + 1; + } + return ret; + } + + /** Get the defined number of highest amplitude and frequency pairs from + * a previously calculated set of FFT results */ + static action getTopNAmplitudesAndFrequencies( integer sampleSize, sequence fftResult, + integer numToRet ) returns sequence { + // Calculate the magnitude/amplitude values + sequence ret := []; + sequence amplitudes := getAmplitudes( fftResult ); + + // Sort in descending order of amplitudes + sequence sortedAmps := amplitudes.clone(); + sortedAmps.sort(); + sortedAmps.reverse(); + sortedAmps.setSize( numToRet ); // Truncate the sequence by the number we want to return + + integer fftSize := fftResult.size(); + + // Iterate over the values to get the associated frequencies + decimal currAmplitude; + for currAmplitude in sortedAmps { + + integer i := amplitudes.indexOf( currAmplitude ); + if( i != -1 ) then { + // Calculate the frequency + AmplitudeFrequency af := new AmplitudeFrequency; + af.frequency := (i*sampleSize) / fftSize; + af.amplitude := currAmplitude; + + // Append the tuple to the sequence + ret.append( af ); + } + } + + return ret; + } + + /** Get the frequency with the highest amplitude from + * a previously calculated set of FFT results */ + static action getLargestFrequency( integer sampleSize, sequence fftResult ) returns integer { + // Get the amplitudes + sequence magnitudes := getAmplitudes( fftResult ); + + // Find largest peak in power spectrum + decimal max_magnitude := -decimal.INFINITY; + integer max_index := -1; + integer i := 0; + + decimal currVal; + for currVal in magnitudes { + if( currVal > max_magnitude ) then { + max_magnitude := currVal; + max_index := i; + } + // Increment the index + i := i + 1; + } + + // Convert index of largest peak to frequency + return (max_index * sampleSize / fftResult.size()); + } +} diff --git a/blocks/HighpassFilter.mon b/blocks/HighpassFilter.mon new file mode 100644 index 0000000..b7804d9 --- /dev/null +++ b/blocks/HighpassFilter.mon @@ -0,0 +1,153 @@ +/* + * $Copyright (c) 2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ + * Use, reproduction, transfer, publication or disclosure is prohibited except as specifically provided for in your License Agreement with Software AG + */ +package apamax.analyticsbuilder.custom; + +using apama.analyticsbuilder.BlockBase; +using apama.analyticsbuilder.Activation; +using com.apama.json.JSONPlugin; +using com.apama.exceptions.Exception; +using com.apama.util.AnyExtractor; +using apama.analyticsbuilder.L10N; +using apamax.analyticsbuilder.samples.WindowContents; +using com.industry.analytics.FFT; +using com.industry.analytics.ComplexType; +using apama.analyticsbuilder.Value; + +/** The parameters for the HighpassFilter block. */ +event HighpassFilter_$Parameters{ + + /** + ** Cut-off Frequency + * + * Only frequencies above this cut-off will be in the output. + * This is a float, must be a finite value. + */ + float cutoff; + + /** + * Window Duration (secs) + * + * The amount of time in seconds for which signal values are to be kept in the window. + * This must be a finite and positive value. + */ + float windowDurationSec; + + action $validate() { + if (not cutoff.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_cutoff_value", [BlockBase.getL10N_param("cutoff",self),cutoff]); + } + if (not windowDurationSec.isFinite()) { + throw L10N.getLocalizedException("fwk_param_finite_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + if windowDurationSec < 0.0 { + throw L10N.getLocalizedException("fwk_param_positive_windowDurationSec_value", [BlockBase.getL10N_param("windowDurationSec",self),windowDurationSec]); + } + } +} + +/** State of the block.*/ +event HighpassFilter_$State{ + sequence contents; //we keep in memory the time-series values received as input. We need to associate the time to each value to work with the rolling window + float startTime; +} + +/** +* Highpass Filter +* +* Passes signals with a frequency higher than a selected cutoff frequency and excludes signals with frequencies lower than the cutoff frequency +* It takes a signal (time-series of float value) as an input, buffer it until it reaches the window duration and then analyze the frequency of that signal. +* +* @$blockCategory Flow Manipulations +*/ +event HighpassFilter { + + /**BlockBase object. + * + * This is initialized by the framework when the block is required for a model. + */ + BlockBase $base; + + /** Parameters, filled in by the framework. */ + HighpassFilter_$Parameters $parameters; + constant string WINDOW_PROPERTY_NAME := "highpassFilter"; + + float cutOffLimit; + float periodSecs; + + /** Called once at block start up. */ + action $init() { + cutOffLimit := $parameters.cutoff; + periodSecs := $parameters.windowDurationSec; + } + + /** + * + * @param $activation The current activation, contextual information required when generating a block output. Blocks should only use the + * Activation object passed to them from the framework, never creating their own or holding on to an Activation object. + * + * @param $input_value float value to add to the time series. + * + * @$inputName value Value + */ + action $process(Activation $activation, float $input_value, HighpassFilter_$State $blockState) { + $base.createTimer(periodSecs, 0.0); + $blockState.contents.append(WindowContents($input_value, $activation.timestamp)); + } + + /** + * Called by the framework. Provide the blockstate specific to that partition, + * + * This block takes in time series values from which frequencies are then calculated via FFT algo and it filter all frequencies below a specific threshold (cutoff) + * + */ + action $timerTriggered(Activation $activation, HighpassFilter_$State $blockState) { + //if not input then no output + if $blockState.contents.size() = 0 {return;} + + sequence timeSeries := $blockState.contents; + + //calculate the start time of the window + $blockState.startTime := $activation.timestamp - periodSecs; + + //expire old entries (index 0) that are older than the start of the window. + while (timeSeries.size() > 0 and timeSeries[0].timestamp < $blockState.startTime) { + timeSeries.remove(0); + } + + sequence timeSeriesValue := new sequence; + WindowContents wc; + for wc in timeSeries { + timeSeriesValue.append(( wc.value).toDecimal()); + } + integer timeRange := (timeSeries[timeSeries.size()-1].timestamp - timeSeries[0].timestamp).floor(); + if timeRange > 0 { + //The sample size is the number of samples per-second that is used in the FFT calculation. + integer sampleSize := timeSeries.size() / timeRange; + sequence transformFF := FFT.fft(timeSeriesValue); + sequence frequencies := FFT.getFrequencies(sampleSize, transformFF); + sequence highPass := new sequence; + integer f; + + for f in frequencies { + if f.toFloat() > cutOffLimit { + highPass.append(f.toFloat()); + } + } + Value v := new Value; + v.properties[HighpassFilter.WINDOW_PROPERTY_NAME] := highPass; + v.value := true; //has to be set + $setOutput_filtered($activation, v); + } + } + + + /** + * Filtered + * + * Populated with frequencies lower than the cut-off. + */ + action $setOutput_filtered; // This is initialized by the framework. It sets the output of the block and may trigger any blocks connected to this output. + +} \ No newline at end of file diff --git a/tests/BandPassFilter_001/.gitignore b/tests/BandPassFilter_001/.gitignore new file mode 100644 index 0000000..aa144c7 --- /dev/null +++ b/tests/BandPassFilter_001/.gitignore @@ -0,0 +1 @@ +/Output/ diff --git a/tests/BandPassFilter_001/pysystest.xml b/tests/BandPassFilter_001/pysystest.xml new file mode 100644 index 0000000..41b5841 --- /dev/null +++ b/tests/BandPassFilter_001/pysystest.xml @@ -0,0 +1,27 @@ + + + + + Bandpass Filter block - touch test. + + + + + + + + + + + + + + + + + + + + diff --git a/tests/BandPassFilter_001/run.py b/tests/BandPassFilter_001/run.py new file mode 100644 index 0000000..d7a2656 --- /dev/null +++ b/tests/BandPassFilter_001/run.py @@ -0,0 +1,71 @@ +# +# $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ +# This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 +# + +from apama.correlator import CorrelatorHelper +from pysys.constants import * +from apamax.analyticsbuilder.basetest import AnalyticsBuilderBaseTest + +class PySysTest(AnalyticsBuilderBaseTest): + def execute(self): + + correlator = self.startAnalyticsBuilderCorrelator(blockSourceDir=f'{self.project.SOURCE}/../blocks/') + # engine_receive process listening on all the channels. + correlator.receive('all.evt') + + # Deploying a new model with correct parameter. + self.modelId = self.createTestModel('apamax.analyticsbuilder.custom.BandpassFilter',{'lower':1.0, 'upper':2.0, 'windowDurationSec':5.0}) + + self.sendEventStrings(correlator, + self.timestamp(1582902000), + self.inputEvent('value', 3.5, id = self.modelId), + self.timestamp(1582902005), + self.inputEvent('value', 5.5, id = self.modelId), + self.timestamp(1582902005.1), + self.inputEvent('value', 6.5, id = self.modelId), + self.timestamp(1582902005.2), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902006), + self.inputEvent('value', 7.5, id = self.modelId), + self.timestamp(1582902006.1), + self.inputEvent('value', 8.5, id = self.modelId), + self.timestamp(1582902006.2), + self.inputEvent('value', 18.5, id = self.modelId), + self.timestamp(1582902006.3), + self.inputEvent('value', 12.5, id = self.modelId), + self.timestamp(1582902006.4), + self.inputEvent('value', 13.0, id = self.modelId), + self.timestamp(1582902007), + self.inputEvent('value', 2.7, id = self.modelId), + self.timestamp(1582902007.1), + self.inputEvent('value', 12.7, id = self.modelId), + self.timestamp(1582902007.2), + self.inputEvent('value', 1.7, id = self.modelId), + self.timestamp(1582902007.3), + self.inputEvent('value', 4.7, id = self.modelId), + self.timestamp(1582902007.4), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902008), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902008.1), + self.inputEvent('value', 15.0, id = self.modelId), + self.timestamp(1582902008.2), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902009), + self.inputEvent('value', 8.0, id = self.modelId), + self.timestamp(1582902009.1), + self.inputEvent('value', 7.0, id = self.modelId), + self.timestamp(1582902025) + ) + + def validate(self): + # Verifying that the model is deployed successfully. + self.assertGrep(self.analyticsBuilderCorrelator.logfile, expr='Model \"' + self.modelId + '\" with PRODUCTION mode has started') + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*1,1,1,1.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1,2.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*1,1,1,2.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*1,2.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*0,1,2,3.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandpassFilter":any.*sequence.*float.*,.*2,3.*'), contains=False) diff --git a/tests/BandstopFilter_001/.gitignore b/tests/BandstopFilter_001/.gitignore new file mode 100644 index 0000000..aa144c7 --- /dev/null +++ b/tests/BandstopFilter_001/.gitignore @@ -0,0 +1 @@ +/Output/ diff --git a/tests/BandstopFilter_001/pysystest.xml b/tests/BandstopFilter_001/pysystest.xml new file mode 100644 index 0000000..6b62eb8 --- /dev/null +++ b/tests/BandstopFilter_001/pysystest.xml @@ -0,0 +1,27 @@ + + + + + Bandstop Filter block - touch test. + + + + + + + + + + + + + + + + + + + + diff --git a/tests/BandstopFilter_001/run.py b/tests/BandstopFilter_001/run.py new file mode 100644 index 0000000..e5acf98 --- /dev/null +++ b/tests/BandstopFilter_001/run.py @@ -0,0 +1,75 @@ +# +# $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ +# This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 +# + +from apama.correlator import CorrelatorHelper +from pysys.constants import * +from apamax.analyticsbuilder.basetest import AnalyticsBuilderBaseTest + +class PySysTest(AnalyticsBuilderBaseTest): + def execute(self): + + correlator = self.startAnalyticsBuilderCorrelator(blockSourceDir=f'{self.project.SOURCE}/../blocks/') + # engine_receive process listening on all the channels. + correlator.receive('all.evt') + + # Deploying a new model with correct parameter. + self.modelId = self.createTestModel('apamax.analyticsbuilder.custom.BandstopFilter',{'lower':1.0, 'upper':2.0, 'windowDurationSec':5.0}) + + self.sendEventStrings(correlator, + self.timestamp(1582902000), + self.inputEvent('value', 3.5, id = self.modelId), + self.timestamp(1582902005), + self.inputEvent('value', 5.5, id = self.modelId), + self.timestamp(1582902005.1), + self.inputEvent('value', 6.5, id = self.modelId), + self.timestamp(1582902005.2), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902006), + self.inputEvent('value', 7.5, id = self.modelId), + self.timestamp(1582902006.1), + self.inputEvent('value', 8.5, id = self.modelId), + self.timestamp(1582902006.2), + self.inputEvent('value', 18.5, id = self.modelId), + self.timestamp(1582902006.3), + self.inputEvent('value', 12.5, id = self.modelId), + self.timestamp(1582902006.4), + self.inputEvent('value', 13.0, id = self.modelId), + self.timestamp(1582902007), + self.inputEvent('value', 2.7, id = self.modelId), + self.timestamp(1582902007.1), + self.inputEvent('value', 12.7, id = self.modelId), + self.timestamp(1582902007.2), + self.inputEvent('value', 1.7, id = self.modelId), + self.timestamp(1582902007.3), + self.inputEvent('value', 4.7, id = self.modelId), + self.timestamp(1582902007.4), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902008), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902008.1), + self.inputEvent('value', 15.0, id = self.modelId), + self.timestamp(1582902008.2), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902009), + self.inputEvent('value', 8.0, id = self.modelId), + self.timestamp(1582902009.1), + self.inputEvent('value', 7.0, id = self.modelId), + self.timestamp(1582902025) + ) + + def validate(self): + # Verifying that the model is deployed successfully. + self.assertGrep(self.analyticsBuilderCorrelator.logfile, expr='Model \"' + self.modelId + '\" with PRODUCTION mode has started') + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*1,1,1,1.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1,2.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*1,1,1,2.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*1,2.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,1,2,3.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*2,3.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,0,0,0,0.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,0,0.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,0.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"bandstopFilter":any.*sequence.*float.*,.*0,3.*'), contains=True) diff --git a/tests/HighpassFilter_001/pysystest.xml b/tests/HighpassFilter_001/pysystest.xml new file mode 100644 index 0000000..003bdf6 --- /dev/null +++ b/tests/HighpassFilter_001/pysystest.xml @@ -0,0 +1,27 @@ + + + + + Highpass Filter block - touch test. + + + + + + + + + + + + + + + + + + + + diff --git a/tests/HighpassFilter_001/run.py b/tests/HighpassFilter_001/run.py new file mode 100644 index 0000000..6fd2369 --- /dev/null +++ b/tests/HighpassFilter_001/run.py @@ -0,0 +1,69 @@ +# +# $Copyright (c) 2019 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its subsidiaries and/or its affiliates and/or their licensors.$ +# This file is licensed under the Apache 2.0 license - see https://www.apache.org/licenses/LICENSE-2.0 +# + +from pysys.constants import * +from apamax.analyticsbuilder.basetest import AnalyticsBuilderBaseTest + +class PySysTest(AnalyticsBuilderBaseTest): + def execute(self): + correlator = self.startAnalyticsBuilderCorrelator(blockSourceDir=f'{self.project.SOURCE}/../blocks/') + + # engine_receive process listening on all the channels. + correlator.receive('all.evt') + + # Deploying a new model with correct parameter. + self.modelId = self.createTestModel('apamax.analyticsbuilder.custom.HighpassFilter',{'cutoff':1.0, 'windowDurationSec':5.0}) + + self.sendEventStrings(correlator, + self.timestamp(1582902000), + self.inputEvent('value', 3.5, id = self.modelId), + self.timestamp(1582902005), + self.inputEvent('value', 5.5, id = self.modelId), + self.timestamp(1582902005.1), + self.inputEvent('value', 6.5, id = self.modelId), + self.timestamp(1582902005.2), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902006), + self.inputEvent('value', 7.5, id = self.modelId), + self.timestamp(1582902006.1), + self.inputEvent('value', 8.5, id = self.modelId), + self.timestamp(1582902006.2), + self.inputEvent('value', 18.5, id = self.modelId), + self.timestamp(1582902006.3), + self.inputEvent('value', 12.5, id = self.modelId), + self.timestamp(1582902006.4), + self.inputEvent('value', 13.0, id = self.modelId), + self.timestamp(1582902007), + self.inputEvent('value', 2.7, id = self.modelId), + self.timestamp(1582902007.1), + self.inputEvent('value', 12.7, id = self.modelId), + self.timestamp(1582902007.2), + self.inputEvent('value', 1.7, id = self.modelId), + self.timestamp(1582902007.3), + self.inputEvent('value', 4.7, id = self.modelId), + self.timestamp(1582902007.4), + self.inputEvent('value', 5.7, id = self.modelId), + self.timestamp(1582902008), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902008.1), + self.inputEvent('value', 15.0, id = self.modelId), + self.timestamp(1582902008.2), + self.inputEvent('value', 5.0, id = self.modelId), + self.timestamp(1582902009), + self.inputEvent('value', 8.0, id = self.modelId), + self.timestamp(1582902009.1), + self.inputEvent('value', 7.0, id = self.modelId), + self.timestamp(1582902025) + ) + + def validate(self): + # Verifying that the model is deployed successfully. + self.assertGrep(self.analyticsBuilderCorrelator.logfile, expr='Model \"' + self.modelId + '\" with PRODUCTION mode has started') + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1,1.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*0,0,0,0,1,1,1,2.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*2.*'), contains=True) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*0,1,2,3.*'), contains=False) + self.assertGrep('output.evt', expr=self.outputExpr('filtered', properties='.*"highpassFilter":any.*sequence.*float.*,.*2,3.*'), contains=True)