Skip to content

Commit

Permalink
Merge pull request #9 from woodtechie1428/perform_request
Browse files Browse the repository at this point in the history
changes in perform_request & exception handling
  • Loading branch information
woodtechie1428 authored Sep 28, 2021
2 parents 57b2359 + 17b58a3 commit 758dc5f
Show file tree
Hide file tree
Showing 13 changed files with 769 additions and 1,137 deletions.
162 changes: 92 additions & 70 deletions qualysclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
import requests

from qualysclient.defaults import AUTH_URI, BASE_URI
from qualysclient.defaults import AUTH_URI


from qualysclient._util import _api_request


class QualysClient:
"""
a simple client for interacting with the Qualys API
"""

def __init__(self, username=None, password=None):
# TODO throw exception for missing creds
self.username = username
self.password = password
self.s = requests.Session()
self.s.headers.update({'X-Requested-With':'Qualys Client - Python'})
self.s.headers.update({"X-Requested-With": "Qualys Client - Python"})

if (username):
if username:
self.login(username=username, password=password)


def __enter__(self, username=None, password=None):
return self

def __exit__(self, exc_type, exc_value, tb):
import traceback

if exc_type is not None:
traceback.print_exception(exc_type, exc_value, tb)
self.logout()
Expand All @@ -39,155 +41,175 @@ def login(self, username, password):
:param password: Qualys Password
:type password: str
"""
if (isinstance(username, str) == False or isinstance(password, str) == False):
if isinstance(username, str) is False or isinstance(password, str) is False:
raise Exception("username and password is required and must be of type str")

payload = {
'action': 'login',
'username': username,
'password': password
}

payload = {"action": "login", "username": username, "password": password}
r = self.s.post(AUTH_URI, payload)
if (r.status_code != 200):
print('Status:', r.status_code, 'Headers:', r.headers, 'Error Response:', r.content)
if r.status_code != 200:
print(
"Status:",
r.status_code,
"Headers:",
r.headers,
"Error Response:",
r.content,
)

return self


def logout(self):
"""
Log out of authenticated sessionn
"""
payload = {
'action': 'logout'
}
payload = {"action": "logout"}
r = self.s.post(AUTH_URI, data=payload)
if (r.status_code != 200):
print('\nStatus:', r.status_code, '\nHeaders:', r.headers, '\nError Response:', r.content)

#REPORTS
if r.status_code != 200:
print(
"\nStatus:",
r.status_code,
"\nHeaders:",
r.headers,
"\nError Response:",
r.content,
)

# REPORTS
def list_reports(self, **kwargs):
"""
View a list of reports in the user’s account when Report Share feature is enabled. The report list output includes all report types, including scorecard reports.
View a list of reports in the user’s account when Report Share feature is enabled.
The report list output includes all report types, including scorecard reports.
:param id: Report ID
:type id: int
:return: (response.status_code, response.content)
:rtype: (int, Response)
"""

api_action = 'list_reports'
api_action = "list_reports"
return _api_request(self, api_action, **kwargs)

def launch_report(self, **kwargs):
api_action = 'launch_report'
def launch_report(self, **kwargs) -> requests.Response:
"""
Launch a report in the user's account. The Report Share feature must be enabled in the
user's subscription. When a report is launched with Report Share, the report is run in the
background, and the report generation processing does not timeout until the report has
completed.
Keyword Args:
template_id (int): (Required) The template ID of the report you want to launch.
Returns:
requests.Response: raw requests.Response object
"""
api_action = "launch_report"
return _api_request(self, api_action, **kwargs)


def launch_scorecard(self, **kwargs):
api_action = 'launch_scorecard'
def launch_scorecard(self, **kwargs) -> requests.Response:
"""
Launch a vulnerability scorecard report in the user’s Report Share. It is not possible to
launch any compliance scorecard reports or WAS scorecard reports using this API at this time.
When a scorecard report is launched, the report is run in the background, and the report
generation processing does not timeout until the report has completed.
Returns:
requests.Response: raw requests.Response object
"""
api_action = "launch_scorecard"
return _api_request(self, api_action, **kwargs)

def cancel_running_report(self, **kwargs):
api_action = 'cancel_running_report'
api_action = "cancel_running_report"
return _api_request(self, api_action, **kwargs)

def download_saved_report(self, **kwargs):
api_action = 'download_saved_report'
api_action = "download_saved_report"
return _api_request(self, api_action, **kwargs)

# ASSET

#ASSET

def list_ip(self, **kwargs):
api_action = 'list_ip'
api_action = "list_ip"
return _api_request(self, api_action, **kwargs)

def add_ips(self, **kwargs):
api_action = 'add_ips'
api_action = "add_ips"
return _api_request(self, api_action, **kwargs)

def update_ips(self, **kwargs):
api_action = 'update_ips'
api_action = "update_ips"
return _api_request(self, api_action, **kwargs)

def host_list(self, truncation_limit=10, **kwargs):
api_action = 'host_list'
if (kwargs.get('truncation_limit') is None):
kwargs['truncation_limit'] = truncation_limit
api_action = "host_list"
if kwargs.get("truncation_limit") is None:
kwargs["truncation_limit"] = truncation_limit
return _api_request(self, api_action, **kwargs)

def host_list_detection(self, truncation_limit=10, **kwargs):
api_action = 'host_list_detection'
if (kwargs.get('truncation_limit') is None):
kwargs['truncation_limit'] = truncation_limit
api_action = "host_list_detection"
if kwargs.get("truncation_limit") is None:
kwargs["truncation_limit"] = truncation_limit
return _api_request(self, api_action, **kwargs)

def excluded_host_list(self, **kwargs):
api_action = 'update_ips'
api_action = "update_ips"
return _api_request(self, api_action, **kwargs)

def excluded_host_change_history(self, **kwargs):
api_action = 'update_ips'
api_action = "update_ips"
return _api_request(self, api_action, **kwargs)


#COMPLIANCE
# COMPLIANCE

def compliance_control_list(self, **kwargs):
api_action = 'compliance_control_list'
api_action = "compliance_control_list"
return _api_request(self, api_action, **kwargs)

def compliance_policy_list(self, **kwargs):
api_action = 'compliance_policy_list'
api_action = "compliance_policy_list"
return _api_request(self, api_action, **kwargs)

def compliance_policy_export(self, **kwargs):
api_action = 'compliance_policy_export'
api_action = "compliance_policy_export"
return _api_request(self, api_action, **kwargs)

def compliance_policy_import(self, **kwargs):
api_action = 'compliance_policy_import'
api_action = "compliance_policy_import"
return _api_request(self, api_action, **kwargs)

def compliance_policy_merge(self, **kwargs):
api_action = 'compliance_policy_merge'
api_action = "compliance_policy_merge"
return _api_request(self, api_action, **kwargs)

def compliance_policy_add_asset_group_ids(self, **kwargs):
api_action = 'compliance_policy_add_asset_group_ids'
api_action = "compliance_policy_add_asset_group_ids"
return _api_request(self, api_action, **kwargs)

def compliance_policy_set_asset_group_ids(self, **kwargs):
api_action = 'compliance_policy_set_asset_group_ids'
api_action = "compliance_policy_set_asset_group_ids"
return _api_request(self, api_action, **kwargs)

def compliance_policy_add_asset_groups(self, **kwargs):
api_action = 'compliance_policy_add_asset_groups'
api_action = "compliance_policy_add_asset_groups"
return _api_request(self, api_action, **kwargs)

def compliance_posture_list(self, **kwargs):
api_action = 'compliance_posture_list'
api_action = "compliance_posture_list"
return _api_request(self, api_action, **kwargs)

def compliance_exception_list(self, **kwargs):
api_action = 'compliance_exception_list'
api_action = "compliance_exception_list"
return _api_request(self, api_action, **kwargs)

def compliance_exception_request(self, **kwargs):
api_action = 'compliance_exception_request'
api_action = "compliance_exception_request"
return _api_request(self, api_action, **kwargs)

def compliance_exception_update(self, **kwargs):
api_action = 'compliance_exception_update'
api_action = "compliance_exception_update"
return _api_request(self, api_action, **kwargs)

def compliance_exception_delete(self, **kwargs):
api_action = 'compliance_exception_delete'
api_action = "compliance_exception_delete"
return _api_request(self, api_action, **kwargs)






17 changes: 8 additions & 9 deletions qualysclient/_endpoints/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._ep_reports import _reports_endpoints
from ._ep_reports import _reports_endpoints
from ._ep_assets import _assets_endpoints
from ._ep_compliance import _compliance_endpoints
from ._ep_session import _session_endpoints
Expand All @@ -7,13 +7,12 @@
api_actions = {}


all_endpoints = [*_reports_endpoints,
*_assets_endpoints,
*_compliance_endpoints,
*_session_endpoints]
all_endpoints = [
*_reports_endpoints,
*_assets_endpoints,
*_compliance_endpoints,
*_session_endpoints,
]

for a in all_endpoints:
api_actions[a[0]] = APIAction(a[0],
api_endpoint=a[1],
input_params=a[2]
)
api_actions[a[0]] = APIAction(a[0], api_endpoint=a[1], input_params=a[2])
Loading

0 comments on commit 758dc5f

Please sign in to comment.