-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #147 from 0x41424142/totalcloud
Added totalcloud.get_remediation_activities and sql.upload_totalcloud_remediation_activities
- Loading branch information
Showing
10 changed files
with
318 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
qualysdk/totalcloud/data_classes/RemediationActivity.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
""" | ||
Contains the RemediationActivity dataclass. | ||
""" | ||
|
||
from dataclasses import dataclass, asdict | ||
from typing import Union, Literal | ||
from datetime import datetime | ||
|
||
from ...base.base_list import BaseList | ||
from ...exceptions.Exceptions import * | ||
|
||
|
||
@dataclass | ||
class RemediationActivity: | ||
""" | ||
Represents a remediation activity in TotalCloud. | ||
""" | ||
|
||
resourceId: str = None | ||
controlId: int = None | ||
cloudType: Literal["AWS", "AZURE"] = None | ||
# accountId depends on the cloudType | ||
accountId: Union[str, int] = None | ||
region: str = None | ||
status: str = None | ||
resourceType: str = None | ||
remediationAction: str = None | ||
connectorName: str = None | ||
policyNames: BaseList[str] = None | ||
controlName: str = None | ||
triggeredOn: Union[str, datetime] = None | ||
Errors: str = None | ||
triggeredBy: str = None | ||
remediationReason: str = None | ||
|
||
def __post_init__(self): | ||
DT_FIELDS = ["triggeredOn"] | ||
|
||
for field in DT_FIELDS: | ||
if not isinstance(getattr(self, field), datetime): | ||
setattr(self, field, datetime.fromisoformat(getattr(self, field))) | ||
|
||
if self.cloudType: | ||
if self.cloudType.upper() == "AWS": | ||
self.accountId = int(self.accountId) | ||
|
||
if self.Errors: | ||
setattr(self, "errors", str(self.Errors)) | ||
|
||
if self.policyNames: | ||
bl = BaseList() | ||
for policy in self.policyNames: | ||
bl.append(policy) | ||
setattr(self, "policyNames", bl) | ||
|
||
def to_dict(self) -> dict: | ||
""" | ||
Converts the object to a dictionary. | ||
Returns: | ||
dict: The object as a dictionary. | ||
""" | ||
return asdict(self) | ||
|
||
def __dict__(self) -> dict: | ||
return self.to_dict() | ||
|
||
def keys(self) -> list: | ||
""" | ||
Returns the keys of the object. | ||
Returns: | ||
list: The keys of the object. | ||
""" | ||
return self.to_dict().keys() | ||
|
||
def values(self) -> list: | ||
""" | ||
Returns the values of the object. | ||
Returns: | ||
list: The values of the object. | ||
""" | ||
return self.to_dict().values() | ||
|
||
def items(self) -> list: | ||
""" | ||
Returns the items of the object. | ||
Returns: | ||
list: The items of the object. | ||
""" | ||
return self.to_dict().items() | ||
|
||
@classmethod | ||
def from_dict(cls, data: dict): | ||
""" | ||
Creates a RemediationActivity object from a dictionary. | ||
Args: | ||
data (dict): The dictionary to create the object from. | ||
Returns: | ||
RemediationActivity: The object created from the dictionary. | ||
""" | ||
return cls(**data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
""" | ||
Contains the get_remediation_activities function for Totalcloud | ||
""" | ||
|
||
from typing import Union, Literal | ||
|
||
from ..base.call_api import call_api | ||
from ..base.base_list import BaseList | ||
from ..auth.token import BasicAuth | ||
from ..exceptions.Exceptions import * | ||
from .data_classes.RemediationActivity import RemediationActivity | ||
|
||
|
||
def get_remediation_activities( | ||
auth: BasicAuth, | ||
provider: Literal["aws", "azure"], | ||
page_count: Union[int, "all"] = "all", | ||
**kwargs, | ||
) -> BaseList[dict]: | ||
""" | ||
Pull remediation activity by cloud provider | ||
Args: | ||
auth (BasicAuth): The authentication object. | ||
provider (Literal['aws', 'azure']): The cloud provider of the resource. | ||
page_count (Union[int, 'all']): The number of pages to retrieve. If 'all', all pages are retrieved. | ||
## Kwargs: | ||
- filter (str): Filter resources by providing a QQL [query](https://docs.qualys.com/en/cloudview/latest/search_tips/search_remediation_activity.htm?rhhlterm=search%20remediation%20activity%20activities&rhsearch=Search%20for%20Remediation%20Activity) | ||
- pageNo (int): The ordered page to start retrieving resources from, or if page_count is 1, the page to retrieve. | ||
- pageSize (int): The number of resources to get per page. | ||
Returns: | ||
BaseList[dict]: A list of dictionaries containing the remediation activities. WARNING: I have not seen the response from this API, so I am not sure what the response will look like. | ||
""" | ||
|
||
if (page_count != "all" and (not isinstance(page_count, int))) or ( | ||
isinstance(page_count, int) and page_count < 1 | ||
): | ||
raise ValueError("page_count must be an integer >=1 or 'all'.") | ||
|
||
if provider.lower() not in ["aws", "azure"]: | ||
raise ValueError("Invalid provider. Must be 'aws' or 'azure'.") | ||
|
||
params = { | ||
"cloudType": provider.upper(), | ||
} | ||
|
||
# Add in kwargs to the params | ||
if kwargs: | ||
params.update(kwargs) | ||
|
||
pulled_pages = 0 | ||
bl = BaseList() | ||
|
||
while True: | ||
# Call the API | ||
response = call_api( | ||
auth=auth, | ||
module="cloudview", | ||
endpoint="get_remediation_activities", | ||
params=params, | ||
) | ||
|
||
# Check the response | ||
if response.status_code != 200: | ||
raise QualysAPIError(f"{response.status_code}: {response.text}") | ||
|
||
j = response.json() | ||
|
||
if "errorCode" in j: | ||
raise QualysAPIError(f"{j['errorCode']}, {j['message']}") | ||
|
||
if "content" not in j.keys() or not j["pageable"].get("empty"): | ||
print("No content in response") | ||
break | ||
|
||
data = j["content"] | ||
if isinstance(data, dict): | ||
data = [data] | ||
for item in data: | ||
bl.append(RemediationActivity.from_dict(item)) | ||
|
||
# Check if we need to pull more pages | ||
if page_count != "all": | ||
pulled_pages += 1 | ||
if pulled_pages >= page_count: | ||
print("Reached page limit of", page_count) | ||
break | ||
|
||
return bl |