Skip to content

Commit

Permalink
add ShardCalculator
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-iohk committed Mar 1, 2024
1 parent 5025186 commit 8310e01
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
21 changes: 21 additions & 0 deletions tests/test_aws_keyspaces_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from uptime_service_validation.coordinator.aws_keyspaces_client import (
AWSKeyspacesClient,
ShardCalculator,
)


Expand All @@ -19,3 +20,23 @@ def test_get_submitted_at_date_list():
end = datetime(2023, 11, 8, 0, 0, 0)
result = AWSKeyspacesClient.get_submitted_at_date_list(start, end)
assert result == ["2023-11-06", "2023-11-07", "2023-11-08"]


def test_calculate_shards_in_range():
# 1 between days
start_time = datetime(2024, 2, 29, 23, 58, 29)
end_time = datetime(2024, 3, 1, 0, 3, 29)
expected_cql_statement = "shard in (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)"
result_cql_statement = ShardCalculator.calculate_shards_in_range(
start_time, end_time
)
assert result_cql_statement == expected_cql_statement

# 2 within the same day
start_time = datetime(2024, 2, 29, 12, 58, 1)
end_time = datetime(2024, 2, 29, 12, 59, 59)
expected_cql_statement = "shard in (580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)"
result_cql_statement = ShardCalculator.calculate_shards_in_range(
start_time, end_time
)
assert result_cql_statement == expected_cql_statement
31 changes: 29 additions & 2 deletions uptime_service_validation/coordinator/aws_keyspaces_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cassandra.policies import DCAwareRoundRobinPolicy
from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLS_CLIENT
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional, ByteString, List

import pandas as pd
Expand Down Expand Up @@ -194,7 +194,9 @@ def get_submissions(
submitted_at_start, submitted_at_end
)

shard_condition = "shard in (" + ",".join(map(str, range(1, 60))) + ")"
shard_condition = ShardCalculator.calculate_shards_in_range(
submitted_at_start, submitted_at_end
)

if len(submitted_at_date_list) == 1:
submitted_at_date = submitted_at_date_list[0]
Expand Down Expand Up @@ -265,6 +267,31 @@ def close(self):
self.cluster.shutdown()


class ShardCalculator:
@classmethod
def calculate_shard(cls, minute, second):
base_shard = minute * 10
additional_value = second // 6 # Integer division to find the segment
return base_shard + additional_value

@classmethod
def calculate_shards_in_range(cls, start_time, end_time):
shards = set()
current_time = start_time

while current_time < end_time:
shard = cls.calculate_shard(current_time.minute, current_time.second)
shards.add(shard)
# Move to the next second
current_time += timedelta(seconds=1)

# Format the shards into a CQL statement string
shards_list = sorted(list(shards)) # Sort the shards for readability
shards_str = ",".join(map(str, shards_list))
cql_statement = f"shard in ({shards_str})"
return cql_statement


# Usage Example
if __name__ == "__main__":
client = AWSKeyspacesClient()
Expand Down

0 comments on commit 8310e01

Please sign in to comment.