Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canonical File Transformations #585

Merged
merged 99 commits into from
Feb 14, 2024
Merged

Canonical File Transformations #585

merged 99 commits into from
Feb 14, 2024

Conversation

knighton
Copy link
Contributor

@knighton knighton commented Jan 29, 2024

Canonical file transformations

  1. StreamingDataset
    1. Organization of StreamingDataset init args
    2. StreamingDataset interface
  2. Stream
    1. Stream divided into three shockingly modularizable parts
    2. Stream > Shard > StreamDirConf > Stream > ...
    3. New Stream args replicated into StreamingDataset
    4. Stream init sequence is redesigned to parallelize index downloading
    5. StreamDirConf interface (inherited by Stream)
    6. StreamWeightConf interface (inherited by Stream)
    7. Stream interface (inherits from StreamDirConf and StreamWeightConf)
  3. Shard
    1. The new 3-phase shard lifecycle
    2. Terminology for prepping shards
    3. Mapping use cases to phases to cache
    4. Phaser (i.e., shard file phase cacher-deleter) API
    5. Relating shard formats, shard file phases, and use cases in practice
    6. Writers and Shards correspond, as before
    7. Shard composition
    8. Shard interface (internal)
    9. ShardFile interface (internal)
    10. ShardFilePhase interface (internal)
  4. Appendix

1. StreamingDataset

1.1. Organization of StreamingDataset init args

  • What to iterate:
    • The streams (either provide streams, or remote and/or local for an implicit stream):
      • epoch_size
      • streams
      • remote
      • local
    • Stream arguments (used as defaults if explicit streams, or as args if implicit stream):
      • Filesystem:
        • split
        • index_size <- new
        • index_hashes <- new
      • Schema:
        • allow_schema_mismatch <- new
        • allow_unsafe_types
        • allow_unchecked_resumption <- new
      • Downloads:
        • download_retry
        • download_timeout
        • download_max_size <- new
        • validate_hash
        • keep_phases <- new
  • How to iterate:
    • Shard lifecycle:
      • predownload
      • cache_limit
    • Reproducibility:
      • shuffle_seed
    • Sampling:
      • sampling_method
      • sampling_granularity
    • Partitioning:
      • partition_algo
      • num_canonical_nodes
      • batch_size
    • Shuffling:
      • shuffle
      • shuffle_algo
      • shuffle_seed
      • shuffle_block_size
    • Batching:
      • batching_method

1.2. StreamingDataset interface

   def __init__(
        self,
        *,
        epoch_size: Optional[Union[str, int]] = None,
        streams: Optional[Sequence[Stream]] = None,
        remote: Optional[str] = None,
        local: Optional[str] = None,
        split: Optional[str] = None,
        index_size: Optional[Union[str, int]] = None,
        index_hashes: Optional[Dict[str, str]] = None,
        allow_schema_mismatch: bool = False,
        allow_unsafe_types: bool = False,
        allow_unchecked_resumption: bool = True,
        download_retry: Union[str, int] = 2,
        download_timeout: Optional[Union[str, float]] = '2m',
        download_max_size: Optional[Union[str, int]] = '200mb',
        validate_hash: Union[None, str, Sequence[str]] = None,
        keep_phases: Union[None, str, Sequence[str], Dict[str, Optional[bool]], Phaser] = None,
        predownload: Optional[int] = None,
        cache_limit: Optional[Union[str, int]] = None,
        shuffle_seed: int = 9176,
        sampling_method: str = 'balanced',
        sampling_granularity: int = 1,
        partition_algo: str = 'relaxed',
        num_canonical_nodes: Optional[int] = None,
        batch_size: Optional[int] = None,
        shuffle: bool = False,
        shuffle_algo: str = 'py1e',
        shuffle_block_size: Optional[int] = None,
        batching_method: str = 'random',
        **kwargs: Any,
    ) -> None:

    def __del__(self) -> None:
    def size(self) -> int:
    def next_epoch(self) -> int:
    def next_epoch(self, next_epoch: int) -> None:
    def cache_usage(self) -> int:
    def cache_usage(self, cache_usage: int) -> None:
    def __len__(self) -> int:
    def _set_shuffle_block_size(self, world: World):
    def _resume(self, world: World, epoch: int) -> Tuple[int, int]:
    def _resume_incr_epoch(self, world: World) -> Tuple[int, int]:
    def state_dict(self, num_samples: int, from_beginning: bool) -> Dict[str, Any]:
    def load_state_dict(self, obj: Dict[str, Any]) -> None:
    def resample_streams(
        self,
        epoch: int,
        stream_id: Optional[int] = None, 
    ) -> Tuple[NDArray[np.int64], NDArray[np.int64]]:
    def _share_work(self, sample_ids: NDArray[np.int64]) -> Tuple[SharedMemory, SharedMemory]:
    def _attach_work(self) -> Tuple[NDArray[np.int64], SharedMemory, SharedMemory]:
    def _get_work(self, world: World, epoch: int, sample_in_epoch: int) -> NDArray[np.int64]:
    def _evict_shard(self, shard_id: int) -> None:
    def _evict_coldest_shard(self) -> None:
    def evict_shard(self, shard_id: int) -> None:
    def evict_coldest_shard(self) -> None:
    def fetch_shard(self, shard_id: int, blocking: bool = True) -> None:
    def get_item(self, sample_id: int, retry: int = 7) -> Any:
    def on_exception(self, future: Future) -> None:
    def _fetch_thread(self, it: _Iterator) -> None:
    def _ready_thread(self, it: _Iterator) -> None:
    def _each_sample_id(self, it: _Iterator) -> Iterator[int]:
    def __iter__(self) -> Iterator[Dict[str, Any]]:

2. Stream

2.1. Stream divided into three shockingly modularizable parts

Stream still has pretty much the same API imported from the same place. However, under the hood, it inherits most of its functionality from two base classes or mixins: StreamWeightConf (everything to do with weighting streams) and StreamDirConf (handling for all other stream args). What remains for Stream itself to do is index loading/having the shards.

2.2. Stream > Shard > StreamDirConf > Stream > ...

The shard API is rewritten to be a lot nicer, with methods like download() and evict() which take no args. It does this by keeping a reference back to its owning StreamDirConf, which is mostly about Stream arguments which are shared by all its Shards. And recalling that StreamDirConf is an ancestor of Stream, which technically owns its Shards, we now cue triumphant "it's the circle of life" Lion King music.

Alternative 1: look up the owning Stream and pass large numbers of its arguments to every call to a Shard method, which would be unpleasant and not help anything.

Alternative 2: have the functionality live in Stream, and look up the shard on every call to a shard method, which would be unworkable because there is a class hierarhcy of Shards.

Alternative 3: Do it functionally, which would present the same annoyances around Shard subclassing with the added annoyance of having literally all the things needed having from both stream and shard having to be passed in as args.

2.3. New Stream args replicated into StreamingDataset

Title.

2.4. Stream init sequence is redesigned to parallelize index downloading

Although there is a faster way within our reach here: using pool.imap_unordered and noting the IDs of Streams whose indexes have been downloaded to a shared memory array and initting Streams in that order. Future PR.

2.5. StreamDirConf interface (inherited by Stream)

    def __init__(
        self,
        *,
        remote: Optional[str] = None,
        local: Optional[str] = None,
        split: Union[None, str, Auto] = Auto(),
        index_size: Union[None, str, int] = None,
        index_hashes: Optional[Dict[str, str]] = None,
        allow_schema_mismatch: Union[bool, Auto] = Auto(),
        allow_unsafe_types: Union[bool, Auto] = Auto(),
        allow_unchecked_resumption: Union[bool, Auto] = Auto(),
        download_retry: Union[str, int, Auto] = Auto(),
        download_timeout: Union[None, str, float, Auto] = Auto(),
        download_max_size: Union[None, str, int, Auto] = Auto(),
        validate_hash: Union[None, str, Sequence[str], Auto] = Auto(),
        keep_phases: Union[None, str, Sequence[str], Dict[str, Optional[bool]], Phaser, Auto] = \
            Auto(),
        **kwargs: Any,
    ) -> None:

    def apply_defaults(
        self,
        *,
        split: Union[None, str, Auto] = Auto(),
        allow_schema_mismatch: Union[bool, Auto] = Auto(),
        allow_unsafe_types: Union[bool, Auto] = Auto(),
        allow_unchecked_resumption: Union[bool, Auto] = Auto(),
        download_retry: Union[str, int, Auto] = Auto(),
        download_timeout: Union[str, float, Auto] = Auto(),
        download_max_size: Union[str, int, Auto] = Auto(),
        validate_hash: Union[None, str, Sequence[str], Auto] = Auto(),
        keep_phases: Union[None, str, Sequence[str], Dict[str, Optional[bool]], Phaser, Auto] = \
            Auto(),
        **kwargs: Any,
    ) -> None:

2.6. StreamWeightConf interface (inherited by Stream)

    def __init__(
        self,
        *,
        proportion: Optional[float] = None,
        repeat: Optional[float] = None,
        choose: Optional[Union[str, int]] = None,
        **kwargs: Dict[str, Any],
    ) -> None:

    def validate_weights(
        cls,
        streams: Sequence[Self],
    ) -> Tuple[bool, bool]:

    def apply_weights(
        cls,
        streams: Sequence[Self],
        samples_per_stream: NDArray[np.int64],
        choose_per_epoch: Optional[int],
        seed: int,
    ) -> int:

2.7. Stream interface (inherits from StreamDirConf and StreamWeightConf)

    def __init__(
        self,
        **kwargs: Any,
    ) -> None:

    def download_index(self) -> None:
    def await_index(self) -> None:
    def load_index(self) -> List[Shard]:
    def inventory_local(self, du_per_shard: NDArray[np.int64]) -> None:

3. Shard

3.1. The new 3-phase shard lifecycle

     /═════╗
     ║ ·1· ║
     ║ Zip ║
     ╚═════/
          \
      - Download -
            \
        - Decompress -
              \
             /═════╗
             ║ ·2· ║
             ║ Raw ║
             ╚═════/   
                  \
             - Canonicalize -
                    \
                   /═════╗
                   ║ ·3· ║
                   ║ Can ║
                   ╚═════/
                        \
                     - Evict -

(Zip != Raw != Can) = shard file.

3.2. Terminology for prepping shards

Fetch
 ├─ Download
 └─ Unpack
     ├─ Decompress
     └─ Canonicalize

You fetch, you access, then finally you evict.

Fetching consists of downloading and unpacking.

Unpacking consists of decompressing and canonicalizing.

3.3. Mapping use cases to phases to cache

  1. For storage/download/remoting: Zip or Raw
  2. Intermediates (for caching all phases): everything in-between 1 and 3
  3. For (slow) checked resumption: Raw (or Can, in an edge case, someday?)
  4. For access and fast (unchecked) resumption: Raw or Can

3.4. Phaser (i.e., shard file phase cacher-deleter) API

            storage: Union[bool, Auto] = Auto(),
            intermediates: Union[bool, Auto] = Auto(),
            checked_resumption: Union[bool, Auto] = Auto(),
            access: Union[bool, Auto] = Auto(),
            zip: Union[None, bool, Auto] = Auto(),
            raw: Union[None, bool, Auto] = Auto(),
            can: Union[None, bool, Auto] = Auto(),

3.5. Relating shard formats, shard file phases, and use cases in practice

Format Zip Raw Can Storage Medial CheckedRes Access
MDS (z) *.mds.zst *.mds - Zip - Raw Raw
MDS - *.mds - Raw - Raw Raw
JSONL (z) *.jsonl.zst *.jsonl - Zip - Raw Raw
JSONL - *.jsonl - Raw - Raw Raw
XSV (z) *.xsv.zst *.xsv - Zip - Raw Raw
XSV - *.xsv - Raw - Raw Raw
CSV (z) *.csv.zst *.csv - Zip - Raw Raw
CSV - *.csv - Raw - Raw Raw
TSV (z) *.tsv.zst *.tsv - Zip - Raw Raw
TSV - *.tsv - Raw - Raw Raw
Parquet - *.parquet *.mds Raw - Raw Can
Delta - *.parquet *.mds Raw - Raw Can

3.6. Writers and Shards correspond, as before

Writer
 └─ RowWriter
     ├─ DualRowWriter
     │    ├─ JSONLWriter
     │    └─ XSVWriter
     │        ├─ CSVWriter
     │        └─ TSVWriter
     └─ MonoRowWriter
         └─ MDSWriter

Writers write Streaming dataset directories, one shard at a time.

Shard
 └─ RowShard
     ├─ DualRowShard
     │   ├─ JSONLShard
     │   └─ XSVShard
     │       ├─ CSVShard
     │       └─ TSVShard
     └─ MonoRowShard
         └─ MDSShard

Shards read individual Streaming shards of dataset directories.

3.7. Shard composition

Shard
 └─ ShardFile
     └─ ShardFilePhase

A shard is realized as one or more files.

A file has from one to three forms, which are called phases.

At a high level, the phases all contain the same information, but stored differently for different use cases, e.g. minimizing download size vs minimizing random access time.

3.8. Shard interface (internal)

    def __init__(
        self,
        *,
        writer_conf: Optional[WriterConf] = None,
        stream: StreamDirConf,
        num_samples: int,
    ) -> None:
    def from_json(cls, stream: StreamDirConf, obj: Dict[str, Any]) -> Self:
    def validate(self) -> None:
    def __len__(self) -> int:
    def set_stream(self, stream: StreamDirConf) -> None:
    def inventory_local(self, listing: Set[str]) -> Optional[int]:
    def fetch(self) -> int:
    def evict(self) -> int:
    def size(self) -> int:
    def get_item(self, index: int) -> Dict[str, Any]:

3.9. ShardFile interface (internal)

    def __init__(
        self,
        *,
        stream: StreamDirConf,
        zip_phase: Optional[ShardFilePhase] = None,
        zip_algo: Optional[str] = None,
        raw_phase: ShardFilePhase,
        can_algo: Optional[str] = None,
        can_phase: Optional[ShardFilePhase] = None,
    ) -> None:
    def set_stream(self, stream: StreamDirConf) -> None:
    def validate(self) -> None:
    def locate(self, listing: Set[str]) -> NDArray[np.int64]:
    def inventory_local(self, listing: Set[str]) -> int:
    def _unzip(self) -> int:
    def _canonicalize(self) -> int:
    def _load_raw(self) -> int:
    def fetch(self) -> int:
    def evict(self) -> int:
    def evict_phases(self, phase_dels: NDArray[np.int64]) -> int:

3.10. ShardFilePhase interface (internal)

    def __init__(
        self,
        *,
        stream: StreamDirConf,
        relative_path: str,
        size: Optional[int] = None,
        hashes: Optional[Dict[str, str]] = None,
    ) -> None:
    def from_json(cls, stream: StreamDirConf, obj: Dict[str, Any]) -> Self:
    def set_stream(self, stream: StreamDirConf) -> None:
    def validate_for_download(self) -> None:
    def get_local_filename(self) -> str:
    def get_remote_filename(self) -> str:
    def probe(self, listing: Set[str]) -> bool:
    def inventory_local(self, listing: Set[str]) -> Optional[int]:
    def is_local(self) -> bool:
    def download(self) -> int:
    def evict(self) -> int:

4. Appendix

Source tree changes

	new file:   streaming/format/base/__init__.py
	new file:   streaming/format/base/file.py
	new file:   streaming/format/base/phase.py
	new file:   streaming/format/base/phaser.py
	new file:   streaming/format/base/shard/__init__.py
	new file:   streaming/format/base/shard/base.py
	new file:   streaming/format/base/shard/dual_row.py
	new file:   streaming/format/base/shard/mono_row.py
	new file:   streaming/format/base/shard/row.py
	new file:   streaming/format/base/writer/__init__.py
	new file:   streaming/format/base/writer/base.py
	new file:   streaming/format/base/writer/dual_row.py
	new file:   streaming/format/base/writer/mono_row.py
	new file:   streaming/format/canonical.py
	new file:   streaming/stream/__init__.py
	new file:   streaming/stream/base.py
	new file:   streaming/stream/dir_conf.py
	new file:   streaming/stream/weight_conf.py
	new file:   streaming/util/json.py
	new file:   streaming/util/waiting.py
	modified:   benchmarks/__init__.py
	modified:   benchmarks/backends/datagen.py
	modified:   benchmarks/backends/plot.py
	modified:   benchmarks/backends/read.py
	modified:   benchmarks/backends/write.py
	modified:   examples/__init__.py
	modified:   examples/multimodal/__init__.py
	modified:   examples/multimodal/laion400m/__init__.py
	modified:   examples/multimodal/webvid/__init__.py
	modified:   examples/multimodal/webvid/read.py
	modified:   examples/text/__init__.py
	modified:   examples/vision/__init__.py
	modified:   examples/vision/ade20k/__init__.py
	modified:   examples/vision/cifar10/__init__.py
	modified:   examples/vision/cifar10/read.py
	modified:   examples/vision/cifar10/write.py
	modified:   examples/vision/cifar10/write_fake.py
	modified:   examples/vision/coco/__init__.py
	modified:   examples/vision/imagenet/__init__.py
	modified:   examples/vision/imagenet/read.py
	modified:   notebooks/spark_dataframe_to_MDS.ipynb
	modified:   scripts/long_lines.py
	modified:   simulation/core/sim_dataset.py
	modified:   streaming/dataset.py
	modified:   streaming/format/__init__.py
	modified:   streaming/format/jsonl/__init__.py
	modified:   streaming/format/jsonl/writer.py
	modified:   streaming/format/mds/shard.py
	modified:   streaming/format/mds/writer.py
	modified:   streaming/format/xsv/writer.py
	modified:   streaming/phasing.py
	modified:   streaming/shared/__init__.py
	modified:   streaming/storage/__init__.py
	modified:   streaming/storage/extra.py
	modified:   streaming/util/__init__.py
	modified:   streaming/util/auto.py
	modified:   streaming/util/importing.py
	modified:   streaming/util/merging.py
	modified:   streaming/util/retrying.py
	modified:   streaming/util/shared.py
	modified:   streaming/util/shorthand.py
	modified:   streaming/util/tabulation.py
	modified:   tests/test_eviction.py
	modified:   tests/test_importing.py
	modified:   tests/test_unsafe_types.py
	modified:   tests/util/__init__.py
	modified:   tests/util/test_retrying.py
	modified:   tests/util/test_shared.py
	modified:   tests/util/test_shorthand.py
	deleted:    streaming/base/converters/README.md
	deleted:    streaming/base/format/base/__init__.py
	deleted:    streaming/base/shared/__init__.py
	deleted:    streaming/base/storage/__init__.py
	deleted:    streaming/format/score.py
	deleted:    streaming/format/shard.py
	deleted:    streaming/multimodal/__init__.py
	deleted:    streaming/stream.py
	deleted:    streaming/text/__init__.py
	deleted:    streaming/vision/__init__.py
	deleted:    streaming/vision/base.py
	deleted:    streaming/vision/convert/base.py
  • Phase caching:
    • By intended use cases: Storage -> Intermediates -> Checked Resumption -> Access
    • By phase: Zip (MDS or JSONL compressed) -> Raw (MDS or Parquet or JSONL) -> Canonical
  • Checked vs fast resumption Corner Cases
    • For performance reasons, we don't necessarily know the size of the Parquet shard canonicalized/MDS phase in advance, much less hashed it
    • This leaves us unable to validate such shard files upon resumption with them in the cache
    • If we keep the files in the cache, we do no work in prepare() but are at the mercy of whoever had access to that directroy, which is potentially dangerous depending on threat model
    • If we drop the files from the cache, we'll have to regenerate them during prepare() from the last checked phase, which is potentially expensive depending on size of shard

Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Copy link
Collaborator

@karan6181 karan6181 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also add delta branch name under branches for files

So that CI runs for the delta branch when you create a PR.

@knighton knighton changed the title 2-Phase -> 3-Phase Shards Canonical File Transformations Feb 2, 2024
streaming/stream/weight_conf.py Show resolved Hide resolved
streaming/stream/weight_conf.py Outdated Show resolved Hide resolved
streaming/stream/base.py Outdated Show resolved Hide resolved
streaming/stream/base.py Outdated Show resolved Hide resolved
streaming/stream/base.py Outdated Show resolved Hide resolved
simulation/core/sim_dataset.py Outdated Show resolved Hide resolved
streaming/dataset.py Outdated Show resolved Hide resolved
streaming/dataset.py Outdated Show resolved Hide resolved
streaming/dataset.py Show resolved Hide resolved
streaming/format/base/file.py Outdated Show resolved Hide resolved
streaming/format/base/file.py Outdated Show resolved Hide resolved
streaming/format/base/phase.py Outdated Show resolved Hide resolved
streaming/format/base/phase.py Show resolved Hide resolved
streaming/format/base/phase.py Outdated Show resolved Hide resolved
streaming/format/base/phase.py Outdated Show resolved Hide resolved
streaming/format/base/phase.py Outdated Show resolved Hide resolved
streaming/format/base/phaser.py Outdated Show resolved Hide resolved
- index_download_num_procs
- index_download_procs_per_cpu
- index_download_max_procs
streaming/dataset.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@XiaohanZhangCMU XiaohanZhangCMU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Collaborator

@snarayan21 snarayan21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some testing issues in addition to these comments

scripts/long_lines.py Show resolved Hide resolved
streaming/dataset.py Outdated Show resolved Hide resolved
streaming/dataset.py Show resolved Hide resolved
@snarayan21 snarayan21 dismissed stale reviews from XiaohanZhangCMU and themself February 14, 2024 00:22

outdated

@knighton knighton merged commit 526b722 into delta Feb 14, 2024
6 checks passed
@knighton knighton deleted the james/rewrite-shard branch February 14, 2024 07:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants