Skip to content

Commit

Permalink
Update blazar filters with the new data structure (#224)
Browse files Browse the repository at this point in the history
* Update blazar filterers with the new data structure

* Stick to previous conditions

* PEP8
  • Loading branch information
JulienPeloton authored Feb 6, 2025
1 parent 8d95834 commit bc91f09
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 18 deletions.
Binary file modified datatest/CTAO_blazar/CTAO_blazar_datatest_v20-12-24.parquet
Binary file not shown.
21 changes: 11 additions & 10 deletions fink_filters/filter_blazar_low_state/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
from pyspark.sql.types import BooleanType

import os
import numpy as np
import pandas as pd

from fink_filters.tester import spark_unit_tests


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
@profile
def low_state_filter(blazar_stats) -> pd.Series:
def low_state_filter(m1, m2) -> pd.Series:
"""Returns True the alert is considered a quiescent state, False otherwise.
Parameters
----------
blazar_stats: Spark DataFrame Column
Column containing the 3 ratios computed in the blazar_low_state module
m1: Spark DataFrame Column
`m1` feature computed in the blazar_low_state module
m2: Spark DataFrame Column
`m2` feature computed in the blazar_low_state module
Returns
-------
Expand All @@ -42,21 +43,21 @@ def low_state_filter(blazar_stats) -> pd.Series:
Examples
--------
>>> from fink_science.blazar_low_state.processor import quiescent_state
>>> import pyspark.sql.functions as F
>>> from fink_utils.spark.utils import apply_user_defined_filter
# Test
>>> df = spark.read.parquet(ztf_alert_sample)
>>> df = df.withColumn("m1", F.col('blazar_stats').getItem('m1').alias("m1"))
>>> df = df.withColumn("m2", F.col('blazar_stats').getItem('m2').alias("m2"))
>>> f = 'fink_filters.filter_blazar_low_state.filter.low_state_filter'
>>> df = apply_user_defined_filter(df, f)
>>> print(df.count())
12
"""
tmp = np.array(blazar_stats.to_numpy().tolist())
tmp = tmp.reshape(tmp.shape[0], tmp.shape[-1]).transpose()
tmp[pd.isna(tmp)] = np.nan
tmp[tmp < 0] = np.nan
return pd.Series((tmp[1] < 1) & (tmp[2] < 1))
f1 = (m1 < 1) & (m1 >= 0)
f2 = (m2 < 1) & (m2 >= 0)
return pd.Series(f1 & f2)


if __name__ == "__main__":
Expand Down
17 changes: 9 additions & 8 deletions fink_filters/filter_blazar_new_low_state/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
from pyspark.sql.types import BooleanType

import os
import numpy as np
import pandas as pd

from fink_filters.tester import spark_unit_tests


@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
@profile
def new_low_state_filter(blazar_stats) -> pd.Series:
def new_low_state_filter(m0, m1, m2) -> pd.Series:
"""Returns True the alert is considered a quiescent state, False else.
Parameters
Expand All @@ -42,22 +41,24 @@ def new_low_state_filter(blazar_stats) -> pd.Series:
Examples
--------
>>> from fink_science.blazar_low_state.processor import quiescent_state
>>> import pyspark.sql.functions as F
>>> from fink_utils.spark.utils import apply_user_defined_filter
# Test
>>> df = spark.read.parquet(ztf_alert_sample)
>>> df = df.withColumn("m0", F.col('blazar_stats').getItem('m0').alias("m0"))
>>> df = df.withColumn("m1", F.col('blazar_stats').getItem('m1').alias("m1"))
>>> df = df.withColumn("m2", F.col('blazar_stats').getItem('m2').alias("m2"))
>>> f = 'fink_filters.filter_blazar_new_low_state'
>>> f += '.filter.new_low_state_filter'
>>> df = apply_user_defined_filter(df, f)
>>> print(df.count())
1
"""
tmp = np.array(blazar_stats.to_numpy().tolist())
tmp = tmp.reshape(tmp.shape[0], tmp.shape[-1]).transpose()
tmp[pd.isna(tmp)] = np.nan
tmp[tmp < 0] = np.nan
return pd.Series((tmp[1] < 1) & (tmp[2] < 1) & (tmp[0] >= 1))
f0 = m0 >= 1
f1 = (m1 < 1) & (m1 >= 0)
f2 = (m2 < 1) & (m2 >= 0)
return pd.Series(f0 & f1 & f2)


if __name__ == "__main__":
Expand Down

0 comments on commit bc91f09

Please sign in to comment.