Skip to content

Commit

Permalink
Merge pull request #198 from elybrand/issue/filtr
Browse files Browse the repository at this point in the history
[BUG] - Fix numerical instabilities for IIR filters
  • Loading branch information
TomDonoghue authored Jun 27, 2020
2 parents 2b2cd84 + 922a4bf commit d66bbee
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 70 deletions.
23 changes: 8 additions & 15 deletions neurodsp/filt/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,18 @@ def check_filter_definition(pass_type, f_range):
return f_lo, f_hi


def check_filter_properties(b_vals, a_vals, fs, pass_type, f_range,
def check_filter_properties(filter_coefs, a_vals, fs, pass_type, f_range,
transitions=(-20, -3), verbose=True):
"""Check a filters properties, including pass band and transition band.
Parameters
----------
b_vals : 1d array
B value filter coefficients for a filter.
a_vals : 1d array
A value filter coefficients for a filter.
filter_coefs : 1d or 2d array
If 1d, interpreted as the B-value filter coefficients.
If 2d, interpreted as the second-order (sos) filter coefficients.
a_vals : 1d array or None
The A-value filter coefficients for a filter.
If second-order filter coefficients are provided in `filter_coefs`, must be None.
fs : float
Sampling rate, in Hz.
pass_type : {'bandpass', 'bandstop', 'lowpass', 'highpass'}
Expand Down Expand Up @@ -133,15 +135,6 @@ def check_filter_properties(b_vals, a_vals, fs, pass_type, f_range,
>>> passes = check_filter_properties(filter_coefs, 1, fs, pass_type, f_range)
Transition bandwidth is 0.5 Hz.
Pass/stop bandwidth is 24.0 Hz.
Check the properties of an IIR filter:
>>> from neurodsp.filt.iir import design_iir_filter
>>> fs, pass_type, f_range, order = 500, 'bandstop', (55, 65), 7
>>> b_vals, a_vals = design_iir_filter(fs, pass_type, f_range, order)
>>> passes = check_filter_properties(b_vals, a_vals, fs, pass_type, f_range)
Transition bandwidth is 1.5 Hz.
Pass/stop bandwidth is 10.0 Hz.
"""

# Import utility functions inside function to avoid circular imports
Expand All @@ -152,7 +145,7 @@ def check_filter_properties(b_vals, a_vals, fs, pass_type, f_range,
passes = True

# Compute the frequency response
f_db, db = compute_frequency_response(b_vals, a_vals, fs)
f_db, db = compute_frequency_response(filter_coefs, a_vals, fs)

# Check that frequency response goes below transition level (has significant attenuation)
if np.min(db) >= transitions[0]:
Expand Down
11 changes: 5 additions & 6 deletions neurodsp/filt/fir.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from neurodsp.utils import remove_nans, restore_nans
from neurodsp.utils.decorators import multidim
from neurodsp.plts.filt import plot_filter_properties
from neurodsp.filt.utils import compute_nyquist, compute_frequency_response, remove_filter_edges
from neurodsp.filt.utils import compute_frequency_response, remove_filter_edges
from neurodsp.filt.checks import (check_filter_definition, check_filter_properties,
check_filter_length)

Expand Down Expand Up @@ -174,15 +174,14 @@ def design_fir_filter(fs, pass_type, f_range, n_cycles=3, n_seconds=None):
f_lo, f_hi = check_filter_definition(pass_type, f_range)
filt_len = compute_filter_length(fs, pass_type, f_lo, f_hi, n_cycles, n_seconds)

f_nyq = compute_nyquist(fs)
if pass_type == 'bandpass':
filter_coefs = firwin(filt_len, (f_lo, f_hi), pass_zero=False, nyq=f_nyq)
filter_coefs = firwin(filt_len, (f_lo, f_hi), pass_zero=False, fs=fs)
elif pass_type == 'bandstop':
filter_coefs = firwin(filt_len, (f_lo, f_hi), nyq=f_nyq)
filter_coefs = firwin(filt_len, (f_lo, f_hi), fs=fs)
elif pass_type == 'highpass':
filter_coefs = firwin(filt_len, f_lo, pass_zero=False, nyq=f_nyq)
filter_coefs = firwin(filt_len, f_lo, pass_zero=False, fs=fs)
elif pass_type == 'lowpass':
filter_coefs = firwin(filt_len, f_hi, nyq=f_nyq)
filter_coefs = firwin(filt_len, f_hi, fs=fs)

return filter_coefs

Expand Down
52 changes: 21 additions & 31 deletions neurodsp/filt/iir.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""Filter signals with IIR filters."""

from warnings import warn

from scipy.signal import butter, filtfilt
from scipy.signal import butter, sosfiltfilt

from neurodsp.utils import remove_nans, restore_nans
from neurodsp.filt.utils import compute_nyquist, compute_frequency_response
Expand Down Expand Up @@ -42,14 +40,14 @@ def filter_signal_iir(sig, fs, pass_type, f_range, butterworth_order,
plot_properties : bool, optional, default: False
If True, plot the properties of the filter, including frequency response and/or kernel.
return_filter : bool, optional, default: False
If True, return the filter coefficients of the IIR filter.
If True, return the second order series coefficients of the IIR filter.
Returns
-------
sig_filt : 1d array
Filtered time series.
filter_coefs : tuple of (1d array, 1d array)
Filter coefficients of the IIR filter, as (b_vals, a_vals).
sos : 2d array
Second order series coefficients of the IIR filter. Has shape of (n_sections, 6).
Only returned if `return_filter` is True.
Examples
Expand All @@ -64,42 +62,40 @@ def filter_signal_iir(sig, fs, pass_type, f_range, butterworth_order,
"""

# Design filter
b_vals, a_vals = design_iir_filter(fs, pass_type, f_range, butterworth_order)
sos = design_iir_filter(fs, pass_type, f_range, butterworth_order)

# Check filter properties: compute transition bandwidth & run checks
check_filter_properties(b_vals, a_vals, fs, pass_type, f_range, verbose=print_transitions)
check_filter_properties(sos, None, fs, pass_type, f_range, verbose=print_transitions)

# Remove any NaN on the edges of 'sig'
sig, sig_nans = remove_nans(sig)

# Apply filter
sig_filt = apply_iir_filter(sig, b_vals, a_vals)
sig_filt = apply_iir_filter(sig, sos)

# Add NaN back on the edges of 'sig', if there were any at the beginning
sig_filt = restore_nans(sig_filt, sig_nans)

# Plot frequency response, if desired
if plot_properties:
f_db, db = compute_frequency_response(b_vals, a_vals, fs)
f_db, db = compute_frequency_response(sos, None, fs)
plot_frequency_response(f_db, db)

if return_filter:
return sig_filt, (b_vals, a_vals)
return sig_filt, sos
else:
return sig_filt


def apply_iir_filter(sig, b_vals, a_vals):
def apply_iir_filter(sig, sos):
"""Apply an IIR filter to a signal.
Parameters
----------
sig : array
Time series to be filtered.
b_vals : 1d array
B value filter coefficients for an IIR filter.
a_vals : 1d array
A value filter coefficients for an IIR filter.
sos : 2d array
Second order series coefficients for an IIR filter. Has shape of (n_sections, 6).
Returns
-------
Expand All @@ -113,12 +109,12 @@ def apply_iir_filter(sig, b_vals, a_vals):
>>> from neurodsp.sim import sim_combined
>>> sig = sim_combined(n_seconds=10, fs=500,
... components={'sim_powerlaw': {}, 'sim_oscillation' : {'freq': 10}})
>>> b_vals, a_vals = design_iir_filter(fs=500, pass_type='bandstop',
>>> sos = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(55, 65), butterworth_order=7)
>>> filt_signal = apply_iir_filter(sig, b_vals, a_vals)
>>> filt_signal = apply_iir_filter(sig, sos)
"""

return filtfilt(b_vals, a_vals, sig)
return sosfiltfilt(sos, sig)


def design_iir_filter(fs, pass_type, f_range, butterworth_order):
Expand Down Expand Up @@ -146,23 +142,17 @@ def design_iir_filter(fs, pass_type, f_range, butterworth_order):
Returns
-------
b_vals : 1d array
B value filter coefficients for an IIR filter.
a_vals : 1d array
A value filter coefficients for an IIR filter.
sos : 2d array
Second order series coefficients for an IIR filter. Has shape of (n_sections, 6).
Examples
--------
Compute coefficients for a bandstop IIR filter:
>>> b_vals, a_vals = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(55, 65), butterworth_order=7)
>>> sos = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(55, 65), butterworth_order=7)
"""

# Warn about only recommending IIR for bandstop
if pass_type != 'bandstop':
warn('IIR filters are not recommended other than for notch filters.')

# Check filter definition
f_lo, f_hi = check_filter_definition(pass_type, f_range)

Expand All @@ -175,6 +165,6 @@ def design_iir_filter(fs, pass_type, f_range, butterworth_order):
win = f_hi / f_nyq

# Design filter
b_vals, a_vals = butter(butterworth_order, win, pass_type)
sos = butter(butterworth_order, win, pass_type, output='sos')

return b_vals, a_vals
return sos
40 changes: 25 additions & 15 deletions neurodsp/filt/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Utility functions for filtering."""

import numpy as np
from scipy.signal import freqz
from scipy.signal import freqz, sosfreqz

from neurodsp.utils.decorators import multidim
from neurodsp.filt.checks import check_filter_definition
Expand Down Expand Up @@ -40,15 +40,17 @@ def infer_passtype(f_range):
return pass_type


def compute_frequency_response(b_vals, a_vals, fs):
def compute_frequency_response(filter_coefs, a_vals, fs):
"""Compute the frequency response of a filter.
Parameters
----------
b_vals : 1d array
B value filter coefficients for a filter.
a_vals : 1d array
A value filter coefficients for a filter.
filter_coefs : 1d or 2d array
If 1d, interpreted as the B-value filter coefficients.
If 2d, interpreted as the second-order (sos) filter coefficients.
a_vals : 1d array or None
The A-value filter coefficients for a filter.
If second-order filter coefficients are provided in `filter_coefs`, must be None.
fs : float
Sampling rate, in Hz.
Expand All @@ -67,15 +69,23 @@ def compute_frequency_response(b_vals, a_vals, fs):
>>> filter_coefs = design_fir_filter(fs=500, pass_type='bandpass', f_range=(8, 12))
>>> f_db, db = compute_frequency_response(filter_coefs, 1, fs=500)
Compute the frequency response for an IIR filter:
Compute the frequency response for an IIR filter, which uses SOS coefficients:
>>> from neurodsp.filt.iir import design_iir_filter
>>> b_vals, a_vals = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(55, 65), butterworth_order=7)
>>> f_db, db = compute_frequency_response(b_vals, a_vals, fs=500)
>>> sos_coefs = design_iir_filter(fs=500, pass_type='bandpass',
... f_range=(8, 12), butterworth_order=3)
>>> f_db, db = compute_frequency_response(sos_coefs, None, fs=500)
"""

w_vals, h_vals = freqz(b_vals, a_vals, worN=int(fs * 2))
if filter_coefs.ndim == 1 and a_vals is not None:
# Compute response for B & A value filter coefficient inputs
w_vals, h_vals = freqz(filter_coefs, a_vals, worN=int(fs * 2))
elif filter_coefs.ndim == 2 and a_vals is None:
# Compute response for sos filter coefficient inputs
w_vals, h_vals = sosfreqz(filter_coefs, worN=int(fs * 2))
else:
raise ValueError("The organization of the filter coefficient inputs is not understood.")

f_db = w_vals * fs / (2. * np.pi)
db = 20 * np.log10(abs(h_vals))

Expand Down Expand Up @@ -109,7 +119,7 @@ def compute_pass_band(fs, pass_type, f_range):
Examples
--------
Compute the bandwith of a bandpass filter:
Compute the bandwidth of a bandpass filter:
>>> compute_pass_band(fs=500, pass_type='bandpass', f_range=(5, 25))
20.0
Expand Down Expand Up @@ -158,9 +168,9 @@ def compute_transition_band(f_db, db, low=-20, high=-3):
Compute the transition band of an IIR filter, using the computed frequency response:
>>> from neurodsp.filt.iir import design_iir_filter
>>> b_vals, a_vals = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(10, 20), butterworth_order=7)
>>> f_db, db = compute_frequency_response(b_vals, a_vals, fs=500)
>>> sos = design_iir_filter(fs=500, pass_type='bandstop',
... f_range=(10, 20), butterworth_order=7)
>>> f_db, db = compute_frequency_response(sos, None, fs=500)
>>> compute_transition_band(f_db, db, low=-20, high=-3)
2.0
"""
Expand Down
14 changes: 12 additions & 2 deletions neurodsp/tests/test_filt_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ def test_check_filter_definition():
def test_check_filter_properties():

filter_coefs = design_fir_filter(FS, 'bandpass', (8, 12))
check_filter_properties(filter_coefs, 1, FS, 'bandpass', (8, 12))
assert True

passes = check_filter_properties(filter_coefs, 1, FS, 'bandpass', (8, 12))
assert passes is True

filter_coefs = design_fir_filter(FS, 'bandstop', (8, 12))
passes = check_filter_properties(filter_coefs, 1, FS, 'bandpass', (8, 12))
assert passes is False

filter_coefs = design_fir_filter(FS, 'bandpass', (20, 21))
passes = check_filter_properties(filter_coefs, 1, FS, 'bandpass', (8, 12))
assert passes is False


def test_check_filter_length():

Expand Down
5 changes: 5 additions & 0 deletions neurodsp/tests/test_filt_fir.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for FIR filters."""

from pytest import raises
import numpy as np

from neurodsp.tests.settings import FS
Expand Down Expand Up @@ -56,3 +57,7 @@ def test_compute_filter_length():
expected_filt_len = int(np.ceil(fs * n_cycles / f_lo)) + 1
filt_len = compute_filter_length(fs, 'bandpass', f_lo, f_hi, n_cycles=n_cycles, n_seconds=None)
assert filt_len == expected_filt_len

with raises(ValueError):
filt_len = compute_filter_length(fs, 'bandpass', f_lo, f_hi)

5 changes: 4 additions & 1 deletion neurodsp/tests/test_filt_iir.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ def test_filter_signal_iir_2d(tsig2d):
assert out.shape == tsig2d.shape
assert sum(~np.isnan(out[0, :])) > 0

out, sos = filter_signal_iir(tsig2d, FS, 'bandpass', (8, 12), 3, return_filter=True)
assert np.shape(sos)[1] == 6

def test_apply_iir_filter(tsig):

out = apply_iir_filter(tsig, np.array([1, 1, 1, 1, 1]), np.array([1, 1, 1, 1, 1]))
out = apply_iir_filter(tsig, np.array([1, 1, 1, 1, 1, 1]))
assert out.shape == tsig.shape

def test_design_iir_filter():
Expand Down
4 changes: 4 additions & 0 deletions neurodsp/tests/test_filt_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for filter utilities."""

from pytest import raises
from neurodsp.tests.settings import FS

from neurodsp.filt.utils import *
Expand All @@ -19,6 +20,9 @@ def test_compute_frequency_response():
filter_coefs = design_fir_filter(FS, 'bandpass', (8, 12))
f_db, db = compute_frequency_response(filter_coefs, 1, FS)

with raises(ValueError):
f_db, db = compute_frequency_response(filter_coefs, None, FS)

def test_compute_pass_band():

assert compute_pass_band(FS, 'bandpass', (4, 8)) == 4.
Expand Down

0 comments on commit d66bbee

Please sign in to comment.