Skip to content

Commit

Permalink
FEAT : Implementation of read archive's function with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sabrina-Hassaim committed Jan 21, 2025
1 parent 7e7dca0 commit 7a955fc
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 113 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
2 changes: 1 addition & 1 deletion README.md
293 changes: 188 additions & 105 deletions janitor/functions/read_archive.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,222 @@
import zipfile
from __future__ import annotations

import tarfile
import zipfile

import pandas as pd
import pandas_flavor as pf

from janitor.utils import check


def read_archive(file_path: str, extract_to_df: bool = True, file_type: str = None) -> pd.DataFrame | list[str]:
@pf.register_dataframe_method
def read_archive(
file_path: str,
extract_to_df: bool = True,
file_type: str | None = None,
) -> pd.DataFrame | list[str]:
"""
Reads an archive file (.zip, .tar, .tar.gz) and optionally lists its content or extracts specific files into a DataFrame.
Reads an archive file (.zip, .tar, .tar.gz) and optionally lists its content
or extracts specific files into a DataFrame.
Examples:
>>> # Example usage
>>> df = pd.read_archive("data.zip", extract_to_df=True)
Args:
file_path: The path to the archive file.
extract_to_df: Whether to attempt reading the contents into a DataFrame (for CSV or similar formats). Default is True.
file_type: Optional file type hint. Can be 'zip', 'tar' or 'tar.gz'. If None, it will be inferred from the file extension.
extract_to_df: Whether to read the contents into a DataFrame
(for CSV or similar formats). Default is True.
file_type: Optional file type hint ('zip', 'tar', 'tar.gz').
If None, it will be inferred from the file extension.
Returns:
- A pandas DataFrame if extract_to_df is True and the user selects a file to load.
- A pandas DataFrame if extract_to_df is True
and the user selects a file to load.
- A list of compatible file names in the archive otherwise.
"""

check("file_path", file_path, [str])
check("extract_to_df", extract_to_df, [bool])

file_type = file_type or _infer_file_type(file_path)

if file_type == "zip":
return _process_zip_archive(file_path, extract_to_df)
elif file_type in {"tar", "tar.gz"}:
return _process_tar_archive(file_path, extract_to_df)
else:
raise ValueError(
"Unsupported archive format.Supported formats are .zip, .tar, or .tar.gz."
)


def _infer_file_type(file_path: str) -> str:
"""
Infer the type of the archive based on the file extension.
Args:
file_path: Path to the file.
Returns:
A string representing the archive type ('zip', 'tar', 'tar.gz').
Raises:
ValueError: If the file format is unsupported or if no readable files are found in the archive.
"""

# Detect file type if not provided
if not file_type:
if file_path.endswith('.zip'):
file_type = 'zip'
elif file_path.endswith(('.tar', '.tar.gz', '.tgz')):
file_type = 'tar'
else:
raise ValueError("Unsupported archive format. Please provide a valid .zip, .tar or .tar.gz file.")

# Process ZIP files
if file_type == 'zip':
with zipfile.ZipFile(file_path, 'r') as archive:
file_names = archive.namelist()
compatible_files = _list_compatible_files(file_names)
if extract_to_df:
return _select_and_extract_from_zip(archive, compatible_files)
return compatible_files

# Process TAR files (including .tar.gz)
elif file_type == 'tar':
mode = 'r:gz' if file_path.endswith('.gz') else 'r'
with tarfile.open(file_path, mode) as archive:
file_names = archive.getnames()
compatible_files = _list_compatible_files(file_names)
if extract_to_df:
return _select_and_extract_from_tar(archive, compatible_files)
return compatible_files
ValueError if the file extension is unsupported.
"""
if file_path.endswith(".zip"):
return "zip"
elif file_path.endswith((".tar", ".tar.gz")):
return "tar.gz" if file_path.endswith(".tar.gz") else "tar"
else:
raise ValueError(
"Cannot infer file type from the file extension. "
"Please specify the 'file_type' parameter."
)


def _process_zip_archive(
file_path: str, extract_to_df: bool
) -> pd.DataFrame | list[str]:
"""
Process a ZIP archive.
Args:
file_path: Path to the ZIP file.
extract_to_df: Whether to extract the content into a DataFrame.
Returns:
A DataFrame or a list of files in the archive.
"""
with zipfile.ZipFile(file_path) as archive:
compatible_files = _list_compatible_files(archive.namelist())

if extract_to_df:
return _select_and_extract_from_zip(archive, compatible_files)
return compatible_files


def _process_tar_archive(
file_path: str, extract_to_df: bool
) -> pd.DataFrame | list[str]:
"""
Process a TAR archive.
Args:
file_path: Path to the TAR file.
extract_to_df: Whether to extract the content into a DataFrame.
Returns:
A DataFrame or a list of files in the archive.
"""
mode = "r:gz" if file_path.endswith(".gz") else "r"
with tarfile.open(file_path, mode) as archive:
compatible_files = _list_compatible_files(archive.getnames())

if extract_to_df:
return _select_and_extract_from_tar(archive, compatible_files)
return compatible_files


def _list_compatible_files(file_names: list[str]) -> list[str]:
"""Helper function to list compatible files (e.g., .csv, .xlsx) from an archive."""
compatible_files = [file_name for file_name in file_names if file_name.endswith(('.csv', '.xlsx'))]
"""
Helper function to list compatible files (e.g., .csv, .xlsx) from an archive.
Args:
file_names: List of file names in the archive.
Returns:
List of compatible file names.
"""
compatible_files = [
file_name
for file_name in file_names
if file_name.endswith((".csv", ".xlsx"))
]
print("Fichiers compatibles détectés :", compatible_files)
if not compatible_files:
raise ValueError("No compatible files found in the archive.")
return compatible_files


def _select_and_extract_from_zip(archive: zipfile.ZipFile, compatible_files: list[str]) -> pd.DataFrame | list[pd.DataFrame]:
"""Helper function to allow the user to select and read specific files from a ZIP archive."""
if not compatible_files:
raise ValueError("No compatible files found in the archive.")

print("Compatible files found in the archive:")
for i, file_name in enumerate(compatible_files):
print(f"{i + 1}. {file_name}")
def _select_and_extract_from_zip(
archive: zipfile.ZipFile, compatible_files: list[str]
) -> pd.DataFrame | list[pd.DataFrame]:
"""
Helper function to allow the user to select
and read specific files from a ZIP archive.
selected_files = input("Enter the numbers of the files you want to read, separated by commas (e.g., 1,3): ").strip()
if not selected_files:
raise ValueError("No files selected.")

selected_indices = []
for index in selected_files.split(','):
index = index.strip()
if index.isdigit():
index = int(index) - 1
if 0 <= index < len(compatible_files):
selected_indices.append(index)
else:
print(f"Index out of range : {index + 1}")
else:
print(f"Invalid Index : '{index}'")

if not selected_indices:
raise ValueError("No valid indices selected.")
Args:
archive: The ZIP archive object.
compatible_files: List of compatible file names.
Returns:
A single DataFrame or a list of DataFrames.
"""
selected_files = _select_files_interactively(compatible_files)
dfs = []
for index in selected_indices:
file_name = compatible_files[index]
try:
with archive.open(file_name) as file:
if file_name.endswith('.csv'):
dfs.append(pd.read_csv(file))
elif file_name.endswith('.xlsx'):
dfs.append(pd.read_excel(file))
except Exception as e:
print(f"Error reading the file {file_name}: {e}")

if not dfs:
raise ValueError("No files could be read successfully.")

for selected_file in selected_files:
with archive.open(selected_file) as file:
if selected_file.endswith(".csv"):
dfs.append(pd.read_csv(file))
elif selected_file.endswith(".xlsx"):
dfs.append(pd.read_excel(file))
return dfs if len(dfs) > 1 else dfs[0]


def _select_and_extract_from_tar(archive: tarfile.TarFile, compatible_files: list[str]) -> pd.DataFrame | list[pd.DataFrame]:
"""Helper function to allow the user to select and read specific files from a TAR archive."""
if not compatible_files:
raise ValueError("No compatible files found in the archive.")

print("Compatible files found in the archive:")
for i, file_name in enumerate(compatible_files):
print(f"{i + 1}. {file_name}")
def _select_and_extract_from_tar(
archive: tarfile.TarFile, compatible_files: list[str]
) -> pd.DataFrame | list[pd.DataFrame]:
"""
Helper function to allow the user to select
and read specific files from a TAR archive.
selected_files = input("Enter the numbers of the files you want to read, separated by commas (e.g., 1,3): ").strip()
if not selected_files:
raise ValueError("No files selected.")
Args:
archive: The TAR archive object.
compatible_files: List of compatible file names.
selected_indices = [int(index.strip()) - 1 for index in selected_files.split(',') if index.strip().isdigit()]
Returns:
A single DataFrame or a list of DataFrames.
"""
selected_files = _select_files_interactively(compatible_files)
dfs = []
for index in selected_indices:
member = archive.getmember(compatible_files[index])
extracted_file = archive.extractfile(member)
if extracted_file:
try:
if member.name.endswith('.csv'):
dfs.append(pd.read_csv(extracted_file))
elif member.name.endswith('.xlsx'):
dfs.append(pd.read_excel(extracted_file))
except Exception as e:
print(f"Error reading the file {member.name}: {e}")

if not dfs:
raise ValueError("No files could be read successfully.")

for selected_file in selected_files:
member = archive.getmember(selected_file)
with archive.extractfile(member) as file:
if selected_file.endswith(".csv"):
dfs.append(pd.read_csv(file))
elif selected_file.endswith(".xlsx"):
dfs.append(pd.read_excel(file))
return dfs if len(dfs) > 1 else dfs[0]


def _select_files_interactively(compatible_files: list[str]) -> list[str]:
"""
Allow the user to select files from a list interactively.
Args:
compatible_files: List of compatible file names.
Returns:
List of selected file names.
"""
print("Compatible files found in the archive:")
for idx, file_name in enumerate(compatible_files, 1):
print(f"{idx}. {file_name}")

selected_indices = (
input(
"Enter the numbers of the files to read, "
"separated by commas (e.g., 1,2,3): "
)
.strip()
.split(",")
)
selected_files = [
compatible_files[int(idx) - 1]
for idx in selected_indices
if idx.strip().isdigit() and 0 < int(idx) <= len(compatible_files)
]
if not selected_files:
raise ValueError("No valid files selected.")
return selected_files
2 changes: 1 addition & 1 deletion janitor/spark/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from functools import wraps

try:
from pyspark.pandas.extensions import register_dataframe_accessor
from pandas.api.extensions import register_dataframe_accessor

except ImportError:
from janitor.utils import import_message
Expand Down
2 changes: 1 addition & 1 deletion mkdocs/AUTHORS.md
2 changes: 1 addition & 1 deletion mkdocs/CHANGELOG.md
3 changes: 3 additions & 0 deletions test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1,col2
1,2
3,4
5 changes: 3 additions & 2 deletions tests/functions/test_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ def test_complete_multiple_groupings():
fill_value={"tag_count": 0},
sort=True,
).astype({"tag_count": int})
assert_frame_equal(result, output3)
print(result)
assert_frame_equal(result, output3, check_dtype=False)


def test_fill_value_scalar(taxonomy_df):
Expand All @@ -451,7 +452,7 @@ def test_fill_value_scalar(taxonomy_df):
.sort_values("Taxon", ignore_index=True)
)

assert_frame_equal(result, expected)
assert_frame_equal(result, expected, check_dtype=False)


# http://imachordata.com/2016/02/05/you-complete-me/
Expand Down
Loading

0 comments on commit 7a955fc

Please sign in to comment.