forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_schemas.py
142 lines (125 loc) · 5.28 KB
/
test_schemas.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
"""Test stack versioned schemas."""
import unittest
import uuid
import eql
from detection_rules.rule import Rule
from detection_rules.schemas import downgrade, CurrentSchema
class TestSchemas(unittest.TestCase):
"""Test schemas and downgrade functions."""
@classmethod
def setUpClass(cls):
cls.compatible_rule = Rule("test.toml", {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "kuery",
"license": "Elastic License",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query"
})
cls.versioned_rule = cls.compatible_rule.copy()
cls.versioned_rule.contents["version"] = 10
cls.threshold_rule = Rule("test.toml", {
"author": ["Elastic"],
"description": "test description",
"language": "kuery",
"license": "Elastic License",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"threshold": {
"field": "destination.bytes",
"value": 75,
},
"type": "threshold",
})
def test_query_downgrade(self):
"""Downgrade a standard KQL rule."""
api_contents = self.compatible_rule.contents
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.8"), {
# "author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "kuery",
# "license": "Elastic License",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": self.compatible_rule.id,
"severity": "low",
"type": "query"
})
with self.assertRaises(ValueError):
downgrade(api_contents, "7.7")
def test_versioned_downgrade(self):
"""Downgrade a KQL rule with version information"""
api_contents = self.versioned_rule.contents
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.8"), {
# "author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "kuery",
# "license": "Elastic License",
"name": "test rule",
"query": "process.name:test.query",
"risk_score": 21,
"rule_id": self.versioned_rule.id,
"severity": "low",
"type": "query",
"version": 10,
})
with self.assertRaises(ValueError):
downgrade(api_contents, "7.7")
def test_threshold_downgrade(self):
"""Downgrade a threshold rule that was first introduced in 7.9."""
api_contents = self.threshold_rule.contents
self.assertDictEqual(downgrade(api_contents, CurrentSchema.STACK_VERSION), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9"), api_contents)
self.assertDictEqual(downgrade(api_contents, "7.9.2"), api_contents)
with self.assertRaises(ValueError):
downgrade(api_contents, "7.7")
with self.assertRaisesRegex(ValueError, "Unsupported rule type"):
downgrade(api_contents, "7.8")
def test_eql_validation(self):
base_fields = {
"author": ["Elastic"],
"description": "test description",
"index": ["filebeat-*"],
"language": "eql",
"license": "Elastic License",
"name": "test rule",
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "eql"
}
Rule("test.toml", dict(base_fields, query="""
process where process.name == "cmd.exe"
"""))
with self.assertRaises(eql.EqlSyntaxError):
Rule("test.toml", dict(base_fields, query="""
process where process.name == this!is$not#v@lid
"""))
with self.assertRaises(eql.EqlSemanticError):
Rule("test.toml", dict(base_fields, query="""
process where process.invalid_field == "hello world"
"""))
with self.assertRaises(eql.EqlTypeMismatchError):
Rule("test.toml", dict(base_fields, query="""
process where process.pid == "some string field"
"""))