Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests #99

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Run Unit Tests

on:
pull_request:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Check out repository
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Run tests
run: |
pytest
20 changes: 20 additions & 0 deletions tests/test_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import unittest

class TestInit(unittest.TestCase):

def test_import(self):
try:
import core
import scanners
except ImportError:
self.fail("Failed to import core or scanners module")

def test_initialization(self):
try:
from core import storage_service
from scanners import zap_scanner, nexpose_scanner, openvas_scanner
except ImportError:
self.fail("Failed to initialize core or scanners module")

if __name__ == '__main__':
unittest.main()
71 changes: 71 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import unittest
from unittest.mock import patch, MagicMock
import main

class TestMain(unittest.TestCase):

@patch('main.ZapScanner')
@patch('main.NexposeScanner')
@patch('main.OpenVASScanner')
def setUp(self, MockZapScanner, MockNexposeScanner, MockOpenVASScanner):
self.mock_zap_scanner = MockZapScanner.return_value
self.mock_nexpose_scanner = MockNexposeScanner.return_value
self.mock_openvas_scanner = MockOpenVASScanner.return_value

def test_main_with_target(self):
config = {
'scan_name': 'test_scan',
'target': 'http://example.com',
'pause': False,
'resume': False
}
result = main.main(config)
self.assertTrue(result)
self.mock_zap_scanner.start.assert_called_once_with(config['scan_name'], config['target'])
self.mock_nexpose_scanner.start.assert_called_once_with(config['scan_name'], config['target'])
self.mock_openvas_scanner.start.assert_called_once_with(config['scan_name'], config['target'])

def test_main_with_pause(self):
config = {
'scan_name': 'test_scan',
'target': None,
'pause': True,
'resume': False
}
result = main.main(config)
self.assertTrue(result)
self.mock_zap_scanner.pause.assert_called_once_with(config['scan_name'])
self.mock_nexpose_scanner.pause.assert_called_once_with(config['scan_name'])
self.mock_openvas_scanner.pause.assert_called_once_with(config['scan_name'])

def test_main_with_resume(self):
config = {
'scan_name': 'test_scan',
'target': None,
'pause': False,
'resume': True
}
result = main.main(config)
self.assertTrue(result)
self.mock_zap_scanner.resume.assert_called_once_with(config['scan_name'])
self.mock_nexpose_scanner.resume.assert_called_once_with(config['scan_name'])
self.mock_openvas_scanner.resume.assert_called_once_with(config['scan_name'])

def test_main_with_no_target_pause_resume(self):
config = {
'scan_name': 'test_scan',
'target': None,
'pause': False,
'resume': False
}
result = main.main(config)
self.assertTrue(result)
self.mock_zap_scanner.get_scan_status.assert_called_once_with(config['scan_name'], [])
self.mock_zap_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {})
self.mock_nexpose_scanner.get_scan_status.assert_called_once_with(config['scan_name'], [])
self.mock_nexpose_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {})
self.mock_openvas_scanner.get_scan_status.assert_called_once_with(config['scan_name'], [])
self.mock_openvas_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {})

if __name__ == '__main__':
unittest.main()
115 changes: 115 additions & 0 deletions tests/test_nexpose_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import unittest
from unittest.mock import patch, MagicMock
from scanners.nexpose_scanner import NexposeScanner

class TestNexposeScanner(unittest.TestCase):

@patch('scanners.nexpose_scanner.rapid7vmconsole')
@patch('scanners.nexpose_scanner.StorageService')
def setUp(self, MockStorageService, MockRapid7vmconsole):
self.mock_rapid7 = MockRapid7vmconsole.return_value
self.mock_storage_service = MockStorageService.return_value
self.nexpose_scanner = NexposeScanner()

def test_start(self):
scan_name = 'test_scan'
target = 'http://example.com'
self.nexpose_scanner.scan = MagicMock(return_value=True)
result = self.nexpose_scanner.start(scan_name, target)
self.assertTrue(result)
self.nexpose_scanner.scan.assert_called_once_with(scan_name, target)

def test_scan(self):
scan_name = 'test_scan'
target = 'http://example.com'
scan_id = 1
site_id = 2
self.mock_rapid7vmconsole.SiteApi.return_value.create_site.return_value.id = site_id
self.mock_rapid7vmconsole.ScanApi.return_value.start_scan.return_value.id = scan_id
scan_data = {
'scan_name': scan_name,
'scan_id': '',
'target': target,
'status': ''
}
self.mock_storage_service.get_by_name.return_value = None
result = self.nexpose_scanner.scan(scan_name, target)
self.assertEqual(result['NEXPOSE']['nexpose_id'], scan_id)
self.assertEqual(result['NEXPOSE']['site_id'], site_id)
self.assertEqual(result['NEXPOSE']['scan_status']['status'], 'INPROGRESS')
self.mock_storage_service.add.assert_called_once_with(scan_data)
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result)

def test_get_scan_status(self):
scan_name = 'test_scan'
scan_data = {
'NEXPOSE': {
'nexpose_id': 1,
'scan_status': {}
},
'target': 'http://example.com'
}
self.mock_storage_service.get_by_name.return_value = scan_data
self.mock_rapid7vmconsole.ScanApi.return_value.get_scan.return_value.status = 'finished'
scan_status_list = []
result = self.nexpose_scanner.get_scan_status(scan_name, scan_status_list)
self.assertEqual(result[0]['scanner'], 'Nexpose')
self.assertEqual(result[0]['status'], 'COMPLETE')
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data)

def test_get_scan_results(self):
scan_name = 'test_scan'
scan_data = {
'NEXPOSE': {
'nexpose_id': 1,
'report_id': 2,
'report_instance_id': 3,
'scan_status': {'status': 'COMPLETE'}
},
'target': 'http://example.com'
}
self.mock_storage_service.get_by_name.return_value = scan_data
report = '<vulnerabilities><vulnerability title="vuln1" cvssScore="5.0"><references><reference source="CVE">CVE-1234</reference></references><description><ContainerBlockElement><Paragraph>desc</Paragraph></ContainerBlockElement></description><solution><ContainerBlockElement><Paragraph>sol</Paragraph></ContainerBlockElement></solution></vulnerability></vulnerabilities>'
self.mock_rapid7vmconsole.ReportApi.return_value.download_report.return_value = report
scan_results = {}
result = self.nexpose_scanner.get_scan_results(scan_name, scan_results)
self.assertEqual(result['vuln1']['name'], 'vuln1')
self.assertEqual(result['vuln1']['severity'], 5.0)
self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234')
self.assertEqual(result['vuln1']['description'], 'desc')
self.assertEqual(result['vuln1']['solution'], 'sol')

def test_pause(self):
scan_name = 'test_scan'
scan_data = {'nexpose_id': 1}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.nexpose_scanner.pause(scan_name)
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value)
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'pause')

def test_resume(self):
scan_name = 'test_scan'
scan_data = {'nexpose_id': 1}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.nexpose_scanner.resume(scan_name)
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value)
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'resume')

def test_stop(self):
scan_name = 'test_scan'
scan_data = {'nexpose_id': 1}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.nexpose_scanner.stop(scan_name)
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value)
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'stop')

def test_remove(self):
scan_name = 'test_scan'
scan_data = {'nexpose_id': 1}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.nexpose_scanner.remove(scan_name)
self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value)
self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'remove')

if __name__ == '__main__':
unittest.main()
107 changes: 107 additions & 0 deletions tests/test_openvas_scanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import unittest
from unittest.mock import patch, MagicMock
from scanners.openvas_scanner import OpenVASScanner

class TestOpenVASScanner(unittest.TestCase):

@patch('scanners.openvas_scanner.Gmp')
@patch('scanners.openvas_scanner.StorageService')
def setUp(self, MockStorageService, MockGmp):
self.mock_gmp = MockGmp.return_value
self.mock_storage_service = MockStorageService.return_value
self.openvas_scanner = OpenVASScanner()

def test_start(self):
scan_name = 'test_scan'
target = 'http://example.com'
self.openvas_scanner.scan = MagicMock(return_value=True)
result = self.openvas_scanner.start(scan_name, target)
self.assertTrue(result)
self.openvas_scanner.scan.assert_called_once_with(scan_name, target)

def test_scan(self):
scan_name = 'test_scan'
target = 'http://example.com'
target_id = '1234'
self.mock_gmp.create_target.return_value.get.return_value = target_id
scan_data = {
'scan_name': scan_name,
'scan_id': '',
'target': target,
'status': ''
}
self.mock_storage_service.get_by_name.return_value = None
result = self.openvas_scanner.scan(scan_name, target)
self.assertEqual(result['OPENVAS']['openvas_id'], target_id)
self.assertEqual(result['OPENVAS']['scan_status']['status'], 'INPROGRESS')
self.mock_storage_service.add.assert_called_once_with(scan_data)
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result)

def test_get_scan_status(self):
scan_name = 'test_scan'
scan_data = {
'OPENVAS': {
'openvas_id': '1234',
'scan_status': {}
},
'target': 'http://example.com'
}
self.mock_storage_service.get_by_name.return_value = scan_data
self.mock_gmp.get_report.return_value = True
scan_status_list = []
result = self.openvas_scanner.get_scan_status(scan_name, scan_status_list)
self.assertEqual(result[0]['scanner'], 'OpenVAS')
self.assertEqual(result[0]['status'], 'COMPLETE')
self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data)

def test_get_scan_results(self):
scan_name = 'test_scan'
scan_data = {
'OPENVAS': {
'openvas_id': '1234',
'report_id': '5678',
'scan_status': {'status': 'COMPLETE'}
},
'target': 'http://example.com'
}
self.mock_storage_service.get_by_name.return_value = scan_data
report = '<get_reports_response><report><report><results><result><name>vuln1</name><nvt><cvss_base>5.0</cvss_base><cve>CVE-1234</cve></nvt><threat>High</threat><description>desc</description></result></results></report></report></get_reports_response>'
self.mock_gmp.get_report.return_value = report
scan_results = {}
result = self.openvas_scanner.get_scan_results(scan_name, scan_results)
self.assertEqual(result['vuln1']['name'], 'vuln1')
self.assertEqual(result['vuln1']['severity'], 5.0)
self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234')
self.assertEqual(result['vuln1']['description'], 'desc')
self.assertEqual(result['vuln1']['risk'], 'High')

def test_pause(self):
scan_name = 'test_scan'
scan_data = {'openvas_id': '1234'}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.openvas_scanner.pause(scan_name)
self.assertIsNone(result)

def test_resume(self):
scan_name = 'test_scan'
scan_data = {'openvas_id': '1234'}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.openvas_scanner.resume(scan_name)
self.assertIsNone(result)

def test_stop(self):
scan_name = 'test_scan'
scan_data = {'openvas_id': '1234'}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.openvas_scanner.stop(scan_name)
self.assertIsNone(result)

def test_remove(self):
scan_name = 'test_scan'
scan_data = {'openvas_id': '1234'}
self.mock_storage_service.get_by_name.return_value = scan_data
result = self.openvas_scanner.remove(scan_name)
self.assertIsNone(result)

if __name__ == '__main__':
unittest.main()
Loading