From e0c5566b680e867aee78b8c1bab16ddc8ef3dcc0 Mon Sep 17 00:00:00 2001 From: "Joao S. O. Bueno" Date: Wed, 14 Oct 2020 00:33:13 -0300 Subject: [PATCH] Grouper (#4) * Initial 'Grouper' implementation * Finish up docs, improvements, tests * Bump version and update description --- README.md | 45 ++++++++++ extradict/__init__.py | 3 +- extradict/grouper.py | 190 +++++++++++++++++++++++++++++++++++++++++ setup.py | 4 +- tests/test_grouper.py | 31 +++++++ tests/test_mappings.py | 2 +- 6 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 extradict/grouper.py create mode 100644 tests/test_grouper.py diff --git a/README.md b/README.md index bd8d1ba..82a24fb 100644 --- a/README.md +++ b/README.md @@ -311,3 +311,48 @@ while those built with `AVLNode` will be self-balancing. Trying to manually mix node types in the same tree, or changing the key_func in different notes, will obviously wreck everything. + +## Grouper + + +Think of it as an itertools.groupby which returns a mapping +Or as an itertools.tee that splits the stream into filtered +substreams according to the passed key-callable. + +Given an iterable and a key callable, +each element in the iterable is run through the key callable and +made available in an iterator under a bucket using the resulting key-value. + +The source iterable need not be ordered (unlike itertools.groupby). +If no key function is given, the identity function is used. + +The items will be made available under the iterable-values as requested, +in a lazy way when possible. Note that several different method calls may +precipatate an eager processing of all items in the source iterator: +.keys() or len(), for example. + +Whenever a new key is found during input consumption, a "Queue" iterator, +which is a thin wrapper over collections.deque is created under that key +and can be further iterated to retrieve more elements that map to +the same key. + +In short, this is very similar to `itertools.tee`, but with a filter +so that each element goes to a mapped bucket. + +Once created, the resulting object may obtionally be called. Doing this +will consume all data in the source iterator at once, and return a +a plain dictionary with all data fetched into lists. + +For example, to divide a sequence of numbers from 0 to 10 in +5 buckets, all one need to do is: `Grouper(myseq, lambda x: x // 2)` + +Or: +```python +>>> from extradict import Grouper +>>> even_odd = Grouper(range(10), lambda x: "even" if not x % 2 else "odd") +>>> print(list(even_odd["even"])) +[0, 2, 4, 6, 8] +>>> print(list(even_odd["odd"])) +[1, 3, 5, 7, 9] + +``` diff --git a/extradict/__init__.py b/extradict/__init__.py index ef61820..e91c1b2 100644 --- a/extradict/__init__.py +++ b/extradict/__init__.py @@ -9,7 +9,8 @@ from .extratuple import defaultnamedtuple from .extratuple import fastnamedtuple from .binary_tree_dict import TreeDict +from .grouper import Grouper __author__ = "João S. O. Bueno" -__version__ = "0.4.0" +__version__ = "0.5.0" diff --git a/extradict/grouper.py b/extradict/grouper.py new file mode 100644 index 0000000..06ec9b5 --- /dev/null +++ b/extradict/grouper.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +from collections import deque +from collections.abc import Mapping +import typing as T + + +SENTINEL = object() + + +class Queue: + """Used as bucket class by Grouper. Works as an iterator value for each key in the Grouper mapping. + + Normally each element is consumed as it is fetched - but the elements + are kept in a `collections.deque` instance in the .data attribute. + One can access .data directly if desired. + """ + __slots__ = ("data", "key", "parent") + + def __init__(self, parent: Grouper, key: T.Hashable): + self.key = key + self.parent = parent + self.data = deque() + + def peek(self, default: T.Any=False) -> T.Any: + """Return the next available value under this bucket, if any. + If there is no waiting value, returns the passed in default value, + or False, if none was given. + + Calling this won't consume the next element, neither cause + the source iterator in the parent to advance. + """ + return self.data[0] if self.data else default + + def __iter__(self): + return self + + def __next__(self): + if self.data: + return self.data.popleft() + try: + self.parent.fetch_next(self.key) + except StopIteration: + raise + else: + return self.data.popleft() + + def append(self, value: T.Any) -> None: + """Used internally by the parnt grouper to feed the filtered data""" + self.data.append(value) + + def __repr__(self): + return f"Queue <{self.data}>" + + +class Grouper(Mapping): + """Grouper mapping: + + Think of it as an itertools.groupby which returns a mapping + Or as an itertools.tee that splits the stream into filtered + substreams according to the passed key-callable. + + Given an iterable and a key callable, + each element in the iterable is run through the key callable and + made available in an iterator under a bucket using the resulting key-value. + + The source iterable need not be ordered (unlike itertools.groupby). + If no key function is given, the identity function is used. + + The items will be made available under the iterable-values as requested, + in a lazy way when possible. Note that several different method calls may + precipatate an eager processing of all items in the source iterator: + .keys() or len(), for example. + + Whenever a new key is found during input consumption, a "Queue" iterator, + which is a thin wrapper over collections.deque is created under that key + and can be further iterated to retrieve more elements that map to + the same key. + + In short, this is very similar to `itertools.tee`, but with a filter + so that each element goes to a mapped bucket. + + Once created, the resulting object may obtionally be called. Doing this + will consume all data in the source iterator at once, and return a + a plain dictionary with all data fetched into lists. + + For example, to divide a sequence of numbers from 0 to 10 in + 5 buckets, all one need to do is: `Grouper(myseq, lambda x: x // 2)` + + Or: + even_odd = `Grouper(seq, lambda x: "even" if not x % 2 else "odd")` + + """ + + def __init__(self, source: T.Union[T.Sequence, T.Iterable], key: T.Callable[[T.Any], T.Hashable]=None): + self.key = key if key is not None else lambda x: x + self.source = iter(source) + self.data = dict() + + def __getitem__(self, key: T.Hashable): + if key not in self.data: + SENTINEL2 = object() + if self.fetch_next(key, SENTINEL2) is SENTINEL2: + raise KeyError(key) + return self.data[key] + + def fetch_next(self, key: T.Hashable, default: T.Any=SENTINEL): + """Advances consuming the source until a new value for 'key' is fetched. + + When source is exhausted, either raises StopIteration or returns the + default value, if it is given. + + Used internally by the Queue iterators to advance to their next values + """ + for new_key, value in self.advance(): + if key == new_key: + return value + if default is SENTINEL: + raise StopIteration + return default + + def advance(self): + """A generator that consumes one item from source, feeds the internal buckets + and yields the (key, value) pair each time it is 'nexted'. + + This is used internally as the mechanism to fill the bucket queues. + If there the intent is just to consume the source and get the key, value pair + without storing the values in the buckets, use ".consume()" + """ + + for key, value in self.consume(): + if key not in self.data: + self.data[key] = Queue(self, key) + self.data[key].append(value) + yield key, value + + def consume(self): + """A generator that consumes one item from source, feeds the internal buckets + and yields the (key, value) pair each time it is 'nexted'. + + This is used internally as the mechanism to advance the source generator + and get the corresponding key. The values are not stored for further use, + and just yielded as key, value pair. + + This can be used directly, but one might just use + `map(lambda v: (key(v), v), source)` instead of using a Grouper object. + + If this is called directly when trying to make use of the buckets undefined + results will ensue. + """ + for value in self.source: + key = self.key(value) + yield key, value + + def __iter__(self): + self.consume_all() + return iter(self.data) + + def __len__(self): + self.consume_all() + return len(self.data.keys()) + + def consume_all(self): + # Consumes all the remaining source + # this "hidden" recipe is the most efficient + # way in the cPython interpreter to consume a generator + # - check: https://docs.python.org/3/library/itertools.html#itertools-recipes + deque(self.advance(), maxlen=0) + + def __call__(self, keyhint: T.Container[str]=()) -> dict[T.Hashable, list[T.any]]: + """Consumes all the source iterator, and returns a plain dictionay with + all elements inside lists under the appropriate keys. If keyhint + is passed, keys for which there are no elements are created with + empty lists. (But extra keys yielded by the key function and + not present in keyhint will be present in the result, nonetheless). + + The iterators in the Grouper object itself are not themselves consumed + (although the expected pattern if one calls this is that the resulting + dictionary is used and the grouper object is discarded) + + """ + keyhint = set(keyhint) + result = {key: list(self[key].data) for key in self} + for remaining in keyhint - result.keys(): + result[remaining] = [] + return result + + def __repr__(self): + return f"Grouper by {sentinel.__name__}" + diff --git a/setup.py b/setup.py index fd9c42c..bf2d0fa 100644 --- a/setup.py +++ b/setup.py @@ -5,11 +5,11 @@ setup( name = 'extradict', packages = ['extradict'], - version = "0.4.0", + version = "0.5.0", license = "LGPLv3+", author = "João S. O. Bueno", author_email = "gwidion@gmail.com", - description = "Enhanced, maybe useful, data containers and utilities: A versioned dictionary, a bidirectional dictionary, a binary tree backed dictionary and an easy extractor from dictionary key/values to variables", + description = "Enhanced, maybe useful, data containers and utilities: A versioned dictionary, a bidirectional dictionary, a binary tree backed dictionary, a Grouper iterator mapper similar to itertools.tee, and an easy extractor from dictionary key/values to variables", keywords = "versioned bijective assigner getter unpack transactional container collection dict dictionary normalized binarytree", py_modules = ['extradict'], url = 'https://github.com/jsbueno/extradict', diff --git a/tests/test_grouper.py b/tests/test_grouper.py new file mode 100644 index 0000000..c685780 --- /dev/null +++ b/tests/test_grouper.py @@ -0,0 +1,31 @@ +import pytest + +from extradict import Grouper + + +def test_grouper_works(): + x = Grouper(range(10)) + assert all(len(list(x[k])) == 1 for k in range(10)) + + +def test_grouper_groups(): + x = Grouper(range(100), key=lambda x: x// 10) + assert all(len(list(x[k])) == 10 for k in range(10)) + + +def test_grouper_call_returns_dict_with_lists(): + x = Grouper(range(100), key=lambda x: x// 10)() + assert x == {k:list(range(k* 10, k* 10 + 10)) for k in range(10)} + + +def test_grouper_call_keyhint(): + x = Grouper(range(5))(keyhint=(5, 6, 7, 8)) + assert x == {**{k:[k] for k in range(5)}, **{k: [] for k in range(5, 9)}} + + +def test_grouper_yields_key_error_on_unmatched_key(): + x = Grouper(range(10)) + x[0] + with pytest.raises(KeyError): + x[10] + diff --git a/tests/test_mappings.py b/tests/test_mappings.py index 2f3f496..bb62059 100644 --- a/tests/test_mappings.py +++ b/tests/test_mappings.py @@ -4,7 +4,7 @@ FallbackNormalizedDict, NormalizedDict, BijectiveDict, - TreeDict + TreeDict, ) import pytest