Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canonical File Transformations #585

Merged
merged 99 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
94fa58c
Preview of "rewrite shard".
knighton Jan 29, 2024
dd32488
Fix.
knighton Jan 31, 2024
b6f11ab
long_lines.
knighton Jan 31, 2024
74c4c21
streaming/vision -> streaming/modality/vision
knighton Jan 31, 2024
38bd234
Remove merge cruft.
knighton Jan 31, 2024
06de6fb
Misc updates.
knighton Jan 31, 2024
2426f7a
Improve.
knighton Jan 31, 2024
3d5e907
wstream_conf/WeightedStreamConf.
knighton Feb 1, 2024
58235f1
streaming.util.waiting.
knighton Feb 1, 2024
6d1842e
Ravamp "smart" downloading in storage API.
knighton Feb 1, 2024
c7b37ff
Refactor: StreamDirConf, StreamWeightConf.
knighton Feb 1, 2024
8b08134
Propate changes thru Stream <-> MDSShard, etc.
knighton Feb 2, 2024
5ffec78
Propagate rename.
knighton Feb 2, 2024
d7c9df8
Fix (lint).
knighton Feb 2, 2024
279e22e
Hook up StreamFilePhase and smart_download_file().
knighton Feb 2, 2024
9678041
Rearrange logic in ShardFilePhase.inventory_local(), keep got_size ar…
knighton Feb 2, 2024
ced6403
Delete merge cruft.
knighton Feb 2, 2024
07d4937
Fix.
knighton Feb 2, 2024
163fd97
Prepare -> fetch.
knighton Feb 2, 2024
7fd8b36
Swap out Stream in StreamingDataset.
knighton Feb 2, 2024
b98073f
Fix.
knighton Feb 2, 2024
7b6fd5d
Remove cruft.
knighton Feb 2, 2024
a06a544
Update SD args accordingly.
knighton Feb 3, 2024
2658459
Remove old.
knighton Feb 3, 2024
6459ac3
Fix workflows, etc.
knighton Feb 3, 2024
12095fe
Fix imports.
knighton Feb 3, 2024
eee3dbd
Fix importing.
knighton Feb 3, 2024
8ddee2c
Update streaming/format/base/file.py
knighton Feb 3, 2024
5084995
Update streaming/format/base/phase.py
knighton Feb 3, 2024
c96b24e
Fix.
knighton Feb 3, 2024
39c9d64
Merge branch 'james/rewrite-shard' of github.com:mosaicml/streaming i…
knighton Feb 3, 2024
f3bf069
Fix.
knighton Feb 3, 2024
19c76d2
Fix.
knighton Feb 3, 2024
4e998b5
Update streaming/format/base/phaser.py
knighton Feb 3, 2024
10d9766
Fix.
knighton Feb 3, 2024
2e26454
Merge branch 'james/rewrite-shard' of github.com:mosaicml/streaming i…
knighton Feb 3, 2024
25c8f15
Knobs for controlling index downloading concurrency.
knighton Feb 4, 2024
21f06e0
ShardFilePhase: size -> expected_size, got_size -> size.
knighton Feb 6, 2024
eb64e40
Stream: index_size/hashes -> *poof*, got_index_size -> index_size
knighton Feb 6, 2024
5111109
Update sim_dataset.
knighton Feb 6, 2024
4c7934f
Note default.
knighton Feb 6, 2024
44fd90e
Rename: du -> cache_usage, ddu -> cache_usage_change.
knighton Feb 6, 2024
d1bfc7b
JSONL shards.
knighton Feb 6, 2024
71c7a65
XSV shards.
knighton Feb 6, 2024
c3f18b0
Typing: Sequence[str] -> Sequence.
knighton Feb 6, 2024
e28c10f
Sequence fracas.
knighton Feb 6, 2024
a359ccb
FILESYSTEM_POLL_INTERVAL.
knighton Feb 6, 2024
d17aaa7
Minimally invasive change to LocalDataset to work the same.
knighton Feb 6, 2024
d82218e
Fix.
knighton Feb 6, 2024
9a605ac
Fix.
knighton Feb 7, 2024
29f9cda
Fix.
knighton Feb 7, 2024
36ee5d3
Unpicklable.
knighton Feb 7, 2024
9ad48ac
Fix usage of retry().
knighton Feb 8, 2024
50da859
Streamline fetch_shard() concurrency/accounting logic for 3-phase sha…
knighton Feb 8, 2024
91e242f
Fix.
knighton Feb 8, 2024
c15d273
Fix.
knighton Feb 8, 2024
3026375
Fix.
knighton Feb 8, 2024
a7312da
Phaser impl.
knighton Feb 9, 2024
b376212
Simply Phaser, dropping autos.
knighton Feb 10, 2024
bc3dd9e
Sizedness enum.
knighton Feb 10, 2024
3301593
Disable temporarily.
knighton Feb 12, 2024
5bc99cf
Replicate needed Stream args to LocalDataset.
knighton Feb 12, 2024
fe6ba6e
Move fixture paths.
knighton Feb 12, 2024
ab04a53
Update usage in test_stream.
knighton Feb 12, 2024
24dfc38
Rename autogen local path auto_local -> local.
knighton Feb 12, 2024
cf2d7d9
download_index now writes empty file on error to save us a timeout.
knighton Feb 12, 2024
4065869
Update cache usage accounting.
knighton Feb 12, 2024
23b52ed
Fix (lint).
knighton Feb 12, 2024
080fbd5
Updates for test_reader.py.
knighton Feb 12, 2024
83a5928
Fix (unsafe types).
knighton Feb 12, 2024
a4f68f8
Debug the hell out of the phaser and shard evictions.
knighton Feb 12, 2024
afc6080
Fix (test_writer).
knighton Feb 12, 2024
de56089
Add remark.
knighton Feb 12, 2024
ae70012
Improve error handling.
knighton Feb 12, 2024
f1a4ab5
Error messages.
knighton Feb 12, 2024
6bfd6b6
Refactor stream init/default.
knighton Feb 12, 2024
70e509c
Better error handling.
knighton Feb 12, 2024
cde6797
Error types.
knighton Feb 12, 2024
0aa3426
Update streaming/stream/base.py
knighton Feb 12, 2024
1864f86
ValueError -> RuntimeError.
knighton Feb 12, 2024
484164e
Merge branch 'james/rewrite-shard' of github.com:mosaicml/streaming i…
knighton Feb 12, 2024
4534db1
Readd match regex.
knighton Feb 13, 2024
68574d0
Fix (exception match regex).
knighton Feb 13, 2024
18f170c
Delete three more StreamingDataset args
knighton Feb 13, 2024
c45ad48
Fix path analysis.
knighton Feb 13, 2024
f9aa948
Docstrings.
knighton Feb 13, 2024
93b52a6
simulation partly updated
snarayan21 Feb 13, 2024
f09b7c4
Error handling.
knighton Feb 13, 2024
b6ae467
Merge branch 'james/rewrite-shard' of github.com:mosaicml/streaming i…
knighton Feb 13, 2024
ecea6a3
Error handling.
knighton Feb 13, 2024
5c61001
Multi-line the comment.
knighton Feb 13, 2024
e308b07
Fix docstring.
knighton Feb 13, 2024
825afad
Fix for remote paths.
knighton Feb 13, 2024
b307d60
abspath normpath hilarity.
knighton Feb 13, 2024
6ef00d3
Fix duplication in docstring.
knighton Feb 14, 2024
6ef47dc
Fix (docstring).
knighton Feb 14, 2024
789b40a
Fix script.
knighton Feb 14, 2024
5fd7ee9
simulation debugged
snarayan21 Feb 14, 2024
82d926a
removed old comments
snarayan21 Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Streaming benchmarking."""

from benchmarks import compression as compression
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/backends/datagen.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Generate a synthetic dataset."""

from typing import Dict, List, Tuple, TypeVar
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/backends/plot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Plot dataset iteration time."""

import json
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/backends/read.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Benchmark dataset iteration time."""

import json
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/backends/write.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Generate a synthetic dataset and serialize it using each Streaming format/backend."""

import os
Expand Down
3 changes: 0 additions & 3 deletions examples/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Example streaming datasets."""

from examples import multimodal as multimodal
Expand Down
3 changes: 0 additions & 3 deletions examples/multimodal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Example multimodal streaming datasets."""

from examples.multimodal import laion400m as laion400m
Expand Down
3 changes: 0 additions & 3 deletions examples/multimodal/laion400m/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""LAION-400M streaming dataset example."""
3 changes: 0 additions & 3 deletions examples/multimodal/webvid/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""WebVid streaming dataset example."""
16 changes: 8 additions & 8 deletions examples/multimodal/webvid/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class StreamingOutsideDTWebVid(StreamingDataset):
"""Streaming WebVid dataset.

Videos are stored "outside" the shards, as a file per video. The extra download happens in
_download_thread ("DT"), when the download thread prefetches the sample.
_fetch_thread ("DT"), when the download thread prefetches the sample.

Args:
extra_local (str, optional): Base destination of extra local sample downloads.
Expand Down Expand Up @@ -133,7 +133,7 @@ def get_item(self, idx: int) -> Any:

return obj

def _download_thread(self, it: _Iterator) -> None:
def _fetch_thread(self, it: _Iterator) -> None:
snarayan21 marked this conversation as resolved.
Show resolved Hide resolved
"""Download the relevant shards in the background while we are being iterated.

This thread is started at the beginning of each epoch, and exits either when out of samples
Expand All @@ -154,26 +154,26 @@ def _download_thread(self, it: _Iterator) -> None:
break

# If we're out of samples this epoch, exit this thread because we are done downloading.
if it.prepare_index == it.total:
if it.fetch_index == it.total:
break

# If we are requested to only pre-download so many samples, if we have as many or more
# downloaded already, we wait and check again later.
if self.predownload is not None:
samples_ahead = it.prepare_index - it.yield_index
samples_ahead = it.fetch_index - it.yield_index
if self.predownload <= samples_ahead:
sleep(TICK)
continue

# If we hit -1, we skip.
sample_id = it.sample_ids[it.prepare_index]
sample_id = it.sample_ids[it.fetch_index]
if sample_id == -1:
it.prepare_index += 1
it.fetch_index += 1
continue

# Download and decompress the shard for this sample, if not already done.
shard_id, _ = self.spanner[sample_id]
self.prepare_shard(shard_id, False)
self.fetch_shard(shard_id, False)

# Predownload the sample's extra data.
obj = super().get_item(sample_id)
Expand All @@ -185,7 +185,7 @@ def _download_thread(self, it: _Iterator) -> None:
download_file(remote, local, self.download_timeout)

# Step forward one sample.
it.prepare_index += 1
it.fetch_index += 1

# Note that we exited.
it.on_exit()
3 changes: 0 additions & 3 deletions examples/text/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Example text streaming datasets."""

from examples.text import c4 as c4
Expand Down
3 changes: 0 additions & 3 deletions examples/vision/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Example computer vision streaming datasets."""

from examples.vision import ade20k as ade20k
Expand Down
3 changes: 0 additions & 3 deletions examples/vision/ade20k/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""ADE20K streaming dataset example."""
3 changes: 0 additions & 3 deletions examples/vision/cifar10/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""CIFAR10 streaming dataset example."""
2 changes: 1 addition & 1 deletion examples/vision/cifar10/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from typing import Any, Dict

from streaming.vision import StreamingVisionDataset
from streaming.modality.vision import StreamingVisionDataset

__all__ = ['StreamingCIFAR10']

Expand Down
2 changes: 1 addition & 1 deletion examples/vision/cifar10/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from torchvision.datasets import CIFAR10

from streaming.modality.vision import convert_image_class_dataset
knighton marked this conversation as resolved.
Show resolved Hide resolved
from streaming.util import get_list_arg
from streaming.vision import convert_image_class_dataset


def parse_args() -> Namespace:
Expand Down
2 changes: 1 addition & 1 deletion examples/vision/cifar10/write_fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
from PIL import Image

from streaming.vision import convert_image_class_dataset
from streaming.modality.vision import convert_image_class_dataset


def parse_args() -> Namespace:
Expand Down
3 changes: 0 additions & 3 deletions examples/vision/coco/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""COCO streaming dataset example."""
3 changes: 0 additions & 3 deletions examples/vision/imagenet/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""ImageNet streaming dataset example."""
2 changes: 1 addition & 1 deletion examples/vision/imagenet/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from typing import Any, Dict

from streaming.vision import StreamingVisionDataset
from streaming.modality.vision import StreamingVisionDataset

__all__ = ['StreamingImageNet']

Expand Down
3 changes: 0 additions & 3 deletions notebooks/spark_dataframe_to_MDS.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,10 @@
},
{
"cell_type": "code",
<<<<<<< HEAD:examples/spark_dataframe_to_MDS.ipynb
"execution_count": null,
=======
"source": [
"from streaming.converters import dataframeToMDS"
],
>>>>>>> 7f5d160 (Move examples out, merge base/ upward (#494)):notebooks/spark_dataframe_to_MDS.ipynb
"metadata": {
"id": "uzYHe6yYRzyV"
},
Expand Down
49 changes: 36 additions & 13 deletions scripts/long_lines.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright 2022-2024 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Note long lines."""

import os
Expand All @@ -13,6 +10,8 @@
from re import Pattern
from typing import IO, Iterator, Optional

import numpy as np


def parse_args() -> Namespace:
"""Parse command-line arguments.
Expand Down Expand Up @@ -59,6 +58,12 @@ def parse_args() -> Namespace:
default='light',
help='Whether to output in color. Supported options: none, light.',
)
args.add_argument(
'--fancy',
type=int,
default=0,
help='Whether to do fancy output, which is harder to parse programmatically.',
)
return args.parse_args()


Expand Down Expand Up @@ -166,25 +171,43 @@ def main(args: Namespace) -> int:
txt = ', '.join(sorted(non_text_behaviors))
raise ValueError(f'Unknown non-text behavior (must be one of: {txt}): {args.non_text}.')

count = 0
pairs = []
karan6181 marked this conversation as resolved.
Show resolved Hide resolved
for path in sorted(each_path(args.root, include, exclude)):
if not (file := open_text(path, args.non_text)):
continue

lines = map(drop_newline, file)
for line_no, line in enumerate(lines):
if args.max_len < len(line):
good_line = line[:args.max_len]
fg_len = len(f'{path}:{line_no}:')
good_line = line[fg_len:args.max_len]
bad_line = line[args.max_len:]

if args.color == 'light':
path = f'\033[0;97m{path}\033[0;0m'
line_no = f'\033[0;92m{line_no}\033[0;0m'
good_line = f'\033[0;94m{good_line}\033[0;0m'
bad_line = f'\033[0;91m{bad_line}\033[0;0m'
print(f'{path}:{line_no}:{good_line}{bad_line}')
count += 1

return 1 if count else 0
color_path = f'\033[0;97m{path}\033[0;0m'
line_no = f'\033[1;92m{line_no}\033[0;0m'
good_line = f'\033[1;34m{good_line}\033[0;0m'
bad_line = f'\033[1;91m{bad_line}\033[0;0m'
else:
color_path = str(path)

out_line = f'{color_path}:{line_no}:{good_line}{bad_line}\n'
pair = len(line), out_line
pairs.append(pair)

vis_lens, _ = zip(*pairs)
max_vis_len = max(vis_lens)
vocab = 0x2571, 0x2572
for vis_len, out_line in pairs:
if args.pad:
count = max_vis_len - vis_len + 1
ords = np.random.choice(vocab, count)
pad = ''.join(map(chr, ords))
print(f'{out_line[:-1]}{chr(0x2523)}{pad}')
else:
print(out_line[:-1])

return 1 if pairs else 0


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions simulation/core/sim_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def __init__(self,
local_foldernames = []
for stream_id, stream in enumerate(self.streams):
logger.info(f' Processing index file for stream {stream_id + 1}')
stream_shards = stream.get_shards(self.world, self.allow_unsafe_types)
stream_shards = stream.load_index()
num_stream_samples = sum(map(len, stream_shards))
index_filename = os.path.join(stream.local, stream.split or '', get_index_basename())
index_filenames.append(index_filename)
Expand All @@ -290,7 +290,7 @@ def __init__(self,
# Check that cache limit is possible.
if cache_limit:
self.cache_limit = normalize_bytes(cache_limit)
min_cache_usage = sum((stream.get_index_size() for stream in streams))
min_cache_usage = sum((stream.got_index_size for stream in streams))
knighton marked this conversation as resolved.
Show resolved Hide resolved
if self.cache_limit <= min_cache_usage:
raise ValueError(f'Minimum cache usage ({min_cache_usage} bytes) is larger than ' +
f'the cache limit ({self.cache_limit} bytes). Please raise ' +
Expand Down
7 changes: 0 additions & 7 deletions streaming/base/converters/README.md

This file was deleted.

8 changes: 0 additions & 8 deletions streaming/base/format/base/__init__.py

This file was deleted.

17 changes: 0 additions & 17 deletions streaming/base/shared/__init__.py

This file was deleted.

Loading