generated from Ostorlab/template_agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from Ostorlab/feature/persist-json
Persist to json file
- Loading branch information
Showing
7 changed files
with
254 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,63 @@ | ||
"""Pytest fixtures for Agent Nebula.""" | ||
|
||
import pathlib | ||
import random | ||
|
||
import pytest | ||
from ostorlab.agent import definitions as agent_definitions | ||
from ostorlab.agent.message import message as msg | ||
from ostorlab.runtimes import definitions as runtime_definitions | ||
|
||
|
||
@pytest.fixture(scope="function", name="agent_definition") | ||
def agent_definition() -> agent_definitions.AgentDefinition: | ||
"""NebulaAgent definition fixture for testing purposes.""" | ||
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o: | ||
return agent_definitions.AgentDefinition.from_yaml(yaml_o) | ||
|
||
|
||
@pytest.fixture(scope="function", name="agent_settings") | ||
def agent_settings() -> runtime_definitions.AgentSettings: | ||
"""NebulaAgent settings fixture for testing purposes.""" | ||
return runtime_definitions.AgentSettings( | ||
key="agent/ostorlab/nebula", | ||
bus_url="NA", | ||
bus_exchange_topic="NA", | ||
healthcheck_port=random.randint(5000, 6000), | ||
redis_url="redis://guest:guest@localhost:6379", | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def link_message() -> msg.Message: | ||
"""Creates a dummy message of type v3.asset.link to be used by the agent for testing purposes.""" | ||
selector = "v3.asset.link" | ||
msg_data = {"url": "https://ostorlab.co", "method": b"GET"} | ||
return msg.Message.from_data(selector, data=msg_data) | ||
|
||
|
||
@pytest.fixture | ||
def multiple_link_messages() -> list[msg.Message]: | ||
"""Creates dummy messages of type v3.asset.link to be used by the agent for testing purposes.""" | ||
selector = "v3.asset.link" | ||
return [ | ||
msg.Message.from_data( | ||
selector, data={"url": f"https://www.domain{i}.com", "method": b"GET"} | ||
) | ||
for i in range(0, 5) | ||
] | ||
|
||
|
||
@pytest.fixture | ||
def multiple_messages() -> list[msg.Message]: | ||
"""Creates dummy messages of type v3.asset.link, v3.asset.domain, v3.asset.ip to be used by the agent for testing | ||
purposes.""" | ||
return [ | ||
msg.Message.from_data( | ||
"v3.asset.link", data={"url": "https://www.domain.com", "method": b"GET"} | ||
), | ||
msg.Message.from_data("v3.asset.domain_name", data={"name": "www.domain.com"}), | ||
msg.Message.from_data( | ||
"v3.asset.ip", data={"host": "192.168.1.1", "mask": "24"} | ||
), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,137 @@ | ||
"""Unit tests for Nebula agent.""" | ||
|
||
import json | ||
import os | ||
import pathlib | ||
|
||
# TODO (elyousfi5): add tests for the Nebula agent | ||
import pytest | ||
from freezegun import freeze_time | ||
from ostorlab.agent import definitions as agent_definitions | ||
from ostorlab.agent.message import message as msg | ||
from ostorlab.runtimes import definitions as runtime_definitions | ||
from ostorlab.utils import defintions as utils_definitions | ||
from pyfakefs import fake_filesystem_unittest | ||
|
||
from agent import nebula_agent | ||
|
||
def testNebulaAgent_always_persistMessages() -> None: | ||
"""Test Nebula agent.""" | ||
pass | ||
|
||
def testAgentNebula_whenUnsupportedFileType_raisesValueError() -> None: | ||
"""Test that NebulaAgent raises ValueError when file type is not supported.""" | ||
with pytest.raises(ValueError): | ||
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o: | ||
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o) | ||
settings = runtime_definitions.AgentSettings( | ||
key="agent/ostorlab/nebula", | ||
bus_url="NA", | ||
bus_exchange_topic="NA", | ||
args=[ | ||
utils_definitions.Arg( | ||
name="file_type", | ||
type="string", | ||
value=json.dumps("txt").encode(), | ||
), | ||
], | ||
healthcheck_port=5301, | ||
redis_url="redis://guest:guest@localhost:6379", | ||
) | ||
nebula_agent.NebulaAgent(definition, settings) | ||
|
||
|
||
@freeze_time("2024-03-05 12:00:00") | ||
def testAgentNebula_whenFileTypeIsJson_persistMessage( | ||
agent_definition: agent_definitions.AgentDefinition, | ||
agent_settings: runtime_definitions.AgentSettings, | ||
link_message: msg.Message, | ||
) -> None: | ||
"""Test that NebulaAgent persists message to json file.""" | ||
with fake_filesystem_unittest.Patcher(): | ||
expected_output = json.dumps( | ||
{"url": "https://ostorlab.co", "method": b"GET"}, | ||
cls=nebula_agent.CustomEncoder, | ||
) | ||
nebula_test_agent = nebula_agent.NebulaAgent(agent_definition, agent_settings) | ||
|
||
nebula_test_agent.process(link_message) | ||
|
||
assert os.path.exists("/output/messages_2024-03-05_12-00-00") | ||
assert len(os.listdir("/output/messages_2024-03-05_12-00-00")) == 1 | ||
with open( | ||
"/output/messages_2024-03-05_12-00-00/v3.asset.link_messages.json" | ||
) as file: | ||
assert sorted(json.load(file).items()) == sorted( | ||
json.loads(expected_output).items() | ||
) | ||
|
||
|
||
@freeze_time("2023-03-05 12:00:00") | ||
def testAgentNebula_whenFileTypeIsJson_persistMultipleLinkMessages( | ||
agent_definition: agent_definitions.AgentDefinition, | ||
agent_settings: runtime_definitions.AgentSettings, | ||
multiple_link_messages: list[msg.Message], | ||
) -> None: | ||
"""Test that NebulaAgent persists multiple link messages to json file.""" | ||
with fake_filesystem_unittest.Patcher(): | ||
expected_output = [ | ||
json.dumps( | ||
{"url": f"https://www.domain{i}.com", "method": b"GET"}, | ||
cls=nebula_agent.CustomEncoder, | ||
) | ||
for i in range(0, 5) | ||
] | ||
nebula_test_agent = nebula_agent.NebulaAgent(agent_definition, agent_settings) | ||
|
||
for message in multiple_link_messages: | ||
nebula_test_agent.process(message) | ||
|
||
file_path = "/output/messages_2023-03-05_12-00-00" | ||
assert os.path.exists(file_path) | ||
assert len(os.listdir(file_path)) == 1 | ||
with open(f"{file_path}/v3.asset.link_messages.json", "r") as file: | ||
lines = file.readlines() | ||
assert len(lines) == len(expected_output) | ||
for line, expected_line in zip(lines, expected_output): | ||
assert line.strip() == expected_line.strip() | ||
|
||
|
||
@freeze_time("2023-03-05 12:00:00") | ||
def testAgentNebula_whenFileTypeIsJson_persistMultipleMessages( | ||
agent_definition: agent_definitions.AgentDefinition, | ||
agent_settings: runtime_definitions.AgentSettings, | ||
multiple_messages: list[msg.Message], | ||
) -> None: | ||
"""Test that NebulaAgent persists multiple messages of different types to json files.""" | ||
with fake_filesystem_unittest.Patcher(): | ||
expected_output = [ | ||
json.dumps( | ||
{"url": "https://www.domain.com", "method": b"GET"}, | ||
cls=nebula_agent.CustomEncoder, | ||
), | ||
json.dumps({"name": "www.domain.com"}, cls=nebula_agent.CustomEncoder), | ||
json.dumps( | ||
{"host": "192.168.1.1", "mask": "24"}, | ||
cls=nebula_agent.CustomEncoder, | ||
), | ||
] | ||
nebula_test_agent = nebula_agent.NebulaAgent(agent_definition, agent_settings) | ||
|
||
for message in multiple_messages: | ||
nebula_test_agent.process(message) | ||
|
||
file_path = "/output/messages_2023-03-05_12-00-00" | ||
assert os.path.exists(file_path) | ||
assert len(os.listdir(file_path)) == 3 | ||
assert os.path.exists(f"{file_path}/v3.asset.link_messages.json") is True | ||
with open(f"{file_path}/v3.asset.link_messages.json", "r") as file: | ||
lines = file.readlines() | ||
assert len(lines) == 1 | ||
assert lines[0].strip() == expected_output[0].strip() | ||
assert os.path.exists(f"{file_path}/v3.asset.domain_name_messages.json") is True | ||
with open(f"{file_path}/v3.asset.domain_name_messages.json", "r") as file: | ||
lines = file.readlines() | ||
assert len(lines) == 1 | ||
assert lines[0].strip() == expected_output[1].strip() | ||
assert os.path.exists(f"{file_path}/v3.asset.ip_messages.json") is True | ||
with open(f"{file_path}/v3.asset.ip_messages.json", "r") as file: | ||
lines = file.readlines() | ||
assert len(lines) == 1 | ||
assert lines[0].strip() == expected_output[2].strip() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,3 +2,6 @@ pytest | |
ruff | ||
mypy | ||
typing-extensions | ||
pytest-mock | ||
freezegun | ||
pyfakefs |