diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 831b3b1a7e9132..a1ac7f67ae6cfd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -1,4 +1,5 @@ import logging +import threading from dataclasses import dataclass, field from typing import Any, Dict, Optional @@ -156,18 +157,21 @@ class TopTableTimings: def __init__(self, size: int = 10): self._size = size self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0)) + self._lock = threading.Lock() def add(self, entity: Dict[str, Any]) -> None: if self._VALUE_FIELD not in entity: return - self.top_entites.add(entity) - if len(self.top_entites) > self._size: - self.top_entites.pop() + with self._lock: + self.top_entites.add(entity) + if len(self.top_entites) > self._size: + self.top_entites.pop() def __str__(self) -> str: - if len(self.top_entites) == 0: - return "no timings reported" - return str(list(self.top_entites)) + with self._lock: + if len(self.top_entites) == 0: + return "no timings reported" + return str(list(self.top_entites)) class TimingClass: @@ -175,24 +179,31 @@ class TimingClass: def __init__(self): self.times = SortedList() + self._lock = threading.Lock() def add_timing(self, t: float) -> None: - self.times.add(t) + with self._lock: + self.times.add(t) def __str__(self) -> str: - if len(self.times) == 0: - return "no timings reported" - total = sum(self.times) - avg = total / len(self.times) - return str( - { - "average_time": format_timespan(avg, detailed=True, max_units=3), - "min_time": format_timespan(self.times[0], detailed=True, max_units=3), - "max_time": format_timespan(self.times[-1], detailed=True, max_units=3), - # total_time does not provide correct information in case we run in more than 1 thread - "total_time": format_timespan(total, detailed=True, max_units=3), - } - ) + with self._lock: + if len(self.times) == 0: + return "no timings reported" + total = sum(self.times) + avg = total / len(self.times) + return str( + { + "average_time": format_timespan(avg, detailed=True, max_units=3), + "min_time": format_timespan( + self.times[0], detailed=True, max_units=3 + ), + "max_time": format_timespan( + self.times[-1], detailed=True, max_units=3 + ), + # total_time does not provide correct information in case we run in more than 1 thread + "total_time": format_timespan(total, detailed=True, max_units=3), + } + ) @dataclass