From b5c1355987d725f687ddcea346f84d11ba6db599 Mon Sep 17 00:00:00 2001 From: Ronaldo da Silva Alves Batista Date: Thu, 16 Nov 2023 20:26:43 +0900 Subject: [PATCH] =?UTF-8?q?Adicionado=20log=20referente=20ao=20processamen?= =?UTF-8?q?to=20da=20coluna=20Designa=C3=A7=C3=A3o=5FEmiss=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes Issue #13 --- extracao/_modidx.py | 6 + extracao/datasources/mosaico.py | 12 +- extracao/datasources/sitarweb.py | 95 ++++++- extracao/estacoes.py | 459 ++++++++++++++++--------------- nbs/01d_mosaico.ipynb | 90 +++--- nbs/_04_eda.ipynb | 50 ++-- 6 files changed, 399 insertions(+), 313 deletions(-) diff --git a/extracao/_modidx.py b/extracao/_modidx.py index c543970..db07211 100644 --- a/extracao/_modidx.py +++ b/extracao/_modidx.py @@ -211,7 +211,13 @@ 'extracao/datasources/telecom.py')}, 'extracao.estacoes': { 'extracao.estacoes.Estacoes': ('estacoes.html#estacoes', 'extracao/estacoes.py'), 'extracao.estacoes.Estacoes.__init__': ('estacoes.html#estacoes.__init__', 'extracao/estacoes.py'), + 'extracao.estacoes.Estacoes._cast2cat': ('estacoes.html#estacoes._cast2cat', 'extracao/estacoes.py'), + 'extracao.estacoes.Estacoes._cast2float': ('estacoes.html#estacoes._cast2float', 'extracao/estacoes.py'), + 'extracao.estacoes.Estacoes._cast2int': ('estacoes.html#estacoes._cast2int', 'extracao/estacoes.py'), + 'extracao.estacoes.Estacoes._cast2str': ('estacoes.html#estacoes._cast2str', 'extracao/estacoes.py'), 'extracao.estacoes.Estacoes._format': ('estacoes.html#estacoes._format', 'extracao/estacoes.py'), + 'extracao.estacoes.Estacoes._remove_invalid_frequencies': ( 'estacoes.html#estacoes._remove_invalid_frequencies', + 'extracao/estacoes.py'), 'extracao.estacoes.Estacoes._simplify_sources': ( 'estacoes.html#estacoes._simplify_sources', 'extracao/estacoes.py'), 'extracao.estacoes.Estacoes._update_source': ( 'estacoes.html#estacoes._update_source', diff --git a/extracao/datasources/mosaico.py b/extracao/datasources/mosaico.py index ba6e553..a5b770e 100644 --- a/extracao/datasources/mosaico.py +++ b/extracao/datasources/mosaico.py @@ -47,14 +47,12 @@ def _extract(self, collection: str, pipeline: list): database = client[self.database] collection = database[collection] dtype = "string[pyarrow]" if self.stem == "srd" else "category" - df = pd.DataFrame( - [c for c in collection.aggregate(pipeline)], copy=False, dtype=dtype - ) + df = pd.DataFrame(list(collection.aggregate(pipeline)), copy=False, dtype=dtype) # Substitui strings vazias e somente com espaços por nulo return df.replace(r"^\s*$", pd.NA, regex=True) - @staticmethod def split_designacao( + self, df: pd.DataFrame, # DataFrame com coluna original DesignacaoEmissao ) -> ( pd.DataFrame @@ -69,7 +67,11 @@ def split_designacao( .str.upper() .str.split(" ") ) - df = df.explode("Designação_Emissão") + df = df.explode("Designação_Emissão").reset_index(drop=True) + exploded_rows = df["Designação_Emissão"].apply(lambda x: isinstance(x, list)) + log = """[("Colunas", "Designação_Emissão"]), + ("Processamento", "Registro expandido nos componentes individuais e extraídas Largura e Classe")]""" + df = self.register_log(df, log, exploded_rows) df = df[df["Designação_Emissão"] != "/"] # Removes empty rows # Apply the parse_bw function parsed_data = zip(*df["Designação_Emissão"].apply(Base.parse_bw)) diff --git a/extracao/datasources/sitarweb.py b/extracao/datasources/sitarweb.py index 30f4d50..024c774 100644 --- a/extracao/datasources/sitarweb.py +++ b/extracao/datasources/sitarweb.py @@ -1,7 +1,7 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/01c_sitarweb.ipynb. # %% auto 0 -__all__ = ['SQLSERVER_PARAMS', 'Sitarweb', 'Radcom', 'Stel'] +__all__ = ['SQLSERVER_PARAMS', 'Radcom', 'Stel', 'Sitarweb'] # %% ../../nbs/01c_sitarweb.ipynb 3 import os @@ -25,7 +25,92 @@ getcontext().prec = 5 load_dotenv(find_dotenv(), override=True) -# %% ../../nbs/01c_sitarweb.ipynb 6 +# %% ../../nbs/01c_sitarweb.ipynb 7 +class Radcom(Sitarweb): + def __init__(self, sql_params: dict = SQLSERVER_PARAMS): + super().__init__(sql_params) + + @property + def query(self): + return SQL_RADCOM + + @property + def stem(self): + return "radcom" + + def _format( + self, + df: pd.DataFrame, # DataFrame com o resultantes do banco de dados + ) -> pd.DataFrame: # DataFrame formatado + """Formata, limpa e padroniza os dados provenientes da query no banco""" + df["Entidade"] = df["Entidade"].str.strip() + df["Serviço"] = "231" + df["Classe_Emissão"] = pd.NA + df["Largura_Emissão(kHz)"] = "256" + df["Validade_RF"] = pd.NA + df["Status"] = "RADCOM" + df["Fonte"] = "SRD" + df["Multiplicidade"] = "1" + a = df.Situação.isna() + df.loc[a, "Classe"] = df.loc[a, "Fase"].astype("string") + df.loc[~a, "Classe"] = ( + df.loc[~a, "Fase"].astype("string") + + "-" + + df.loc[~a, "Situação"].astype("string") + ) + df.drop(["Fase", "Situação"], axis=1, inplace=True) + df["Log"] = "" + df["Frequência"] = pd.to_numeric(df["Frequência"], errors="coerce").astype( + "float" + ) + discarded = df[df.Frequência.isna()].copy() + if not discarded.empty: + log = """[("Colunas", "Frequência"), + ("Processamento", "Valor Nulo")]""" + self.append2discarded(self.register_log(discarded, log)) + df.dropna(subset=["Frequência"], inplace=True) + return df.loc[:, self.columns] + +# %% ../../nbs/01c_sitarweb.ipynb 8 +class Stel(Sitarweb): + def __init__(self, sql_params: dict = SQLSERVER_PARAMS): + super().__init__(sql_params) + + @property + def query(self): + return SQL_STEL + + @property + def stem(self): + return "stel" + + def _format( + self, + df: pd.DataFrame, # DataFrame com o resultantes do banco de dados + ) -> pd.DataFrame: # DataFrame formatado + """Formata, limpa e padroniza os dados provenientes da query no banco""" + df["Status"] = "L" + df["Entidade"] = df.Entidade.str.strip() + df["Fonte"] = "STEL" + df["Largura_Emissão"] = df["Largura_Emissão"].astype("string") + df.loc[:, ["Largura_Emissão(kHz)", "_"]] = ( + df.Largura_Emissão.fillna("").apply(self.parse_bw).tolist() + ) + df.drop(["Largura_Emissão", "_"], axis=1, inplace=True) + df.loc[:, "Validade_RF"] = df.Validade_RF.astype("string").str.slice(0, 10) + df["Frequência"] = df["Frequência"].astype("float") + df.loc[df.Unidade == "kHz", "Frequência"] = df.loc[ + df.Unidade == "kHz", "Frequência" + ].apply(lambda x: float(Decimal(x) / Decimal(1000))) + df.loc[df.Unidade == "GHz", "Frequência"] = df.loc[ + df.Unidade == "GHz", "Frequência" + ].apply(lambda x: float(Decimal(x) * Decimal(1000))) + df.drop("Unidade", axis=1, inplace=True) + df["Multiplicidade"] = 1 + df["Log"] = "" + return df.loc[:, self.columns] + +# %% ../../nbs/01c_sitarweb.ipynb 10 SQLSERVER_PARAMS = dict( driver=os.environ.get("SQL_DRIVER"), server=os.environ.get("SQL_SERVER"), @@ -62,7 +147,7 @@ def query(self): def extraction(self): return pd.read_sql_query(self.query, self.connect(), dtype="category") -# %% ../../nbs/01c_sitarweb.ipynb 7 +# %% ../../nbs/01c_sitarweb.ipynb 11 class Radcom(Sitarweb): def __init__(self, sql_params: dict = SQLSERVER_PARAMS): super().__init__(sql_params) @@ -81,7 +166,7 @@ def _format( ) -> pd.DataFrame: # DataFrame formatado """Formata, limpa e padroniza os dados provenientes da query no banco""" df["Entidade"] = df["Entidade"].str.strip() - df["Serviço"] = "231" + df["Num_Serviço"] = "231" df["Classe_Emissão"] = pd.NA df["Largura_Emissão(kHz)"] = "256" df["Validade_RF"] = pd.NA @@ -108,7 +193,7 @@ def _format( df.dropna(subset=["Frequência"], inplace=True) return df.loc[:, self.columns] -# %% ../../nbs/01c_sitarweb.ipynb 8 +# %% ../../nbs/01c_sitarweb.ipynb 13 class Stel(Sitarweb): def __init__(self, sql_params: dict = SQLSERVER_PARAMS): super().__init__(sql_params) diff --git a/extracao/estacoes.py b/extracao/estacoes.py index 8b76add..68d003a 100644 --- a/extracao/estacoes.py +++ b/extracao/estacoes.py @@ -28,231 +28,244 @@ # %% ../nbs/04_estacoes.ipynb 4 load_dotenv(find_dotenv(), override=True) - # %% ../nbs/04_estacoes.ipynb 6 class Estacoes(Base): - """Classe auxiliar para agregar os dados originários da Anatel""" - - def __init__( - self, - sql_params: dict = SQLSERVER_PARAMS, - mongo_uri: str = MONGO_URI, - limit: int = 0, - parallel: bool = True, - ): - self.sql_params = sql_params - self.mongo_uri = mongo_uri - self.limit = limit - self.parallel = parallel - self.init_data_sources() - - @property - def columns(self): - return COLS_SRD - - def build_from_sources(self) -> pd.DataFrame: - return self._format([s.df() for s in self.sources.values()]) - - @property - def stem(self): - return 'estacoes' - - @staticmethod - def _update_source(class_instance): - class_instance.update() - class_instance.save() - return class_instance - - def init_data_sources(self): - self.sources = { - 'telecom': Telecom(self.mongo_uri, self.limit), - 'smp': SMP(self.mongo_uri, self.limit), - 'srd': SRD(self.mongo_uri, self.limit), - 'stel': Stel(self.sql_params), - 'radcom': Radcom(self.sql_params), - 'aero': Aero(), - } - - def extraction(self) -> L: - if self.parallel: - sources = parallel( - Estacoes._update_source, - self.sources.values(), - n_workers=len(self.sources), - progress=True, - ) - else: - sources = L(self._update_source(s) for s in self.sources.values()) - return sources.attrgot('df') - - @staticmethod - def verify_shapefile_folder(): - # Convert the file paths to Path objects - shapefile_path = Path(IBGE_POLIGONO) - parent_folder = shapefile_path.parent - parent_folder.mkdir(exist_ok=True, parents=True) - zip_file_path = parent_folder.with_suffix('.zip') - - # Check if all required files exist - required_files = L('.cpg', '.dbf', '.prj', '.shx').map(shapefile_path.with_suffix) - if not all(required_files.map(Path.is_file)): - # shutil.rmtree(str(shapefile_path.parent), ignore_errors=True) - parent_folder.ls().map(Path.unlink) - # Download and unzip the zipped folder - urllib.request.urlretrieve(MALHA_IBGE, zip_file_path) - with ZipFile(zip_file_path, 'r') as zip_ref: - zip_ref.extractall(parent_folder) - zip_file_path.unlink() - - def fill_nan_coordinates( - self, - df: pd.DataFrame, # DataFrame com os dados da Anatel - ) -> pd.DataFrame: # DataFrame com as coordenadas validadas na base do IBGE - """Valida as coordenadas consultado a Base Corporativa do IBGE, excluindo o que já está no cache na versão anterior""" - - municipios = pd.read_csv( - IBGE_MUNICIPIOS, - usecols=['Código_Município', 'Latitude', 'Longitude'], - dtype='string[pyarrow]', - dtype_backend='pyarrow', - ) - - df = pd.merge( - df.astype('string[pyarrow]'), - municipios, - on='Código_Município', - how='left', - copy=False, - ) - - null_coords = df.Latitude_x.isna() | df.Longitude_x.isna() - - df.loc[null_coords, ['Latitude_x', 'Longitude_x']] = df.loc[ - null_coords, ['Latitude_y', 'Longitude_y'] - ] - - log = """[("Colunas", ["Latitude", "Longitude"]), + """Classe auxiliar para agregar os dados originários da Anatel""" + + def __init__( + self, + sql_params: dict = SQLSERVER_PARAMS, + mongo_uri: str = MONGO_URI, + limit: int = 0, + parallel: bool = True, + ): + self.sql_params = sql_params + self.mongo_uri = mongo_uri + self.limit = limit + self.parallel = parallel + self.init_data_sources() + + @property + def columns(self): + return COLS_SRD + + def build_from_sources(self) -> pd.DataFrame: + return self._format([s.df() for s in self.sources.values()]) + + @property + def stem(self): + return "estacoes" + + @staticmethod + def _update_source(class_instance): + class_instance.update() + class_instance.save() + return class_instance + + def init_data_sources(self): + self.sources = { + "telecom": Telecom(self.mongo_uri, self.limit), + "smp": SMP(self.mongo_uri, self.limit), + "srd": SRD(self.mongo_uri, self.limit), + "stel": Stel(self.sql_params), + "radcom": Radcom(self.sql_params), + "aero": Aero(), + } + + def extraction(self) -> L: + if self.parallel: + sources = parallel( + Estacoes._update_source, + self.sources.values(), + n_workers=len(self.sources), + progress=True, + ) + else: + sources = L(self._update_source(s) for s in self.sources.values()) + return sources.attrgot("df") + + @staticmethod + def verify_shapefile_folder(): + # Convert the file paths to Path objects + shapefile_path = Path(IBGE_POLIGONO) + parent_folder = shapefile_path.parent + parent_folder.mkdir(exist_ok=True, parents=True) + zip_file_path = parent_folder.with_suffix(".zip") + + # Check if all required files exist + required_files = L(".cpg", ".dbf", ".prj", ".shx").map( + shapefile_path.with_suffix + ) + if not all(required_files.map(Path.is_file)): + # shutil.rmtree(str(shapefile_path.parent), ignore_errors=True) + parent_folder.ls().map(Path.unlink) + # Download and unzip the zipped folder + urllib.request.urlretrieve(MALHA_IBGE, zip_file_path) + with ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(parent_folder) + zip_file_path.unlink() + + def fill_nan_coordinates( + self, + df: pd.DataFrame, # DataFrame com os dados da Anatel + ) -> pd.DataFrame: # DataFrame com as coordenadas validadas na base do IBGE + """Valida as coordenadas consultado a Base Corporativa do IBGE, excluindo o que já está no cache na versão anterior""" + + municipios = pd.read_csv( + IBGE_MUNICIPIOS, + usecols=["Código_Município", "Latitude", "Longitude"], + dtype="string[pyarrow]", + dtype_backend="pyarrow", + ) + + df = pd.merge( + df.astype("string[pyarrow]"), + municipios, + on="Código_Município", + how="left", + copy=False, + ) + + null_coords = df.Latitude_x.isna() | df.Longitude_x.isna() + + df.loc[null_coords, ["Latitude_x", "Longitude_x"]] = df.loc[ + null_coords, ["Latitude_y", "Longitude_y"] + ] + + log = """[("Colunas", ["Latitude", "Longitude"]), ("Processamento", "Coordenadas Ausentes. Inserido coordenadas do Município")]""" - df = self.register_log(df, log, null_coords) - - df.rename( - columns={ - 'Latitude_x': 'Latitude', - 'Longitude_x': 'Longitude', - 'Latitude_y': 'Latitude_ibge', - 'Longitude_y': 'Longitude_ibge', - }, - inplace=True, - ) - - return df - - def intersect_coordinates_on_poligon(self, df: pd.DataFrame, check_municipio: bool = True): - for column in ['Latitude', 'Longitude']: - df[column] = pd.to_numeric(df[column], errors='coerce').astype('float') - regions = gpd.read_file(IBGE_POLIGONO) - - # Convert pandas dataframe to geopandas df with geometry point given coordinates - gdf_points = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.Longitude, df.Latitude)) - - # Set the same coordinate reference system (CRS) as the regions shapefile - gdf_points.crs = regions.crs - - # Spatial join points to the regions - gdf = gpd.sjoin(gdf_points, regions, how='inner', predicate='within') - - if check_municipio: - # Check correctness of Coordinates - check_coords = gdf.Código_Município != gdf.CD_MUN - - log = """[("Colunas", ["Código_Município", "Município", "UF"]), + df = self.register_log(df, log, null_coords) + + df.rename( + columns={ + "Latitude_x": "Latitude", + "Longitude_x": "Longitude", + "Latitude_y": "Latitude_ibge", + "Longitude_y": "Longitude_ibge", + }, + inplace=True, + ) + + return df + + def intersect_coordinates_on_poligon( + self, df: pd.DataFrame, check_municipio: bool = True + ): + for column in ["Latitude", "Longitude"]: + df[column] = pd.to_numeric(df[column], errors="coerce").astype("float") + regions = gpd.read_file(IBGE_POLIGONO) + + # Convert pandas dataframe to geopandas df with geometry point given coordinates + gdf_points = gpd.GeoDataFrame( + df, geometry=gpd.points_from_xy(df.Longitude, df.Latitude) + ) + + # Set the same coordinate reference system (CRS) as the regions shapefile + gdf_points.crs = regions.crs + + # Spatial join points to the regions + gdf = gpd.sjoin(gdf_points, regions, how="inner", predicate="within") + + if check_municipio: + # Check correctness of Coordinates + check_coords = gdf.Código_Município != gdf.CD_MUN + + log = """[("Colunas", ["Código_Município", "Município", "UF"]), ("Processamento", "Informações substituídas pela localização correta das coordenadas.") """ - self.register_log(gdf, log, check_coords) - - gdf.drop(['Código_Município', 'Município', 'UF'], axis=1, inplace=True) - - gdf.rename( - columns={ - 'CD_MUN': 'Código_Município', - 'NM_MUN': 'Município', - 'SIGLA_UF': 'UF', - }, - inplace=True, - ) - - return gdf - - def validate_coordinates(self, df: pd.DataFrame, check_municipio: bool = True) -> pd.DataFrame: - """ - Validates the coordinates in the given DataFrame. - - Args: - df: The DataFrame containing the coordinates to be validated. - check_municipio: A boolean indicating whether to check the municipality information (default: True). - - Returns: - pd.DataFrame: The DataFrame with validated coordinates. - - Raises: - None - """ - self.verify_shapefile_folder() - if check_municipio: - df = self.fill_nan_coordinates(df) - return self.intersect_coordinates_on_poligon(df, check_municipio) - - @staticmethod - def _simplify_sources(df): - df['Fonte'] = df['Fonte'].str.replace( - 'ICAO-CANALIZACAO-VOR/ILS/DME | AISWEB-CANALIZACAO-VOR/ILS/DME', - 'CANALIZACAO-VOR/ILS/DME', - ) - df['Fonte'] = df['Fonte'].str.replace( - r'(ICAO-)?(AISWEB-)?CANALIZACAO-VOR/ILS/DME', - 'CANALIZACAO-VOR/ILS/DME', - regex=True, - ) - - return df - - @staticmethod - def _cast2float(column: pd.Series) -> pd.Series: - return pd.to_numeric(column, downcast='float', errors='coerce', dtype_backend='pyarrow') - - @staticmethod - def _cast2int(column: pd.Series) -> pd.Series: - return pd.to_numeric(column, downcast='integer', errors='coerce', dtype_backend='pyarrow') - - @staticmethod - def _cast2str(column: pd.Series) -> pd.Series: - return column.astype('string', copy=False) - - @staticmethod - def _cast2cat(column: pd.Series) -> pd.Series: - return column.astype('category', copy=False) - - @staticmethod - def _remove_invalid_frequencies(df): - valid_range = df['Frequência'] <= LIMIT_FREQ - # TODO: save to discarded and log - # log = f"""[("Colunas", "Frequência"), - # ("Processamento", "Frequência Inválida: Maior que {LIMIT_FREQ}") - # """ - # self.register_log(df, log, check_coords) - return df.loc[valid_range] - - def _format( - self, - dfs: List, # List with the individual API sources - ) -> pd.DataFrame: # Processed DataFrame - aero = dfs.pop() - anatel = pd.concat(dfs, ignore_index=True) - df = merge_on_frequency(anatel, aero) - df = self.validate_coordinates(df) - df = self._simplify_sources(df) - for col in ['Frequência', 'Latitude', 'Longitude']: - df[col] = self._cast2float(df[col]) - df.sort_values(['Frequência', 'Latitude', 'Longitude'], ignore_index=True, inplace=True) - df = self._remove_invalid_frequencies(df) - return df.loc[:, self.columns] + self.register_log(gdf, log, check_coords) + + gdf.drop(["Código_Município", "Município", "UF"], axis=1, inplace=True) + + gdf.rename( + columns={ + "CD_MUN": "Código_Município", + "NM_MUN": "Município", + "SIGLA_UF": "UF", + }, + inplace=True, + ) + + return gdf + + def validate_coordinates( + self, df: pd.DataFrame, check_municipio: bool = True + ) -> pd.DataFrame: + """ + Validates the coordinates in the given DataFrame. + + Args: + df: The DataFrame containing the coordinates to be validated. + check_municipio: A boolean indicating whether to check the municipality information (default: True). + + Returns: + pd.DataFrame: The DataFrame with validated coordinates. + + Raises: + None + """ + self.verify_shapefile_folder() + if check_municipio: + df = self.fill_nan_coordinates(df) + return self.intersect_coordinates_on_poligon(df, check_municipio) + + @staticmethod + def _simplify_sources(df): + df["Fonte"] = df["Fonte"].str.replace( + "ICAO-CANALIZACAO-VOR/ILS/DME | AISWEB-CANALIZACAO-VOR/ILS/DME", + "CANALIZACAO-VOR/ILS/DME", + ) + df["Fonte"] = df["Fonte"].str.replace( + r"(ICAO-)?(AISWEB-)?CANALIZACAO-VOR/ILS/DME", + "CANALIZACAO-VOR/ILS/DME", + regex=True, + ) + + return df + + @staticmethod + def _cast2float(column: pd.Series) -> pd.Series: + return pd.to_numeric( + column, downcast="float", errors="coerce", dtype_backend="pyarrow" + ) + + @staticmethod + def _cast2int(column: pd.Series) -> pd.Series: + return pd.to_numeric( + column, downcast="integer", errors="coerce", dtype_backend="pyarrow" + ) + + @staticmethod + def _cast2str(column: pd.Series) -> pd.Series: + return column.astype("string", copy=False) + + @staticmethod + def _cast2cat(column: pd.Series) -> pd.Series: + return column.astype("category", copy=False) + + @staticmethod + def _remove_invalid_frequencies(df): + valid_range = df["Frequência"] <= LIMIT_FREQ + # TODO: save to discarded and log + # log = f"""[("Colunas", "Frequência"), + # ("Processamento", "Frequência Inválida: Maior que {LIMIT_FREQ}") + # """ + # self.register_log(df, log, check_coords) + return df.loc[valid_range] + + def _format( + self, + dfs: List, # List with the individual API sources + ) -> pd.DataFrame: # Processed DataFrame + aero = dfs.pop() + anatel = pd.concat(dfs, ignore_index=True) + df = merge_on_frequency(anatel, aero) + df = self.validate_coordinates(df) + df = self._simplify_sources(df) + for col in ["Frequência", "Latitude", "Longitude"]: + df[col] = self._cast2float(df[col]) + df.sort_values( + ["Frequência", "Latitude", "Longitude"], ignore_index=True, inplace=True + ) + df = self._remove_invalid_frequencies(df) + return df.loc[:, self.columns] diff --git a/nbs/01d_mosaico.ipynb b/nbs/01d_mosaico.ipynb index a991e84..f00d515 100644 --- a/nbs/01d_mosaico.ipynb +++ b/nbs/01d_mosaico.ipynb @@ -90,7 +90,7 @@ "outputs": [], "source": [ "#| export\n", - "MONGO_URI: str = os.environ.get(\"MONGO_URI\")" + "MONGO_URI: str = os.environ.get('MONGO_URI')\n" ] }, { @@ -101,59 +101,51 @@ "source": [ "# | export\n", "class Mosaico(Base, GetAttr):\n", - " def __init__(self, mongo_uri: str = MONGO_URI):\n", - " self.database = \"sms\"\n", - " self.default = MongoDB(mongo_uri)\n", + "\tdef __init__(self, mongo_uri: str = MONGO_URI):\n", + "\t\tself.database = 'sms'\n", + "\t\tself.default = MongoDB(mongo_uri)\n", "\n", - " @property\n", - " def collection(self):\n", - " raise NotImplementedError(\n", - " \"Subclasses devem implementar a propriedade 'collection'\"\n", - " )\n", + "\t@property\n", + "\tdef collection(self):\n", + "\t\traise NotImplementedError(\"Subclasses devem implementar a propriedade 'collection'\")\n", "\n", - " @property\n", - " def query(self):\n", - " raise NotImplementedError(\"Subclasses devem implementar a propriedade 'query'\")\n", + "\t@property\n", + "\tdef query(self):\n", + "\t\traise NotImplementedError(\"Subclasses devem implementar a propriedade 'query'\")\n", "\n", - " @property\n", - " def projection(self):\n", - " raise NotImplementedError(\n", - " \"Subclasses devem implementar a propriedade 'projection'\"\n", - " )\n", + "\t@property\n", + "\tdef projection(self):\n", + "\t\traise NotImplementedError(\"Subclasses devem implementar a propriedade 'projection'\")\n", "\n", - " def _extract(self, collection: str, pipeline: list):\n", - " client = self.connect()\n", - " database = client[self.database]\n", - " collection = database[collection]\n", - " dtype = \"string[pyarrow]\" if self.stem == \"srd\" else \"category\"\n", - " df = pd.DataFrame(\n", - " [c for c in collection.aggregate(pipeline)], copy=False, dtype=dtype\n", - " )\n", - " # Substitui strings vazias e somente com espaços por nulo\n", - " return df.replace(r\"^\\s*$\", pd.NA, regex=True)\n", + "\tdef _extract(self, collection: str, pipeline: list):\n", + "\t\tclient = self.connect()\n", + "\t\tdatabase = client[self.database]\n", + "\t\tcollection = database[collection]\n", + "\t\tdtype = 'string[pyarrow]' if self.stem == 'srd' else 'category'\n", + "\t\tdf = pd.DataFrame(list(collection.aggregate(pipeline)), copy=False, dtype=dtype)\n", + "\t\t# Substitui strings vazias e somente com espaços por nulo\n", + "\t\treturn df.replace(r'^\\s*$', pd.NA, regex=True)\n", "\n", - " @staticmethod\n", - " def split_designacao(\n", - " df: pd.DataFrame, # DataFrame com coluna original DesignacaoEmissao\n", - " ) -> (\n", - " pd.DataFrame\n", - " ): # DataFrame com novas colunas Largura_Emissão(kHz) e Classe_Emissão\n", - " \"\"\"Parse a bandwidth string\n", - " It returns the numerical component and a character class\n", - " \"\"\"\n", - " df[\"Designação_Emissão\"] = (\n", - " df[\"Designação_Emissão\"]\n", - " .str.replace(\",\", \" \")\n", - " .str.strip()\n", - " .str.upper()\n", - " .str.split(\" \")\n", - " )\n", - " df = df.explode(\"Designação_Emissão\")\n", - " df = df[df[\"Designação_Emissão\"] != \"/\"] # Removes empty rows\n", - " # Apply the parse_bw function\n", - " parsed_data = zip(*df[\"Designação_Emissão\"].apply(Base.parse_bw))\n", - " df[\"Largura_Emissão(kHz)\"], df[\"Classe_Emissão\"] = parsed_data\n", - " return df.drop(\"Designação_Emissão\", axis=1)" + "\tdef split_designacao(\n", + "\t\tself,\n", + "\t\tdf: pd.DataFrame, # DataFrame com coluna original DesignacaoEmissao\n", + "\t) -> pd.DataFrame: # DataFrame com novas colunas Largura_Emissão(kHz) e Classe_Emissão\n", + "\t\t\"\"\"Parse a bandwidth string\n", + "\t\tIt returns the numerical component and a character class\n", + "\t\t\"\"\"\n", + "\t\tdf['Designação_Emissão'] = (\n", + "\t\t\tdf['Designação_Emissão'].str.replace(',', ' ').str.strip().str.upper().str.split(' ')\n", + "\t\t)\n", + "\t\tdf = df.explode('Designação_Emissão').reset_index(drop=True)\n", + "\t\texploded_rows = df['Designação_Emissão'].apply(lambda x: isinstance(x, list))\n", + "\t\tlog = \"\"\"[(\"Colunas\", \"Designação_Emissão\"]),\n", + "\t\t (\"Processamento\", \"Registro expandido nos componentes individuais e extraídas Largura e Classe\")]\"\"\"\n", + "\t\tdf = self.register_log(df, log, exploded_rows)\n", + "\t\tdf = df[df['Designação_Emissão'] != '/'] # Removes empty rows\n", + "\t\t# Apply the parse_bw function\n", + "\t\tparsed_data = zip(*df['Designação_Emissão'].apply(Base.parse_bw))\n", + "\t\tdf['Largura_Emissão(kHz)'], df['Classe_Emissão'] = parsed_data\n", + "\t\treturn df.drop('Designação_Emissão', axis=1)" ] }, { diff --git a/nbs/_04_eda.ipynb b/nbs/_04_eda.ipynb index 3341723..0c0be33 100644 --- a/nbs/_04_eda.ipynb +++ b/nbs/_04_eda.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -16,7 +16,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -41,7 +41,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -50,7 +50,7 @@ "True" ] }, - "execution_count": 3, + "execution_count": null, "metadata": {}, "output_type": "execute_result" } @@ -82,7 +82,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -93,7 +93,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -106,7 +106,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -160,7 +160,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -169,7 +169,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -225,7 +225,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -277,7 +277,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -291,7 +291,7 @@ " dtype='int32', length=974142)" ] }, - "execution_count": 15, + "execution_count": null, "metadata": {}, "output_type": "execute_result" } @@ -303,7 +303,7 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -355,7 +355,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -364,7 +364,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -373,7 +373,7 @@ }, { "cell_type": "code", - "execution_count": 24, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -382,7 +382,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -396,7 +396,7 @@ " dtype='int32', length=974142)" ] }, - "execution_count": 18, + "execution_count": null, "metadata": {}, "output_type": "execute_result" } @@ -407,7 +407,7 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -29234,18 +29234,6 @@ "display_name": "python3", "language": "python", "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.8.18" } }, "nbformat": 4,