Skip to content

Commit

Permalink
🎨 Improve OOP Logic, DRYyed the codebase
Browse files Browse the repository at this point in the history
* Transformed self.sources in Estacoes to L object, then the logic to read
or update the code is unique given the class attribute flag
* Move staticmethods of casting types from Estacoes to Base
* Move staticmethod _format_types from Estacoes to Mosaico
* Added casting types when formatting the SRD dataframe in _format
  method
  • Loading branch information
Ronaldo S.A. Batista committed Nov 30, 2023
1 parent 4de44bd commit 00920a9
Show file tree
Hide file tree
Showing 17 changed files with 5,131 additions and 7,048 deletions.
4 changes: 2 additions & 2 deletions extracao/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@
'Comprimento_Linha(m)',
]

INT_COLUMNS = ['Fistel', 'Serviço', 'Código_Município', 'Multiplicidade']
INT_COLUMNS = ['Fistel', 'Serviço', 'Multiplicidade']


STR_COLUMNS = ['Entidade', 'Estação', 'Log', 'Padrão_Antena(dBd)', 'Relatório_Canal']

CAT_COLUMNS = [
# 'Código_Município',
'Código_Município',
'Município',
'UF',
'Classe',
Expand Down
2 changes: 1 addition & 1 deletion extracao/datasources/arquivos/saida/VersionFile.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"appAnalise": {"Version": "1.39", "ReleaseDate": "14/11/2023"}, "appColeta": {"Version": "1.11", "ReleaseDate": "01/12/2022"}, "appColetaV2": {"Version": "1.49", "ReleaseDate": "05/10/2023"}, "anateldb": {"ReleaseDate": "27/11/2023 08:13:01", "ANATEL": "27/11/2023 08:13:01", "AERONAUTICA": "27/11/2023 08:13:01"}, "fiscaliza": {"Version": "0.2.20", "ReleaseDate": "27/02/2023"}, "rfpye": {"Version": "0.2.3", "ReleaseDate": "03/11/2021"}}
{"appAnalise": {"Version": "1.39", "ReleaseDate": "14/11/2023"}, "appColeta": {"Version": "1.11", "ReleaseDate": "01/12/2022"}, "appColetaV2": {"Version": "1.49", "ReleaseDate": "05/10/2023"}, "anateldb": {"ReleaseDate": "02/10/2023 09:20:49", "ANATEL": "02/10/2023 09:19:19", "AERONAUTICA": "02/10/2023 09:19:46"}, "rfdatahub": {"ReleaseDate": "30/11/2023 05:44:17", "ANATEL": "30/11/2023 05:44:17", "AERONAUTICA": "30/11/2023 05:44:17"}, "fiscaliza": {"Version": "0.2.20", "ReleaseDate": "27/02/2023"}, "rfpye": {"Version": "0.2.3", "ReleaseDate": "03/11/2021"}}
32 changes: 29 additions & 3 deletions extracao/datasources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ def _save(self, df: pd.DataFrame, folder: Union[str, Path], stem: str) -> pd.Dat
"""Format, Save and return a dataframe"""
try:
file = Path(f'{folder}/{stem}.parquet.gzip')
df.astype('string[pyarrow]').to_parquet(
file, compression='gzip', index=False, engine='pyarrow'
)
df.to_parquet(file, compression='gzip', index=False, engine='pyarrow')
except (ArrowInvalid, ArrowTypeError) as e:
raise e(f'Não foi possível salvar o arquivo parquet {file}') from e
return df
Expand Down Expand Up @@ -123,3 +121,31 @@ def save(self, folder: Union[str, Path] = None):
folder = self.folder
self._save(self.df, folder, self.stem)
self._save(self.discarded, folder, f'{self.stem}_discarded')

@staticmethod
def _cast2float(column: pd.Series) -> pd.Series:
return pd.to_numeric(
column,
downcast='float',
errors='coerce',
dtype_backend='numpy_nullable',
).fillna(-1.0)

@staticmethod
def _cast2int(column: pd.Series) -> pd.Series:
return pd.to_numeric(
column,
downcast='integer',
errors='coerce',
dtype_backend='numpy_nullable',
).fillna(-1)

@staticmethod
def _cast2str(column: pd.Series) -> pd.Series:
column.replace('', '-1', inplace=True)
return column.astype('string', copy=False).fillna('-1')

@staticmethod
def _cast2cat(column: pd.Series) -> pd.Series:
column.replace('', '-1', inplace=True)
return column.fillna('-1').astype('category', copy=False)
138 changes: 69 additions & 69 deletions extracao/datasources/icao.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/02a_icao.ipynb.

# %% auto 0
__all__ = ['COLS_NAV', 'COLS_COM', 'UNIQUE_COLS', 'convert_latitude', 'convert_longitude', 'map_channels', 'get_icao']
__all__ = [
'COLS_NAV',
'COLS_COM',
'UNIQUE_COLS',
'convert_latitude',
'convert_longitude',
'map_channels',
'get_icao',
]

# %% ../../nbs/02a_icao.ipynb 3
import os
Expand All @@ -15,90 +23,82 @@
load_dotenv(find_dotenv(), override=True)

# %% ../../nbs/02a_icao.ipynb 6
COLS_NAV = ["Frequency", "Latitude", "Longitude", "Facility", "Location", "NS", "WE"]
COLS_COM = ["Frequency", "CoordLat", "CoordLong", "DOC", "Location", "NS", "WE"]
UNIQUE_COLS = ["Frequência", "Latitude", "Longitude"]
COLS_NAV = ['Frequency', 'Latitude', 'Longitude', 'Facility', 'Location', 'NS', 'WE']
COLS_COM = ['Frequency', 'CoordLat', 'CoordLong', 'DOC', 'Location', 'NS', 'WE']
UNIQUE_COLS = ['Frequência', 'Latitude', 'Longitude']


# %% ../../nbs/02a_icao.ipynb 7
def convert_latitude(
lat: str, # Latitude
hemisphere: str, # Hemisfério: N | S
lat: str, # Latitude
hemisphere: str, # Hemisfério: N | S
) -> float:
"""Converte a Latitude para formato decimal"""
multiplier = 1 if hemisphere == "N" else -1
return multiplier * (
float(lat[:2]) + float(lat[3:5]) / 60 + float(lat[6:8]) / 3600.0
)
"""Converte a Latitude para formato decimal"""
multiplier = 1 if hemisphere == 'N' else -1
return multiplier * (float(lat[:2]) + float(lat[3:5]) / 60 + float(lat[6:8]) / 3600.0)


def convert_longitude(
lon: str, # Longitude
hemisphere: str, # Hemisfério: W | E
lon: str, # Longitude
hemisphere: str, # Hemisfério: W | E
) -> float:
"""Converte a longitude para formato decimal"""
"""Converte a longitude para formato decimal"""

multiplier = 1 if hemisphere == 'E' else -1
return multiplier * (float(lon[1:3]) + float(lon[4:6]) / 60 + float(lon[7:9]) / 3600.0)

multiplier = 1 if hemisphere == "E" else -1
return multiplier * (
float(lon[1:3]) + float(lon[4:6]) / 60 + float(lon[7:9]) / 3600.0
)

# %% ../../nbs/02a_icao.ipynb 10
def _read_df(
path: str, # Caminho do arquivo
usecols: Iterable[str], # Subconjunto de colunas do arquivo
path: str, # Caminho do arquivo
usecols: Iterable[str], # Subconjunto de colunas do arquivo
) -> pd.DataFrame: # Dataframe formatado
# sourcery skip: use-fstring-for-concatenation
"""Lê o DataFrame no caminho `path`, filtra as colunas `usecols` e o retorna formatado"""
df = pd.read_csv(path, dtype="string")[usecols]
df.columns = COLS_NAV
df["Latitude"] = df.apply(
lambda x: convert_latitude(x["Latitude"], x["NS"]), axis=1
)
df["Longitude"] = df.apply(
lambda x: convert_longitude(x["Longitude"], x["WE"]), axis=1
)
df["Description"] = df.Facility + ", " + df.Location
df["Fonte"] = "ICAO"
df = df[["Frequency", "Latitude", "Longitude", "Description", "Fonte"]]
df.columns = ["Frequência", "Latitude", "Longitude", "Entidade", "Fonte"]
return df
# sourcery skip: use-fstring-for-concatenation
"""Lê o DataFrame no caminho `path`, filtra as colunas `usecols` e o retorna formatado"""
df = pd.read_csv(path, dtype='string')[usecols]
df.columns = COLS_NAV
df['Latitude'] = df.apply(lambda x: convert_latitude(x['Latitude'], x['NS']), axis=1)
df['Longitude'] = df.apply(lambda x: convert_longitude(x['Longitude'], x['WE']), axis=1)
df['Description'] = df.Facility + ', ' + df.Location
df['Fonte'] = 'ICAO'
df = df[['Frequency', 'Latitude', 'Longitude', 'Description', 'Fonte']]
df.columns = ['Frequência', 'Latitude', 'Longitude', 'Entidade', 'Fonte']
return df


# %% ../../nbs/02a_icao.ipynb 11
def map_channels(
df: pd.DataFrame, # DataFrame dos dados de origem
origem: str, # Descrição da emissão a ser substituída
df: pd.DataFrame, # DataFrame dos dados de origem
origem: str, # Descrição da emissão a ser substituída
) -> pd.DataFrame:
"""Mapeia os canais contidos em `df` e adiciona os registros ILS/DME caso houver"""
chs = pd.read_csv(VOR_ILS_DME, dtype="string[pyarrow]", dtype_backend="pyarrow")
for row in df[df.Entidade.str.contains("ILS|DME")].itertuples():
if not (ch := chs[(chs.VOR_ILSloc == row.Frequência)]).empty:
for i, c in enumerate(ch.values[0][2:]):
if pd.notna(c):
if i == 0:
freq_type = "ILS glide path"
elif i == 1:
freq_type = "Airbone DME"
elif i == 2:
freq_type = "Ground-based DME"
else:
raise ValueError("No additional frequency to map on channel")
entidade = row.Entidade + f"({freq_type})"
df.loc[len(df)] = [
c,
row.Latitude,
row.Longitude,
entidade,
f"{origem}-CANALIZACAO-VOR/ILS/DME",
]
return df
"""Mapeia os canais contidos em `df` e adiciona os registros ILS/DME caso houver"""
chs = pd.read_csv(VOR_ILS_DME, dtype='string', dtype_backend='pyarrow')
for row in df[df.Entidade.str.contains('ILS|DME')].itertuples():
if not (ch := chs[(chs.VOR_ILSloc == row.Frequência)]).empty:
for i, c in enumerate(ch.values[0][2:]):
if pd.notna(c):
if i == 0:
freq_type = 'ILS glide path'
elif i == 1:
freq_type = 'Airbone DME'
elif i == 2:
freq_type = 'Ground-based DME'
else:
raise ValueError('No additional frequency to map on channel')
entidade = row.Entidade + f'({freq_type})'
df.loc[len(df)] = [
c,
row.Latitude,
row.Longitude,
entidade,
f'{origem}-CANALIZACAO-VOR/ILS/DME',
]
return df


# %% ../../nbs/02a_icao.ipynb 12
def get_icao() -> (
pd.DataFrame
): # DataFrame com frequências, coordenadas e descrição das estações
"""Lê, concatena e pós-processa os arquivos do ICAO"""
df = pd.concat(
_read_df(p, c) for p, c in zip([PATH_NAV, PATH_COM], [COLS_NAV, COLS_COM])
)
df = df.astype("string")
return map_channels(df, "ICAO").drop_duplicates(UNIQUE_COLS, ignore_index=True)
def get_icao() -> pd.DataFrame: # DataFrame com frequências, coordenadas e descrição das estações
"""Lê, concatena e pós-processa os arquivos do ICAO"""
df = pd.concat(_read_df(p, c) for p, c in zip([PATH_NAV, PATH_COM], [COLS_NAV, COLS_COM]))
df = df.astype('string')
return map_channels(df, 'ICAO').drop_duplicates(UNIQUE_COLS, ignore_index=True)
109 changes: 59 additions & 50 deletions extracao/datasources/mosaico.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,79 @@
from fastcore.foundation import GetAttr
from tqdm.auto import tqdm

from extracao.constants import (
FLOAT_COLUMNS,
INT_COLUMNS,
CAT_COLUMNS,
STR_COLUMNS,
)
from .connectors import MongoDB
from .base import Base

# %% ../../nbs/01d_mosaico.ipynb 4
load_dotenv(find_dotenv())

# %% ../../nbs/01d_mosaico.ipynb 6
MONGO_URI: str = os.environ.get("MONGO_URI")
MONGO_URI: str = os.environ.get('MONGO_URI')


# %% ../../nbs/01d_mosaico.ipynb 7
class Mosaico(Base, GetAttr):
def __init__(self, mongo_uri: str = MONGO_URI):
self.database = "sms"
self.default = MongoDB(mongo_uri)
def __init__(self, mongo_uri: str = MONGO_URI):
self.database = 'sms'
self.default = MongoDB(mongo_uri)

@property
def collection(self):
raise NotImplementedError(
"Subclasses devem implementar a propriedade 'collection'"
)
@property
def collection(self):
raise NotImplementedError("Subclasses devem implementar a propriedade 'collection'")

@property
def query(self):
raise NotImplementedError("Subclasses devem implementar a propriedade 'query'")
@property
def query(self):
raise NotImplementedError("Subclasses devem implementar a propriedade 'query'")

@property
def projection(self):
raise NotImplementedError(
"Subclasses devem implementar a propriedade 'projection'"
)
@property
def projection(self):
raise NotImplementedError("Subclasses devem implementar a propriedade 'projection'")

def _extract(self, collection: str, pipeline: list):
client = self.connect()
database = client[self.database]
collection = database[collection]
dtype = "string[pyarrow]" if self.stem == "srd" else "category"
df = pd.DataFrame(list(collection.aggregate(pipeline)), copy=False, dtype=dtype)
# Substitui strings vazias e somente com espaços por nulo
return df.replace(r"^\s*$", pd.NA, regex=True)
def _extract(self, collection: str, pipeline: list):
client = self.connect()
database = client[self.database]
collection = database[collection]
df = pd.DataFrame(list(collection.aggregate(pipeline)), copy=False, dtype='string')
# Substitui strings vazias e somente com espaços por nulo
return df.replace(r'^\s*$', pd.NA, regex=True)

def split_designacao(
self,
df: pd.DataFrame, # DataFrame com coluna original DesignacaoEmissao
) -> (
pd.DataFrame
): # DataFrame com novas colunas Largura_Emissão(kHz) e Classe_Emissão
"""Parse a bandwidth string
It returns the numerical component and a character class
"""
df["Designação_Emissão"] = (
df["Designação_Emissão"]
.str.replace(",", " ")
.str.strip()
.str.upper()
.str.split(" ")
)
exploded_rows = df["Designação_Emissão"].apply(lambda x: isinstance(x, list))
log = """[("Colunas", "Designação_Emissão"]),
def split_designacao(
self,
df: pd.DataFrame, # DataFrame com coluna original DesignacaoEmissao
) -> pd.DataFrame: # DataFrame com novas colunas Largura_Emissão(kHz) e Classe_Emissão
"""Parse a bandwidth string
It returns the numerical component and a character class
"""
df['Designação_Emissão'] = (
df['Designação_Emissão'].str.replace(',', ' ').str.strip().str.upper().str.split(' ')
)
exploded_rows = df['Designação_Emissão'].apply(lambda x: isinstance(x, list))
log = """[("Colunas", "Designação_Emissão"]),
("Processamento", "Registro expandido nos componentes individuais e extraídas Largura e Classe")]"""
df = self.register_log(df, log, exploded_rows)
df = df.explode("Designação_Emissão").reset_index(drop=True)
df = self.register_log(df, log, exploded_rows)
df = df.explode('Designação_Emissão').reset_index(drop=True)

df = df[df['Designação_Emissão'] != '/'] # Removes empty rows
# Apply the parse_bw function
parsed_data = zip(*df['Designação_Emissão'].apply(Base.parse_bw))
df['Largura_Emissão(kHz)'], df['Classe_Emissão'] = parsed_data
return df.drop('Designação_Emissão', axis=1)

df = df[df["Designação_Emissão"] != "/"] # Removes empty rows
# Apply the parse_bw function
parsed_data = zip(*df["Designação_Emissão"].apply(Base.parse_bw))
df["Largura_Emissão(kHz)"], df["Classe_Emissão"] = parsed_data
return df.drop("Designação_Emissão", axis=1)
@staticmethod
def _format_types(df):
df['Frequência'] = df['Frequência'].astype('float')
for col in FLOAT_COLUMNS:
df[col] = Base._cast2float(df[col])
for col in INT_COLUMNS:
df[col] = Base._cast2int(df[col])
for col in CAT_COLUMNS:
df[col] = Base._cast2cat(df[col])
for col in STR_COLUMNS:
df[col] = Base._cast2str(df[col])
return df
Loading

0 comments on commit 00920a9

Please sign in to comment.