diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..f95ed09 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: Run Unit Tests + +on: + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.x' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: | + pytest diff --git a/tests/test_init.py b/tests/test_init.py new file mode 100644 index 0000000..c89ddac --- /dev/null +++ b/tests/test_init.py @@ -0,0 +1,20 @@ +import unittest + +class TestInit(unittest.TestCase): + + def test_import(self): + try: + import core + import scanners + except ImportError: + self.fail("Failed to import core or scanners module") + + def test_initialization(self): + try: + from core import storage_service + from scanners import zap_scanner, nexpose_scanner, openvas_scanner + except ImportError: + self.fail("Failed to initialize core or scanners module") + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..cf9ab30 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,71 @@ +import unittest +from unittest.mock import patch, MagicMock +import main + +class TestMain(unittest.TestCase): + + @patch('main.ZapScanner') + @patch('main.NexposeScanner') + @patch('main.OpenVASScanner') + def setUp(self, MockZapScanner, MockNexposeScanner, MockOpenVASScanner): + self.mock_zap_scanner = MockZapScanner.return_value + self.mock_nexpose_scanner = MockNexposeScanner.return_value + self.mock_openvas_scanner = MockOpenVASScanner.return_value + + def test_main_with_target(self): + config = { + 'scan_name': 'test_scan', + 'target': 'http://example.com', + 'pause': False, + 'resume': False + } + result = main.main(config) + self.assertTrue(result) + self.mock_zap_scanner.start.assert_called_once_with(config['scan_name'], config['target']) + self.mock_nexpose_scanner.start.assert_called_once_with(config['scan_name'], config['target']) + self.mock_openvas_scanner.start.assert_called_once_with(config['scan_name'], config['target']) + + def test_main_with_pause(self): + config = { + 'scan_name': 'test_scan', + 'target': None, + 'pause': True, + 'resume': False + } + result = main.main(config) + self.assertTrue(result) + self.mock_zap_scanner.pause.assert_called_once_with(config['scan_name']) + self.mock_nexpose_scanner.pause.assert_called_once_with(config['scan_name']) + self.mock_openvas_scanner.pause.assert_called_once_with(config['scan_name']) + + def test_main_with_resume(self): + config = { + 'scan_name': 'test_scan', + 'target': None, + 'pause': False, + 'resume': True + } + result = main.main(config) + self.assertTrue(result) + self.mock_zap_scanner.resume.assert_called_once_with(config['scan_name']) + self.mock_nexpose_scanner.resume.assert_called_once_with(config['scan_name']) + self.mock_openvas_scanner.resume.assert_called_once_with(config['scan_name']) + + def test_main_with_no_target_pause_resume(self): + config = { + 'scan_name': 'test_scan', + 'target': None, + 'pause': False, + 'resume': False + } + result = main.main(config) + self.assertTrue(result) + self.mock_zap_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) + self.mock_zap_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) + self.mock_nexpose_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) + self.mock_nexpose_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) + self.mock_openvas_scanner.get_scan_status.assert_called_once_with(config['scan_name'], []) + self.mock_openvas_scanner.get_scan_results.assert_called_once_with(config['scan_name'], {}) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_nexpose_scanner.py b/tests/test_nexpose_scanner.py new file mode 100644 index 0000000..de8f86b --- /dev/null +++ b/tests/test_nexpose_scanner.py @@ -0,0 +1,115 @@ +import unittest +from unittest.mock import patch, MagicMock +from scanners.nexpose_scanner import NexposeScanner + +class TestNexposeScanner(unittest.TestCase): + + @patch('scanners.nexpose_scanner.rapid7vmconsole') + @patch('scanners.nexpose_scanner.StorageService') + def setUp(self, MockStorageService, MockRapid7vmconsole): + self.mock_rapid7 = MockRapid7vmconsole.return_value + self.mock_storage_service = MockStorageService.return_value + self.nexpose_scanner = NexposeScanner() + + def test_start(self): + scan_name = 'test_scan' + target = 'http://example.com' + self.nexpose_scanner.scan = MagicMock(return_value=True) + result = self.nexpose_scanner.start(scan_name, target) + self.assertTrue(result) + self.nexpose_scanner.scan.assert_called_once_with(scan_name, target) + + def test_scan(self): + scan_name = 'test_scan' + target = 'http://example.com' + scan_id = 1 + site_id = 2 + self.mock_rapid7vmconsole.SiteApi.return_value.create_site.return_value.id = site_id + self.mock_rapid7vmconsole.ScanApi.return_value.start_scan.return_value.id = scan_id + scan_data = { + 'scan_name': scan_name, + 'scan_id': '', + 'target': target, + 'status': '' + } + self.mock_storage_service.get_by_name.return_value = None + result = self.nexpose_scanner.scan(scan_name, target) + self.assertEqual(result['NEXPOSE']['nexpose_id'], scan_id) + self.assertEqual(result['NEXPOSE']['site_id'], site_id) + self.assertEqual(result['NEXPOSE']['scan_status']['status'], 'INPROGRESS') + self.mock_storage_service.add.assert_called_once_with(scan_data) + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result) + + def test_get_scan_status(self): + scan_name = 'test_scan' + scan_data = { + 'NEXPOSE': { + 'nexpose_id': 1, + 'scan_status': {} + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + self.mock_rapid7vmconsole.ScanApi.return_value.get_scan.return_value.status = 'finished' + scan_status_list = [] + result = self.nexpose_scanner.get_scan_status(scan_name, scan_status_list) + self.assertEqual(result[0]['scanner'], 'Nexpose') + self.assertEqual(result[0]['status'], 'COMPLETE') + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data) + + def test_get_scan_results(self): + scan_name = 'test_scan' + scan_data = { + 'NEXPOSE': { + 'nexpose_id': 1, + 'report_id': 2, + 'report_instance_id': 3, + 'scan_status': {'status': 'COMPLETE'} + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + report = 'CVE-1234descsol' + self.mock_rapid7vmconsole.ReportApi.return_value.download_report.return_value = report + scan_results = {} + result = self.nexpose_scanner.get_scan_results(scan_name, scan_results) + self.assertEqual(result['vuln1']['name'], 'vuln1') + self.assertEqual(result['vuln1']['severity'], 5.0) + self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234') + self.assertEqual(result['vuln1']['description'], 'desc') + self.assertEqual(result['vuln1']['solution'], 'sol') + + def test_pause(self): + scan_name = 'test_scan' + scan_data = {'nexpose_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.nexpose_scanner.pause(scan_name) + self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) + self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'pause') + + def test_resume(self): + scan_name = 'test_scan' + scan_data = {'nexpose_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.nexpose_scanner.resume(scan_name) + self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) + self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'resume') + + def test_stop(self): + scan_name = 'test_scan' + scan_data = {'nexpose_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.nexpose_scanner.stop(scan_name) + self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) + self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'stop') + + def test_remove(self): + scan_name = 'test_scan' + scan_data = {'nexpose_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.nexpose_scanner.remove(scan_name) + self.assertEqual(result, self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.return_value) + self.mock_rapid7vmconsole.ScanApi.return_value.set_scan_status.assert_called_once_with(scan_data['nexpose_id'], 'remove') + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_openvas_scanner.py b/tests/test_openvas_scanner.py new file mode 100644 index 0000000..8534748 --- /dev/null +++ b/tests/test_openvas_scanner.py @@ -0,0 +1,107 @@ +import unittest +from unittest.mock import patch, MagicMock +from scanners.openvas_scanner import OpenVASScanner + +class TestOpenVASScanner(unittest.TestCase): + + @patch('scanners.openvas_scanner.Gmp') + @patch('scanners.openvas_scanner.StorageService') + def setUp(self, MockStorageService, MockGmp): + self.mock_gmp = MockGmp.return_value + self.mock_storage_service = MockStorageService.return_value + self.openvas_scanner = OpenVASScanner() + + def test_start(self): + scan_name = 'test_scan' + target = 'http://example.com' + self.openvas_scanner.scan = MagicMock(return_value=True) + result = self.openvas_scanner.start(scan_name, target) + self.assertTrue(result) + self.openvas_scanner.scan.assert_called_once_with(scan_name, target) + + def test_scan(self): + scan_name = 'test_scan' + target = 'http://example.com' + target_id = '1234' + self.mock_gmp.create_target.return_value.get.return_value = target_id + scan_data = { + 'scan_name': scan_name, + 'scan_id': '', + 'target': target, + 'status': '' + } + self.mock_storage_service.get_by_name.return_value = None + result = self.openvas_scanner.scan(scan_name, target) + self.assertEqual(result['OPENVAS']['openvas_id'], target_id) + self.assertEqual(result['OPENVAS']['scan_status']['status'], 'INPROGRESS') + self.mock_storage_service.add.assert_called_once_with(scan_data) + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result) + + def test_get_scan_status(self): + scan_name = 'test_scan' + scan_data = { + 'OPENVAS': { + 'openvas_id': '1234', + 'scan_status': {} + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + self.mock_gmp.get_report.return_value = True + scan_status_list = [] + result = self.openvas_scanner.get_scan_status(scan_name, scan_status_list) + self.assertEqual(result[0]['scanner'], 'OpenVAS') + self.assertEqual(result[0]['status'], 'COMPLETE') + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data) + + def test_get_scan_results(self): + scan_name = 'test_scan' + scan_data = { + 'OPENVAS': { + 'openvas_id': '1234', + 'report_id': '5678', + 'scan_status': {'status': 'COMPLETE'} + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + report = 'vuln15.0CVE-1234Highdesc' + self.mock_gmp.get_report.return_value = report + scan_results = {} + result = self.openvas_scanner.get_scan_results(scan_name, scan_results) + self.assertEqual(result['vuln1']['name'], 'vuln1') + self.assertEqual(result['vuln1']['severity'], 5.0) + self.assertEqual(result['vuln1']['cve_id'], 'CVE-1234') + self.assertEqual(result['vuln1']['description'], 'desc') + self.assertEqual(result['vuln1']['risk'], 'High') + + def test_pause(self): + scan_name = 'test_scan' + scan_data = {'openvas_id': '1234'} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.openvas_scanner.pause(scan_name) + self.assertIsNone(result) + + def test_resume(self): + scan_name = 'test_scan' + scan_data = {'openvas_id': '1234'} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.openvas_scanner.resume(scan_name) + self.assertIsNone(result) + + def test_stop(self): + scan_name = 'test_scan' + scan_data = {'openvas_id': '1234'} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.openvas_scanner.stop(scan_name) + self.assertIsNone(result) + + def test_remove(self): + scan_name = 'test_scan' + scan_data = {'openvas_id': '1234'} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.openvas_scanner.remove(scan_name) + self.assertIsNone(result) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_scanner.py b/tests/test_scanner.py new file mode 100644 index 0000000..0fd281b --- /dev/null +++ b/tests/test_scanner.py @@ -0,0 +1,55 @@ +import unittest +from unittest.mock import patch, MagicMock +from scanners.scanner import Scanner + +class TestScanner(unittest.TestCase): + + @patch('scanners.scanner.StorageService') + def setUp(self, MockStorageService): + self.mock_storage_service = MockStorageService.return_value + self.scanner = Scanner() + + def test_scan(self): + self.assertIsNone(self.scanner.scan()) + + def test_get_scan_status(self): + self.assertIsNone(self.scanner.get_scan_status()) + + def test_get_scan_results(self): + self.assertIsNone(self.scanner.get_scan_results()) + + def test_is_valid_scan(self): + self.assertIsNone(self.scanner.is_valid_scan()) + + def test_list_scans(self): + self.assertIsNone(self.scanner.list_scans()) + + def test_pause(self): + self.assertIsNone(self.scanner.pause()) + + def test_resume(self): + self.assertIsNone(self.scanner.resume()) + + def test_stop(self): + self.assertIsNone(self.scanner.stop()) + + def test_get_address(self): + target = 'http://example.com' + result = self.scanner._get_address(target) + self.assertEqual(result, 'example.com') + + def test_process_for_duplicates(self): + scan_results = {'vuln1': {'name': 'vuln1'}} + result = self.scanner._process_for_duplicates(scan_results) + self.assertEqual(result, scan_results) + + def test_print_scan_status(self): + scan_status_list = [{'scanner': 'ZAP', 'status': 'COMPLETE'}] + self.scanner.print_scan_status(scan_status_list) + + def test_print_report(self): + scan_results = {'vuln1': {'name': 'vuln1', 'risk': 'High', 'severity': 8.5, 'cve_id': 'CVE-1234', 'urls': ['http://example.com'], 'description': 'desc', 'solution': 'sol', 'reported_by': 'ZAP'}} + self.scanner.print_report(scan_results) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_storage_service.py b/tests/test_storage_service.py new file mode 100644 index 0000000..57d0b68 --- /dev/null +++ b/tests/test_storage_service.py @@ -0,0 +1,40 @@ +import unittest +from unittest.mock import patch, MagicMock +from core.storage_service import StorageService + +class TestStorageService(unittest.TestCase): + + @patch('core.storage_service.TinyDB') + def setUp(self, MockTinyDB): + self.mock_db = MockTinyDB.return_value + self.storage_service = StorageService() + + def test_add(self): + data = {'scan_name': 'test_scan'} + self.storage_service.add(data) + self.mock_db.insert.assert_called_once_with(data) + + def test_get_by_name(self): + scan_name = 'test_scan' + self.storage_service.get_by_name(scan_name) + self.mock_db.get.assert_called_once_with({'scan_name': scan_name}) + + def test_get_by_id(self): + scan_id = 1 + self.storage_service.get_by_id(scan_id) + self.mock_db.get.assert_called_once_with({'scan_id': scan_id}) + + def test_update_by_name(self): + scan_name = 'test_scan' + data = {'status': 'completed'} + self.storage_service.update_by_name(scan_name, data) + self.mock_db.update.assert_called_once_with(data, {'scan_name': scan_name}) + + def test_update_by_id(self): + scan_id = 1 + data = {'status': 'completed'} + self.storage_service.update_by_id(scan_id, data) + self.mock_db.update.assert_called_once_with(data, {'scan_id': scan_id}) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_zap_scanner.py b/tests/test_zap_scanner.py new file mode 100644 index 0000000..fb0aec9 --- /dev/null +++ b/tests/test_zap_scanner.py @@ -0,0 +1,121 @@ +import unittest +from unittest.mock import patch, MagicMock +from scanners.zap_scanner import ZapScanner + +class TestZapScanner(unittest.TestCase): + + @patch('scanners.zap_scanner.ZAPv2') + @patch('scanners.zap_scanner.StorageService') + def setUp(self, MockStorageService, MockZAPv2): + self.mock_zap = MockZAPv2.return_value + self.mock_storage_service = MockStorageService.return_value + self.zap_scanner = ZapScanner() + + def test_start(self): + scan_name = 'test_scan' + target = 'http://example.com' + self.zap_scanner.scan = MagicMock(return_value=True) + result = self.zap_scanner.start(scan_name, target) + self.assertTrue(result) + self.zap_scanner.scan.assert_called_once_with(scan_name, target) + + def test_pause(self): + scan_name = 'test_scan' + scan_data = {'scan_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.zap_scanner.pause(scan_name) + self.assertEqual(result, scan_data) + self.mock_zap.spider.pause.assert_called_once_with(scan_data['scan_id']) + self.mock_zap.ascan.pause.assert_called_once_with(scan_data['scan_id']) + + def test_resume(self): + scan_name = 'test_scan' + scan_data = {'scan_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.zap_scanner.resume(scan_name) + self.assertEqual(result, scan_data) + self.mock_zap.spider.resume.assert_called_once_with(scan_data['scan_id']) + self.mock_zap.ascan.resume.assert_called_once_with(scan_data['scan_id']) + + def test_stop(self): + scan_name = 'test_scan' + scan_data = {'scan_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.zap_scanner.stop(scan_name) + self.assertEqual(result, scan_data) + self.mock_zap.spider.stop.assert_called_once_with(scan_data['scan_id']) + self.mock_zap.ascan.stop.assert_called_once_with(scan_data['scan_id']) + + def test_remove(self): + scan_name = 'test_scan' + scan_data = {'scan_id': 1} + self.mock_storage_service.get_by_name.return_value = scan_data + result = self.zap_scanner.remove(scan_name) + self.assertEqual(result, scan_data['scan_id']) + self.mock_zap.spider.removeScan.assert_called_once_with(scan_data['scan_id']) + self.mock_zap.ascan.removeScan.assert_called_once_with(scan_data['scan_id']) + + def test_scan(self): + scan_name = 'test_scan' + target = 'http://example.com' + scan_id = 1 + active_scan_id = 2 + self.mock_zap.spider.scan.return_value = scan_id + self.mock_zap.ascan.scan.return_value = active_scan_id + scan_data = { + 'scan_name': scan_name, + 'scan_id': '', + 'target': target, + 'status': '' + } + self.mock_storage_service.get_by_name.return_value = None + result = self.zap_scanner.scan(scan_name, target) + self.assertEqual(result['ZAP']['zap_id'], scan_id) + self.assertEqual(result['ZAP']['active_scan_id'], active_scan_id) + self.assertEqual(result['ZAP']['scan_status']['status'], 'INPROGRESS') + self.mock_storage_service.add.assert_called_once_with(scan_data) + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, result) + + def test_get_scan_status(self): + scan_name = 'test_scan' + scan_data = { + 'ZAP': { + 'zap_id': 1, + 'scan_status': {} + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + self.mock_zap.spider.status.return_value = '100' + self.mock_zap.pscan.records_to_scan = 0 + self.mock_zap.ascan.status.return_value = '100' + scan_status_list = [] + result = self.zap_scanner.get_scan_status(scan_name, scan_status_list) + self.assertEqual(result[0]['scanner'], 'ZAP (spider_scan)') + self.assertEqual(result[0]['status'], 'COMPLETE (100%)') + self.assertEqual(result[1]['scanner'], 'ZAP (passive_scan)') + self.assertEqual(result[1]['status'], 'COMPLETE (0)') + self.assertEqual(result[2]['scanner'], 'ZAP (active_scan)') + self.assertEqual(result[2]['status'], 'COMPLETE (100%)') + self.mock_storage_service.update_by_name.assert_called_once_with(scan_name, scan_data) + + def test_get_scan_results(self): + scan_name = 'test_scan' + scan_data = { + 'ZAP': { + 'zap_id': 1 + }, + 'target': 'http://example.com' + } + self.mock_storage_service.get_by_name.return_value = scan_data + alerts = [{'name': 'alert1', 'risk': 'High', 'url': 'http://example.com'}] + self.mock_zap.core.alerts.return_value = alerts + scan_results = {} + result = self.zap_scanner.get_scan_results(scan_name, scan_results) + self.assertEqual(result['alert1']['name'], 'alert1') + self.assertEqual(result['alert1']['risk'], 'High') + self.assertEqual(result['alert1']['urls'], {'http://example.com'}) + self.assertEqual(result['alert1']['severity'], 8.5) + +if __name__ == '__main__': + unittest.main()