Skip to content

Commit

Permalink
WIP: Management of data feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
picaultj committed Feb 3, 2025
1 parent fb5d043 commit c52f7a5
Show file tree
Hide file tree
Showing 20 changed files with 745 additions and 21 deletions.
8 changes: 8 additions & 0 deletions bertrend/demos/demos_utils/icons.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
WARNING_ICON = ":material/warning:"
ERROR_ICON = ":material/error:"
INFO_ICON = ":material/info:"
EDIT_ICON = ":material/edit:"
ADD_ICON = ":material/add_circle:"
DELETE_ICON = ":material/delete:"
SUCCESS_ICON = ":material/check:"
SETTINGS_ICON = ":material/settings:"
TOPIC_ICON = ":material/speaker_notes:"
TREND_ICON = ":material/trending_up:"
MODELS_ICON = ":material/network_intel_node:"
EMBEDDING_ICON = ":material/memory:"
SAVE_ICON = ":material/save:"
TOPIC_EXPLORATION_ICON = ":material/explore:"
TOPIC_VISUALIZATION_ICON = ":material/monitoring:"
Expand All @@ -20,6 +25,9 @@
MODEL_TRAINING_ICON = ":material/cognition:"
SERVER_STORAGE_ICON = ":material/database:"
CLIENT_STORAGE_ICON = ":material/upload:"
UNHAPPY_ICON = ":material/sentiment_extremely_dissatisfied:"
TOGGLE_ON_ICON = ":material/toggle_on:"
TOGGLE_OFF_ICON = ":material/toggle_off:"

JSON_ICON = "🧾"
PARQUET_ICON = "📦️"
Expand Down
5 changes: 4 additions & 1 deletion bertrend/demos/demos_utils/parameters_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def display_remote_embeddings():
)


def display_bertopic_hyperparameters():
def display_embedding_hyperparameters():
"""UI settings for embedding hyperparameters"""
# Embedding model parameters
with st.expander("Embedding Model Settings", expanded=False):
register_widget("embedding_service_type")
Expand All @@ -103,6 +104,8 @@ def display_bertopic_hyperparameters():
else:
display_remote_embeddings()


def display_bertopic_hyperparameters():
# BERTopic model parameters
with st.expander("BERTopic Model Settings", expanded=False):
# If BERTopic config is already in session state, use it
Expand Down
4 changes: 4 additions & 0 deletions bertrend/demos/topic_analysis/demo_pages/training_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ERROR_ICON,
SETTINGS_ICON,
INFO_ICON,
EMBEDDING_ICON,
)
from bertrend.demos.demos_utils.messages import (
NO_EMBEDDINGS_WARNING_MESSAGE,
Expand All @@ -38,6 +39,7 @@
)
from bertrend.demos.demos_utils.parameters_component import (
display_bertopic_hyperparameters,
display_embedding_hyperparameters,
)
from bertrend.demos.weak_signals.visualizations_utils import PLOTLY_BUTTON_SAVE_CONFIG
from bertrend.metrics.topic_metrics import compute_cluster_metrics
Expand Down Expand Up @@ -175,6 +177,8 @@ def main():
# In the sidebar form
with st.sidebar:
st.header(SETTINGS_ICON + " Settings")
st.subheader(EMBEDDING_ICON + " Embedding Hyperparameters")
display_embedding_hyperparameters()
display_bertopic_hyperparameters()

# Load data
Expand Down
4 changes: 4 additions & 0 deletions bertrend/demos/weak_signals/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ANALYSIS_ICON,
MODEL_TRAINING_ICON,
DATA_LOADING_ICON,
EMBEDDING_ICON,
)
from bertrend.demos.demos_utils.messages import (
NO_EMBEDDINGS_WARNING_MESSAGE,
Expand All @@ -40,6 +41,7 @@
from bertrend.demos.demos_utils.parameters_component import (
display_bertopic_hyperparameters,
display_bertrend_hyperparameters,
display_embedding_hyperparameters,
)
from bertrend.BERTopicModel import BERTopicModel
from bertrend.demos.weak_signals.messages import (
Expand Down Expand Up @@ -479,6 +481,8 @@ def main():
SessionStateManager.clear()

# BERTopic Hyperparameters
st.subheader(EMBEDDING_ICON + " Embedding Hyperparameters")
display_embedding_hyperparameters()
st.subheader(TOPIC_ICON + " BERTopic Hyperparameters")
display_bertopic_hyperparameters()
st.subheader(TREND_ICON + " BERTrend Hyperparameters")
Expand Down
79 changes: 73 additions & 6 deletions bertrend_apps/common/crontab_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,76 @@
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
import os
import re
import subprocess
import sys
from pathlib import Path

from cron_descriptor import (
Options,
CasingTypeEnum,
ExpressionDescriptor,
DescriptionTypeEnum,
)
from loguru import logger

from bertrend import BEST_CUDA_DEVICE, BERTREND_LOG_PATH, load_toml_config


def add_job_to_crontab(schedule, command, env_vars=""):
logger.info(f"Adding to crontab: {schedule} {command}")
def get_understandable_cron_description(cron_expression: str) -> str:
"""Returns a human understandable crontab description."""
options = Options()
options.casing_type = CasingTypeEnum.Sentence
options.use_24hour_time_format = True
options.locale_code = "fr_FR"
descriptor = ExpressionDescriptor(cron_expression, options)
return descriptor.get_description(DescriptionTypeEnum.FULL)


def add_job_to_crontab(schedule, command, env_vars="") -> bool:
"""Add the specified job to the crontab."""
logger.debug(f"Adding to crontab: {schedule} {command}")
home = os.getenv("HOME")
# Create crontab, add command - NB: we use the .bashrc to source all environment variables that may be required by the command
cmd = f'(crontab -l; echo "{schedule} umask 002; source {home}/.bashrc; {env_vars} {command}" ) | crontab -'
returned_value = subprocess.call(cmd, shell=True) # returns the exit code in unix
logger.info(f"Crontab updated with status {returned_value}")
return returned_value == 0


def schedule_scrapping(
feed_cfg: Path,
):
def check_cron_job(pattern: str) -> bool:
"""Check if a specific pattern (expressed as a regular expression) matches crontab entries."""
try:
# Run `crontab -l` and capture the output
result = subprocess.run(
["crontab", "-l"], capture_output=True, text=True, check=True
)

# Search for the regex pattern in the crontab output
if re.search(pattern, result.stdout):
return True
else:
return False
except subprocess.CalledProcessError:
# If crontab fails (e.g., no crontab for the user), return False
return False


def remove_from_crontab(pattern: str) -> bool:
"""Removes from the crontab the job matching the provided pattern (expressed as a regular expression)"""
if not (check_cron_job(pattern)):
logger.warning("No job matching the provided pattern")
return False
try:
# Retrieve current crontab
output = subprocess.check_output(
f"crontab -l | grep -vE {pattern} | crontab -", shell=True
)
return output == 0
except subprocess.CalledProcessError:
return False


def schedule_scrapping(feed_cfg: Path):
"""Schedule data scrapping on the basis of a feed configuration file"""
data_feed_cfg = load_toml_config(feed_cfg)
schedule = data_feed_cfg["data-feed"]["update_frequency"]
Expand All @@ -44,3 +93,21 @@ def schedule_newsletter(
command = f"{sys.prefix}/bin/python -m bertrend_apps.newsletters newsletters {newsletter_cfg_path.resolve()} {data_feed_cfg_path.resolve()} > {BERTREND_LOG_PATH}/cron_newsletter_{id}.log 2>&1"
env_vars = f"CUDA_VISIBLE_DEVICES={cuda_devices}"
add_job_to_crontab(schedule, command, env_vars)


def check_if_scrapping_active_for_user(feed_id: str, user: str = None) -> bool:
"""Checks if a given scrapping feed is active (registered in the crontab"""
if user:
return check_cron_job(rf"scrape-feed.*/feeds/users/{user}/{feed_id}_feed.toml")
else:
return check_cron_job(rf"scrape-feed.*/feeds/{feed_id}_feed.toml")


def remove_scrapping_for_user(feed_id: str, user: str = None):
"""Removes from the crontab the job matching the provided feed_id"""
if user:
return remove_from_crontab(
rf"scrape-feed.*/feeds/users/{user}/{feed_id}_feed.toml"
)
else:
return remove_from_crontab(rf"scrape-feed.*/feeds/{feed_id}_feed.toml")
5 changes: 5 additions & 0 deletions bertrend_apps/data_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
# Define a pattern for a basic URL validation
URL_PATTERN = (
r"^(https?://)?([a-z0-9-]+\.)+[a-z]{2,6}(:\d+)?(/[\w.-]*)*$|"
r"^(https?://)?(localhost|(\d{1,3}\.){3}\d{1,3})(:\d+)?(/[\w.-]*)*$"
)
16 changes: 10 additions & 6 deletions bertrend_apps/data_provider/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from bertrend_apps.common.crontab_utils import schedule_scrapping
from bertrend_apps.data_provider.arxiv_provider import ArxivProvider
from bertrend_apps.data_provider.bing_news_provider import BingNewsProvider
from bertrend_apps.data_provider.curebot_provider import CurebotProvider
from bertrend_apps.data_provider.google_news_provider import GoogleNewsProvider
from bertrend_apps.data_provider.newscatcher_provider import NewsCatcherProvider

Expand All @@ -23,6 +24,7 @@

PROVIDERS = {
"arxiv": ArxivProvider,
"curebot": CurebotProvider,
"google": GoogleNewsProvider,
"bing": BingNewsProvider,
"newscatcher": NewsCatcherProvider,
Expand All @@ -46,7 +48,7 @@ def scrape(
max_results: int = typer.Option(
50, help="maximum number of results per request"
),
save_path: str = typer.Option(
save_path: Path = typer.Option(
None, help="Path for writing results. File is in jsonl format."
),
language: str = typer.Option(None, help="Language filter"),
Expand All @@ -65,7 +67,7 @@ def scrape(
"to" date, formatted as YYYY-MM-DD
max_results: int
Maximum number of results per request
save_path: str
save_path: Path
Path to the output file (jsonl format)
language: str
Language filter
Expand All @@ -90,7 +92,7 @@ def auto_scrape(
provider: str = typer.Option(
"google", help="source for news [google, bing, newscatcher]"
),
save_path: str = typer.Option(None, help="Path for writing results."),
save_path: Path = typer.Option(None, help="Path for writing results."),
language: str = typer.Option(None, help="Language filter"),
):
"""Scrape data from Arxiv, Google, Bing news or NewsCatcher (multiple requests from a configuration file: each line of the file shall be compliant with the following format:
Expand All @@ -100,9 +102,11 @@ def auto_scrape(
----------
requests_file: str
Text file containing the list of requests to be processed
max_results: int
Maximum number of results per request
provider: str
News data provider. Current authorized values [google, bing, newscatcher]
save_path: str
save_path: Path
Path to the output file (jsonl format)
language: str
Language filter
Expand Down Expand Up @@ -178,7 +182,7 @@ def _daterange(start_date, end_date, ndays):

@app.command("scrape-feed")
def scrape_from_feed(
feed_cfg: str = typer.Argument(help="Path of the data feed config file"),
feed_cfg: Path = typer.Argument(help="Path of the data feed config file"),
):
"""Scrape data from Arxiv, Google, Bing news or NewsCatcher on the basis of a feed configuration file"""
data_feed_cfg = load_toml_config(feed_cfg)
Expand All @@ -200,7 +204,7 @@ def scrape_from_feed(

# Generate a query file
with tempfile.NamedTemporaryFile() as query_file:
if provider == "arxiv": # already returns batches
if provider == "arxiv" or provider == "curebot": # already returns batches
scrape(
keywords=keywords,
provider=provider,
Expand Down
24 changes: 17 additions & 7 deletions bertrend_apps/data_provider/curebot_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
import re
from pathlib import Path

import pandas as pd
from loguru import logger

from bertrend_apps.data_provider import URL_PATTERN
from bertrend_apps.data_provider.data_provider import DataProvider
import feedparser

Expand All @@ -19,6 +21,8 @@ def __init__(self, curebot_export_file: Path = None, feed_url: str = None):
self.data_file = curebot_export_file
if self.data_file:
self.df_dict = pd.read_excel(self.data_file, sheet_name=None, dtype=str)
else:
self.df_dict = None
self.feed_url = feed_url

def get_articles(
Expand All @@ -30,16 +34,22 @@ def get_articles(
language: str = "fr",
) -> list[dict]:
"""Requests the news data provider, collects a set of URLs to be parsed, return results as json lines"""
if query and re.match(URL_PATTERN, query):
# if using a config file, the "query" field may contain the feed url
self.feed_url = query
if self.feed_url:
return self.parse_ATOM_feed()

entries = []
for k in self.df_dict.keys():
entries += self.df_dict[k].to_dict(orient="records")
results = [self._parse_entry(res) for res in entries]
return [
res for res in results if res is not None
] # sanity check to remove errors
if self.df_dict:
entries = []
for k in self.df_dict.keys():
entries += self.df_dict[k].to_dict(orient="records")
results = [self._parse_entry(res) for res in entries]
return [
res for res in results if res is not None
] # sanity check to remove errors

return []

def parse_ATOM_feed(self) -> list[dict]:
feed = feedparser.parse(self.feed_url)
Expand Down
3 changes: 2 additions & 1 deletion bertrend_apps/data_provider/data_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def store_articles(self, data: list[dict], file_path: Path):
if not data:
logger.error("No data to be stored!")
return -1
with jsonlines.open(file_path, "w") as writer:
with jsonlines.open(file_path, "a") as writer:
# append to existing file
writer.write_all(data)

logger.info(f"Data stored to {file_path} [{len(data)} entries].")
Expand Down
4 changes: 4 additions & 0 deletions bertrend_apps/prospective_demo/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# SPDX-License-Identifier: MPL-2.0
# This file is part of BERTrend.
Loading

0 comments on commit c52f7a5

Please sign in to comment.