Skip to content

Commit

Permalink
Add multithreaded test (#678)
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba authored Feb 5, 2025
1 parent 4c705da commit 1674d6d
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
1 change: 1 addition & 0 deletions icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ test = [
"hypothesis",
"pandas-stubs",
"boto3-stubs[s3]",
"termcolor",
]
benchmark = [
"pytest-benchmark[histogram]",
Expand Down
178 changes: 178 additions & 0 deletions icechunk-python/tests/test_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import asyncio
import concurrent.futures
import random
import time
import uuid
from random import randrange
from threading import Event

import pytest
from termcolor import colored

import icechunk
import zarr
from tests.conftest import write_chunks_to_minio

N = 15

Expand Down Expand Up @@ -81,3 +90,172 @@ async def test_concurrency() -> None:
for x in range(N):
for y in range(N - 1):
assert array[x, y] == x * y


@pytest.mark.filterwarnings("ignore:datetime.datetime.utcnow")
async def test_thread_concurrency() -> None:
"""Run multiple threads doing different type of operations for SECONDS_TO_RUN seconds.
The threads execute 5 types of operations: reads, native writes, virtual writes, deletes and lists.
We launch THREADS threads of each type, and at the end we assert all operation types executed
a few times.
The output prints a block character for each operation, colored according to the operation type.
The expectation is blocks of different colors should interleave.
"""
THREADS = 20
SECONDS_TO_RUN = 1

prefix = str(uuid.uuid4())
write_chunks_to_minio(
[
(f"{prefix}/chunk-1", b"first"),
(f"{prefix}/chunk-2", b"second"),
],
)

config = icechunk.RepositoryConfig.default()
store_config = icechunk.s3_store(
region="us-east-1",
endpoint_url="http://localhost:9000",
allow_http=True,
s3_compatible=True,
)
container = icechunk.VirtualChunkContainer("s3", "s3://", store_config)
config.set_virtual_chunk_container(container)
config.inline_chunk_threshold_bytes = 0
credentials = icechunk.containers_credentials(
s3=icechunk.s3_credentials(access_key_id="minio123", secret_access_key="minio123")
)

storage = icechunk.s3_storage(
region="us-east-1",
endpoint_url="http://localhost:9000",
allow_http=True,
bucket="testbucket",
prefix="multithreaded-test__" + str(time.time()),
access_key_id="minio123",
secret_access_key="minio123",
)

# Open the store
repo = icechunk.Repository.create(
storage=storage,
config=config,
virtual_chunk_credentials=credentials,
)

session = repo.writable_session("main")
store = session.store

group = zarr.group(store=store, overwrite=True)
group.create_array("array", shape=(1_000,), chunks=(1,), dtype="i4", compressors=None)

def do_virtual_writes(start, stop):
n = 0
start.wait()
while not stop.is_set():
i = randrange(1_000)
store.set_virtual_ref(
f"array/c/{i}",
f"s3://testbucket/{prefix}/chunk-1",
offset=0,
length=4,
)
print(colored("■", "green"), end="")
n += 1
return n

def do_native_writes(start, stop):
async def do():
n = 0
while not stop.is_set():
i = randrange(1_000)
await store.set(
f"array/c/{i}", zarr.core.buffer.cpu.Buffer.from_bytes(b"0123")
)
print(colored("■", "white"), end="")
n += 1
return n

start.wait()
return asyncio.run(do())

def do_reads(start, stop):
buffer_prototype = zarr.core.buffer.default_buffer_prototype()

async def do():
n = 0
while not stop.is_set():
i = randrange(1_000)
await store.get(f"array/c/{i}", prototype=buffer_prototype)
print(colored("■", "blue"), end="")
n += 1
return n

start.wait()
return asyncio.run(do())

def do_deletes(start, stop):
async def do():
n = 0
while not stop.is_set():
i = randrange(1_000)
await store.delete(f"array/c/{i}")
print(colored("■", "red"), end="")
n += 1
return n

start.wait()
return asyncio.run(do())

def do_lists(start, stop):
async def do():
n = 0
while not stop.is_set():
_ = [k async for k in store.list_prefix("")]
print(colored("■", "yellow"), end="")
n += 1
return n

start.wait()
return asyncio.run(do())

with concurrent.futures.ThreadPoolExecutor(max_workers=THREADS * 5) as pool:
virtual_writes = []
native_writes = []
deletes = []
reads = []
lists = []

start = Event()
stop = Event()

for _ in range(THREADS):
virtual_writes.append(pool.submit(do_virtual_writes, start, stop))
native_writes.append(pool.submit(do_native_writes, start, stop))
deletes.append(pool.submit(do_deletes, start, stop))
reads.append(pool.submit(do_reads, start, stop))
lists.append(pool.submit(do_lists, start, stop))

start.set()
time.sleep(SECONDS_TO_RUN)
stop.set()

virtual_writes = sum(future.result() for future in virtual_writes)
native_writes = sum(future.result() for future in native_writes)
deletes = sum(future.result() for future in deletes)
reads = sum(future.result() for future in reads)
lists = sum(future.result() for future in lists)

print()
print(
f"virtual writes: {virtual_writes}, native writes: {native_writes}, deletes: {deletes}, reads: {reads}, lists: {lists}"
)

assert virtual_writes > 2
assert native_writes > 2
assert deletes > 2
assert reads > 2
assert lists > 2

0 comments on commit 1674d6d

Please sign in to comment.