forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_packages.py
149 lines (118 loc) · 6.37 KB
/
test_packages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
"""Test that the packages are built correctly."""
import unittest
import uuid
import yaml
from detection_rules import rule_loader
from detection_rules.packaging import PACKAGE_FILE, Package
class TestPackages(unittest.TestCase):
"""Test package building and saving."""
@staticmethod
def get_test_rule(version=1, count=1):
def get_rule_contents():
contents = {
"author": ["Elastic"],
"description": "test description",
"language": "kuery",
"license": "Elastic License",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query"
}
return contents
rules = [rule_loader.Rule('test.toml', get_rule_contents()) for i in range(count)]
version_info = {
rule.id: {
'rule_name': rule.name,
'sha256': rule.get_hash(),
'version': version
} for rule in rules
}
return rules, version_info
def test_package_loader_production_config(self):
"""Test that packages are loading correctly."""
def test_package_loader_default_configs(self):
"""Test configs in etc/packages.yml."""
with open(PACKAGE_FILE) as f:
configs = yaml.safe_load(f)['package']
package = Package.from_config(configs)
for rule in package.rules:
rule.contents.pop('version')
rule.validate(as_rule=True)
@rule_loader.mock_loader
def test_package_summary(self):
"""Test the generation of the package summary."""
rules = rule_loader.get_production_rules()
package = Package(rules, 'test-package')
changed_rule_ids, new_rule_ids, deprecated_rule_ids = package.bump_versions(save_changes=False)
package.generate_summary_and_changelog(changed_rule_ids, new_rule_ids, deprecated_rule_ids)
def test_versioning_diffs(self):
"""Test that versioning is detecting diffs as expected."""
rules, version_info = self.get_test_rule()
package = Package(rules, 'test', current_versions=version_info)
# test versioning doesn't falsely detect changes
changed_rules, new_rules = package.changed_rule_ids, package.new_rules_ids
self.assertEqual(0, len(changed_rules), 'Package version bumping is improperly detecting changed rules')
self.assertEqual(0, len(new_rules), 'Package version bumping is improperly detecting new rules')
self.assertEqual(1, package.rules[0].contents['version'], 'Package version bumping unexpectedly')
# test versioning detects a new rule
package.rules[0].contents.pop('version')
changed_rules, new_rules, _ = package.bump_versions(current_versions={})
self.assertEqual(0, len(changed_rules), 'Package version bumping is improperly detecting changed rules')
self.assertEqual(1, len(new_rules), 'Package version bumping is not detecting new rules')
self.assertEqual(1, package.rules[0].contents['version'],
'Package version bumping not setting version to 1 for new rules')
# test versioning detects a hash changes
package.rules[0].contents.pop('version')
package.rules[0].contents['query'] = 'process.name:changed.test.query'
changed_rules, new_rules, _ = package.bump_versions(current_versions=version_info)
self.assertEqual(1, len(changed_rules), 'Package version bumping is not detecting changed rules')
self.assertEqual(0, len(new_rules), 'Package version bumping is improperly detecting new rules')
self.assertEqual(2, package.rules[0].contents['version'], 'Package version not bumping on changes')
@rule_loader.mock_loader
def test_rule_versioning(self):
"""Test that all rules are properly versioned and tracked"""
self.maxDiff = None
rules = rule_loader.load_rules().values()
original_hashes = []
post_bump_hashes = []
# test that no rules have versions defined
for rule in rules:
self.assertIsNone(rule.contents.get('version'), '{} - {}: explicitly sets a version in the rule file')
original_hashes.append(rule.get_hash())
package = Package(rules, 'test-package')
# test that all rules have versions defined
# package.bump_versions(save_changes=False)
for rule in package.rules:
self.assertGreaterEqual(rule.contents.get('version'), 1, '{} - {}: version is not being set in package')
# test that rules validate with version
for rule in package.rules:
rule.validate(versioned=True)
rule.contents.pop('version')
post_bump_hashes.append(rule.get_hash())
# test that no hashes changed as a result of the version bumps
self.assertListEqual(original_hashes, post_bump_hashes, 'Version bumping modified the hash of a rule')
def test_version_filter(self):
"""Test that version filtering is working as expected."""
msg = 'Package version filter failing'
rules, version_info = self.get_test_rule(version=1, count=3)
package = Package(rules, 'test', current_versions=version_info, min_version=2)
self.assertEqual(0, len(package.rules), msg)
rules, version_info = self.get_test_rule(version=5, count=3)
package = Package(rules, 'test', current_versions=version_info, max_version=2)
self.assertEqual(0, len(package.rules), msg)
rules, version_info = self.get_test_rule(version=2, count=3)
package = Package(rules, 'test', current_versions=version_info, min_version=1, max_version=3)
self.assertEqual(3, len(package.rules), msg)
rules, version_info = self.get_test_rule(version=1, count=3)
version = 1
for rule_id, vinfo in version_info.items():
vinfo['version'] = version
version += 1
package = Package(rules, 'test', current_versions=version_info, min_version=2, max_version=2)
self.assertEqual(1, len(package.rules), msg)