diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index c6514108..c4959a93 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -38,6 +38,7 @@ test = [ "hypothesis", "pandas-stubs", "boto3-stubs[s3]", + "termcolor", ] benchmark = [ "pytest-benchmark[histogram]", diff --git a/icechunk-python/tests/test_concurrency.py b/icechunk-python/tests/test_concurrency.py index ab644778..bf6eb0a1 100644 --- a/icechunk-python/tests/test_concurrency.py +++ b/icechunk-python/tests/test_concurrency.py @@ -1,8 +1,17 @@ import asyncio +import concurrent.futures import random +import time +import uuid +from random import randrange +from threading import Event + +import pytest +from termcolor import colored import icechunk import zarr +from tests.conftest import write_chunks_to_minio N = 15 @@ -81,3 +90,172 @@ async def test_concurrency() -> None: for x in range(N): for y in range(N - 1): assert array[x, y] == x * y + + +@pytest.mark.filterwarnings("ignore:datetime.datetime.utcnow") +async def test_thread_concurrency() -> None: + """Run multiple threads doing different type of operations for SECONDS_TO_RUN seconds. + + The threads execute 5 types of operations: reads, native writes, virtual writes, deletes and lists. + + We launch THREADS threads of each type, and at the end we assert all operation types executed + a few times. + + The output prints a block character for each operation, colored according to the operation type. + The expectation is blocks of different colors should interleave. + """ + THREADS = 20 + SECONDS_TO_RUN = 1 + + prefix = str(uuid.uuid4()) + write_chunks_to_minio( + [ + (f"{prefix}/chunk-1", b"first"), + (f"{prefix}/chunk-2", b"second"), + ], + ) + + config = icechunk.RepositoryConfig.default() + store_config = icechunk.s3_store( + region="us-east-1", + endpoint_url="http://localhost:9000", + allow_http=True, + s3_compatible=True, + ) + container = icechunk.VirtualChunkContainer("s3", "s3://", store_config) + config.set_virtual_chunk_container(container) + config.inline_chunk_threshold_bytes = 0 + credentials = icechunk.containers_credentials( + s3=icechunk.s3_credentials(access_key_id="minio123", secret_access_key="minio123") + ) + + storage = icechunk.s3_storage( + region="us-east-1", + endpoint_url="http://localhost:9000", + allow_http=True, + bucket="testbucket", + prefix="multithreaded-test__" + str(time.time()), + access_key_id="minio123", + secret_access_key="minio123", + ) + + # Open the store + repo = icechunk.Repository.create( + storage=storage, + config=config, + virtual_chunk_credentials=credentials, + ) + + session = repo.writable_session("main") + store = session.store + + group = zarr.group(store=store, overwrite=True) + group.create_array("array", shape=(1_000,), chunks=(1,), dtype="i4", compressors=None) + + def do_virtual_writes(start, stop): + n = 0 + start.wait() + while not stop.is_set(): + i = randrange(1_000) + store.set_virtual_ref( + f"array/c/{i}", + f"s3://testbucket/{prefix}/chunk-1", + offset=0, + length=4, + ) + print(colored("■", "green"), end="") + n += 1 + return n + + def do_native_writes(start, stop): + async def do(): + n = 0 + while not stop.is_set(): + i = randrange(1_000) + await store.set( + f"array/c/{i}", zarr.core.buffer.cpu.Buffer.from_bytes(b"0123") + ) + print(colored("■", "white"), end="") + n += 1 + return n + + start.wait() + return asyncio.run(do()) + + def do_reads(start, stop): + buffer_prototype = zarr.core.buffer.default_buffer_prototype() + + async def do(): + n = 0 + while not stop.is_set(): + i = randrange(1_000) + await store.get(f"array/c/{i}", prototype=buffer_prototype) + print(colored("■", "blue"), end="") + n += 1 + return n + + start.wait() + return asyncio.run(do()) + + def do_deletes(start, stop): + async def do(): + n = 0 + while not stop.is_set(): + i = randrange(1_000) + await store.delete(f"array/c/{i}") + print(colored("■", "red"), end="") + n += 1 + return n + + start.wait() + return asyncio.run(do()) + + def do_lists(start, stop): + async def do(): + n = 0 + while not stop.is_set(): + _ = [k async for k in store.list_prefix("")] + print(colored("■", "yellow"), end="") + n += 1 + return n + + start.wait() + return asyncio.run(do()) + + with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS * 5) as pool: + virtual_writes = [] + native_writes = [] + deletes = [] + reads = [] + lists = [] + + start = Event() + stop = Event() + + for _ in range(THREADS): + virtual_writes.append(pool.submit(do_virtual_writes, start, stop)) + native_writes.append(pool.submit(do_native_writes, start, stop)) + deletes.append(pool.submit(do_deletes, start, stop)) + reads.append(pool.submit(do_reads, start, stop)) + lists.append(pool.submit(do_lists, start, stop)) + + start.set() + time.sleep(SECONDS_TO_RUN) + stop.set() + + virtual_writes = sum(future.result() for future in virtual_writes) + native_writes = sum(future.result() for future in native_writes) + deletes = sum(future.result() for future in deletes) + reads = sum(future.result() for future in reads) + lists = sum(future.result() for future in lists) + + print() + print( + f"virtual writes: {virtual_writes}, native writes: {native_writes}, deletes: {deletes}, reads: {reads}, lists: {lists}" + ) + + assert virtual_writes > 2 + assert native_writes > 2 + assert deletes > 2 + assert reads > 2 + assert lists > 2