Skip to content

Commit

Permalink
Merge pull request #447 from atlanhq/FT-865
Browse files Browse the repository at this point in the history
FT-865: Fixed timestamp paging returns incomplete results
  • Loading branch information
Aryamanz29 authored Dec 13, 2024
2 parents 97b664c + b62087c commit 9f5c8a7
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 9 deletions.
42 changes: 36 additions & 6 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,12 +1711,6 @@ def next_page(self, start=None, size=None) -> bool:
self._start = start or self._start + self._size
if size:
self._size = size
# Used in the "timestamp-based" paging approach
# to check if `asset.guid` has already been processed
# in a previous page of results.
# If it has,then exclude it from the current results;
# otherwise, we may encounter duplicate asset records.
self._processed_guids.update(asset.guid for asset in self._assets)
return self._get_next_page() if self._assets else False

@abc.abstractmethod
Expand Down Expand Up @@ -1851,6 +1845,42 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
rewritten_query = Bool(filter=rewritten_filters)
self._criteria.dsl.from_ = 0 # type: ignore[attr-defined]
self._criteria.dsl.query = rewritten_query # type: ignore[attr-defined]
else:
# Ensure that when switching to offset-based paging, if the first and last record timestamps are the same,
# we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query.
# Instead, ensure the search runs with only SortItem(field='__timestamp', order=<SortOrder.ASCENDING>).
# Failing to do so can lead to incomplete results (less than the approximate count) when running the search
# with a small page size.
if isinstance(query, Bool):
for filter_ in query.filter:
if self.is_paging_timestamp_query(filter_):
query.filter.remove(filter_)

# Always ensure that the offset is set to the length of the processed assets
# instead of the default (start + size), as the default may skip some assets
# and result in incomplete results (less than the approximate count)
self._criteria.dsl.from_ = len(self._processed_guids) # type: ignore[attr-defined]

def next_page(self, start=None, size=None) -> bool:
"""
Indicates whether there is a next page of results.
:returns: True if there is a next page of results, otherwise False
"""
self._start = start or self._start + self._size
is_bulk_search = (
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
)
if size:
self._size = size
if is_bulk_search:
# Used in the "timestamp-based" paging approach
# to check if `asset.guid` has already been processed
# in a previous page of results.
# If it has,then exclude it from the current results;
# otherwise, we may encounter duplicate asset records.
self._processed_guids.update(asset.guid for asset in self._assets)
return self._get_next_page() if self._assets else False

def _get_next_page(self):
"""
Expand Down
23 changes: 21 additions & 2 deletions pyatlan/model/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def _get_next_page(self):
query = self._criteria.dsl.query
self._criteria.dsl.size = self._size
self._criteria.dsl.from_ = self._start
is_bulk_search = self._bulk or self._count > self._MASS_EXTRACT_THRESHOLD
is_bulk_search = (
self._bulk or self._approximate_count > self._MASS_EXTRACT_THRESHOLD
)

if is_bulk_search:
self._prepare_query_for_timestamp_paging(query)
Expand Down Expand Up @@ -381,6 +383,21 @@ def _prepare_query_for_timestamp_paging(self, query: Query):
rewritten_query = Bool(filter=rewritten_filters)
self._criteria.dsl.from_ = 0
self._criteria.dsl.query = rewritten_query
else:
# Ensure that when switching to offset-based paging, if the first and last record timestamps are the same,
# we do not include a created timestamp filter (ie: Range(field='__timestamp', gte=VALUE)) in the query.
# Instead, ensure the search runs with only SortItem(field='__timestamp', order=<SortOrder.ASCENDING>).
# Failing to do so can lead to incomplete results (less than the approximate count) when running the search
# with a small page size.
if isinstance(query, Bool):
for filter_ in query.filter:
if self._is_paging_timestamp_query(filter_):
query.filter.remove(filter_)

# Always ensure that the offset is set to the length of the processed assets
# instead of the default (start + size), as the default may skip some assets
# and result in incomplete results (less than the approximate count)
self._criteria.dsl.from_ = len(self._processed_entity_keys)

@staticmethod
def _get_paging_timestamp_query(last_timestamp: int) -> Query:
Expand All @@ -403,7 +420,7 @@ def _update_first_last_record_creation_times(self):
def presorted_by_timestamp(sorts: Optional[List[SortItem]]) -> bool:
"""
Checks if the sorting options prioritize creation time in ascending order.
:param sorts: List of sorting options or None.
:param sorts: list of sorting options or None.
:returns: True if sorting is already prioritized by creation time, False otherwise.
"""
if sorts and isinstance(sorts[0], SortItem):
Expand Down Expand Up @@ -439,6 +456,8 @@ def __iter__(self) -> Generator[EntityAudit, None, None]:
"""
Iterates through the results, lazily-fetching each next page until there
are no more results.
returns: an iterable form of each result, across all pages
"""
while True:
yield from self.current_page()
Expand Down
6 changes: 5 additions & 1 deletion tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,11 @@ def test_search_next_page(client: AtlanClient):
def _assert_search_results(results, expected_sorts, size, TOTAL_ASSETS, bulk=False):
assert results.count > size
assert len(results.current_page()) == size
assert results.count == TOTAL_ASSETS
counter = 0
for term in results:
assert term
counter += 1
assert counter == TOTAL_ASSETS
assert results
assert results._bulk is bulk
assert results.aggregations is None
Expand Down

0 comments on commit 9f5c8a7

Please sign in to comment.