diff --git a/OpenAgentsNode.html b/OpenAgentsNode.html index 15f95e8..d6e8ca2 100644 --- a/OpenAgentsNode.html +++ b/OpenAgentsNode.html @@ -39,23 +39,44 @@

Module openagents.OpenAgentsNode

from typing import Union from .JobContext import JobContext import json -class HeaderAdderInterceptor(grpc.aio.UnaryUnaryClientInterceptor): +class HeaderAdderInterceptor( + grpc.aio.ClientInterceptor +): """ An interceptor for GRPC that adds headers to outgoing requests. """ def __init__(self, headers): self._headers = headers - async def intercept_unary_unary(self, continuation, client_call_details, request): + + + async def _intercept(self, continuation, client_call_details, request_or_iterator): metadata = client_call_details.metadata if not metadata: - metadata=grpc.aio.Metadata() + metadata = grpc.aio.Metadata() for header in self._headers: metadata.add(header[0], header[1]) new_client_call_details = client_call_details._replace(metadata=metadata) - response = await continuation(new_client_call_details, request) + response = await continuation(new_client_call_details, request_or_iterator) return response +class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor): + async def intercept_unary_stream(self, continuation, client_call_details, request): + return await self._intercept(continuation, client_call_details, request) + +class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor): + async def intercept_stream_unary(self, continuation, client_call_details, request_iterator): + return await self._intercept(continuation, client_call_details, request_iterator) + +class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor): + async def intercept_stream_stream(self, continuation, client_call_details, request_iterator): + return await self._intercept(continuation, client_call_details, request_iterator) + +class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor): + async def intercept_unary_unary(self, continuation, client_call_details, request): + return await self._intercept(continuation, client_call_details, request) + + class OpenAgentsNode: """ An OpenAgents node that can run jobs. @@ -72,7 +93,6 @@

Module openagents.OpenAgentsNode

""" def __init__(self, config: NodeConfig): - self.meta = config.getMeta() self.nextNodeAnnounce = 0 @@ -143,8 +163,10 @@

Module openagents.OpenAgentsNode

("authorization", str(nodeToken)) ] if interceptors is None: interceptors=[] - interceptor=HeaderAdderInterceptor(metadata) - interceptors.append(interceptor) + interceptors.append(HeaderAdderInterceptor0(metadata)) + interceptors.append(HeaderAdderInterceptor1(metadata)) + interceptors.append(HeaderAdderInterceptor2(metadata)) + interceptors.append(HeaderAdderInterceptor3(metadata)) if self.poolSsl: @@ -380,31 +402,267 @@

Classes

Expand source code -
class HeaderAdderInterceptor(grpc.aio.UnaryUnaryClientInterceptor):
+
class HeaderAdderInterceptor(
+    grpc.aio.ClientInterceptor     
+):
     """
     An interceptor for GRPC that adds headers to outgoing requests.
     """
     def __init__(self, headers):
         self._headers = headers
 
-    async def intercept_unary_unary(self, continuation, client_call_details, request):
+  
+
+    async def _intercept(self, continuation, client_call_details, request_or_iterator):
         metadata = client_call_details.metadata
         if not metadata:
-            metadata=grpc.aio.Metadata()
+            metadata = grpc.aio.Metadata()
         for header in self._headers:
             metadata.add(header[0], header[1])
         new_client_call_details = client_call_details._replace(metadata=metadata)
-        response = await continuation(new_client_call_details, request)
+        response = await continuation(new_client_call_details, request_or_iterator)
         return response

Ancestors

+

Subclasses

+ + +
+class HeaderAdderInterceptor0 +(headers) +
+
+

Affords intercepting unary-stream invocations.

+
+ +Expand source code + +
class HeaderAdderInterceptor0(grpc.aio.UnaryStreamClientInterceptor,HeaderAdderInterceptor):
+    async def intercept_unary_stream(self, continuation, client_call_details, request):
+        return await self._intercept(continuation, client_call_details, request)
+
+

Ancestors

+
    +
  • grpc.aio._interceptor.UnaryStreamClientInterceptor
  • +
  • HeaderAdderInterceptor
  • +
  • grpc.aio._interceptor.ClientInterceptor
  • +
+

Methods

+
+
+async def intercept_unary_stream(self, continuation, client_call_details, request) +
+
+

Intercepts a unary-stream invocation asynchronously.

+

The function could return the call object or an asynchronous +iterator, in case of being an asyncrhonous iterator this will +become the source of the reads done by the caller.

+

Args

+
+
continuation
+
A coroutine that proceeds with the invocation by +executing the next interceptor in the chain or invoking the +actual RPC on the underlying Channel. It is the interceptor's +responsibility to call it if it decides to move the RPC forward. +The interceptor can use +call = await continuation(client_call_details, request) +to continue with the RPC. continuation returns the call to the +RPC.
+
client_call_details
+
A ClientCallDetails object describing the +outgoing RPC.
+
request
+
The request value for the RPC.
+
+

Returns

+

The RPC Call or an asynchronous iterator.

+

Raises

+
+
AioRpcError
+
Indicating that the RPC terminated with non-OK status.
+
asyncio.CancelledError
+
Indicating that the RPC was canceled.
+
+
+ +Expand source code + +
async def intercept_unary_stream(self, continuation, client_call_details, request):
+    return await self._intercept(continuation, client_call_details, request)
+
+
+
+
+
+class HeaderAdderInterceptor1 +(headers) +
+
+

Affords intercepting stream-unary invocations.

+
+ +Expand source code + +
class HeaderAdderInterceptor1(grpc.aio.StreamUnaryClientInterceptor,HeaderAdderInterceptor):
+    async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
+        return await self._intercept(continuation, client_call_details, request_iterator)
+
+

Ancestors

+
    +
  • grpc.aio._interceptor.StreamUnaryClientInterceptor
  • +
  • HeaderAdderInterceptor
  • +
  • grpc.aio._interceptor.ClientInterceptor
  • +
+

Methods

+
+
+async def intercept_stream_unary(self, continuation, client_call_details, request_iterator) +
+
+

Intercepts a stream-unary invocation asynchronously.

+

Within the interceptor the usage of the call methods like write or +even awaiting the call should be done carefully, since the caller +could be expecting an untouched call, for example for start writing +messages to it.

+

Args

+
+
continuation
+
A coroutine that proceeds with the invocation by +executing the next interceptor in the chain or invoking the +actual RPC on the underlying Channel. It is the interceptor's +responsibility to call it if it decides to move the RPC forward. +The interceptor can use +call = await continuation(client_call_details, request_iterator) +to continue with the RPC. continuation returns the call to the +RPC.
+
client_call_details
+
A ClientCallDetails object describing the +outgoing RPC.
+
request_iterator
+
The request iterator that will produce requests +for the RPC.
+
+

Returns

+

The RPC Call.

+

Raises

+
+
AioRpcError
+
Indicating that the RPC terminated with non-OK status.
+
asyncio.CancelledError
+
Indicating that the RPC was canceled.
+
+
+ +Expand source code + +
async def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
+    return await self._intercept(continuation, client_call_details, request_iterator)
+
+
+
+
+
+class HeaderAdderInterceptor2 +(headers) +
+
+

Affords intercepting stream-stream invocations.

+
+ +Expand source code + +
class HeaderAdderInterceptor2(grpc.aio.StreamStreamClientInterceptor,HeaderAdderInterceptor):
+    async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
+        return await self._intercept(continuation, client_call_details, request_iterator)
+
+

Ancestors

+
    +
  • grpc.aio._interceptor.StreamStreamClientInterceptor
  • +
  • HeaderAdderInterceptor
  • +
  • grpc.aio._interceptor.ClientInterceptor
  • +
+

Methods

+
+
+async def intercept_stream_stream(self, continuation, client_call_details, request_iterator) +
+
+

Intercepts a stream-stream invocation asynchronously.

+

Within the interceptor the usage of the call methods like write or +even awaiting the call should be done carefully, since the caller +could be expecting an untouched call, for example for start writing +messages to it.

+

The function could return the call object or an asynchronous +iterator, in case of being an asyncrhonous iterator this will +become the source of the reads done by the caller.

+

Args

+
+
continuation
+
A coroutine that proceeds with the invocation by +executing the next interceptor in the chain or invoking the +actual RPC on the underlying Channel. It is the interceptor's +responsibility to call it if it decides to move the RPC forward. +The interceptor can use +call = await continuation(client_call_details, request_iterator) +to continue with the RPC. continuation returns the call to the +RPC.
+
client_call_details
+
A ClientCallDetails object describing the +outgoing RPC.
+
request_iterator
+
The request iterator that will produce requests +for the RPC.
+
+

Returns

+

The RPC Call or an asynchronous iterator.

+

Raises

+
+
AioRpcError
+
Indicating that the RPC terminated with non-OK status.
+
asyncio.CancelledError
+
Indicating that the RPC was canceled.
+
+
+ +Expand source code + +
async def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
+    return await self._intercept(continuation, client_call_details, request_iterator)
+
+
+
+
+
+class HeaderAdderInterceptor3 +(headers) +
+
+

Affords intercepting unary-unary invocations.

+
+ +Expand source code + +
class HeaderAdderInterceptor3(grpc.aio.UnaryUnaryClientInterceptor,HeaderAdderInterceptor):
+    async def intercept_unary_unary(self, continuation, client_call_details, request):
+        return await self._intercept(continuation, client_call_details, request)
+
+

Ancestors

+
  • grpc.aio._interceptor.UnaryUnaryClientInterceptor
  • +
  • HeaderAdderInterceptor
  • grpc.aio._interceptor.ClientInterceptor

Methods

-
+
async def intercept_unary_unary(self, continuation, client_call_details, request)
@@ -440,14 +698,7 @@

Raises

Expand source code
async def intercept_unary_unary(self, continuation, client_call_details, request):
-    metadata = client_call_details.metadata
-    if not metadata:
-        metadata=grpc.aio.Metadata()
-    for header in self._headers:
-        metadata.add(header[0], header[1])
-    new_client_call_details = client_call_details._replace(metadata=metadata)
-    response = await continuation(new_client_call_details, request)
-    return response
+ return await self._intercept(continuation, client_call_details, request)
@@ -488,7 +739,6 @@

Raises

""" def __init__(self, config: NodeConfig): - self.meta = config.getMeta() self.nextNodeAnnounce = 0 @@ -559,8 +809,10 @@

Raises

("authorization", str(nodeToken)) ] if interceptors is None: interceptors=[] - interceptor=HeaderAdderInterceptor(metadata) - interceptors.append(interceptor) + interceptors.append(HeaderAdderInterceptor0(metadata)) + interceptors.append(HeaderAdderInterceptor1(metadata)) + interceptors.append(HeaderAdderInterceptor2(metadata)) + interceptors.append(HeaderAdderInterceptor3(metadata)) if self.poolSsl: @@ -940,8 +1192,29 @@

Index