Skip to content

Commit

Permalink
cvlib/grpc_client: Add retry policy to the grpc client.
Browse files Browse the repository at this point in the history
When script-executor is using cvlib to call external endpoints like
logging, tags, inputs the calls can results in `Unavaialble` errors due
to a provisioning happening at the same time. Add retry policy to the
grpc client so the retry will be done by each external call.

Change-Id: I647fea1b303263baaa12d304937d5287b3871023
  • Loading branch information
rcodescu committed Feb 28, 2025
1 parent c26d881 commit 83a14d9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
22 changes: 21 additions & 1 deletion cloudvision/Connector/grpc_client/grpcClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import json
import grpc
from google.protobuf import timestamp_pb2 as pbts
from google.protobuf.empty_pb2 import Empty
Expand Down Expand Up @@ -125,7 +126,26 @@ class GRPCClient(object):
"grpc.keepalive_time_ms": 60000,
# 0 means infinite, as keepalive_time_ms is 60 seconds client will send 1 ping every minute
"grpc.http2.max_pings_without_data": 0,
# enable retries for the grpc client
# https://grpc.github.io/grpc/core/group__grpc__arg__keys.html#ga212f667ecbcee3b100898ba7e88454df
"grpc.enable_retries": 1,
}
RETRY_POLICY_JSON = json.dumps(
{
"methodConfig": [
{
"name": [{"service": ""}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "1s",
"maxBackoff": "16s",
"backoffMultiplier": 2.0,
"retryableStatusCodes": ["UNAVAILABLE"],
},
}
]
}
)

def __init__(
self,
Expand All @@ -149,7 +169,7 @@ def __init__(
self.channel_options = [
(k, v) for k, v in dict(GRPCClient.DEFAULT_CHANNEL_OPTIONS, **channel_options).items()
]

self.channel_options.append(("grpc.service_config", GRPCClient.RETRY_POLICY_JSON))
if (certs is None or key is None) and (token is None and tokenValue is None):
self.channel = grpc.insecure_channel(grpcAddr, options=self.channel_options)
else:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "cloudvision"
version = "1.23.1"
version = "1.23.2"
dynamic = ["dependencies", "optional-dependencies"]
description = "A Python library for Arista's CloudVision APIs and Provisioning Action integrations."
requires-python = ">=3.7.0"
Expand Down
53 changes: 41 additions & 12 deletions test/connector/grpc_client/test_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

"""Test grpc_client module."""


import pytest

from cloudvision import __version__ as version
Expand All @@ -19,7 +18,9 @@ class TestGRPCClient:
[
("grpc.primary_user_agent", f"cloudvision.Connector/{version}"),
("grpc.keepalive_time_ms", 60000),
('grpc.http2.max_pings_without_data', 0),
("grpc.http2.max_pings_without_data", 0),
("grpc.enable_retries", 1),
("grpc.service_config", GRPCClient.RETRY_POLICY_JSON),
]
],
)
Expand All @@ -35,38 +36,61 @@ def test_channel_options_defaults(self, channel_options):
(
{
"grpc.keepalive_time_ms": 30000,
'grpc.http2.max_pings_without_data': 0,
"grpc.http2.max_pings_without_data": 0,
},
[
("grpc.primary_user_agent", f"cloudvision.Connector/{version}"),
("grpc.keepalive_time_ms", 30000),
('grpc.http2.max_pings_without_data', 0),
("grpc.http2.max_pings_without_data", 0),
("grpc.enable_retries", 1),
("grpc.service_config", GRPCClient.RETRY_POLICY_JSON),
],
),
(
{
"grpc.primary_user_agent": "torans_grpc_client",
"grpc.keepalive_time_ms": 1200000,
'grpc.http2.max_pings_without_data': 0,
"grpc.http2.max_pings_without_data": 0,
},
[
("grpc.primary_user_agent", "torans_grpc_client"),
("grpc.keepalive_time_ms", 1200000),
('grpc.http2.max_pings_without_data', 0),
("grpc.http2.max_pings_without_data", 0),
("grpc.enable_retries", 1),
("grpc.service_config", GRPCClient.RETRY_POLICY_JSON),
],
),
(
{
"grpc.primary_user_agent": "torans_grpc_client",
"grpc.keepalive_time_ms": 1200000,
"grpc.keepalive_timeout_ms": 10000,
"grpc.http2.max_pings_without_data": 1,
},
[
("grpc.primary_user_agent", "torans_grpc_client"),
("grpc.keepalive_time_ms", 1200000),
("grpc.keepalive_timeout_ms", 10000),
("grpc.http2.max_pings_without_data", 1),
("grpc.enable_retries", 1),
("grpc.service_config", GRPCClient.RETRY_POLICY_JSON),
],
),
(
{
"grpc.primary_user_agent": "torans_grpc_client",
"grpc.keepalive_time_ms": 1200000,
"grpc.keepalive_timeout_ms": 10000,
'grpc.http2.max_pings_without_data': 1,
"grpc.http2.max_pings_without_data": 1,
"grpc.enable_retries": 0,
},
[
("grpc.primary_user_agent", "torans_grpc_client"),
("grpc.keepalive_time_ms", 1200000),
("grpc.keepalive_timeout_ms", 10000),
('grpc.http2.max_pings_without_data', 1),
("grpc.http2.max_pings_without_data", 1),
("grpc.enable_retries", 0),
("grpc.service_config", GRPCClient.RETRY_POLICY_JSON),
],
),
],
Expand All @@ -81,12 +105,15 @@ def test_create_custom_schema_index_request(self):
client = GRPCClient("localhost:443")
d_name = "dataset_name"
path_elements = ["path", "element"]
schema = [rtr.IndexField(name="FieldName1", type=rtr.INTEGER),
rtr.IndexField(name="FieldName1", type=rtr.FLOAT)]
schema = [
rtr.IndexField(name="FieldName1", type=rtr.INTEGER),
rtr.IndexField(name="FieldName1", type=rtr.FLOAT),
]
d_type = "device"
delete_after_days = 50
request = client.create_custom_schema_index_request(
d_name, path_elements, schema, delete_after_days, d_type)
d_name, path_elements, schema, delete_after_days, d_type
)
assert len(request.schema) == len(schema)
for idx, fieldSchema in enumerate(request.schema):
assert fieldSchema == schema[idx]
Expand All @@ -95,5 +122,7 @@ def test_create_custom_schema_index_request(self):
assert request.query.dataset.type == d_type
assert len(request.query.paths) == 1
path = request.query.paths[0]
for idx, path_element in enumerate([client.encoder.encode(x) for x in path_elements]):
for idx, path_element in enumerate(
[client.encoder.encode(x) for x in path_elements]
):
assert path_element == path.path_elements[idx]

0 comments on commit 83a14d9

Please sign in to comment.