diff --git a/JobContext.html b/JobContext.html new file mode 100644 index 0000000..01e5f78 --- /dev/null +++ b/JobContext.html @@ -0,0 +1,903 @@ + + + + + + +openagents.JobContext API documentation + + + + + + + + + + + +

Module openagents.JobContext

+ +Expand source code + +
from openagents_grpc_proto import rpc_pb2_grpc
+from openagents_grpc_proto import rpc_pb2
+from .Logger import Logger
+from .Disk import Disk
+from .RunnerConfig import RunnerConfig
+import time
+import os
+import json
+import pickle
+from typing import Union
+class JobContext:
+    """
+    A class that represents the context of a job.
+    """
+    def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2):
+        self.job=job
+        self._node=node
+        self.runner=runner
+        self._cachePath = os.getenv('CACHE_PATH',  "cache")
+        if not os.path.exists(self._cachePath):
+            os.makedirs(self._cachePath)
+        self.logger=Logger(
+            self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"],
+            self._node.getMeta()["version"],
+            self.job.id,
+            lambda x: self._node._log(x, self.job.id),
+        )
+        self._disksByUrl = {}
+        self._disksById = {}
+        self._diskByName = {}
+    def getLogger(self):
+        """
+        Get the logger of the job.
+        """
+        return self.logger
+    def getNode(self):
+        """
+        Get the node running this job
+        """
+        return self._node
+    def getJob(self):
+        """
+        Get the job object.
+        """
+        return self.job
+    async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
+        """
+        Set a value in the cache.
+        Args:
+            key (str): The key of the value to set.
+            value (object): The value to set.
+            version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0.            expireAt (int): The timestamp at which the value expires. Defaults to 0.
+            expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
+            local (bool): Whether to store the value locally or remotely. Defaults to True.
+            CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
+        """
+        try:
+            dataBytes = pickle.dumps(value)
+            if local:
+                fullPath = os.path.join(self._cachePath, key)
+                with open(fullPath, "wb") as f:
+                    f.write(dataBytes)
+                with open(fullPath+".meta.json", "w") as f:
+                    f.write(json.dumps({"version":version, "expireAt":expireAt}))
+            else:
+                client = self._node._getClient()
+                def write_data():
+                    for j in range(0, len(dataBytes), CHUNK_SIZE):
+                        chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
+                        request = rpc_pb2.RpcCacheSetRequest(
+                            key=key, 
+                            data=chunk,
+                            expireAt=expireAt,
+                            version=version
+                        )
+                        yield request                              
+                res=await client.cacheSet(write_data())
+                return res.success
+        except Exception as e:
+            self._node.getLogger().error("Error setting cache "+str(e))
+            return False
+    async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
+        """
+        Get a value from the cache.
+        Args:
+            path (str): The key of the value to get.
+            lastVersion (int): The version of the cache to check. Defaults to 0.
+            local (bool): Whether to get the value locally or remotely. Defaults to True.
+        Returns:
+            any: The value of the cache.
+        """
+        try:
+            if local:
+                fullPath = os.path.join(self._cachePath, key)
+                if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
+                    return None
+                with open(fullPath+".meta.json", "r") as f:
+                    meta = json.loads(f.read())
+                if lastVersion > 0 and meta["version"] != lastVersion:
+                    return None
+                if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
+                    return None
+                with open(fullPath, "rb") as f:
+                    return pickle.load(f)
+            else:
+                client = self._node._getClient()
+                bytesOut = bytearray()
+                stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
+                async for chunk in stream:
+                    if not chunk.exists:
+                        return None
+                    bytesOut.extend(chunk.data)
+                return pickle.loads(bytesOut)
+        except Exception as e:
+            self._node.getLogger().error("Error getting cache "+str(e))
+            return None
+    async def openStorage(self, url:str)->Disk:
+        """
+        Open a storage disk.
+        Args:
+            url (str): The URL of the disk.
+        Returns:
+            Disk: The disk object.
+        """
+        if url in self._disksByUrl:
+            return self._disksByUrl[url]
+        client = self._node._getClient()
+        diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+        disk =  Disk(id=diskId, url=url, node=self._node)
+        self._disksByUrl[url] = disk
+        self._disksById[diskId] = disk
+        return disk
+    async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
+        """
+        Create a storage disk.
+        Args:
+            name (str): Optional: The name of the disk.
+            encryptionKey (str): Optional: The encryption key of the disk.
+            includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
+        Returns:
+            Disk: The disk object.
+        """
+        if name in self._diskByName:
+            return self._diskByName[name]
+        client = self._node._getClient()
+        url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
+            name=name,
+            encryptionKey=encryptionKey,
+            includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
+        ))).url
+        diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+        disk = Disk(id=diskId, url=url, node=self._node)
+        self._disksByUrl[url] = disk
+        self._disksById[diskId] = disk
+        if name:self._diskByName[name] = disk
+        return disk
+    async def close(self):
+        """
+        Close the job context.
+        Free up resources, submit pending logs.
+        """
+        for disk in self._disksById.values():
+            await disk.close()
+        for disk in self._disksByUrl.values():
+            await disk.close()
+        for disk in self._diskByName.values():
+            await disk.close()
+        self._disksById = {}
+        self._disksByUrl = {}
+        self._diskByName = {}
+        self.logger.close()
+    def getJobParamValues(self,key,default:list[str]=None)->list[str]:
+        job=self.getJob()
+        for p in job.param:
+            if p.key == key:
+                return p.value
+        return default
+    def getJobParamValue(self,key,default:str=None)->str:
+        job=self.getJob()
+        for p in job.param:
+            if p.key == key:
+                return p.value[0]
+    def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]:
+        job=self.getJob()
+        out=[]
+        for i in job.input:
+            if marker is None or i.marker == marker:
+                out.append(i)
+        return out
+    def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2:
+        job=self.getJob()
+        for i in job.input:
+            if marker is None or i.marker == marker:
+                return i
+        return None
+    def getOutputFormat(self):
+        job=self.getJob()
+        return job.outputFormat


+class JobContext +(node: OpenAgentsNode, runner: JobRunner, job: ) +

A class that represents the context of a job.

+ +Expand source code + +
class JobContext:
+    """
+    A class that represents the context of a job.
+    """
+    def __init__(self, node: 'OpenAgentsNode',runner:'JobRunner', job: rpc_pb2.Job__pb2):
+        self.job=job
+        self._node=node
+        self.runner=runner
+        self._cachePath = os.getenv('CACHE_PATH',  "cache")
+        if not os.path.exists(self._cachePath):
+            os.makedirs(self._cachePath)
+        self.logger=Logger(
+            self._node.getMeta()["name"]+"."+self.runner.getMeta()["name"],
+            self._node.getMeta()["version"],
+            self.job.id,
+            lambda x: self._node._log(x, self.job.id),
+        )
+        self._disksByUrl = {}
+        self._disksById = {}
+        self._diskByName = {}
+    def getLogger(self):
+        """
+        Get the logger of the job.
+        """
+        return self.logger
+    def getNode(self):
+        """
+        Get the node running this job
+        """
+        return self._node
+    def getJob(self):
+        """
+        Get the job object.
+        """
+        return self.job
+    async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
+        """
+        Set a value in the cache.
+        Args:
+            key (str): The key of the value to set.
+            value (object): The value to set.
+            version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0.            expireAt (int): The timestamp at which the value expires. Defaults to 0.
+            expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
+            local (bool): Whether to store the value locally or remotely. Defaults to True.
+            CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
+        """
+        try:
+            dataBytes = pickle.dumps(value)
+            if local:
+                fullPath = os.path.join(self._cachePath, key)
+                with open(fullPath, "wb") as f:
+                    f.write(dataBytes)
+                with open(fullPath+".meta.json", "w") as f:
+                    f.write(json.dumps({"version":version, "expireAt":expireAt}))
+            else:
+                client = self._node._getClient()
+                def write_data():
+                    for j in range(0, len(dataBytes), CHUNK_SIZE):
+                        chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
+                        request = rpc_pb2.RpcCacheSetRequest(
+                            key=key, 
+                            data=chunk,
+                            expireAt=expireAt,
+                            version=version
+                        )
+                        yield request                              
+                res=await client.cacheSet(write_data())
+                return res.success
+        except Exception as e:
+            self._node.getLogger().error("Error setting cache "+str(e))
+            return False
+    async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
+        """
+        Get a value from the cache.
+        Args:
+            path (str): The key of the value to get.
+            lastVersion (int): The version of the cache to check. Defaults to 0.
+            local (bool): Whether to get the value locally or remotely. Defaults to True.
+        Returns:
+            any: The value of the cache.
+        """
+        try:
+            if local:
+                fullPath = os.path.join(self._cachePath, key)
+                if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
+                    return None
+                with open(fullPath+".meta.json", "r") as f:
+                    meta = json.loads(f.read())
+                if lastVersion > 0 and meta["version"] != lastVersion:
+                    return None
+                if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
+                    return None
+                with open(fullPath, "rb") as f:
+                    return pickle.load(f)
+            else:
+                client = self._node._getClient()
+                bytesOut = bytearray()
+                stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
+                async for chunk in stream:
+                    if not chunk.exists:
+                        return None
+                    bytesOut.extend(chunk.data)
+                return pickle.loads(bytesOut)
+        except Exception as e:
+            self._node.getLogger().error("Error getting cache "+str(e))
+            return None
+    async def openStorage(self, url:str)->Disk:
+        """
+        Open a storage disk.
+        Args:
+            url (str): The URL of the disk.
+        Returns:
+            Disk: The disk object.
+        """
+        if url in self._disksByUrl:
+            return self._disksByUrl[url]
+        client = self._node._getClient()
+        diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+        disk =  Disk(id=diskId, url=url, node=self._node)
+        self._disksByUrl[url] = disk
+        self._disksById[diskId] = disk
+        return disk
+    async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
+        """
+        Create a storage disk.
+        Args:
+            name (str): Optional: The name of the disk.
+            encryptionKey (str): Optional: The encryption key of the disk.
+            includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
+        Returns:
+            Disk: The disk object.
+        """
+        if name in self._diskByName:
+            return self._diskByName[name]
+        client = self._node._getClient()
+        url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
+            name=name,
+            encryptionKey=encryptionKey,
+            includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
+        ))).url
+        diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+        disk = Disk(id=diskId, url=url, node=self._node)
+        self._disksByUrl[url] = disk
+        self._disksById[diskId] = disk
+        if name:self._diskByName[name] = disk
+        return disk
+    async def close(self):
+        """
+        Close the job context.
+        Free up resources, submit pending logs.
+        """
+        for disk in self._disksById.values():
+            await disk.close()
+        for disk in self._disksByUrl.values():
+            await disk.close()
+        for disk in self._diskByName.values():
+            await disk.close()
+        self._disksById = {}
+        self._disksByUrl = {}
+        self._diskByName = {}
+        self.logger.close()
+    def getJobParamValues(self,key,default:list[str]=None)->list[str]:
+        job=self.getJob()
+        for p in job.param:
+            if p.key == key:
+                return p.value
+        return default
+    def getJobParamValue(self,key,default:str=None)->str:
+        job=self.getJob()
+        for p in job.param:
+            if p.key == key:
+                return p.value[0]
+    def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]:
+        job=self.getJob()
+        out=[]
+        for i in job.input:
+            if marker is None or i.marker == marker:
+                out.append(i)
+        return out
+    def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2:
+        job=self.getJob()
+        for i in job.input:
+            if marker is None or i.marker == marker:
+                return i
+        return None
+    def getOutputFormat(self):
+        job=self.getJob()
+        return job.outputFormat


+async def cacheGet(self, key: str, lastVersion=0, local=True) ‑>  +

Get a value from the cache.



path : str
The key of the value to get.
lastVersion : int
The version of the cache to check. Defaults to 0.
local : bool
Whether to get the value locally or remotely. Defaults to True.


The value of the cache.
+ +Expand source code + +
async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
+    """
+    Get a value from the cache.
+    Args:
+        path (str): The key of the value to get.
+        lastVersion (int): The version of the cache to check. Defaults to 0.
+        local (bool): Whether to get the value locally or remotely. Defaults to True.
+    Returns:
+        any: The value of the cache.
+    """
+    try:
+        if local:
+            fullPath = os.path.join(self._cachePath, key)
+            if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
+                return None
+            with open(fullPath+".meta.json", "r") as f:
+                meta = json.loads(f.read())
+            if lastVersion > 0 and meta["version"] != lastVersion:
+                return None
+            if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
+                return None
+            with open(fullPath, "rb") as f:
+                return pickle.load(f)
+        else:
+            client = self._node._getClient()
+            bytesOut = bytearray()
+            stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
+            async for chunk in stream:
+                if not chunk.exists:
+                    return None
+                bytesOut.extend(chunk.data)
+            return pickle.loads(bytesOut)
+    except Exception as e:
+        self._node.getLogger().error("Error getting cache "+str(e))
+        return None
+async def cacheSet(self, key: str, value, version: int = 0, expireAt: int = 0, local=True, CHUNK_SIZE=15728640) +

Set a value in the cache.



key : str
The key of the value to set.
value : object
The value to set.
version : int
The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. +expireAt (int): The timestamp at which the value expires. Defaults to 0.
expireAt : int
The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
local : bool
Whether to store the value locally or remotely. Defaults to True.
The size of each chunk to write in bytes, if needed. Defaults to 1024102415.
+ +Expand source code + +
async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
+    """
+    Set a value in the cache.
+    Args:
+        key (str): The key of the value to set.
+        value (object): The value to set.
+        version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0.            expireAt (int): The timestamp at which the value expires. Defaults to 0.
+        expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
+        local (bool): Whether to store the value locally or remotely. Defaults to True.
+        CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
+    """
+    try:
+        dataBytes = pickle.dumps(value)
+        if local:
+            fullPath = os.path.join(self._cachePath, key)
+            with open(fullPath, "wb") as f:
+                f.write(dataBytes)
+            with open(fullPath+".meta.json", "w") as f:
+                f.write(json.dumps({"version":version, "expireAt":expireAt}))
+        else:
+            client = self._node._getClient()
+            def write_data():
+                for j in range(0, len(dataBytes), CHUNK_SIZE):
+                    chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
+                    request = rpc_pb2.RpcCacheSetRequest(
+                        key=key, 
+                        data=chunk,
+                        expireAt=expireAt,
+                        version=version
+                    )
+                    yield request                              
+            res=await client.cacheSet(write_data())
+            return res.success
+    except Exception as e:
+        self._node.getLogger().error("Error setting cache "+str(e))
+        return False
+async def close(self) +

Close the job context. +Free up resources, submit pending logs.

+ +Expand source code + +
async def close(self):
+    """
+    Close the job context.
+    Free up resources, submit pending logs.
+    """
+    for disk in self._disksById.values():
+        await disk.close()
+    for disk in self._disksByUrl.values():
+        await disk.close()
+    for disk in self._diskByName.values():
+        await disk.close()
+    self._disksById = {}
+    self._disksByUrl = {}
+    self._diskByName = {}
+    self.logger.close()
+async def createStorage(self, name: str = None, encryptionKey: str = None, includeEncryptionKeyInUrl: str = None) ‑> Disk +

Create a storage disk.



name : str
Optional: The name of the disk.
encryptionKey : str
Optional: The encryption key of the disk.
includeEncryptionKeyInUrl : str
Optional: Whether to include the encryption key in the URL.


The disk object.
+ +Expand source code + +
async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
+    """
+    Create a storage disk.
+    Args:
+        name (str): Optional: The name of the disk.
+        encryptionKey (str): Optional: The encryption key of the disk.
+        includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
+    Returns:
+        Disk: The disk object.
+    """
+    if name in self._diskByName:
+        return self._diskByName[name]
+    client = self._node._getClient()
+    url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
+        name=name,
+        encryptionKey=encryptionKey,
+        includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
+    ))).url
+    diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+    disk = Disk(id=diskId, url=url, node=self._node)
+    self._disksByUrl[url] = disk
+    self._disksById[diskId] = disk
+    if name:self._diskByName[name] = disk
+    return disk
+def getJob(self) +

Get the job object.

+ +Expand source code + +
def getJob(self):
+    """
+    Get the job object.
+    """
+    return self.job
+def getJobInput(self, marker: str | None = None) ‑>  +
+ +Expand source code + +
def getJobInput(self,marker:str|None=None)->rpc_pb2.JobInput__pb2:
+    job=self.getJob()
+    for i in job.input:
+        if marker is None or i.marker == marker:
+            return i
+    return None
+def getJobInputs(self, marker: str | None = None) ‑> list[] +
+ +Expand source code + +
def getJobInputs(self,marker:str|None=None)->list[rpc_pb2.JobInput__pb2]:
+    job=self.getJob()
+    out=[]
+    for i in job.input:
+        if marker is None or i.marker == marker:
+            out.append(i)
+    return out
+def getJobParamValue(self, key, default: str = None) ‑> str +
+ +Expand source code + +
def getJobParamValue(self,key,default:str=None)->str:
+    job=self.getJob()
+    for p in job.param:
+        if p.key == key:
+            return p.value[0]
+def getJobParamValues(self, key, default: list[str] = None) ‑> list[str] +
+ +Expand source code + +
def getJobParamValues(self,key,default:list[str]=None)->list[str]:
+    job=self.getJob()
+    for p in job.param:
+        if p.key == key:
+            return p.value
+    return default
+def getLogger(self) +

Get the logger of the job.

+ +Expand source code + +
def getLogger(self):
+    """
+    Get the logger of the job.
+    """
+    return self.logger
+def getNode(self) +

Get the node running this job

+ +Expand source code + +
def getNode(self):
+    """
+    Get the node running this job
+    """
+    return self._node
+def getOutputFormat(self) +
+ +Expand source code + +
def getOutputFormat(self):
+    job=self.getJob()
+    return job.outputFormat
+async def openStorage(self, url: str) ‑> Disk +

Open a storage disk.



url : str
The URL of the disk.


The disk object.
+ +Expand source code + +
async def openStorage(self, url:str)->Disk:
+    """
+    Open a storage disk.
+    Args:
+        url (str): The URL of the disk.
+    Returns:
+        Disk: The disk object.
+    """
+    if url in self._disksByUrl:
+        return self._disksByUrl[url]
+    client = self._node._getClient()
+    diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
+    disk =  Disk(id=diskId, url=url, node=self._node)
+    self._disksByUrl[url] = disk
+    self._disksById[diskId] = disk
+    return disk
+ +
+ + + \ No newline at end of file diff --git a/JobRunner.html b/JobRunner.html index 954cefc..331040e 100644 --- a/JobRunner.html +++ b/JobRunner.html @@ -36,6 +36,9 @@

Module openagents.JobRunner

import json import pickle from typing import Union +from .JobContext import JobContext +from .Logger import Logger +import asyncio class JobRunner: """ An abstract class that represents a job runner. @@ -47,222 +50,96 @@

Module openagents.JobRunner

""" - def __init__(self, metaOrConfig: Union[dict, RunnerConfig], filters:dict=None, template:dict=None, sockets:dict=None): - meta=None - if isinstance(metaOrConfig, RunnerConfig): - meta = metaOrConfig.getMeta() - filters = metaOrConfig.getFilters() - template = metaOrConfig.getTemplate() - sockets = metaOrConfig.getSockets() - else: - meta = metaOrConfig - self._filters = None - self._node = None - self._job = None - self._disksByUrl = {} - self._disksById = {} - self._diskByName = {} - self._template = None - self._meta = None - self._sockets = None - self._nextAnnouncementTimestamp = 0 - self._cachePath = None - - self.logger = Logger("JobRunner", "0", self, False) - self._filters = filters - self._meta = json.dumps(meta) - self._template = template - self._sockets = json.dumps(sockets) - - self._cachePath = os.getenv('CACHE_PATH', "cache") - if not os.path.exists(self._cachePath): - os.makedirs(self._cachePath) - + def __init__(self, config: RunnerConfig): + self._node = None + self._template = config.getTemplate() + self._meta = config.getMeta() + self._sockets = config.getSockets() + self._filter = config.getFilter() + self.runInParallel=False + self.initialized=False - def getLogger(self) -> Logger: - """ - Get the active logger for the runner. - Returns: - Logger: The logger for the runner. - """ - return self.logger - - async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): - """ - Set a value in the cache. - Args: - key (str): The key of the value to set. - value (object): The value to set. - version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. - expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. - local (bool): Whether to store the value locally or remotely. Defaults to True. - CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. - """ - try: - dataBytes = pickle.dumps(value) - if local: - fullPath = os.path.join(self._cachePath, key) - with open(fullPath, "wb") as f: - f.write(dataBytes) - with open(fullPath+".meta.json", "w") as f: - f.write(json.dumps({"version":version, "expireAt":expireAt})) - else: - client = self._node._getClient() - def write_data(): - for j in range(0, len(dataBytes), CHUNK_SIZE): - chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) - request = rpc_pb2.RpcCacheSetRequest( - key=key, - data=chunk, - expireAt=expireAt, - version=version - ) - yield request - res=await client.cacheSet(write_data()) - return res.success - except Exception as e: - self.getLogger().error("Error setting cache "+str(e)) - return False + + def getMeta(self): + return self._meta + + def getFilter(self): + return self._filter - async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: - """ - Get a value from the cache. - Args: - path (str): The key of the value to get. - lastVersion (int): The version of the cache to check. Defaults to 0. - local (bool): Whether to get the value locally or remotely. Defaults to True. - Returns: - any: The value of the cache. - """ - try: - if local: - fullPath = os.path.join(self._cachePath, key) - if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): - return None - with open(fullPath+".meta.json", "r") as f: - meta = json.loads(f.read()) - if lastVersion > 0 and meta["version"] != lastVersion: - return None - if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: - return None - with open(fullPath, "rb") as f: - return pickle.load(f) - else: - client = self._node._getClient() - bytesOut = bytearray() - stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) - async for chunk in stream: - if not chunk.exists: - return None - bytesOut.extend(chunk.data) - return pickle.loads(bytesOut) - except Exception as e: - self.getLogger().error("Error getting cache "+str(e)) - return None - - def _setNode(self, node): - self._node = node - - def _setJob(self, job): - self._job = job + def getTemplate(self): + return self._template + + def getSockets(self): + return self._sockets - def _log(self, message:str): + def setRunInParallel(self, runInParallel:bool): """ - Log a message to the network. - Shouldn't be used directly. Use getLogger().info() instead. + Set whether the runner should run in parallel. Args: - message (str): The message to log. + runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False. """ - if self._job: message+=" for job "+self._job.id - if self._node: - self._node._log(message, self._job.id if self._job else None) - + self.runInParallel = runInParallel - async def openStorage(self, url:str)->Disk: + def isRunInParallel(self) -> bool: """ - Open a storage disk. - Args: - url (str): The URL of the disk. + Check if the runner should run in parallel. Returns: - Disk: The disk object. + bool: True if the runner should run in parallel, False otherwise. """ + return self.runInParallel - if url in self._disksByUrl: - return self._disksByUrl[url] - client = self._node._getClient() - diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId - disk = Disk(id=diskId, url=url, node=self._node) - self._disksByUrl[url] = disk - self._disksById[diskId] = disk - return disk - - async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: - """ - Create a storage disk. - Args: - name (str): Optional: The name of the disk. - encryptionKey (str): Optional: The encryption key of the disk. - includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. - Returns: - Disk: The disk object. - """ - if name in self._diskByName: - return self._diskByName[name] - - client = self._node._getClient() - url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( - name=name, - encryptionKey=encryptionKey, - includeEncryptionKeyInUrl=includeEncryptionKeyInUrl - ))).url - diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId - disk = Disk(id=diskId, url=url, node=self._node) - self._disksByUrl[url] = disk - self._disksById[diskId] = disk - self._diskByName[name] = disk - return disk - async def postRun(self, job:rpc_pb2.Job__pb2.Job) -> None: + + async def postRun(self, ctx:JobContext) -> None: """ Called after the runner has finished running. + Args: + ctx (JobContext): The context of the job. """ - for disk in self._disksById.values(): - await disk.close() - for disk in self._disksByUrl.values(): - await disk.close() - for disk in self._diskByName.values(): - await disk.close() - self._disksById = {} - self._disksByUrl = {} - self._diskByName = {} + pass - async def canRun(self,job:rpc_pb2.Job__pb2.Job) -> bool: + async def canRun(self,ctx:JobContext) -> bool: """ Check if the runner can run a job. Args: - job (Job): The job to check. + ctx (JobContext): The context of the job. Returns: bool: True if the runner can run the job, False otherwise. """ return True - async def preRun(self, job:rpc_pb2.Job__pb2.Job)-> None: + async def preRun(self, ctx:JobContext)-> None: """ Called before the runner starts running. + Args: + ctx (JobContext): The context of the job. """ pass - async def loop(self)-> None: + async def run(self, ctx:JobContext) -> None: + """ + Run a job. + Args: + ctx (JobContext): The context of the job. + """ + pass + + async def loop(self, node: 'OpenAgentsNode')-> None: """ The main loop of the runner. + Args: + node (OpenAgentsNode): The node """ pass - async def run(self, job:rpc_pb2.Job__pb2.Job) -> None: + + async def init(self,node: 'OpenAgentsNode')-> None: """ - Run a job. + Initialize the runner. + Args: + node (OpenAgentsNode): The node """ pass @@ -278,7 +155,7 @@


class JobRunner -(metaOrConfig: Union[dict, RunnerConfig], filters: dict = None, template: dict = None, sockets: dict = None) +(config: RunnerConfig)

An abstract class that represents a job runner. @@ -300,561 +177,342 @@


""" - def __init__(self, metaOrConfig: Union[dict, RunnerConfig], filters:dict=None, template:dict=None, sockets:dict=None): - meta=None - if isinstance(metaOrConfig, RunnerConfig): - meta = metaOrConfig.getMeta() - filters = metaOrConfig.getFilters() - template = metaOrConfig.getTemplate() - sockets = metaOrConfig.getSockets() - else: - meta = metaOrConfig - self._filters = None - self._node = None - self._job = None - self._disksByUrl = {} - self._disksById = {} - self._diskByName = {} - self._template = None - self._meta = None - self._sockets = None - self._nextAnnouncementTimestamp = 0 - self._cachePath = None - - self.logger = Logger("JobRunner", "0", self, False) - self._filters = filters - self._meta = json.dumps(meta) - self._template = template - self._sockets = json.dumps(sockets) - - self._cachePath = os.getenv('CACHE_PATH', "cache") - if not os.path.exists(self._cachePath): - os.makedirs(self._cachePath) - + def __init__(self, config: RunnerConfig): + self._node = None + self._template = config.getTemplate() + self._meta = config.getMeta() + self._sockets = config.getSockets() + self._filter = config.getFilter() + self.runInParallel=False + self.initialized=False - def getLogger(self) -> Logger: - """ - Get the active logger for the runner. - Returns: - Logger: The logger for the runner. - """ - return self.logger - - async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15): - """ - Set a value in the cache. - Args: - key (str): The key of the value to set. - value (object): The value to set. - version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. expireAt (int): The timestamp at which the value expires. Defaults to 0. - expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0. - local (bool): Whether to store the value locally or remotely. Defaults to True. - CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15. - """ - try: - dataBytes = pickle.dumps(value) - if local: - fullPath = os.path.join(self._cachePath, key) - with open(fullPath, "wb") as f: - f.write(dataBytes) - with open(fullPath+".meta.json", "w") as f: - f.write(json.dumps({"version":version, "expireAt":expireAt})) - else: - client = self._node._getClient() - def write_data(): - for j in range(0, len(dataBytes), CHUNK_SIZE): - chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))]) - request = rpc_pb2.RpcCacheSetRequest( - key=key, - data=chunk, - expireAt=expireAt, - version=version - ) - yield request - res=await client.cacheSet(write_data()) - return res.success - except Exception as e: - self.getLogger().error("Error setting cache "+str(e)) - return False + + def getMeta(self): + return self._meta + + def getFilter(self): + return self._filter - async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any: - """ - Get a value from the cache. - Args: - path (str): The key of the value to get. - lastVersion (int): The version of the cache to check. Defaults to 0. - local (bool): Whether to get the value locally or remotely. Defaults to True. - Returns: - any: The value of the cache. - """ - try: - if local: - fullPath = os.path.join(self._cachePath, key) - if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"): - return None - with open(fullPath+".meta.json", "r") as f: - meta = json.loads(f.read()) - if lastVersion > 0 and meta["version"] != lastVersion: - return None - if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]: - return None - with open(fullPath, "rb") as f: - return pickle.load(f) - else: - client = self._node._getClient() - bytesOut = bytearray() - stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion)) - async for chunk in stream: - if not chunk.exists: - return None - bytesOut.extend(chunk.data) - return pickle.loads(bytesOut) - except Exception as e: - self.getLogger().error("Error getting cache "+str(e)) - return None - - def _setNode(self, node): - self._node = node - - def _setJob(self, job): - self._job = job + def getTemplate(self): + return self._template + + def getSockets(self): + return self._sockets - def _log(self, message:str): + def setRunInParallel(self, runInParallel:bool): """ - Log a message to the network. - Shouldn't be used directly. Use getLogger().info() instead. + Set whether the runner should run in parallel. Args: - message (str): The message to log. + runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False. """ - if self._job: message+=" for job "+self._job.id - if self._node: - self._node._log(message, self._job.id if self._job else None) - + self.runInParallel = runInParallel - async def openStorage(self, url:str)->Disk: + def isRunInParallel(self) -> bool: """ - Open a storage disk. - Args: - url (str): The URL of the disk. + Check if the runner should run in parallel. Returns: - Disk: The disk object. + bool: True if the runner should run in parallel, False otherwise. """ + return self.runInParallel - if url in self._disksByUrl: - return self._disksByUrl[url] - client = self._node._getClient() - diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId - disk = Disk(id=diskId, url=url, node=self._node) - self._disksByUrl[url] = disk - self._disksById[diskId] = disk - return disk - - async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk: - """ - Create a storage disk. - Args: - name (str): Optional: The name of the disk. - encryptionKey (str): Optional: The encryption key of the disk. - includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL. - Returns: - Disk: The disk object. - """ - if name in self._diskByName: - return self._diskByName[name] - - client = self._node._getClient() - url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest( - name=name, - encryptionKey=encryptionKey, - includeEncryptionKeyInUrl=includeEncryptionKeyInUrl - ))).url - diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId - disk = Disk(id=diskId, url=url, node=self._node) - self._disksByUrl[url] = disk - self._disksById[diskId] = disk - self._diskByName[name] = disk - return disk - async def postRun(self, job:rpc_pb2.Job__pb2.Job) -> None: + + async def postRun(self, ctx:JobContext) -> None: """ Called after the runner has finished running. + Args: + ctx (JobContext): The context of the job. """ - for disk in self._disksById.values(): - await disk.close() - for disk in self._disksByUrl.values(): - await disk.close() - for disk in self._diskByName.values(): - await disk.close() - self._disksById = {} - self._disksByUrl = {} - self._diskByName = {} + pass - async def canRun(self,job:rpc_pb2.Job__pb2.Job) -> bool: + async def canRun(self,ctx:JobContext) -> bool: """ Check if the runner can run a job. Args: - job (Job): The job to check. + ctx (JobContext): The context of the job. Returns: bool: True if the runner can run the job, False otherwise. """ return True - async def preRun(self, job:rpc_pb2.Job__pb2.Job)-> None: + async def preRun(self, ctx:JobContext)-> None: """ Called before the runner starts running. + Args: + ctx (JobContext): The context of the job. + """ + pass + + async def run(self, ctx:JobContext) -> None: + """ + Run a job. + Args: + ctx (JobContext): The context of the job. """ pass - async def loop(self)-> None: + async def loop(self, node: 'OpenAgentsNode')-> None: """ The main loop of the runner. + Args: + node (OpenAgentsNode): The node """ pass - async def run(self, job:rpc_pb2.Job__pb2.Job) -> None: + + async def init(self,node: 'OpenAgentsNode')-> None: """ - Run a job. + Initialize the runner. + Args: + node (OpenAgentsNode): The node """ pass


-async def cacheGet(self, key: str, lastVersion=0, local=True) ‑>  +
+async def canRun(self, ctx: JobContext) ‑> bool

Get a value from the cache.


Check if the runner can run a job.


path : str
The key of the value to get.
lastVersion : int
The version of the cache to check. Defaults to 0.
local : bool
Whether to get the value locally or remotely. Defaults to True.
ctx : JobContext
The context of the job.


The value of the cache.
True if the runner can run the job, False otherwise.
Expand source code -
async def cacheGet(self, key:str, lastVersion = 0, local=True) -> any:
async def canRun(self,ctx:JobContext) -> bool:
-    Get a value from the cache.
+    Check if the runner can run a job.
-        path (str): The key of the value to get.
-        lastVersion (int): The version of the cache to check. Defaults to 0.
-        local (bool): Whether to get the value locally or remotely. Defaults to True.
+        ctx (JobContext): The context of the job.
-        any: The value of the cache.
+        bool: True if the runner can run the job, False otherwise.
-    try:
-        if local:
-            fullPath = os.path.join(self._cachePath, key)
-            if not os.path.exists(fullPath) or not os.path.exists(fullPath+".meta.json"):
-                return None
-            with open(fullPath+".meta.json", "r") as f:
-                meta = json.loads(f.read())
-            if lastVersion > 0 and meta["version"] != lastVersion:
-                return None
-            if meta["expireAt"] > 0 and time.time()*1000 > meta["expireAt"]:
-                return None
-            with open(fullPath, "rb") as f:
-                return pickle.load(f)
-        else:
-            client = self._node._getClient()
-            bytesOut = bytearray()
-            stream = client.cacheGet(rpc_pb2.RpcCacheGetRequest(key=key, lastVersion = lastVersion))
-            async for chunk in stream:
-                if not chunk.exists:
-                    return None
-                bytesOut.extend(chunk.data)
-            return pickle.loads(bytesOut)
-    except Exception as e:
-        self.getLogger().error("Error getting cache "+str(e))
-        return None
+ return True
-async def cacheSet(self, key: str, value, version: int = 0, expireAt: int = 0, local=True, CHUNK_SIZE=15728640) +
+def getFilter(self)

Set a value in the cache.



key : str
The key of the value to set.
value : object
The value to set.
version : int
The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0. -expireAt (int): The timestamp at which the value expires. Defaults to 0.
expireAt : int
The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
local : bool
Whether to store the value locally or remotely. Defaults to True.
The size of each chunk to write in bytes, if needed. Defaults to 1024102415.
Expand source code -
async def cacheSet(self, key:str, value, version:int=0, expireAt:int=0, local=True, CHUNK_SIZE=1024*1024*15):
-    """
-    Set a value in the cache.
-    Args:
-        key (str): The key of the value to set.
-        value (object): The value to set.
-        version (int): The version of the cache (if the call to cacheGet requires a different version, the cache will be considered expired). Defaults to 0.            expireAt (int): The timestamp at which the value expires. Defaults to 0.
-        expireAt (int): The timestamp at which the value expires in milliseconds. 0 = never. Defaults to 0.
-        local (bool): Whether to store the value locally or remotely. Defaults to True.
-        CHUNK_SIZE (int): The size of each chunk to write in bytes, if needed. Defaults to 1024*1024*15.
-    """
-    try:
-        dataBytes = pickle.dumps(value)
-        if local:
-            fullPath = os.path.join(self._cachePath, key)
-            with open(fullPath, "wb") as f:
-                f.write(dataBytes)
-            with open(fullPath+".meta.json", "w") as f:
-                f.write(json.dumps({"version":version, "expireAt":expireAt}))
-        else:
-            client = self._node._getClient()
-            def write_data():
-                for j in range(0, len(dataBytes), CHUNK_SIZE):
-                    chunk = bytes(dataBytes[j:min(j+CHUNK_SIZE, len(dataBytes))])                   
-                    request = rpc_pb2.RpcCacheSetRequest(
-                        key=key, 
-                        data=chunk,
-                        expireAt=expireAt,
-                        version=version
-                    )
-                    yield request                              
-            res=await client.cacheSet(write_data())
-            return res.success
-    except Exception as e:
-        self.getLogger().error("Error setting cache "+str(e))
-        return False
def getFilter(self):
+    return self._filter
-async def canRun(self, job: Job_pb2.Job) ‑> bool +
+def getMeta(self)

Check if the runner can run a job.



job : Job
The job to check.


True if the runner can run the job, False otherwise.
Expand source code -
async def canRun(self,job:rpc_pb2.Job__pb2.Job) -> bool:
-    """
-    Check if the runner can run a job.
-    Args:
-        job (Job): The job to check.
-    Returns:
-        bool: True if the runner can run the job, False otherwise.
-    """
-    return True
def getMeta(self):
+    return self._meta
+def getSockets(self) +
+ +Expand source code + +
def getSockets(self):
+    return self._sockets
-async def createStorage(self, name: str = None, encryptionKey: str = None, includeEncryptionKeyInUrl: str = None) ‑> Disk +
+def getTemplate(self)

Create a storage disk.

+ +Expand source code + +
def getTemplate(self):
+    return self._template
+async def init(self, node: OpenAgentsNode) +

Initialize the runner.


name : str
Optional: The name of the disk.
encryptionKey : str
Optional: The encryption key of the disk.
includeEncryptionKeyInUrl : str
Optional: Whether to include the encryption key in the URL.


The disk object.
node : OpenAgentsNode
The node
Expand source code -
async def createStorage(self,name:str=None,encryptionKey:str=None,includeEncryptionKeyInUrl:str=None) -> Disk:
async def init(self,node: 'OpenAgentsNode')-> None:
-    Create a storage disk.
+    Initialize the runner.
-        name (str): Optional: The name of the disk.
-        encryptionKey (str): Optional: The encryption key of the disk.
-        includeEncryptionKeyInUrl (str): Optional: Whether to include the encryption key in the URL.
-    Returns:
-        Disk: The disk object.
+        node (OpenAgentsNode): The node
-    if name in self._diskByName:
-        return self._diskByName[name]
-    client = self._node._getClient()
-    url = (await client.createDisk(rpc_pb2.RpcCreateDiskRequest(
-        name=name,
-        encryptionKey=encryptionKey,
-        includeEncryptionKeyInUrl=includeEncryptionKeyInUrl
-    ))).url
-    diskId =( await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
-    disk = Disk(id=diskId, url=url, node=self._node)
-    self._disksByUrl[url] = disk
-    self._disksById[diskId] = disk
-    self._diskByName[name] = disk
-    return disk
+ pass
-def getLogger(self) ‑> Logger +
+def isRunInParallel(self) ‑> bool

Get the active logger for the runner.


Check if the runner should run in parallel.


The logger for the runner.
True if the runner should run in parallel, False otherwise.
Expand source code -
def getLogger(self) -> Logger:
def isRunInParallel(self) -> bool:
-    Get the active logger for the runner.
+    Check if the runner should run in parallel.
-        Logger: The logger for the runner.
+        bool: True if the runner should run in parallel, False otherwise.
-    return self.logger
+ return self.runInParallel
-async def loop(self) ‑> None +async def loop(self, node: OpenAgentsNode)

The main loop of the runner.


The main loop of the runner.



node : OpenAgentsNode
The node
Expand source code -
async def loop(self)-> None:
async def loop(self, node: 'OpenAgentsNode')-> None:
     The main loop of the runner.
+    Args:
+        node (OpenAgentsNode): The node
-async def openStorage(self, url: str) ‑> Disk +
+async def postRun(self, ctx: JobContext) ‑> None

Open a storage disk.


Called after the runner has finished running.


url : str
The URL of the disk.


The disk object.
ctx : JobContext
The context of the job.
Expand source code -
async def openStorage(self, url:str)->Disk:
async def postRun(self, ctx:JobContext) -> None:
-    Open a storage disk.
+    Called after the runner has finished running.
-        url (str): The URL of the disk.
-    Returns:
-        Disk: The disk object.
+        ctx (JobContext): The context of the job.
-    if url in self._disksByUrl:
-        return self._disksByUrl[url]
-    client = self._node._getClient()
-    diskId =(await client.openDisk(rpc_pb2.RpcOpenDiskRequest(url=url))).diskId
-    disk =  Disk(id=diskId, url=url, node=self._node)
-    self._disksByUrl[url] = disk
-    self._disksById[diskId] = disk
-    return disk
+ pass
-async def postRun(self, job: Job_pb2.Job) ‑> None +
+async def preRun(self, ctx: JobContext) ‑> None

Called after the runner has finished running.


Called before the runner starts running.



ctx : JobContext
The context of the job.
Expand source code -
async def postRun(self, job:rpc_pb2.Job__pb2.Job) -> None:
async def preRun(self, ctx:JobContext)-> None:
-    Called after the runner has finished running.
+    Called before the runner starts running.
+    Args:
+        ctx (JobContext): The context of the job.
-    for disk in self._disksById.values():
-        await disk.close()
-    for disk in self._disksByUrl.values():
-        await disk.close()
-    for disk in self._diskByName.values():
-        await disk.close()
-    self._disksById = {}
-    self._disksByUrl = {}
-    self._diskByName = {}
+ pass
-async def preRun(self, job: Job_pb2.Job) ‑> None +
+async def run(self, ctx: JobContext) ‑> None

Called before the runner starts running.


Run a job.



ctx : JobContext
The context of the job.
Expand source code -
async def preRun(self, job:rpc_pb2.Job__pb2.Job)-> None:
async def run(self, ctx:JobContext) -> None:
-    Called before the runner starts running.
+    Run a job.
+    Args:
+        ctx (JobContext): The context of the job.
-async def run(self, job: Job_pb2.Job) ‑> None +
+def setRunInParallel(self, runInParallel: bool)

Run a job.


Set whether the runner should run in parallel.



runInParallel : bool
True if the runner should run in parallel, False otherwise. Defaults to False.
Expand source code -
async def run(self, job:rpc_pb2.Job__pb2.Job) -> None:
def setRunInParallel(self, runInParallel:bool):
-    Run a job.
+    Set whether the runner should run in parallel.
+    Args:
+        runInParallel (bool): True if the runner should run in parallel, False otherwise. Defaults to False.
-    pass
+ self.runInParallel = runInParallel
@@ -878,16 +536,18 @@


  • JobRunner

  • diff --git a/Logger.html b/Logger.html index 26994e6..f304da6 100644 --- a/Logger.html +++ b/Logger.html @@ -85,7 +85,37 @@

    Module openagents.Logger

    with self.wait: self.wait.notify_all() - + def close(self): + """ + Immediately flush all the logs to OpenObserve and shutdown the logger. + """ + self.flushThread.shutdown() + if not self.buffer.empty(): + batch = [] + while not self.buffer.empty(): + batch.append(self.buffer.get()) + self._flushToOpenObserve(batch) + + + def _flushToOpenObserve(self, batch): + try: + url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json" + basicAuth = self.options["auth"] + if not isinstance(basicAuth, str): + if "username" in basicAuth and "password" in basicAuth: + basicAuth = basicAuth["username"]+":"+basicAuth["password"] + basicAuth = base64.b64encode(basicAuth.encode()).decode() + headers = { + 'Content-Type': 'application/json', + "Authorization": "Basic "+basicAuth if basicAuth else None + } + res = requests.post(url, headers=headers, json=batch) + if res.status_code != 200: + print("Error flushing log "+str(res.status_code)) + except Exception as e: + print("Error flushing log "+str(e)) + + def flushLoop(self): while True: with self.wait: @@ -93,23 +123,7 @@

    Module openagents.Logger

    batch = [] while not self.buffer.empty(): batch.append(self.buffer.get()) - try: - url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json" - basicAuth = self.options["auth"] - if not isinstance(basicAuth, str): - if "username" in basicAuth and "password" in basicAuth: - basicAuth = basicAuth["username"]+":"+basicAuth["password"] - basicAuth = base64.b64encode(basicAuth.encode()).decode() - headers = { - 'Content-Type': 'application/json', - "Authorization": "Basic "+basicAuth if basicAuth else None - } - res = requests.post(url, headers=headers, json=batch) - if res.status_code != 200: - print("Error flushing log "+str(res.status_code)) - except Exception as e: - print("Error flushing log "+str(e)) - + self._flushToOpenObserve(batch) @@ -132,22 +146,22 @@

    Module openagents.Logger

    """ - def __init__(self, name:str, version:str, runner=None, level=None, enableOobs:bool=True): + def __init__(self, name:str, version:str, jobId:str=None, runnerLogger=None, level=None, enableOobs:bool=True): """ Create a new logger. Args: name (str): The name of the logger. version (str): The version of the logger. - runner (object): The runner to log to. Defaults to None. + runnerLogger : The function to log to the runner. level (LogLevel): Optional: The minimum level of logs to print. Defaults to environment variable or "debug". enableOobs (bool): Optional: Whether to enable logging to OpenObserve. Defaults to True. """ self.name=name or "main" - self.runner=runner - self.logger=None + self.runnerLogger=runnerLogger self.logLevel=None self.oobsLogger=None self.version=version + self.jobId=jobId logLevelName = os.getenv('LOG_LEVEL', "debug") oobsLogLevelName= os.getenv('OPENOBSERVE_LOGLEVEL', logLevelName) @@ -175,7 +189,8 @@

    Module openagents.Logger

    "flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)), "meta":{ "appName": self.name, - "appVersion": self.version + "appVersion": self.version, + "jobId": self.jobId } }) @@ -199,13 +214,13 @@

    Module openagents.Logger

    if levelV >= minLevel: date = time.strftime("%Y-%m-%d %H:%M:%S") - print(date+" ["+self.name+":"+self.version+"] : "+level+" : "+message) + print(date+" ["+self.name+":"+self.version+"] "+(("("+self.jobId+")") if self.jobId else "")+": "+level+" : "+message) if self.oobsLogger and levelV >= minObsLevel: self.oobsLogger.log(level, message) - if self.runner and levelV >= minNostrLevel: - self.runner._log(message) + if self.runnerLogger and levelV >= minNostrLevel: + self.runnerLogger(message) def log(self, *args): @@ -233,6 +248,11 @@

    Module openagents.Logger

    def finest(self, *args): self._log("finest", args) + def close(self): + if self.oobsLogger: + self.obsLogger.close() + +
    @@ -247,7 +267,7 @@


    class Logger -(name: str, version: str, runner=None, level=None, enableOobs: bool = True) +(name: str, version: str, jobId: str = None, runnerLogger=None, level=None, enableOobs: bool = True)

    A logger for OpenAgents Nodes. @@ -269,8 +289,7 @@


    The name of the logger.
    version : str
    The version of the logger.
    runner : object
    The runner to log to. Defaults to None.
    runnerLogger : The function to log to the runner.
    level : LogLevel
    Optional: The minimum level of logs to print. Defaults to environment variable or "debug".
    enableOobs : bool
    @@ -297,22 +316,22 @@


    """ - def __init__(self, name:str, version:str, runner=None, level=None, enableOobs:bool=True): + def __init__(self, name:str, version:str, jobId:str=None, runnerLogger=None, level=None, enableOobs:bool=True): """ Create a new logger. Args: name (str): The name of the logger. version (str): The version of the logger. - runner (object): The runner to log to. Defaults to None. + runnerLogger : The function to log to the runner. level (LogLevel): Optional: The minimum level of logs to print. Defaults to environment variable or "debug". enableOobs (bool): Optional: Whether to enable logging to OpenObserve. Defaults to True. """ self.name=name or "main" - self.runner=runner - self.logger=None + self.runnerLogger=runnerLogger self.logLevel=None self.oobsLogger=None self.version=version + self.jobId=jobId logLevelName = os.getenv('LOG_LEVEL', "debug") oobsLogLevelName= os.getenv('OPENOBSERVE_LOGLEVEL', logLevelName) @@ -340,7 +359,8 @@


    "flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)), "meta":{ "appName": self.name, - "appVersion": self.version + "appVersion": self.version, + "jobId": self.jobId } }) @@ -364,13 +384,13 @@


    if levelV >= minLevel: date = time.strftime("%Y-%m-%d %H:%M:%S") - print(date+" ["+self.name+":"+self.version+"] : "+level+" : "+message) + print(date+" ["+self.name+":"+self.version+"] "+(("("+self.jobId+")") if self.jobId else "")+": "+level+" : "+message) if self.oobsLogger and levelV >= minObsLevel: self.oobsLogger.log(level, message) - if self.runner and levelV >= minNostrLevel: - self.runner._log(message) + if self.runnerLogger and levelV >= minNostrLevel: + self.runnerLogger(message) def log(self, *args): @@ -396,10 +416,28 @@


    self._log("finer", args) def finest(self, *args): - self._log("finest", args)
    + self._log("finest", args) + + def close(self): + if self.oobsLogger: + self.obsLogger.close()


    +def close(self) +
    + +Expand source code + +
    def close(self):
    +    if self.oobsLogger:
    +        self.obsLogger.close()
    def debug(self, *args)
    @@ -557,7 +595,37 @@


    with self.wait: self.wait.notify_all() - + def close(self): + """ + Immediately flush all the logs to OpenObserve and shutdown the logger. + """ + self.flushThread.shutdown() + if not self.buffer.empty(): + batch = [] + while not self.buffer.empty(): + batch.append(self.buffer.get()) + self._flushToOpenObserve(batch) + + + def _flushToOpenObserve(self, batch): + try: + url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json" + basicAuth = self.options["auth"] + if not isinstance(basicAuth, str): + if "username" in basicAuth and "password" in basicAuth: + basicAuth = basicAuth["username"]+":"+basicAuth["password"] + basicAuth = base64.b64encode(basicAuth.encode()).decode() + headers = { + 'Content-Type': 'application/json', + "Authorization": "Basic "+basicAuth if basicAuth else None + } + res = requests.post(url, headers=headers, json=batch) + if res.status_code != 200: + print("Error flushing log "+str(res.status_code)) + except Exception as e: + print("Error flushing log "+str(e)) + + def flushLoop(self): while True: with self.wait: @@ -565,25 +633,31 @@


    batch = [] while not self.buffer.empty(): batch.append(self.buffer.get()) - try: - url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json" - basicAuth = self.options["auth"] - if not isinstance(basicAuth, str): - if "username" in basicAuth and "password" in basicAuth: - basicAuth = basicAuth["username"]+":"+basicAuth["password"] - basicAuth = base64.b64encode(basicAuth.encode()).decode() - headers = { - 'Content-Type': 'application/json', - "Authorization": "Basic "+basicAuth if basicAuth else None - } - res = requests.post(url, headers=headers, json=batch) - if res.status_code != 200: - print("Error flushing log "+str(res.status_code)) - except Exception as e: - print("Error flushing log "+str(e)) + self._flushToOpenObserve(batch)


    +def close(self) +

    Immediately flush all the logs to OpenObserve and shutdown the logger.

    + +Expand source code + +
    def close(self):
    +    """
    +    Immediately flush all the logs to OpenObserve and shutdown the logger.
    +    """
    +    self.flushThread.shutdown()
    +    if not self.buffer.empty():
    +        batch = []
    +        while not self.buffer.empty():
    +            batch.append(self.buffer.get())
    +        self._flushToOpenObserve(batch)
    def flushLoop(self)
    @@ -600,22 +674,7 @@


    batch = [] while not self.buffer.empty(): batch.append(self.buffer.get()) - try: - url = self.options["baseUrl"]+"/api/"+self.options["org"]+"/"+self.options["stream"]+"/_json" - basicAuth = self.options["auth"] - if not isinstance(basicAuth, str): - if "username" in basicAuth and "password" in basicAuth: - basicAuth = basicAuth["username"]+":"+basicAuth["password"] - basicAuth = base64.b64encode(basicAuth.encode()).decode() - headers = { - 'Content-Type': 'application/json', - "Authorization": "Basic "+basicAuth if basicAuth else None - } - res = requests.post(url, headers=headers, json=batch) - if res.status_code != 200: - print("Error flushing log "+str(res.status_code)) - except Exception as e: - print("Error flushing log "+str(e)) + self._flushToOpenObserve(batch)
    @@ -681,6 +740,7 @@


  • Logger

    • close
    • debug
    • error
    • fine
    • @@ -694,6 +754,7 @@

    • OpenObserveLogger

      diff --git a/NodeConfig.html b/NodeConfig.html index a4e4121..360bb8e 100644 --- a/NodeConfig.html +++ b/NodeConfig.html @@ -40,8 +40,8 @@

      Module openagents.NodeConfig

      self._meta={ "name": "OpenAgents Node", "description": "An new OpenAgents Node", - "about": "An new OpenAgents Node", "version": "0.0.1", + "picture":"" } if meta: for k,v in meta.items(): @@ -50,46 +50,8 @@

      Module openagents.NodeConfig

      def getMeta(self): self._meta["name"] = os.getenv('NODE_NAME', self._meta["name"]) self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"]) - self._meta["about"] = os.getenv('NODE_DESCRIPTION', self._meta["about"]) - self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"]) self._meta["version"] = os.getenv('NODE_VERSION', self._meta["version"]) - - return self._meta - - def name(self,name:str)->'NodeConfig': - """ - Set the name of the node. - Args: - name (str): The name of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["name"]=name - return self - - def description(self,description:str)->'NodeConfig': - """ - Set the description of the node. - Args: - description (str): The description of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["about"]=description - self._meta["description"]=description - return self - - def version(self,version:str)->'NodeConfig': - """ - Set the version of the node. - Args: - version (str): The version of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["version"]=version - return self -
      + return self._meta
      @@ -127,8 +89,8 @@


      self._meta={ "name": "OpenAgents Node", "description": "An new OpenAgents Node", - "about": "An new OpenAgents Node", "version": "0.0.1", + "picture":"" } if meta: for k,v in meta.items(): @@ -137,80 +99,11 @@


      def getMeta(self): self._meta["name"] = os.getenv('NODE_NAME', self._meta["name"]) self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"]) - self._meta["about"] = os.getenv('NODE_DESCRIPTION', self._meta["about"]) - self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"]) self._meta["version"] = os.getenv('NODE_VERSION', self._meta["version"]) - - return self._meta - - def name(self,name:str)->'NodeConfig': - """ - Set the name of the node. - Args: - name (str): The name of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["name"]=name - return self - - def description(self,description:str)->'NodeConfig': - """ - Set the description of the node. - Args: - description (str): The description of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["about"]=description - self._meta["description"]=description - return self - - def version(self,version:str)->'NodeConfig': - """ - Set the version of the node. - Args: - version (str): The version of the node. - Returns: - NodeConfig: The node configuration. - """ - self._meta["version"]=version - return self + return self._meta


      -def description(self, description: str) ‑> NodeConfig -

      Set the description of the node.



      description : str
      The description of the node.


      The node configuration.
      - -Expand source code - -
      def description(self,description:str)->'NodeConfig':
      -    """
      -    Set the description of the node.
      -    Args:
      -        description (str): The description of the node.
      -    Returns:
      -        NodeConfig: The node configuration.
      -    """
      -    self._meta["about"]=description
      -    self._meta["description"]=description
      -    return self
      def getMeta(self)
      @@ -223,75 +116,10 @@


      def getMeta(self):
           self._meta["name"] = os.getenv('NODE_NAME', self._meta["name"])
           self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"])
      -    self._meta["about"] = os.getenv('NODE_DESCRIPTION', self._meta["about"])
      -    self._meta["description"] = os.getenv('NODE_DESCRIPTION', self._meta["description"])
           self._meta["version"] = os.getenv('NODE_VERSION', self._meta["version"])
           return self._meta
      -def name(self, name: str) ‑> NodeConfig -

      Set the name of the node.



      name : str
      The name of the node.


      The node configuration.
      - -Expand source code - -
      def name(self,name:str)->'NodeConfig':
      -    """
      -    Set the name of the node.
      -    Args:
      -        name (str): The name of the node.
      -    Returns:
      -        NodeConfig: The node configuration.
      -    """
      -    self._meta["name"]=name
      -    return self
      -def version(self, version: str) ‑> NodeConfig -

      Set the version of the node.



      version : str
      The version of the node.


      The node configuration.
      - -Expand source code - -
      def version(self,version:str)->'NodeConfig':
      -    """
      -    Set the version of the node.
      -    Args:
      -        version (str): The version of the node.
      -    Returns:
      -        NodeConfig: The node configuration.
      -    """
      -    self._meta["version"]=version
      -    return self
  • @@ -313,10 +141,7 @@


  • NodeConfig

  • diff --git a/OpenAgentsNode.html b/OpenAgentsNode.html index ed3c6a8..15f95e8 100644 --- a/OpenAgentsNode.html +++ b/OpenAgentsNode.html @@ -37,7 +37,8 @@

    Module openagents.OpenAgentsNode

    from .NodeConfig import NodeConfig from .Logger import Logger from typing import Union - +from .JobContext import JobContext +import json class HeaderAdderInterceptor(grpc.aio.UnaryUnaryClientInterceptor): """ An interceptor for GRPC that adds headers to outgoing requests. @@ -70,54 +71,45 @@

    Module openagents.OpenAgentsNode

    - NODE_TOKEN: The token of the node. Defaults to None. """ - def __init__(self, metaOrConfig: Union[dict, NodeConfig]): - meta=None - if isinstance(metaOrConfig, NodeConfig): - meta = metaOrConfig.getMeta() - else: - meta = metaOrConfig - self.nextNodeAnnounce = 0 - self.nodeName = "" - self.nodeIcon = "" - self.nodeDescription = "" + def __init__(self, config: NodeConfig): + + self.meta = config.getMeta() + + self.nextNodeAnnounce = 0 self.channel = None self.rpcClient = None - self.runners=[] + self.registeredRunners=[] self.poolAddress = None self.poolPort = None - self.failedJobsTracker = [] + self.lockedJobs = [] self.isLooping = False self.logger = None self.loopInterval = 100 - name = "" - icon = "" - description = "" - version = "0.0.1" - - name = meta["name"] if "name" in meta else None - icon = meta["picture"] if "picture" in meta else None - description = meta["about"] if "about" in meta else None - version = meta["version"] if "version" in meta else None - - self.nodeName = name or os.getenv('NODE_NAME', "OpenAgentsNode") - self.nodeIcon = icon or os.getenv('NODE_ICON', "") - self.nodeVersion = version or os.getenv('NODE_VERSION', "0.0.1") - self.nodeDescription = description or os.getenv('NODE_DESCRIPTION', "") + + self.nodeName = self.meta["name"] + self.nodeIcon = self.meta["picture"] + self.nodeVersion = self.meta["version"] + self.nodeDescription = self.meta["description"] self.channel = None self.rpcClient = None self.logger = Logger(self.nodeName,self.nodeVersion) self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion) + def getMeta(self): + return self.meta + def registerRunner(self, runner:JobRunner) -> None: """ Register a runner to the node. Args: runner (JobRunner): The runner to register. """ - runner.logger=Logger(self.nodeName+"."+runner.__class__.__name__,self.nodeVersion,runner) - self.runners.append(runner) + self.registeredRunners.append({ + "runner": runner, + "nextAnnouncementTimestamp": 0 + }) def getLogger(self): """ @@ -201,62 +193,76 @@

    Module openagents.OpenAgentsNode

    Args: runner (JobRunner): The runner to execute the job. """ - if runner not in self.runners: + if len([x for x in self.registeredRunners if x["runner"]==runner])==0: del self.runnerTasks[runner] return + try: + if not runner.initialized: + runner.initialized=True + await runner.init(self) client = self._getClient() - #for runner in self.runners: jobs=[] - for filter in runner._filters: - jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs( - filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None, - filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None, - filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None, - filterById = filter["filterById"] if "filterById" in filter else None, - filterByKind = filter["filterByKind"] if "filterByKind" in filter else None, - wait=60000, - # exclude failed jobs - excludeId = [x[0] for x in self.failedJobsTracker if time.time()-x[1] < 60] - ))).jobs) + filter = runner.getFilter() + self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60] + jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs( + filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None, + filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None, + filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None, + filterById = filter["filterById"] if "filterById" in filter else None, + filterByKind = filter["filterByKind"] if "filterByKind" in filter else None, + wait=60000, + # exclude failed jobs + excludeId = [x[0] for x in self.lockedJobs] + ))).jobs) + + if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__) + else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__) - for job in jobs: - if len(jobs)>0 : runner.getLogger().log(str(len(jobs))+" pending jobs") - else : runner.getLogger().log("No pending jobs") + for job in jobs: wasAccepted=False t=time.time() + ctx = JobContext(self,runner,job) try: - client = self._getClient() # Reconnect client for each job - if not await runner.canRun(job): + client = self._getClient() # Refresh client connection if needed + if not await runner.canRun(ctx): + await ctx.close() continue + self.lockedJobs.append([job.id, time.time()]) await self._acceptJob(job.id) wasAccepted = True - runner.getLogger().info("Job started on node "+self.nodeName) - runner._setNode(self) - runner._setJob(job) - await runner.preRun(job) + + + + ctx.getLogger().info("Job started on node "+self.nodeName) + + await runner.preRun(ctx) async def task(): try: - output=await runner.run(job) - await runner.postRun(job) - runner.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) + output=await runner.run(ctx) + await runner.postRun(ctx) + ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output)) except Exception as e: - self.failedJobsTracker.append([job.id, time.time()]) - runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() - asyncio.create_task(task()) + await ctx.close() + if not runner.isRunInParallel(): + await task() + else: + asyncio.create_task(task()) except Exception as e: - self.failedJobsTracker.append([job.id, time.time()]) - runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + await ctx.close() if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() + except Exception as e: traceback.print_exc() - runner.getLogger().error("Error executing runner "+str(e)) + self.getLogger().error("Error executing runner "+str(e)) await asyncio.sleep(5000.0/1000.0) self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) @@ -266,8 +272,9 @@

    Module openagents.OpenAgentsNode

    """ Execute all pending jobs for all runners. """ - for runner in self.runners: + for reg in self.registeredRunners: try: + runner = reg["runner"] if not runner in self.runnerTasks: self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) except Exception as e: @@ -295,20 +302,20 @@

    Module openagents.OpenAgentsNode

    self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 - for runner in self.runners: + for reg in self.registeredRunners: try: - if time_ms >= runner._nextAnnouncementTimestamp: + if time_ms >= reg["nextAnnouncementTimestamp"]: client = self._getClient() res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest( - meta=runner._meta, - template=runner._template, - sockets=runner._sockets + meta=json.dumps(reg["runner"].getMeta()), + template=reg["runner"].getTemplate(), + sockets=json.dumps(reg["runner"].getSockets()) )) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + res.refreshInterval + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing template "+ str(e), None) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000 + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000 except Exception as e: self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5000.0/1000.0) @@ -318,7 +325,7 @@

    Module openagents.OpenAgentsNode

    """ The main loop of the node. """ - promises = [runner.loop() for runner in self.runners] + promises = [reg["runner"].loop(self) for reg in self.registeredRunners] await asyncio.gather(*promises) self.isLooping = False await asyncio.sleep(self.loopInterval/1000.0) @@ -447,7 +454,7 @@


    class OpenAgentsNode -(metaOrConfig: Union[dict, NodeConfig]) +(config: NodeConfig)

    An OpenAgents node that can run jobs. @@ -480,54 +487,45 @@


    - NODE_TOKEN: The token of the node. Defaults to None. """ - def __init__(self, metaOrConfig: Union[dict, NodeConfig]): - meta=None - if isinstance(metaOrConfig, NodeConfig): - meta = metaOrConfig.getMeta() - else: - meta = metaOrConfig - self.nextNodeAnnounce = 0 - self.nodeName = "" - self.nodeIcon = "" - self.nodeDescription = "" + def __init__(self, config: NodeConfig): + + self.meta = config.getMeta() + + self.nextNodeAnnounce = 0 self.channel = None self.rpcClient = None - self.runners=[] + self.registeredRunners=[] self.poolAddress = None self.poolPort = None - self.failedJobsTracker = [] + self.lockedJobs = [] self.isLooping = False self.logger = None self.loopInterval = 100 - name = "" - icon = "" - description = "" - version = "0.0.1" - - name = meta["name"] if "name" in meta else None - icon = meta["picture"] if "picture" in meta else None - description = meta["about"] if "about" in meta else None - version = meta["version"] if "version" in meta else None - - self.nodeName = name or os.getenv('NODE_NAME', "OpenAgentsNode") - self.nodeIcon = icon or os.getenv('NODE_ICON', "") - self.nodeVersion = version or os.getenv('NODE_VERSION', "0.0.1") - self.nodeDescription = description or os.getenv('NODE_DESCRIPTION', "") + + self.nodeName = self.meta["name"] + self.nodeIcon = self.meta["picture"] + self.nodeVersion = self.meta["version"] + self.nodeDescription = self.meta["description"] self.channel = None self.rpcClient = None self.logger = Logger(self.nodeName,self.nodeVersion) self.logger.info("Starting "+self.nodeName+" v"+self.nodeVersion) + def getMeta(self): + return self.meta + def registerRunner(self, runner:JobRunner) -> None: """ Register a runner to the node. Args: runner (JobRunner): The runner to register. """ - runner.logger=Logger(self.nodeName+"."+runner.__class__.__name__,self.nodeVersion,runner) - self.runners.append(runner) + self.registeredRunners.append({ + "runner": runner, + "nextAnnouncementTimestamp": 0 + }) def getLogger(self): """ @@ -611,62 +609,76 @@


    Args: runner (JobRunner): The runner to execute the job. """ - if runner not in self.runners: + if len([x for x in self.registeredRunners if x["runner"]==runner])==0: del self.runnerTasks[runner] return + try: + if not runner.initialized: + runner.initialized=True + await runner.init(self) client = self._getClient() - #for runner in self.runners: jobs=[] - for filter in runner._filters: - jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs( - filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None, - filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None, - filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None, - filterById = filter["filterById"] if "filterById" in filter else None, - filterByKind = filter["filterByKind"] if "filterByKind" in filter else None, - wait=60000, - # exclude failed jobs - excludeId = [x[0] for x in self.failedJobsTracker if time.time()-x[1] < 60] - ))).jobs) + filter = runner.getFilter() + self.lockedJobs = [x for x in self.lockedJobs if time.time()-x[1] < 60] + jobs.extend((await client.getPendingJobs(rpc_pb2.RpcGetPendingJobs( + filterByRunOn = filter["filterByRunOn"] if "filterByRunOn" in filter else None, + filterByCustomer = filter["filterByCustomer"] if "filterByCustomer" in filter else None, + filterByDescription = filter["filterByDescription"] if "filterByDescription" in filter else None, + filterById = filter["filterById"] if "filterById" in filter else None, + filterByKind = filter["filterByKind"] if "filterByKind" in filter else None, + wait=60000, + # exclude failed jobs + excludeId = [x[0] for x in self.lockedJobs] + ))).jobs) + + if len(jobs)>0 : self.getLogger().log(str(len(jobs))+" pending jobs for "+runner.__class__.__name__) + else : self.getLogger().finer("No pending jobs for "+runner.__class__.__name__) - for job in jobs: - if len(jobs)>0 : runner.getLogger().log(str(len(jobs))+" pending jobs") - else : runner.getLogger().log("No pending jobs") + for job in jobs: wasAccepted=False t=time.time() + ctx = JobContext(self,runner,job) try: - client = self._getClient() # Reconnect client for each job - if not await runner.canRun(job): + client = self._getClient() # Refresh client connection if needed + if not await runner.canRun(ctx): + await ctx.close() continue + self.lockedJobs.append([job.id, time.time()]) await self._acceptJob(job.id) wasAccepted = True - runner.getLogger().info("Job started on node "+self.nodeName) - runner._setNode(self) - runner._setJob(job) - await runner.preRun(job) + + + + ctx.getLogger().info("Job started on node "+self.nodeName) + + await runner.preRun(ctx) async def task(): try: - output=await runner.run(job) - await runner.postRun(job) - runner.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) + output=await runner.run(ctx) + await runner.postRun(ctx) + ctx.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output)) except Exception as e: - self.failedJobsTracker.append([job.id, time.time()]) - runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() - asyncio.create_task(task()) + await ctx.close() + if not runner.isRunInParallel(): + await task() + else: + asyncio.create_task(task()) except Exception as e: - self.failedJobsTracker.append([job.id, time.time()]) - runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + ctx.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + await ctx.close() if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() + except Exception as e: traceback.print_exc() - runner.getLogger().error("Error executing runner "+str(e)) + self.getLogger().error("Error executing runner "+str(e)) await asyncio.sleep(5000.0/1000.0) self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) @@ -676,8 +688,9 @@


    """ Execute all pending jobs for all runners. """ - for runner in self.runners: + for reg in self.registeredRunners: try: + runner = reg["runner"] if not runner in self.runnerTasks: self.runnerTasks[runner]=asyncio.create_task(self._executePendingJobForRunner(runner)) except Exception as e: @@ -705,20 +718,20 @@


    self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 - for runner in self.runners: + for reg in self.registeredRunners: try: - if time_ms >= runner._nextAnnouncementTimestamp: + if time_ms >= reg["nextAnnouncementTimestamp"]: client = self._getClient() res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest( - meta=runner._meta, - template=runner._template, - sockets=runner._sockets + meta=json.dumps(reg["runner"].getMeta()), + template=reg["runner"].getTemplate(), + sockets=json.dumps(reg["runner"].getSockets()) )) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + res.refreshInterval + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing template "+ str(e), None) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000 + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000 except Exception as e: self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5000.0/1000.0) @@ -728,7 +741,7 @@


    """ The main loop of the node. """ - promises = [runner.loop() for runner in self.runners] + promises = [reg["runner"].loop(self) for reg in self.registeredRunners] await asyncio.gather(*promises) self.isLooping = False await asyncio.sleep(self.loopInterval/1000.0) @@ -788,6 +801,19 @@


    return self.logger
    +def getMeta(self) +
    + +Expand source code + +
    def getMeta(self):
    +    return self.meta
    async def reannounce(self)
    @@ -818,20 +844,20 @@


    self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 - for runner in self.runners: + for reg in self.registeredRunners: try: - if time_ms >= runner._nextAnnouncementTimestamp: + if time_ms >= reg["nextAnnouncementTimestamp"]: client = self._getClient() res = await client.announceEventTemplate(rpc_pb2.RpcAnnounceTemplateRequest( - meta=runner._meta, - template=runner._template, - sockets=runner._sockets + meta=json.dumps(reg["runner"].getMeta()), + template=reg["runner"].getTemplate(), + sockets=json.dumps(reg["runner"].getSockets()) )) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + res.refreshInterval + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + res.refreshInterval self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: self.getLogger().error("Error announcing template "+ str(e), None) - runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000 + reg["nextAnnouncementTimestamp"] = int(time.time()*1000) + 5000 except Exception as e: self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5000.0/1000.0) @@ -858,8 +884,10 @@


    Args: runner (JobRunner): The runner to register. """ - runner.logger=Logger(self.nodeName+"."+runner.__class__.__name__,self.nodeVersion,runner) - self.runners.append(runner) + self.registeredRunners.append({ + "runner": runner, + "nextAnnouncementTimestamp": 0 + })
    @@ -918,8 +946,9 @@

