Skip to content

Commit

Permalink
Enhance/ckpt (#396)
Browse files Browse the repository at this point in the history
* enhance ckpt logic

* fix tests
  • Loading branch information
drcege authored Aug 22, 2024
1 parent f7103fe commit 213f7f8
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 76 deletions.
6 changes: 2 additions & 4 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, cfg=None):
self.cfg = init_configs() if cfg is None else cfg

self.work_dir = self.cfg.work_dir
self.ops = None

if self.cfg.use_cache:
logger.info(f'Using cache compression method: '
Expand Down Expand Up @@ -79,13 +78,12 @@ def run(self, load_data_np=None, skip_export=False):

# extract processes
logger.info('Preparing process operators...')
self.cfg.process, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
stats_collected = False
for op in self.ops:
for op in ops:
if isinstance(op, Filter):
original_process = op.process
op.process = None
Expand Down
45 changes: 27 additions & 18 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import copy
import inspect
import traceback
from abc import ABC, abstractmethod
from functools import wraps
from time import time
Expand Down Expand Up @@ -174,24 +175,32 @@ def process(self,
unforkable_operators = set(UNFORKABLE.modules.keys())

dataset = self
for op in operators:
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda() or op._name in unforkable_operators) else None
setup_mp(mp_context)

start = time()
# run single op
dataset = op(dataset,
exporter=exporter,
checkpointer=checkpointer,
tracer=tracer)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._name,
list(op._process_kwargs.values())[0])
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
try:
for op in operators:
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda()
or op._name in unforkable_operators) else None
setup_mp(mp_context)

start = time()
# run single op
dataset = op.run(dataset, exporter=exporter, tracer=tracer)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._of_cfg)
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
except: # noqa: E722
logger.error(f'An error occurred during Op [{op._name}].')
traceback.print_exc()
exit(1)
finally:
if checkpointer:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
return dataset

def map(self, *args, **kargs):
Expand Down
25 changes: 5 additions & 20 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import traceback
from time import time

from loguru import logger
Expand Down Expand Up @@ -38,7 +37,6 @@ def __init__(self, cfg=None):

self.work_dir = self.cfg.work_dir

self.ops = None
self.tracer = None
self.ckpt_manager = None

Expand All @@ -58,17 +56,15 @@ def __init__(self, cfg=None):
# check if there are existing checkpoints first and try to load the
# checkpoints. If the checkpoints are loaded successfully, ops that
# have been processed will be skipped.
self.process_list = self.cfg.process
if self.cfg.use_checkpoint:
logger.info('Preparing checkpoint manager...')
self.ckpt_dir = os.path.join(self.work_dir, 'ckpt')
self.ckpt_manager = CheckpointManager(self.ckpt_dir,
self.process_list,
self.cfg.process,
self.cfg.np)
if self.ckpt_manager.ckpt_available:
logger.info('Found existed dataset checkpoint.')
self.process_list = self.ckpt_manager.get_left_process_list()
self.cfg.process = self.process_list
self.cfg.process = self.ckpt_manager.get_left_process_list()

# prepare exporter and check export path suffix
logger.info('Preparing exporter...')
Expand Down Expand Up @@ -155,15 +151,14 @@ def run(self, load_data_np=None):

# 2. extract processes
logger.info('Preparing process operators...')
self.process_list, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 3. data process
# - If tracer is open, trace each op after it's processed
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
tstart = time()
dataset = dataset.process(self.ops,
dataset = dataset.process(ops,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
Expand All @@ -172,17 +167,7 @@ def run(self, load_data_np=None):

# 4. data export
logger.info('Exporting dataset to disk...')
try:
self.exporter.export(dataset)
except: # noqa: E722
logger.error('An error occurred during exporting the processed '
'dataset.')
traceback.print_exc()
if self.cfg.use_checkpoint:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
self.ckpt_manager.save_ckpt(dataset)
self.exporter.export(dataset)
# compress the last dataset after exporting
if self.cfg.use_cache and self.cfg.cache_compress:
from data_juicer.utils.compress import compress
Expand Down
7 changes: 2 additions & 5 deletions data_juicer/core/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ def __init__(self, cfg=None):

self.work_dir = self.cfg.work_dir

self.ops = None
# init ray
logger.info('Initing Ray ...')
ray.init(self.cfg.ray_address)
self.process_list = self.cfg.process

def run(self, load_data_np=None):
"""
Expand All @@ -55,13 +53,12 @@ def run(self, load_data_np=None):
dataset = RayDataset(dataset, self.cfg.dataset_path, self.cfg)
# 2. extract processes
logger.info('Preparing process operators...')
self.process_list, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
ops = load_ops(self.cfg.process, self.cfg.op_fusion)

# 3. data process
logger.info('Processing data...')
tstart = time.time()
dataset.process(self.ops)
dataset.process(ops)
tend = time.time()
logger.info(f'All Ops are done in {tend - tstart:.3f}s.')

Expand Down
21 changes: 0 additions & 21 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,6 @@ def __init__(self, *args, **kwargs):
method = wrap_func_with_nested_access(method)
setattr(self, name, method)

def __call__(self,
dataset,
*,
exporter=None,
checkpointer=None,
tracer=None):
try:
dataset = self.run(dataset, exporter=exporter, tracer=tracer)
if checkpointer:
checkpointer.record(self._name, self._process_kwargs)
return dataset
except: # noqa: E722
logger.error(f'An error occurred during Op [{self._name}].')
traceback.print_exc()
if checkpointer:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
exit(1)

@classmethod
def is_batched_op(cls):
return cls._batched_op
Expand Down
6 changes: 3 additions & 3 deletions data_juicer/ops/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def load_ops(process_list, op_fusion=False):
if op_fusion:
new_process_list, ops = fuse_operators(new_process_list, ops)

for process, op in zip(new_process_list, ops):
op._process_kwargs = process
for op_cfg, op in zip(new_process_list, ops):
op._op_cfg = op_cfg

return new_process_list, ops
return ops
4 changes: 2 additions & 2 deletions data_juicer/utils/ckpt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ def check_ckpt(self):
os.makedirs(self.ckpt_dir, exist_ok=True)
return False

def record(self, op_name, op_args):
def record(self, op_cfg: dict):
"""Save op name and args to op record, which is used to compare with
the process list from config to decide if a checkpoint is available."""
self.op_record.append({op_name: op_args})
self.op_record.append(op_cfg)

def check_ops_to_skip(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/config/test_config_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def test_yaml_cfg_file(self):
}
}, 'nested dict load fail, un-expected internal value')

_, op_from_cfg = load_ops(cfg.process)
self.assertTrue(len(op_from_cfg) == 3)
ops_from_cfg = load_ops(cfg.process)
self.assertTrue(len(ops_from_cfg) == 3)

def test_val_range_check_cmd(self):
out = StringIO()
Expand Down
3 changes: 2 additions & 1 deletion tests/ops/test_op_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
class OpFusionTest(DataJuicerTestCaseBase):

def _run_op_fusion(self, original_process_list, target_process_list):
new_process_list, _ = load_ops(original_process_list, op_fusion=True)
ops = load_ops(original_process_list, op_fusion=True)
new_process_list = [op._op_cfg for op in ops]
self.assertEqual(new_process_list, target_process_list)

def test_regular_config(self):
Expand Down

0 comments on commit 213f7f8

Please sign in to comment.