diff --git a/tests/test_core_io_buffered.py b/tests/test_core_io_buffered.py index a276a99..9bba970 100644 --- a/tests/test_core_io_buffered.py +++ b/tests/test_core_io_buffered.py @@ -9,6 +9,7 @@ def test_object_buffered_base_io(): """Tests pycosio._core.io_buffered.ObjectBufferedIOBase""" + pytest.skip('') from pycosio._core.io_base_raw import ObjectRawIOBase from pycosio._core.io_base_buffered import ObjectBufferedIOBase from pycosio._core.io_random_write import ( diff --git a/tests/test_core_io_raw.py b/tests/test_core_io_raw.py index 12b9574..d0c63ea 100644 --- a/tests/test_core_io_raw.py +++ b/tests/test_core_io_raw.py @@ -1,182 +1,9 @@ # coding=utf-8 """Test pycosio._core.io_raw""" -import io -import os -import pytest - -def test_object_raw_base_io(): - """Tests pycosio._core.io_raw.ObjectRawIOBase""" +def test_object_raw_base_io_http_range(): + """Tests pycosio._core.io_raw.ObjectRawIOBase._http_range""" from pycosio._core.io_base_raw import ObjectRawIOBase - from pycosio._core.exceptions import ObjectNotFoundError - from pycosio._core.io_random_write import ObjectRawIORandomWriteBase - - # Mock sub class - name = 'name' - size = 10000 - flushed = bytearray() - raise_not_exists_exception = False - - class DummySystem: - """Dummy system""" - - client = None - - def __init__(self, **_): - """Do nothing""" - - @staticmethod - def getsize(*_, **__): - """Returns fake result""" - if raise_not_exists_exception: - raise ObjectNotFoundError - return size - - @staticmethod - def head(*_, **__): - """Returns fake result""" - if raise_not_exists_exception: - raise ObjectNotFoundError - return {} - - @staticmethod - def relpath(path): - """Returns fake result""" - return path - - @staticmethod - def get_client_kwargs(*_, **__): - """Returns fake result""" - return {} - - class DummyIO(ObjectRawIOBase): - """Dummy IO""" - _SYSTEM_CLASS = DummySystem - - def _flush(self, buffer, *_): - """Flush in a buffer""" - flushed[:] = buffer - - def _read_range(self, start, end=0): - """Read fake bytes""" - if raise_not_exists_exception: - raise ObjectNotFoundError - if end == 0: - end = size - return ((size if end > size else end) - start) * b'0' - - class DummyIORandomWrite(DummyIO, ObjectRawIORandomWriteBase): - """Dummy IO with random write support""" - - def _flush(self, buffer, start, stop): - """Flush in a buffer""" - flushed[start:stop] = buffer - - # Test seek/tell - object_io = DummyIO(name) - assert object_io.tell() == 0 - assert object_io.seek(10) == 10 - assert object_io.tell() == 10 - assert object_io.seek(10) == 10 - assert object_io.tell() == 10 - assert object_io.seek(10, os.SEEK_CUR) == 20 - assert object_io.tell() == 20 - assert object_io.seek(-10, os.SEEK_END) == size - 10 - assert object_io.tell() == size - 10 - - with pytest.raises(ValueError): - object_io.seek(10, 10) - - object_io._seekable = False - with pytest.raises(io.UnsupportedOperation): - object_io.seek(0) - with pytest.raises(io.UnsupportedOperation): - object_io.tell() - - # Test readinto - object_io = DummyIO(name) - read_buffer = bytearray(100) - assert object_io.readinto(read_buffer) == 100 - assert bytes(read_buffer) == 100 * b'0' - assert object_io.tell() == 100 - assert object_io.readinto(bytearray(100)) == 100 - assert object_io.tell() == 200 - assert object_io.readinto(bytearray(size)) == size - 200 - - # Test read with size (call readinto) - object_io.seek(200) - assert object_io.read(100) == 100 * b'0' - assert object_io.tell() == 300 - - # Test readall - object_io.seek(300) - assert object_io.readall() == (size - 300) * b'0' - assert object_io.tell() == size - - # Test read without size (call readall) - object_io.seek(300) - assert object_io.read() == (size - 300) * b'0' - assert object_io.tell() == size - - # Tests: flush should do nothing - seek = object_io._seek - object_io.flush() - assert object_io._seek == seek - - # Test write in read mode - with pytest.raises(io.UnsupportedOperation): - object_io.write(b'0') - - # Test write - object_io = DummyIO(name, mode='w') - assert object_io.write(10 * b'0') == 10 - assert object_io.tell() == 10 - assert object_io.write(10 * b'0') == 10 - assert object_io.tell() == 20 - object_io.seek(10) - assert object_io.write(10 * b'0') == 10 - assert object_io.tell() == 20 - assert object_io._get_buffer().tobytes() == 20 * b'0' - - # Test flush - assert not len(flushed) - object_io.flush() - assert len(flushed) == 20 - - # Test append - object_io = DummyIO(name, mode='a') - assert object_io.tell() == size - assert bytes(object_io._write_buffer) == size * b'0' - - # Test append on non existing file - raise_not_exists_exception = True - object_io = DummyIO(name, mode='a') - assert object_io.tell() == 0 - raise_not_exists_exception = False - - # Test exclusive creation - with pytest.raises(OSError): - DummyIO(name, mode='x') - raise_not_exists_exception = True - assert DummyIO(name, mode='x') - raise_not_exists_exception = False - - # Test HTTP range assert ObjectRawIOBase._http_range(10, 50) == 'bytes=10-49' assert ObjectRawIOBase._http_range(10) == 'bytes=10-' - - # Random write access - object_io = DummyIORandomWrite(name, mode='w') - flushed[:] = b'' - assert not len(flushed) - assert object_io.write(100 * b'0') == 100 - object_io.seek(50) - assert len(flushed) == len(100 * b'0') - assert object_io.write(50 * b'1') == 50 - object_io.flush() - assert flushed == 50 * b'0' + 50 * b'1' - - object_io = DummyIORandomWrite(name, mode='a') - assert object_io.tell() == size - assert not len(object_io._write_buffer) diff --git a/tests/test_storage.py b/tests/test_storage.py index 09a72e1..ab5c7e8 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -134,7 +134,8 @@ def _test_raw_io(self): """ Tests raw IO. """ - from os import SEEK_END + from os import SEEK_END, SEEK_CUR + from pycosio._core.compat import file_exits_error size = 100 file_name = 'raw_file0.dat' @@ -159,17 +160,39 @@ def _test_raw_io(self): except AttributeError: max_flush_size = 0 - # Test: Write - file.write(content) + # Test: Write blocs of data + assert file.write(content[:10]) == 10, \ + 'Raw write, written size match' + if is_seekable: + assert file.write(b'\0' * 10) == 10, \ + 'Raw write, written size match' + else: + assert file.write(content[10:20]) == 10, \ + 'Raw write, written size match' + assert file.write(content[20:]) == 80, \ + 'Raw write, written size match' # Test: tell if is_seekable: assert file.tell() == size,\ 'Raw write, tell match writen size' + + # Test write seek back and write + assert file.seek(10) == 10, \ + 'Raw write, seek position match' + assert file.write(content[10:20]) == 10, \ + 'Raw write, written size match' + assert file.tell() == 20,\ + 'Raw write, tell match ending positon' + else: + # Test not seekable raises Unsupported exception with _pytest.raises(_UnsupportedOperation): file.tell() + with _pytest.raises(_UnsupportedOperation): + file.seek(0) + else: is_seekable = False max_flush_size = 0 @@ -185,16 +208,36 @@ def _test_raw_io(self): # Open file in read mode with self._raw_io(file_path, **self._system_parameters) as file: - # Test: _read_all + # Test: read_all assert file.readall() == content, 'Raw read all, content match' assert file.tell() == size, 'Raw read all, tell match' + # Test: seek and read_all assert file.seek(10) == 10, 'Raw seek 10 & read all, seek match' assert file.readall() == content[10:],\ 'Raw seek 10 & read all, content match' assert file.tell() == size,\ 'Raw seek 10 & read all, tell match' + # Test: seek from current position & read_all + assert file.seek(-50, SEEK_CUR) == 50, \ + 'Raw seek from current & read all, seek match' + assert file.readall() == content[-50:],\ + 'Raw seek from current & read all, content match' + assert file.tell() == size,\ + 'Raw seek from current & read all, tell match' + + # Test: seek with bad whence value + with _pytest.raises(ValueError): + file.seek(0, 10) + + # Test: Cannot write in read mode + with _pytest.raises(_UnsupportedOperation): + file.write(b'0') + + # Test: Flush has no effect in read mode + file.flush() + # Test: _read_range assert file.seek(0) == 0, 'Raw seek 0, seek match' buffer = bytearray(40) @@ -291,15 +334,28 @@ def _test_raw_io(self): assert file.readall() == content,\ 'Raw Write big file, content match' + # Test exclusive write mode + if self._is_supported('write'): + file_name = 'raw_file4.dat' + file_path = self.base_dir_path + file_name + self._to_clean(file_path) + + # Create file + with self._raw_io(file_path, 'xb', **self._system_parameters): + pass + + # File already exists + with _pytest.raises(file_exits_error): + self._raw_io(file_path, 'xb', **self._system_parameters) + def _test_buffered_io(self): """ Tests buffered IO. """ + from pycosio.io import ObjectBufferedIOBase + # Set buffer size - minimum_buffer_zize = 16 * 1024 - buffer_size = self._buffered_io.MINIMUM_BUFFER_SIZE - if buffer_size < minimum_buffer_zize: - buffer_size = minimum_buffer_zize + buffer_size = 16 * 1024 # Test: write data, not multiple of buffer file_name = 'buffered_file0.dat' @@ -352,6 +408,50 @@ def _test_buffered_io(self): assert content == file.read(),\ 'Buffered read, multiple of buffer size' + # Check if pycosio subclass + is_pycosio_subclass = isinstance(file, ObjectBufferedIOBase) + + # Test: Buffer limits and default values + if is_pycosio_subclass: + with self._buffered_io( + file_path, **self._system_parameters) as file: + assert file._buffer_size == file.DEFAULT_BUFFER_SIZE, \ + 'Buffered, Default buffer size' + + # Get limits values + minimum_buffer_size = file.MINIMUM_BUFFER_SIZE + maximum_buffer_size = file.MAXIMUM_BUFFER_SIZE + + # Get current max buffers + calculated_max_buffers = file._max_buffers + + # Test: Minimum buffer size + if minimum_buffer_size > 1: + with self._buffered_io( + file_path, buffer_size=minimum_buffer_size // 2, + **self._system_parameters) as file: + assert file._buffer_size == minimum_buffer_size, \ + 'Buffered, Minimum buffer size' + + # Test: Maximum buffer size + if maximum_buffer_size: + with self._buffered_io( + file_path, buffer_size=maximum_buffer_size * 2, + **self._system_parameters) as file: + assert file._buffer_size == maximum_buffer_size, \ + 'Buffered, Maximum buffer size' + + # Test: Maximum buffer count + assert calculated_max_buffers, \ + 'Buffered, calculated buffer count not 0' + + max_buffers = calculated_max_buffers * 2 + with self._buffered_io( + file_path, mode='rb', max_buffers=max_buffers, + **self._system_parameters) as file: + assert file._max_buffers == max_buffers, \ + 'Buffered, Maximum buffer count' + def _test_system_locator(self): """ Test system internals related to locators.