Skip to content

Commit

Permalink
WIP: Management of data feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
picaultj committed Feb 3, 2025
1 parent fb5d043 commit a4918e3
Show file tree
Hide file tree
Showing 15 changed files with 720 additions and 20 deletions.
6 changes: 6 additions & 0 deletions bertrend/demos/demos_utils/icons.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
WARNING_ICON = ":material/warning:"
ERROR_ICON = ":material/error:"
INFO_ICON = ":material/info:"
EDIT_ICON = ":material/edit:"
ADD_ICON = ":material/add_circle:"
DELETE_ICON = ":material/delete:"
SUCCESS_ICON = ":material/check:"
SETTINGS_ICON = ":material/settings:"
TOPIC_ICON = ":material/speaker_notes:"
Expand All @@ -20,6 +23,9 @@
MODEL_TRAINING_ICON = ":material/cognition:"
SERVER_STORAGE_ICON = ":material/database:"
CLIENT_STORAGE_ICON = ":material/upload:"
UNHAPPY_ICON = ":material/sentiment_extremely_dissatisfied:"
TOGGLE_ON_ICON = ":material/toggle_on:"
TOGGLE_OFF_ICON = ":material/toggle_off:"

JSON_ICON = "🧾"
PARQUET_ICON = "📦️"
Expand Down
79 changes: 73 additions & 6 deletions bertrend_apps/common/crontab_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,76 @@
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
import os
import re
import subprocess
import sys
from pathlib import Path

from cron_descriptor import (
Options,
CasingTypeEnum,
ExpressionDescriptor,
DescriptionTypeEnum,
)
from loguru import logger

from bertrend import BEST_CUDA_DEVICE, BERTREND_LOG_PATH, load_toml_config


def add_job_to_crontab(schedule, command, env_vars=""):
logger.info(f"Adding to crontab: {schedule} {command}")
def get_understandable_cron_description(cron_expression: str) -> str:
"""Returns a human understandable crontab description."""
options = Options()
options.casing_type = CasingTypeEnum.Sentence
options.use_24hour_time_format = True
options.locale_code = "fr_FR"
descriptor = ExpressionDescriptor(cron_expression, options)
return descriptor.get_description(DescriptionTypeEnum.FULL)


def add_job_to_crontab(schedule, command, env_vars="") -> bool:
"""Add the specified job to the crontab."""
logger.debug(f"Adding to crontab: {schedule} {command}")
home = os.getenv("HOME")
# Create crontab, add command - NB: we use the .bashrc to source all environment variables that may be required by the command
cmd = f'(crontab -l; echo "{schedule} umask 002; source {home}/.bashrc; {env_vars} {command}" ) | crontab -'
returned_value = subprocess.call(cmd, shell=True) # returns the exit code in unix
logger.info(f"Crontab updated with status {returned_value}")
return returned_value == 0


def schedule_scrapping(
feed_cfg: Path,
):
def check_cron_job(pattern: str) -> bool:
"""Check if a specific pattern (expressed as a regular expression) matches crontab entries."""
try:
# Run `crontab -l` and capture the output
result = subprocess.run(
["crontab", "-l"], capture_output=True, text=True, check=True
)

# Search for the regex pattern in the crontab output
if re.search(pattern, result.stdout):
return True
else:
return False
except subprocess.CalledProcessError:
# If crontab fails (e.g., no crontab for the user), return False
return False


def remove_from_crontab(pattern: str) -> bool:
"""Removes from the crontab the job matching the provided pattern (expressed as a regular expression)"""
if not (check_cron_job(pattern)):
logger.warning("No job matching the provided pattern")
return False
try:
# Retrieve current crontab
output = subprocess.check_output(
f"crontab -l | grep -vE {pattern} | crontab -", shell=True
)
return output == 0
except subprocess.CalledProcessError:
return False


def schedule_scrapping(feed_cfg: Path):
"""Schedule data scrapping on the basis of a feed configuration file"""
data_feed_cfg = load_toml_config(feed_cfg)
schedule = data_feed_cfg["data-feed"]["update_frequency"]
Expand All @@ -44,3 +93,21 @@ def schedule_newsletter(
command = f"{sys.prefix}/bin/python -m bertrend_apps.newsletters newsletters {newsletter_cfg_path.resolve()} {data_feed_cfg_path.resolve()} > {BERTREND_LOG_PATH}/cron_newsletter_{id}.log 2>&1"
env_vars = f"CUDA_VISIBLE_DEVICES={cuda_devices}"
add_job_to_crontab(schedule, command, env_vars)


def check_if_scrapping_active_for_user(feed_id: str, user: str = None) -> bool:
"""Checks if a given scrapping feed is active (registered in the crontab"""
if user:
return check_cron_job(rf"scrape-feed.*/feeds/users/{user}/{feed_id}_feed.toml")
else:
return check_cron_job(rf"scrape-feed.*/feeds/{feed_id}_feed.toml")


def remove_scrapping_for_user(feed_id: str, user: str = None):
"""Removes from the crontab the job matching the provided feed_id"""
if user:
return remove_from_crontab(
rf"scrape-feed.*/feeds/users/{user}/{feed_id}_feed.toml"
)
else:
return remove_from_crontab(rf"scrape-feed.*/feeds/{feed_id}_feed.toml")
5 changes: 5 additions & 0 deletions bertrend_apps/data_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
# Define a pattern for a basic URL validation
URL_PATTERN = (
r"^(https?://)?([a-z0-9-]+\.)+[a-z]{2,6}(:\d+)?(/[\w.-]*)*$|"
r"^(https?://)?(localhost|(\d{1,3}\.){3}\d{1,3})(:\d+)?(/[\w.-]*)*$"
)
16 changes: 10 additions & 6 deletions bertrend_apps/data_provider/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from bertrend_apps.common.crontab_utils import schedule_scrapping
from bertrend_apps.data_provider.arxiv_provider import ArxivProvider
from bertrend_apps.data_provider.bing_news_provider import BingNewsProvider
from bertrend_apps.data_provider.curebot_provider import CurebotProvider
from bertrend_apps.data_provider.google_news_provider import GoogleNewsProvider
from bertrend_apps.data_provider.newscatcher_provider import NewsCatcherProvider

Expand All @@ -23,6 +24,7 @@

PROVIDERS = {
"arxiv": ArxivProvider,
"curebot": CurebotProvider,
"google": GoogleNewsProvider,
"bing": BingNewsProvider,
"newscatcher": NewsCatcherProvider,
Expand All @@ -46,7 +48,7 @@ def scrape(
max_results: int = typer.Option(
50, help="maximum number of results per request"
),
save_path: str = typer.Option(
save_path: Path = typer.Option(
None, help="Path for writing results. File is in jsonl format."
),
language: str = typer.Option(None, help="Language filter"),
Expand All @@ -65,7 +67,7 @@ def scrape(
"to" date, formatted as YYYY-MM-DD
max_results: int
Maximum number of results per request
save_path: str
save_path: Path
Path to the output file (jsonl format)
language: str
Language filter
Expand All @@ -90,7 +92,7 @@ def auto_scrape(
provider: str = typer.Option(
"google", help="source for news [google, bing, newscatcher]"
),
save_path: str = typer.Option(None, help="Path for writing results."),
save_path: Path = typer.Option(None, help="Path for writing results."),
language: str = typer.Option(None, help="Language filter"),
):
"""Scrape data from Arxiv, Google, Bing news or NewsCatcher (multiple requests from a configuration file: each line of the file shall be compliant with the following format:
Expand All @@ -100,9 +102,11 @@ def auto_scrape(
----------
requests_file: str
Text file containing the list of requests to be processed
max_results: int
Maximum number of results per request
provider: str
News data provider. Current authorized values [google, bing, newscatcher]
save_path: str
save_path: Path
Path to the output file (jsonl format)
language: str
Language filter
Expand Down Expand Up @@ -178,7 +182,7 @@ def _daterange(start_date, end_date, ndays):

@app.command("scrape-feed")
def scrape_from_feed(
feed_cfg: str = typer.Argument(help="Path of the data feed config file"),
feed_cfg: Path = typer.Argument(help="Path of the data feed config file"),
):
"""Scrape data from Arxiv, Google, Bing news or NewsCatcher on the basis of a feed configuration file"""
data_feed_cfg = load_toml_config(feed_cfg)
Expand All @@ -200,7 +204,7 @@ def scrape_from_feed(

# Generate a query file
with tempfile.NamedTemporaryFile() as query_file:
if provider == "arxiv": # already returns batches
if provider == "arxiv" or provider == "curebot": # already returns batches
scrape(
keywords=keywords,
provider=provider,
Expand Down
24 changes: 17 additions & 7 deletions bertrend_apps/data_provider/curebot_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
import re
from pathlib import Path

import pandas as pd
from loguru import logger

from bertrend_apps.data_provider import URL_PATTERN
from bertrend_apps.data_provider.data_provider import DataProvider
import feedparser

Expand All @@ -19,6 +21,8 @@ def __init__(self, curebot_export_file: Path = None, feed_url: str = None):
self.data_file = curebot_export_file
if self.data_file:
self.df_dict = pd.read_excel(self.data_file, sheet_name=None, dtype=str)
else:
self.df_dict = None
self.feed_url = feed_url

def get_articles(
Expand All @@ -30,16 +34,22 @@ def get_articles(
language: str = "fr",
) -> list[dict]:
"""Requests the news data provider, collects a set of URLs to be parsed, return results as json lines"""
if query and re.match(URL_PATTERN, query):
# if using a config file, the "query" field may contain the feed url
self.feed_url = query
if self.feed_url:
return self.parse_ATOM_feed()

entries = []
for k in self.df_dict.keys():
entries += self.df_dict[k].to_dict(orient="records")
results = [self._parse_entry(res) for res in entries]
return [
res for res in results if res is not None
] # sanity check to remove errors
if self.df_dict:
entries = []
for k in self.df_dict.keys():
entries += self.df_dict[k].to_dict(orient="records")
results = [self._parse_entry(res) for res in entries]
return [
res for res in results if res is not None
] # sanity check to remove errors

return []

def parse_ATOM_feed(self) -> list[dict]:
feed = feedparser.parse(self.feed_url)
Expand Down
3 changes: 2 additions & 1 deletion bertrend_apps/data_provider/data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def store_articles(self, data: list[dict], file_path: Path):
if not data:
logger.error("No data to be stored!")
return -1
with jsonlines.open(file_path, "w") as writer:
with jsonlines.open(file_path, "a") as writer:
# append to existing file
writer.write_all(data)

logger.info(f"Data stored to {file_path} [{len(data)} entries].")
Expand Down
4 changes: 4 additions & 0 deletions bertrend_apps/prospective_demo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
81 changes: 81 additions & 0 deletions bertrend_apps/prospective_demo/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
from typing import Literal

import streamlit as st

from bertrend.demos.demos_utils import is_admin_mode
from bertrend.demos.demos_utils.icons import (
SETTINGS_ICON,
ANALYSIS_ICON,
NEWSLETTER_ICON,
SERVER_STORAGE_ICON,
)
from bertrend.demos.demos_utils.state_utils import SessionStateManager
from bertrend_apps.prospective_demo.authentication import check_password
from bertrend_apps.prospective_demo.dashboard_analysis import dashboard_analysis
from bertrend_apps.prospective_demo.feeds_config import configure_information_sources
from bertrend_apps.prospective_demo.feeds_data import display_data_status

# UI Settings
PAGE_TITLE = "BERTrend - Prospective Analysis demo"
LAYOUT: Literal["centered", "wide"] = "wide"

# TODO: reactivate password
# AUTHENTIFICATION = True
AUTHENTIFICATION = False


def main():
"""Main page"""
st.set_page_config(
page_title=PAGE_TITLE,
layout=LAYOUT,
initial_sidebar_state="expanded" if is_admin_mode() else "collapsed",
page_icon=":part_alternation_mark:",
)

st.title(":part_alternation_mark: " + PAGE_TITLE)

if AUTHENTIFICATION:
username = check_password()
if not username:
st.stop()
else:
SessionStateManager.set("username", username)
else:
SessionStateManager.get_or_set(
"username", "nemo"
) # if username is not set or authentication deactivated

# Sidebar
with st.sidebar:
st.header(SETTINGS_ICON + " Settings and Controls")

# Main content
tab1, tab2 = st.tabs(
[
NEWSLETTER_ICON + " Mes veilles",
ANALYSIS_ICON + " Mes analyses",
]
)

with tab1:
with st.expander(
"Configuration des flux de données", expanded=True, icon=SETTINGS_ICON
):
configure_information_sources()

with st.expander(
"Etat de collecte des données", expanded=False, icon=SERVER_STORAGE_ICON
):
display_data_status()

with tab2:
dashboard_analysis()


if __name__ == "__main__":
main()
Loading

0 comments on commit a4918e3

Please sign in to comment.