diff --git a/examples/loom2parquetmerge.py b/examples/loom2parquetmerge.py deleted file mode 100644 index 20f1c45..0000000 --- a/examples/loom2parquetmerge.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import pyarrow.parquet as pq -import pyarrow as pa - - -# get all absolute paths of files in a directory -def get_files_paths(directory, extension: str = "parquet"): - """ - Get all file paths in a directory. - :param extension: str, file extension. - :param directory: str, directory path. - :return: list, list of file paths. - """ - files_paths = [] - for root, dirs, files in os.walk(directory): - for file in files: - if file.endswith(extension): - files_paths.append(os.path.join(root, file)) - return files_paths - - -def concatenate_parquet_files_incremental(files_paths, output_path, batch_size=10000): - """ - Concatenate multiple parquet files in an incremental fashion to avoid memory overload. - - :param files_paths: List of parquet file paths. - :param output_path: Path to the output parquet file. - :param batch_size: Number of rows to read from each file at a time. - """ - writer = None - - for file_path in files_paths: - print(f"Processing file: {file_path}") - parquet_file = pq.ParquetFile(file_path) - - # Read the file in batches to avoid memory overload - for batch in parquet_file.iter_batches(batch_size=batch_size): - # Convert the batch to a PyArrow Table - table = pa.Table.from_batches([batch]) - - # If the writer is not initialized, create a new Parquet writer - if writer is None: - writer = pq.ParquetWriter(output_path, table.schema, compression='gzip') - - # Write the batch to the output Parquet file - writer.write_table(table) - - # Close the writer after all batches are written - if writer is not None: - writer.close() - print(f"Concatenated parquet file written to {output_path}") - - -# Get all files paths -files_paths = get_files_paths(directory="./", - extension="parquet") - -# Output path for the final concatenated parquet file -output_path = "GSE156793.parquet" - -# Concatenate the parquet files and write to a single file incrementally -concatenate_parquet_files_incremental(files_paths, output_path, batch_size=10000) diff --git a/fslite/fs/fdataframe.py b/fslite/fs/fdataframe.py index 1ca7910..318bffd 100644 --- a/fslite/fs/fdataframe.py +++ b/fslite/fs/fdataframe.py @@ -38,14 +38,12 @@ class FSDataFrame: """ def __init__( - self, - df: pd.DataFrame, - sample_col: Optional[str] = None, - label_col: Optional[str] = None, - sparse_threshold: float = 0.7, # Threshold for sparsity - memory_threshold: Optional[ - float - ] = 0.75, # Proportion of system memory to use for dense arrays + self, + df: pd.DataFrame, + sample_col: Optional[str] = None, + label_col: Optional[str] = None, + sparse_threshold: float = 0.7, # Threshold for sparsity + memory_threshold: Optional[float] = 0.75, # Proportion of system memory to use for dense arrays ): """ Create an instance of FSDataFrame. @@ -61,21 +59,15 @@ def __init__( in the feature matrix exceeds this value, the matrix is stored in a sparse format unless memory allows. :param memory_threshold: Proportion of system memory available to use before deciding on sparse/dense. """ - # TODO: We are loading full data into memory, look for other options. Maybe Dask? - self.__df = df.copy() - - # Check for necessary columns - columns_to_drop = [] + # Copy the DataFrame for internal usage + self.__df = df # Handle sample column if sample_col: if sample_col not in df.columns: - raise ValueError( - f"Sample column '{sample_col}' not found in DataFrame." - ) + raise ValueError(f"Sample column '{sample_col}' not found in DataFrame.") self.__sample_col = sample_col self.__samples = df[sample_col].tolist() - columns_to_drop.append(sample_col) else: self.__sample_col = None self.__samples = [] @@ -90,34 +82,32 @@ def __init__( self.__label_col = label_col self.__labels = df[label_col].tolist() - # Encode labels - # TODO: Check if labels are categorical or continuous? For now, assume categorical + # Encode labels (assume categorical for now) label_encoder = LabelEncoder() self.__labels_matrix = label_encoder.fit_transform(df[label_col]).tolist() - columns_to_drop.append(label_col) - # Drop both sample and label columns in one step - self.__df = self.__df.drop(columns=columns_to_drop) + # Select only numerical columns, excluding sample_col and label_col + feature_columns = df.select_dtypes(include=[np.number]).columns.tolist() + self.__original_features = [col for col in feature_columns if col not in [sample_col, label_col]] - # Extract features - self.__original_features = self.__df.columns.tolist() + # Select only the feature columns directly (no drop) + numerical_df = df[self.__original_features] - # Ensure only numerical features are retained - numerical_df = self.__df.select_dtypes(include=[np.number]) if numerical_df.empty: raise ValueError("No numerical features found in the DataFrame.") - # Check sparsity + # Calculate sparsity num_elements = numerical_df.size - num_zeros = np.count_nonzero(numerical_df == 0) + num_zeros = (numerical_df == 0).sum().sum() sparsity = num_zeros / num_elements + # Estimate memory usage dense_matrix_size = numerical_df.memory_usage(deep=True).sum() # In bytes available_memory = psutil.virtual_memory().available # In bytes + # Handle sparse or dense matrix based on sparsity and available memory if sparsity > sparse_threshold: if dense_matrix_size < memory_threshold * available_memory: - # Use dense matrix if enough memory is available logging.info( f"Data is sparse (sparsity={sparsity:.2f}) but enough memory available. " f"Using a dense matrix." @@ -125,20 +115,14 @@ def __init__( self.__matrix = numerical_df.to_numpy(dtype=np.float32) self.__is_sparse = False else: - # Use sparse matrix due to memory constraints logging.info( f"Data is sparse (sparsity={sparsity:.2f}), memory insufficient for dense matrix. " f"Using a sparse matrix representation." ) - self.__matrix = sparse.csr_matrix( - numerical_df.to_numpy(dtype=np.float32) - ) + self.__matrix = sparse.csr_matrix(numerical_df.to_numpy(dtype=np.float32)) self.__is_sparse = True else: - # Use dense matrix since it's not sparse - logging.info( - f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix." - ) + logging.info(f"Data is not sparse (sparsity={sparsity:.2f}), using a dense matrix.") self.__matrix = numerical_df.to_numpy(dtype=np.float32) self.__is_sparse = False diff --git a/fslite/tests/test_univariate_methods.py b/fslite/tests/test_univariate_methods.py index d096069..278393f 100644 --- a/fslite/tests/test_univariate_methods.py +++ b/fslite/tests/test_univariate_methods.py @@ -1,4 +1,5 @@ import pandas as pd +import psutil from fslite.fs.fdataframe import FSDataFrame from fslite.fs.univariate import FSUnivariate @@ -29,6 +30,30 @@ def test_univariate_filter_corr(): df_filtered = fsdf_filtered.to_pandas() df_filtered.to_csv("filtered_tnbc_data.csv", index=False) +def test_univariate_filter_big_corr(): + # import tsv as pandas DataFrame + df = pd.read_parquet(path="../../examples/GSE156793.parquet") + df.drop(columns=["development_day", "assay_id"], inplace=True) + print(df.shape[1]) + + dense_matrix_size = (df.memory_usage(deep=True).sum() / 1e+6) # In megabytes + available_memory = (psutil.virtual_memory().available / 1e+6) # In megabytes + + # create FSDataFrame instance + fs_df = FSDataFrame(df=df, sample_col="sample_id", label_col="cell_cluster_id") + + # create FSUnivariate instance + fs_univariate = FSUnivariate(fs_method="u_corr", selection_threshold=0.3) + + fsdf_filtered = fs_univariate.select_features(fs_df) + + assert fs_df.count_features() == 500 + assert fsdf_filtered.count_features() == 211 + + # Export the filtered DataFrame as Pandas DataFrame + df_filtered = fsdf_filtered.to_pandas() + df_filtered.to_csv("single_cell_output.csv", index=False) + # test the univariate_filter method with 'anova' method def test_univariate_filter_anova():