Skip to content

Commit

Permalink
Merge branch 'modelscope:main' into mllm_mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Qirui-jiao authored Aug 23, 2024
2 parents 4c5a9f3 + 69e199e commit 8e31c1f
Show file tree
Hide file tree
Showing 19 changed files with 183 additions and 141 deletions.
3 changes: 3 additions & 0 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ process:
hf_img2seq: 'Salesforce/blip2-opt-2.7b' # model name on huggingface to generate caption if caption_key is null
mem_required: '8GB' # This operation (Op) utilizes deep neural network models that consume a significant amount of memory for computation, hence the system's available memory might constrains the maximum number of processes that can be launched
- image_face_blur_mapper: # blur faces detected in images
cv_classifier: '' # OpenCV classifier path for face detection. By default, we will use 'haarcascade_frontalface_alt.xml'.
blur_type: 'gaussian' # type of blur kernel, including ['mean', 'box', 'gaussian']
radius: 2 # radius of blur kernel
- mllm_mapper: # use MLLMs for visual question answering tasks
Expand Down Expand Up @@ -197,6 +198,7 @@ process:
vertical_flip: false # flip frame image vertically (top to bottom).
mem_required: '20GB' # This operation (Op) utilizes deep neural network models that consume a significant amount of memory for computation, hence the system's available memory might constrains the maximum number of processes that can be launched
- video_face_blur_mapper: # blur faces detected in videos
cv_classifier: '' # OpenCV classifier path for face detection. By default, we will use 'haarcascade_frontalface_alt.xml'.
blur_type: 'gaussian' # type of blur kernel, including ['mean', 'box', 'gaussian']
radius: 2 # radius of blur kernel
- video_ffmpeg_wrapped_mapper: # simple wrapper for FFmpeg video filters
Expand Down Expand Up @@ -281,6 +283,7 @@ process:
max_ratio: 3.0 # the max aspect ratio of filter range
any_or_all: any # keep this sample when any/all images meet the filter condition
- image_face_ratio_filter: # filter samples according to the face area ratios in images (r=face_area/image_area). If multiple faces are available, we use the largest one.
cv_classifier: '' # OpenCV classifier path for face detection. By default, we will use 'haarcascade_frontalface_alt.xml'.
min_ratio: 0.0 # the min face area ratio of filter range
max_ratio: 0.4 # the max face area ratio of filter range
- image_nsfw_filter: # filter samples according to the nsfw scores of images in them
Expand Down
6 changes: 2 additions & 4 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, cfg=None):
self.cfg = init_configs() if cfg is None else cfg

self.work_dir = self.cfg.work_dir
self.ops = None

if self.cfg.use_cache:
logger.info(f'Using cache compression method: '
Expand Down Expand Up @@ -79,13 +78,12 @@ def run(self, load_data_np=None, skip_export=False):

# extract processes
logger.info('Preparing process operators...')
self.cfg.process, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
stats_collected = False
for op in self.ops:
for op in ops:
if isinstance(op, Filter):
original_process = op.process
op.process = None
Expand Down
49 changes: 31 additions & 18 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import copy
import inspect
import traceback
from abc import ABC, abstractmethod
from functools import wraps
from time import time
Expand Down Expand Up @@ -174,24 +175,32 @@ def process(self,
unforkable_operators = set(UNFORKABLE.modules.keys())

dataset = self
for op in operators:
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda() or op._name in unforkable_operators) else None
setup_mp(mp_context)

start = time()
# run single op
dataset = op(dataset,
exporter=exporter,
checkpointer=checkpointer,
tracer=tracer)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._name,
list(op._process_kwargs.values())[0])
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
try:
for op in operators:
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda()
or op._name in unforkable_operators) else None
setup_mp(mp_context)

start = time()
# run single op
dataset = op.run(dataset, exporter=exporter, tracer=tracer)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._op_cfg)
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
except: # noqa: E722
logger.error(f'An error occurred during Op [{op._name}].')
traceback.print_exc()
exit(1)
finally:
if checkpointer and dataset is not self:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
return dataset

def map(self, *args, **kargs):
Expand Down Expand Up @@ -325,6 +334,10 @@ def cleanup_cache_files(self):
cleanup_compressed_cache_files(self)
return super().cleanup_cache_files()

@staticmethod
def load_from_disk(*args, **kargs):
return NestedDataset(Dataset.load_from_disk(*args, **kargs))


def nested_query(root_obj: Union[NestedDatasetDict, NestedDataset,
NestedQueryDict], key):
Expand Down
25 changes: 5 additions & 20 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import traceback
from time import time

from loguru import logger
Expand Down Expand Up @@ -38,7 +37,6 @@ def __init__(self, cfg=None):

self.work_dir = self.cfg.work_dir

self.ops = None
self.tracer = None
self.ckpt_manager = None

Expand All @@ -58,17 +56,15 @@ def __init__(self, cfg=None):
# check if there are existing checkpoints first and try to load the
# checkpoints. If the checkpoints are loaded successfully, ops that
# have been processed will be skipped.
self.process_list = self.cfg.process
if self.cfg.use_checkpoint:
logger.info('Preparing checkpoint manager...')
self.ckpt_dir = os.path.join(self.work_dir, 'ckpt')
self.ckpt_manager = CheckpointManager(self.ckpt_dir,
self.process_list,
self.cfg.process,
self.cfg.np)
if self.ckpt_manager.ckpt_available:
logger.info('Found existed dataset checkpoint.')
self.process_list = self.ckpt_manager.get_left_process_list()
self.cfg.process = self.process_list
self.cfg.process = self.ckpt_manager.get_left_process_list()

# prepare exporter and check export path suffix
logger.info('Preparing exporter...')
Expand Down Expand Up @@ -155,15 +151,14 @@ def run(self, load_data_np=None):

# 2. extract processes
logger.info('Preparing process operators...')
self.process_list, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 3. data process
# - If tracer is open, trace each op after it's processed
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
tstart = time()
dataset = dataset.process(self.ops,
dataset = dataset.process(ops,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
Expand All @@ -172,17 +167,7 @@ def run(self, load_data_np=None):

# 4. data export
logger.info('Exporting dataset to disk...')
try:
self.exporter.export(dataset)
except: # noqa: E722
logger.error('An error occurred during exporting the processed '
'dataset.')
traceback.print_exc()
if self.cfg.use_checkpoint:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
self.ckpt_manager.save_ckpt(dataset)
self.exporter.export(dataset)
# compress the last dataset after exporting
if self.cfg.use_cache and self.cfg.cache_compress:
from data_juicer.utils.compress import compress
Expand Down
7 changes: 2 additions & 5 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ def __init__(self, cfg=None):

self.work_dir = self.cfg.work_dir

self.ops = None
# init ray
logger.info('Initing Ray ...')
ray.init(self.cfg.ray_address)
self.process_list = self.cfg.process

def run(self, load_data_np=None):
"""
Expand All @@ -55,13 +53,12 @@ def run(self, load_data_np=None):
dataset = RayDataset(dataset, self.cfg.dataset_path, self.cfg)
# 2. extract processes
logger.info('Preparing process operators...')
self.process_list, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 3. data process
logger.info('Processing data...')
tstart = time.time()
dataset.process(self.ops)
dataset.process(ops)
tend = time.time()
logger.info(f'All Ops are done in {tend - tstart:.3f}s.')

Expand Down
21 changes: 0 additions & 21 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,6 @@ def __init__(self, *args, **kwargs):
method = wrap_func_with_nested_access(method)
setattr(self, name, method)

def __call__(self,
dataset,
*,
exporter=None,
checkpointer=None,
tracer=None):
try:
dataset = self.run(dataset, exporter=exporter, tracer=tracer)
if checkpointer:
checkpointer.record(self._name, self._process_kwargs)
return dataset
except: # noqa: E722
logger.error(f'An error occurred during Op [{self._name}].')
traceback.print_exc()
if checkpointer:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
exit(1)

@classmethod
def is_batched_op(cls):
return cls._batched_op
Expand Down
47 changes: 29 additions & 18 deletions data_juicer/ops/filter/image_face_ratio_filter.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
import os

import numpy as np
from jsonargparse.typing import ClosedUnitInterval
from loguru import logger

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (load_data_with_context, load_image,
pil_to_opencv)
from data_juicer.utils.mm_utils import (detect_faces, load_data_with_context,
load_image)
from data_juicer.utils.model_utils import get_model, prepare_model

from ..base_op import OPERATORS, Filter
from ..base_op import OPERATORS, UNFORKABLE, Filter
from ..op_fusion import LOADED_IMAGES

OP_NAME = 'image_face_ratio_filter'

with AvailabilityChecking(['dlib'], OP_NAME):
import dlib
with AvailabilityChecking(['opencv-python'], OP_NAME):
import cv2


@UNFORKABLE.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
@LOADED_IMAGES.register_module(OP_NAME)
class ImageFaceRatioFilter(Filter):
"""Filter to keep samples with face area ratios within a specific range.
"""

_default_kwargs = {'upsample_num_times': 0}
_default_kwargs = {
'scaleFactor': 1.1,
'minNeighbors': 3,
'minSize': None,
'maxSize': None,
}

def __init__(self,
cv_classifier='',
min_ratio: ClosedUnitInterval = 0.0,
max_ratio: ClosedUnitInterval = 0.4,
any_or_all: str = 'any',
Expand All @@ -33,6 +43,8 @@ def __init__(self,
"""
Initialization method.
:param cv_classifier: OpenCV classifier path for face detection.
By default, we will use 'haarcascade_frontalface_alt.xml'.
:param min_ratio: Min ratio for the largest face area in an image.
:param max_ratio: Max ratio for the largest face area in an image.
:param any_or_all: Keep this sample with 'any' or 'all' strategy of
Expand All @@ -43,6 +55,10 @@ def __init__(self,
:param kwargs: Extra keyword arguments.
"""
super().__init__(*args, **kwargs)

if cv_classifier == '':
cv_classifier = os.path.join(cv2.data.haarcascades,
'haarcascade_frontalface_alt.xml')
self.min_ratio = min_ratio
self.max_ratio = max_ratio

Expand All @@ -56,8 +72,8 @@ def __init__(self,
f'Can only be one of ["any", "all"].')
self.any = (any_or_all == 'any')

# Initialize face detector
self.detector = dlib.get_frontal_face_detector()
self.model_key = prepare_model(model_type='opencv_classifier',
model_path=cv_classifier)

def compute_stats(self, sample, context=False):
# check if it's computed already
Expand All @@ -75,25 +91,20 @@ def compute_stats(self, sample, context=False):
sample, images = load_data_with_context(sample, context,
loaded_image_keys, load_image)

model = get_model(self.model_key)

# detect faces
face_detections = {}
for key, image in images.items():
img = pil_to_opencv(image)
dets = self.detector(img, **self.extra_kwargs)
face_detections[key] = [[
max(det.left(), 0),
max(det.top(), 0),
min(det.right(), image.width),
min(det.bottom(), image.height)
] for det in dets]
face_detections[key] = detect_faces(image, model,
**self.extra_kwargs)
logger.debug(f'detections: {face_detections}')

# compute face area ratios for each image considering the largest face
face_area_ratios = {}
for key, dets in face_detections.items():
image_area = images[key].width * images[key].height
face_area_ratios[key] = max([(x2 - x1) * (y2 - y1)
for x1, y1, x2, y2 in dets],
face_area_ratios[key] = max([w * h for _, _, w, h in dets],
default=0.0) / image_area
logger.debug(f'ratios: {face_area_ratios}')

Expand Down
6 changes: 3 additions & 3 deletions data_juicer/ops/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def load_ops(process_list, op_fusion=False):
if op_fusion:
new_process_list, ops = fuse_operators(new_process_list, ops)

for process, op in zip(new_process_list, ops):
op._process_kwargs = process
for op_cfg, op in zip(new_process_list, ops):
op._op_cfg = op_cfg

return new_process_list, ops
return ops
Loading

0 comments on commit 8e31c1f

Please sign in to comment.