Skip to content

Commit

Permalink
⚡️Retornada a leitura usando pyarrow para performance
Browse files Browse the repository at this point in the history
  • Loading branch information
ronaldokun committed Mar 6, 2024
1 parent e3480ef commit a7b3db2
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions extracao/datasources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ class Base:
folder: Union[str, Path] = Path(__file__).parent / 'arquivos' / 'saida'
read_cache: bool = False

def _read(self, stem: str) -> pd.DataFrame:
def _read(self, stem: str, backend: str = 'pyarrow') -> pd.DataFrame:
"""Lê o dataframe formado por self.folder / self.stem.parquet.gzip"""
file = Path(f'{self.folder}/{stem}.parquet.gzip')
try:
df = pd.read_parquet(file, dtype_backend='pyarrow')
df = pd.read_parquet(file, dtype_backend=backend)
except (ArrowInvalid, FileNotFoundError) as e:
raise ValueError(f'Error when reading {file}') from e
return df
Expand All @@ -43,7 +43,7 @@ def _save(self, df: pd.DataFrame, folder: Union[str, Path], stem: str) -> pd.Dat
"""Format, Save and return a dataframe"""
try:
file = Path(f'{folder}/{stem}.parquet.gzip')
df.to_parquet(file, compression='gzip', index=False, engine='pyarrow')
df.astype('string').to_parquet(file, compression='gzip', index=False, engine='pyarrow')
except (ArrowInvalid, ArrowTypeError) as e:
raise Exception(f'Não foi possível salvar o arquivo parquet') from e
return df
Expand Down Expand Up @@ -95,7 +95,7 @@ def register_log(
df['Log'] = df['Log'].astype('string', copy=False).fillna('[]')
df['Log'] = df['Log'].str.replace('^$', r'[]', regex=True)
log_function = partial(Base.format_log, processing=processing, column=column)
print(f'Logging {processing}...')
print(f'Logging: {processing}')
df.loc[row_filter, 'Log'] = df[row_filter].progress_apply(log_function, axis=1)

@staticmethod
Expand Down

0 comments on commit a7b3db2

Please sign in to comment.