diff --git a/docs/authentication.md b/docs/authentication.md index e616ec0..3acc514 100644 --- a/docs/authentication.md +++ b/docs/authentication.md @@ -20,6 +20,8 @@ with BasicAuth(,, platform='qg1') as auth: >>>qualysdk.exceptions.Exceptions.AuthTypeMismatchError: Auth type mismatch. Expected token but got basic. ``` +>**Head's Up!**: Automatic rate limit respecting, described below, is not available for Patch Management API calls. Qualys does not return the headers necessary to determine rate limit windows for these endpoints. + Both authentication objects also support automatic rate limit respecting. The SDK will warn you as you get close to an API endpoint's limit and automatically sleep until the limit is lifted, continuing the call afterwards: ```plaintext diff --git a/docs/patch.md b/docs/patch.md index fb5d6d4..d474ce2 100644 --- a/docs/patch.md +++ b/docs/patch.md @@ -29,6 +29,8 @@ You can use any of the endpoints currently supported: | ```get_asset_count``` | Returns the number of assets for a given platform that match ```query``` and ```havingQuery```. | | ```lookup_host_uuids``` | Returns a list of tuples, containing host UUIDs for a given list of asset IDs. | | ```get_patch_catalog``` | Returns the patch catalog for a given platform according to ```patchId```. | +| ```get_packages_in_linux_patch``` | Returns the packages associated with a Linux patch. | +| ```get_products_in_windows_patch``` | Returns the products associated with a Windows patch. | ## Get PM Version API @@ -686,6 +688,90 @@ catalog = get_patch_catalog( ] ``` +## Get Packages Associated with Linux Patches API + +```get_packages_in_linux_patch``` returns the packages associated with a Linux patch. + +If a ```BaseList``` or a ```list``` of patch IDs is passed, the function will use threading to speed up the process. + +> Warning: You should filter down the patches as much as possible before passing them into this function. If you bulk-pass in a lot of patches, you will almost certainly hit a rate limit. **PM APIs do not return the headers necessary for the SDK to auto-recover from rate limits.** + + +|Parameter| Possible Values |Description| Required| +|--|--|--|--| +|```auth```|```qualysdk.auth.TokenAuth``` | Authentication object | ✅ | +| ```patchId``` | ```Union[str, BaseList[str]]``` | The ID(s) of the patch to get packages for | ✅ | +| ```threads``` | ```int=5``` | The number of threads to use for the lookup. | ❌ | +| ```filter``` | ```str``` | The QQL filter to search for packages | ❌ | +| ```pageNumber``` | ```int``` | The page number to return. The SDK will handle pagination for you. Users can use this if ```page_count``` is 1 to pull a specific page. | ❌ | +| ```pageSize``` | ```int=10``` | The number of packages to return per page | ❌ | + +```py +from qualysdk.auth import TokenAuth +from qualysdk.pm import get_packages_in_linux_patch, get_patches + +auth = TokenAuth(, , platform='qg1') + +# Get some Linux patches: +patches = get_patches( + auth, + platform='linux', + attributes="id", + query="vendorSeverity:Critical" +) + +# Get the packages for the patches: +packages = get_packages_in_linux_patch( + auth, + [patch.id for patch in patches] +) +>>>PackageDetail( + packageName='minidlna_1.3.0+dfsg-2+deb11u2', + architecture='noarch', + patchId='48e7d965-5f86-3118-a35f-b8fd1463e6b0' +) +``` + +## Get Products Associated with Windows Patches API + +```get_products_in_windows_patch``` returns the products associated with a Windows patch. + +If a ```BaseList``` or a ```list``` of patch IDs is passed, the function will use threading to speed up the process. + +> Warning: You should filter down the patches as much as possible before passing them into this function. If you bulk-pass in a lot of patches, you will almost certainly hit a rate limit. **PM APIs do not return the headers necessary for the SDK to auto-recover from rate limits.** + + +|Parameter| Possible Values |Description| Required| +|--|--|--|--| +|```auth```|```qualysdk.auth.TokenAuth``` | Authentication object | ✅ | +| ```patchId``` | ```Union[str, BaseList[str]]``` | The ID(s) of the patch to get products for | ✅ | +| ```threads``` | ```int=5``` | The number of threads to use for the lookup. | ❌ | + +```py +from qualysdk.auth import TokenAuth +from qualysdk.pm import get_products_in_windows_patch, get_patches + +auth = TokenAuth(, , platform='qg1') + +# Get some Windows patches: +patches = get_patches( + auth, + platform='windows', + attributes="id", + query="vendorSeverity:Critical" +) + +# Get the products for the patches: +products = get_products_in_windows_patch( + auth, + [patch.id for patch in patches] +) +>>>AssociatedProduct( + product=['Adobe Audition 2024 24 x64'], + patchId='2c1649c0-a18a-3f77-8c52-a6ea297ab295' +) +``` + ## ```qualysdk-pm``` CLI tool diff --git a/docs/sql.md b/docs/sql.md index 4faca32..7060857 100644 --- a/docs/sql.md +++ b/docs/sql.md @@ -140,6 +140,8 @@ The final optional parameter is ```table_name```. If you want to specify a custo | ```upload_pm_assets``` | Patch Management | ```pm.get_assets()``` | ```pm_assets``` | | ```upload_pm_assetids_to_uuids``` | Patch Management | ```pm.lookup_host_uuids()``` | ```pm_assetids_to_uuids``` | | ```upload_pm_patch_catalog``` | Patch Management | ```pm.get_patch_catalog()``` | ```pm_patch_catalog``` | +| ```upload_pm_linux_packages``` | Patch Management | ```pm.get_packages_in_linux_patch()``` | ```pm_linux_packages``` | +| ```upload_pm_windows_products``` | Patch Management | ```pm.get_products_in_windows_patch()``` | ```pm_windows_products``` | | ```upload_cert_certs``` | Certificate View | ```cert.list_certs()``` | ```cert_certs``` for certificates and ```cert_assets``` for assets (key = certs.id -> assets.certId) | diff --git a/poetry.lock b/poetry.lock index 6dcf36f..483b60e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -690,13 +690,13 @@ pyyaml = ">=5.1" [[package]] name = "mkdocs-material" -version = "9.5.48" +version = "9.5.49" description = "Documentation that simply works" optional = false python-versions = ">=3.8" files = [ - {file = "mkdocs_material-9.5.48-py3-none-any.whl", hash = "sha256:b695c998f4b939ce748adbc0d3bff73fa886a670ece948cf27818fa115dc16f8"}, - {file = "mkdocs_material-9.5.48.tar.gz", hash = "sha256:a582531e8b34f4c7ed38c29d5c44763053832cf2a32f7409567e0c74749a47db"}, + {file = "mkdocs_material-9.5.49-py3-none-any.whl", hash = "sha256:c3c2d8176b18198435d3a3e119011922f3e11424074645c24019c2dcf08a360e"}, + {file = "mkdocs_material-9.5.49.tar.gz", hash = "sha256:3671bb282b4f53a1c72e08adbe04d2481a98f85fed392530051f80ff94a9621d"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 4aa1dad..06b08f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "qualysdk" -version = "0.2.4" +version = "0.2.5" description = "SDK for interacting with Qualys APIs, across most modules the platform offers." authors = ["0x41424142 ", "0x4A616B65 "] maintainers = ["Jake Lindsay "] diff --git a/qualysdk/auth/basic.py b/qualysdk/auth/basic.py index df2e3c2..a6c5e08 100644 --- a/qualysdk/auth/basic.py +++ b/qualysdk/auth/basic.py @@ -95,7 +95,7 @@ def test_login(self, return_ratelimit: bool = False) -> Union[dict, None]: if self.platform != "qg1": url = f"https://qualysapi.{self.platform}.apps.qualys.com/msp/about.php" else: - url = f"https://qualysapi.qualys.com/msp/about.php" + url = "https://qualysapi.qualys.com/msp/about.php" """Requires basic auth. JWT is not supported for this endpoint.""" r = get(url, auth=(self.username, self.password)) diff --git a/qualysdk/base/call_api.py b/qualysdk/base/call_api.py index c3e9155..5011932 100644 --- a/qualysdk/base/call_api.py +++ b/qualysdk/base/call_api.py @@ -272,7 +272,8 @@ def call_api( and int(response.headers["X-RateLimit-Remaining"]) < 10 ) or response.status_code == 429: # Rate limit reached: - if int(response.headers["X-RateLimit-Remaining"]) == 0: + # Qualys does not return X-RateLimit headers for PM as of 12-2024. Sigh... + if module != "pm" and int(response.headers["X-RateLimit-Remaining"]) == 0: # Call API again for the X-RateLimit-ToWait-Sec header. # Qualys sometimes only includes this header when the rate limit is reached and retried: response = request( @@ -299,18 +300,18 @@ def call_api( to_wait = 3601 # Default to 1h 1s if no header is present. print( - "WARNING: You have reached the rate limit for this endpoint. " - + f"qualysdk will automatically sleep for {to_wait} seconds and try again at approximately {datetime.now() + timedelta(seconds=to_wait)}." + f"WARNING: You have reached the rate limit for this endpoint. qualysdk will automatically sleep for {to_wait} seconds and try again at approximately {datetime.now() + timedelta(seconds=to_wait)}." ) sleep(to_wait) # Go to next iteration of the loop to try again: continue - + # Qualys does not return X-RateLimit headers for PM. Sigh... + elif module == "pm": + return response else: # Almost at rate limit: print( - f"Warning: This endpoint will accept {response.headers['X-RateLimit-Remaining']} more calls before rate limiting you." - + f" qualysdk will automatically sleep once remaining calls hits 0." + f"Warning: This endpoint will accept {response.headers['X-RateLimit-Remaining']} more calls before rate limiting you. qualysdk will automatically sleep once remaining calls hits 0." ) return response diff --git a/qualysdk/base/call_schemas/pm_schema.py b/qualysdk/base/call_schemas/pm_schema.py index edcca55..89b2d5c 100644 --- a/qualysdk/base/call_schemas/pm_schema.py +++ b/qualysdk/base/call_schemas/pm_schema.py @@ -180,6 +180,35 @@ "pagination": False, "auth_type": "token", }, + "get_packages_in_linux_patch": { + "endpoint": "/pm/v1/patchcatalog/patch/packages", + "method": ["GET"], + "valid_params": [ + "patchUuid", + "patchId", + "filter", + "pageNumber", + "pageSize", + ], + "valid_POST_data": [], + "use_requests_json_data": False, + "return_type": "json", + "pagination": True, + "auth_type": "token", + }, + "get_products_in_windows_patch": { + "endpoint": "/pm/v1/patchcatalog/patch/{placeholder}/products", + "method": ["GET"], + "valid_params": [ + "placeholder", + "patchId", + ], + "valid_POST_data": [], + "use_requests_json_data": False, + "return_type": "json", + "pagination": False, + "auth_type": "token", + }, }, } ) diff --git a/qualysdk/base/printutils.py b/qualysdk/base/printutils.py index 850c7db..32b6f9a 100644 --- a/qualysdk/base/printutils.py +++ b/qualysdk/base/printutils.py @@ -5,7 +5,13 @@ disable_stdout -> bool: Disable printing. """ -import sys, os +import sys +import os -disable_stdout = lambda: setattr(sys, "stdout", open(os.devnull, "w")) -enable_stdout = lambda: setattr(sys, "stdout", sys.__stdout__) + +def disable_stdout(): + setattr(sys, "stdout", open(os.devnull, "w")) + + +def enable_stdout(): + setattr(sys, "stdout", sys.__stdout__) diff --git a/qualysdk/cloud_agent/data_classes/Agent.py b/qualysdk/cloud_agent/data_classes/Agent.py index 8387a31..311f104 100644 --- a/qualysdk/cloud_agent/data_classes/Agent.py +++ b/qualysdk/cloud_agent/data_classes/Agent.py @@ -115,21 +115,23 @@ def __post_init__(self): ] IP_ADDRESS_FIELDS = ["agentInfo_connectedFrom"] - for field in DATE_FIELDS: - if getattr(self, field): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + for date_field in DATE_FIELDS: + if getattr(self, date_field): + setattr( + self, date_field, datetime.fromisoformat(getattr(self, date_field)) + ) - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) - for field in FLOAT_FIELDS: - if getattr(self, field): - setattr(self, field, float(getattr(self, field))) + for float_field in FLOAT_FIELDS: + if getattr(self, float_field): + setattr(self, float_field, float(getattr(self, float_field))) - for field in IP_ADDRESS_FIELDS: - if getattr(self, field): - setattr(self, field, ip_address(getattr(self, field))) + for ip_field in IP_ADDRESS_FIELDS: + if getattr(self, ip_field): + setattr(self, field, ip_address(getattr(self, ip_field))) # Before setting none fields, let's build the BaseList objects if self.tags: @@ -265,9 +267,9 @@ def __post_init__(self): else: setattr(self, "networkInterface", None) - for field in TO_NONE_FIELDS: - if getattr(self, field) == {}: - setattr(self, field, None) + for none_field in TO_NONE_FIELDS: + if getattr(self, none_field) == {}: + setattr(self, none_field, None) if "isDockerHost" in asdict(self).keys(): setattr(self, "isDockerHost", self.isDockerHost == "true") diff --git a/qualysdk/gav/hosts.py b/qualysdk/gav/hosts.py index cc4dbcf..2c6ebae 100644 --- a/qualysdk/gav/hosts.py +++ b/qualysdk/gav/hosts.py @@ -158,6 +158,7 @@ class Host(BaseClass): cloudProvider_region: Optional[str] = None cloudProvider_spotInstance: Optional[bool] = None cloudProvider_subnetId: Optional[str] = None + cloudProvider_tags: Optional[Union[str, List[str], BaseList[str]]] = None cloudProvider_vpcId: Optional[str] = None # Azure: cloudProvider_imageOffer: Optional[str] = None @@ -523,6 +524,7 @@ def __post_init__(self): if tag.get("key") else f"{tag.get('name')}:{tag.get('value')}" ) + bl.append(s) setattr(self, "cloudProvider_tags", bl) else: diff --git a/qualysdk/pm/__init__.py b/qualysdk/pm/__init__.py index c72636c..866545d 100644 --- a/qualysdk/pm/__init__.py +++ b/qualysdk/pm/__init__.py @@ -16,4 +16,8 @@ from .version import get_version from .patches import get_patches, get_patch_count from .assets import get_assets, lookup_host_uuids -from .patchcatalog import get_patch_catalog +from .patchcatalog import ( + get_patch_catalog, + get_packages_in_linux_patch, + get_products_in_windows_patch, +) diff --git a/qualysdk/pm/assets.py b/qualysdk/pm/assets.py index fcc70fb..a6e60fa 100644 --- a/qualysdk/pm/assets.py +++ b/qualysdk/pm/assets.py @@ -5,7 +5,9 @@ from typing import Union, Literal, overload -from .base.threading_backend import _get_patches_or_assets as get_assets_backend +from .base.assets_patches_threading_backend import ( + _get_patches_or_assets as get_assets_backend, +) from .data_classes.PMAsset import Asset from ..auth.token import TokenAuth from ..base.base_list import BaseList diff --git a/qualysdk/pm/base/threading_backend.py b/qualysdk/pm/base/assets_patches_threading_backend.py similarity index 99% rename from qualysdk/pm/base/threading_backend.py rename to qualysdk/pm/base/assets_patches_threading_backend.py index 276dae9..62c3ed7 100644 --- a/qualysdk/pm/base/threading_backend.py +++ b/qualysdk/pm/base/assets_patches_threading_backend.py @@ -7,7 +7,7 @@ from ..data_classes.Patch import Patch from ..data_classes.PMAsset import Asset -from ..base.page_limit import check_page_size_limit +from .page_limit import check_page_size_limit from ...auth.token import TokenAuth from ...base.call_api import call_api from ...base.base_list import BaseList diff --git a/qualysdk/pm/data_classes/AssociatedProduct.py b/qualysdk/pm/data_classes/AssociatedProduct.py new file mode 100644 index 0000000..0ec028f --- /dev/null +++ b/qualysdk/pm/data_classes/AssociatedProduct.py @@ -0,0 +1,31 @@ +""" +Contains AssociatedProduct data class for products under a Qualys +Windows patch. +""" + +from dataclasses import dataclass + +from ...base.base_class import BaseClass +from ...base.base_list import BaseList + + +@dataclass +class AssociatedProduct(BaseClass): + """ + A data class representing a product associated with a Windows patch. + + product: BaseList[str] + The product name(s) associated with the patch. + patchId: str + The UUID of the associated patch. + """ + + product: BaseList[str] = None + patchId: str = None + + def __str__(self): + return f"{str(self.product)}" + + def __post_init__(self): + if self.product: + setattr(self, "product", BaseList(self.product)) diff --git a/qualysdk/pm/data_classes/CatalogPatch.py b/qualysdk/pm/data_classes/CatalogPatch.py index 39b9be6..cd56c3e 100644 --- a/qualysdk/pm/data_classes/CatalogPatch.py +++ b/qualysdk/pm/data_classes/CatalogPatch.py @@ -31,6 +31,7 @@ class PackageDetail(BaseClass): packageName: str = None architecture: str = None + patchId: str = None def __str__(self): return f"{self.packageName} ({self.architecture})" diff --git a/qualysdk/pm/data_classes/__init__.py b/qualysdk/pm/data_classes/__init__.py index 2c273bd..9ddab98 100644 --- a/qualysdk/pm/data_classes/__init__.py +++ b/qualysdk/pm/data_classes/__init__.py @@ -8,3 +8,4 @@ from .PMRun import PMRun from .Patch import Patch from .CatalogPatch import CatalogPatch +from .AssociatedProduct import AssociatedProduct diff --git a/qualysdk/pm/jobs.py b/qualysdk/pm/jobs.py index becb5dd..b4dad07 100644 --- a/qualysdk/pm/jobs.py +++ b/qualysdk/pm/jobs.py @@ -198,7 +198,7 @@ def list_jobs( ), ] - print(f"Spawned threads for both Windows and Linux jobs...") + print("Spawned threads for both Windows and Linux jobs...") for thread in threads: thread.start() diff --git a/qualysdk/pm/patchcatalog.py b/qualysdk/pm/patchcatalog.py index d201155..1f91768 100644 --- a/qualysdk/pm/patchcatalog.py +++ b/qualysdk/pm/patchcatalog.py @@ -3,8 +3,12 @@ """ from typing import Union, Literal, overload +from queue import Queue +from threading import Thread, Lock +from datetime import datetime -from .data_classes.CatalogPatch import CatalogPatch +from .data_classes.CatalogPatch import CatalogPatch, PackageDetail +from .data_classes.AssociatedProduct import AssociatedProduct from ..base.base_list import BaseList from ..auth.token import TokenAuth from ..base.call_api import call_api @@ -91,7 +95,7 @@ def get_patch_catalog( params=params, ) - if not result.status_code in range(200, 299): + if result.status_code not in range(200, 299): raise QualysAPIError(result.json()) # Remove processed patchIds from the list: @@ -107,3 +111,277 @@ def get_patch_catalog( print(f"Pulled {pulled} chunks of 1K patch catalog entries") return results + + +def _get_products_or_packages_in_patch( + auth: TokenAuth, endpoint: Literal["products", "packages"], **kwargs +): + """ + Backend function for user-facing get_packages_in_linux_patch + and get_products_in_windows_patch functions. + + Args: + auth (TokenAuth): The authentication object. + endpoint (Literal['products', 'packages']): The endpoint to call. + + ## Kwargs: + + - any other kwargs from the calling function + + Returns: + response (requests.Response): The response object. + """ + + if endpoint not in ["products", "packages"]: + raise ValueError(f"endpoint must be 'products' or 'packages', not {endpoint}") + + if endpoint == "products": + kwargs["placeholder"] = kwargs.pop("patchId") + return call_api( + auth, + "pm", + "get_products_in_windows_patch", + params=kwargs, + ) + elif endpoint == "packages": + # Rename patchId to patchUuid. Qualys quirk. + kwargs["patchUuid"] = kwargs.pop("patchId") + return call_api( + auth, + "pm", + "get_packages_in_linux_patch", + params=kwargs, + ) + + +def _thread_worker( + auth: TokenAuth, + endpoint: Literal["products", "packages"], + patchQueue: Queue, + responses: BaseList, + LOCK: Lock, + tracker: datetime, + error_flag: bool, + **kwargs, +): + """ + Worker function for threading in get_packages_in_linux_patch and get_products_in_windows_patch. + + Args: + auth (TokenAuth): The authentication object. + endpoint (Literal['products', 'packages']): The endpoint to call. + patchQueue (Queue): The queue of patch IDs. + responses (BaseList): The list to append responses to. + LOCK (Lock): The threading lock. + tracker (datetime): The datetime object to track time. + error_flag (bool): The flag to track errors across threads. + + ## Kwargs: + + - any other kwargs from the calling function + """ + + kwargs = {} if not kwargs else kwargs + + if not kwargs.get("pageNumber") and endpoint == "packages": + kwargs["pageNumber"] = 0 + try: # use error_flag to prevent multiple threads from printing the same error. We can also return what we have so far if an error occurs. + while not patchQueue.empty(): + # We dont need to differentiate the kwarg here, backend will handle it + if error_flag: + print("Error flag is set. Returning what was collected so far...") + return + kwargs["patchId"] = patchQueue.get_nowait() + while True: + response = _get_products_or_packages_in_patch(auth, endpoint, **kwargs) + if response.status_code not in range(200, 299): + if response.text: + raise QualysAPIError(response.json()) + elif response.status_code == 429: + raise QualysAPIError( + "Qualys has rate limited you. Please try again later." + ) + else: + raise QualysAPIError( + f"Qualys returned status code {response.status_code}. {response.text}" + ) + j = response.json() + for obj in j: + obj["patchId"] = kwargs["patchId"] + if endpoint == "products": + responses.append(AssociatedProduct(**obj)) + elif endpoint == "packages": + responses.append(PackageDetail(**obj)) + # Or handles initial page number + if kwargs.get("pageNumber") or kwargs.get("pageNumber") == 0: + kwargs["pageNumber"] += 1 + # Kind of a weird if statement admittedly, but we + # do this to compensate for Windows patches endpoint + # not having any kwargs, specifically pageNumber/pageSize: + if (len(j) < kwargs.get("pageSize", 10)) or not j: + break + # Depending on the patches and the len() of their associated + # patches/products list, this print may fire at inconsistent intervals + with LOCK: + # Use lastReportedTime for printing how far we are every ~15 seconds + if (datetime.now() - tracker).seconds >= 15: + print(f"{len(responses)} {endpoint} processed...") + tracker = datetime.now() + except Exception as e: + with LOCK: + error_flag = True + print(f"Error in thread: {e} Returning what was collected so far...") + + +def validate_threads_and_patches(patchId, threads): + if type(patchId) not in [str, list, BaseList]: + raise ValueError("patchId must be a string, list, or BaseList") + + if not isinstance(threads, int) or threads < 1: + raise ValueError("threads must be an integer greater than 0") + + if isinstance(patchId, str) and "," in patchId: + patchId = patchId.replace(" ", "").split(",") + + if not isinstance(patchId, (list, BaseList)): + patchId = [patchId] + return patchId + + +@overload +def get_packages_in_linux_patch( + auth: TokenAuth, patchId: str, threads: int = 5, **kwargs +) -> BaseList[PackageDetail]: + ... + + +@overload +def get_packages_in_linux_patch( + auth: TokenAuth, + patchId: Union[BaseList[str], list[str]], + threads: int = 5, + **kwargs, +) -> BaseList[PackageDetail]: + ... + + +def get_packages_in_linux_patch( + auth: TokenAuth, + patchId: Union[str, BaseList[str], list[str]], + threads: int = 5, + **kwargs, +) -> BaseList[PackageDetail]: + """ + Get a list of packages associated with a Linux patch. + + Args: + auth (TokenAuth): The authentication object. + patchId (Union[str, BaseList[str], list[str]]): The patch ID(s) to retrieve packages for. If a BaseList/list of patch IDs is provided, the SDK will use threading to speed up the process. + threads (int): The number of threads to use. Default is 5. + + ## Kwargs: + + - filter (str): A filter to apply to the results. Patch QQL is supported. + - PageNumber (int): The page number to retrieve. The SDK will automatically handle pagination unless page_count = 1, in which case it will return the page you specify. + - PageSize (int): The number of results to return per page. Used for pagination (the SDK will automatically handle pagination). Default is 10. + + Returns: + BaseList[PackageDetail]: A list of PackageDetail objects. + """ + + patchId = validate_threads_and_patches(patchId, threads) + LOCK = Lock() + patchQueue = Queue() + timeTracker = datetime.now() + error_in_thread = False + for patch in patchId: + patchQueue.put(patch) + responses = BaseList() + threadsList = [] + for i in range(threads): + t = Thread( + target=_thread_worker, + args=( + auth, + "packages", + patchQueue, + responses, + LOCK, + timeTracker, + error_in_thread, + ), + kwargs=kwargs, + name=f"Linux-Thread-{i}", + ) + t.start() + threadsList.append(t) + for t in threadsList: + t.join() + + return responses + + +@overload +def get_products_in_windows_patch( + auth: TokenAuth, + patchId: str, + threads: int = 5, +) -> BaseList[AssociatedProduct]: + ... + + +@overload +def get_products_in_windows_patch( + auth: TokenAuth, + patchId: Union[BaseList[str], list[str]], + threads: int = 5, +) -> BaseList[AssociatedProduct]: + ... + + +def get_products_in_windows_patch( + auth: TokenAuth, + patchId: Union[str, BaseList[str], list[str]], + threads: int = 5, +) -> BaseList[AssociatedProduct]: + """ + Get a list of products associated with a Windows patch. + + Args: + auth (TokenAuth): The authentication object. + patchId (Union[str, BaseList[str], list[str]]): The patch ID(s) to retrieve products for. If a BaseList/list of patch IDs is provided, the SDK will use threading to speed up the process. + threads (int): The number of threads to use. Default is 5. + + Returns: + BaseList[PackageDetail]: A list of PackageDetail objects. + """ + + patchId = validate_threads_and_patches(patchId, threads) + LOCK = Lock() + patchQueue = Queue() + timeTracker = datetime.now() + error_in_thread = False + for patch in patchId: + patchQueue.put(patch) + responses = BaseList() + threadsList = [] + for i in range(threads): + t = Thread( + target=_thread_worker, + args=( + auth, + "products", + patchQueue, + responses, + LOCK, + timeTracker, + error_in_thread, + ), + name=f"Windows-Thread-{i}", + ) + t.start() + threadsList.append(t) + for t in threadsList: + t.join() + + return responses diff --git a/qualysdk/pm/patches.py b/qualysdk/pm/patches.py index 86a5177..43941de 100644 --- a/qualysdk/pm/patches.py +++ b/qualysdk/pm/patches.py @@ -6,7 +6,9 @@ from typing import Union, Literal from .data_classes.Patch import Patch -from .base.threading_backend import _get_patches_or_assets as get_assets_backend +from .base.assets_patches_threading_backend import ( + _get_patches_or_assets as get_assets_backend, +) from ..auth.token import TokenAuth from ..base.call_api import call_api from ..base.base_list import BaseList diff --git a/qualysdk/sql/__init__.py b/qualysdk/sql/__init__.py index 2f0b9c1..bb8ebb8 100644 --- a/qualysdk/sql/__init__.py +++ b/qualysdk/sql/__init__.py @@ -74,5 +74,7 @@ upload_pm_assets, upload_pm_assetids_to_uuids, upload_pm_patch_catalog, + upload_pm_linux_packages, + upload_pm_windows_products, ) from .cert import upload_cert_certs diff --git a/qualysdk/sql/base.py b/qualysdk/sql/base.py index 2f6b85d..1962384 100644 --- a/qualysdk/sql/base.py +++ b/qualysdk/sql/base.py @@ -6,8 +6,6 @@ from dataclasses import dataclass from typing import Literal -from sqlalchemy import types - from pandas import DataFrame from sqlalchemy import create_engine, Connection, types diff --git a/qualysdk/sql/gav.py b/qualysdk/sql/gav.py index e5fd0a0..9ae8fbb 100644 --- a/qualysdk/sql/gav.py +++ b/qualysdk/sql/gav.py @@ -285,9 +285,6 @@ def upload_gav_hosts( "cloudProvider_imagePublisher": types.String().with_variant( TEXT(charset="utf8"), "mysql", "mariadb" ), - "cloudProvider_imagePublisher": types.String().with_variant( - TEXT(charset="utf8"), "mysql", "mariadb" - ), "cloudProvider_imageVersion": types.String().with_variant( TEXT(charset="utf8"), "mysql", "mariadb" ), diff --git a/qualysdk/sql/pm.py b/qualysdk/sql/pm.py index 85fd3e8..6ad1f12 100644 --- a/qualysdk/sql/pm.py +++ b/qualysdk/sql/pm.py @@ -684,3 +684,78 @@ def upload_pm_patch_catalog( # Upload the data: return upload_data(df, table_name, cnxn, COLS, override_import_dt) + + +def upload_pm_windows_products( + products: BaseList, + cnxn: Connection, + table_name: str = "pm_windows_products", + override_import_dt: datetime = None, +) -> int: + """ + Upload results from ```pm.get_products_in_windows_patch``` + to a SQL database. + + Args: + products (BaseList): A BaseList of AssociatedProduct objects. + cnxn (Connection): The Connection object to the SQL database. + table_name (str): The name of the table to upload to. Defaults to "pm_windows_products". + override_import_dt (datetime): If provided, will override the import_datetime column with this value. + + Returns: + int: The number of rows uploaded. + """ + + COLS = { + "patchId": types.String().with_variant( + TEXT(charset="utf8"), "mysql", "mariadb" + ), + "product": types.String().with_variant( + TEXT(charset="utf8"), "mysql", "mariadb" + ), + } + + # Prepare the dataclass for insertion: + df = DataFrame([prepare_dataclass(product) for product in products]) + + # Upload the data: + return upload_data(df, table_name, cnxn, COLS, override_import_dt) + + +def upload_pm_linux_packages( + packages: BaseList, + cnxn: Connection, + table_name: str = "pm_linux_packages", + override_import_dt: datetime = None, +) -> int: + """ + Upload results from ```pm.get_packages_in_linux_patch``` + to a SQL database. + + Args: + packages (BaseList): A BaseList of AssociatedProduct objects. + cnxn (Connection): The Connection object to the SQL database. + table_name (str): The name of the table to upload to. Defaults to "pm_linux_packages". + override_import_dt (datetime): If provided, will override the import_datetime column with this value. + + Returns: + int: The number of rows uploaded. + """ + + COLS = { + "patchId": types.String().with_variant( + TEXT(charset="utf8"), "mysql", "mariadb" + ), + "packageName": types.String().with_variant( + TEXT(charset="utf8"), "mysql", "mariadb" + ), + "architecture": types.String().with_variant( + TEXT(charset="utf8"), "mysql", "mariadb" + ), + } + + # Prepare the dataclass for insertion: + df = DataFrame([prepare_dataclass(package) for package in packages]) + + # Upload the data: + return upload_data(df, table_name, cnxn, COLS, override_import_dt) diff --git a/qualysdk/sql/vmdr.py b/qualysdk/sql/vmdr.py index af59bb6..1a8722e 100644 --- a/qualysdk/sql/vmdr.py +++ b/qualysdk/sql/vmdr.py @@ -848,7 +848,6 @@ def upload_vmdr_users( "LATEST_VULN": types.String().with_variant( TEXT(charset="utf8"), "mysql", "mariadb" ), - "MAP": types.String().with_variant(TEXT(charset="utf8"), "mysql", "mariadb"), "DAILY_TICKETS": types.Boolean(), "ASSET_GROUP_TITLE": types.String().with_variant( TEXT(charset="utf8"), "mysql", "mariadb" diff --git a/qualysdk/totalcloud/data_classes/Controls.py b/qualysdk/totalcloud/data_classes/Controls.py index 5dac4ac..8b5e9d3 100644 --- a/qualysdk/totalcloud/data_classes/Controls.py +++ b/qualysdk/totalcloud/data_classes/Controls.py @@ -6,16 +6,16 @@ from typing import Union from datetime import datetime -from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning - # suppress warning from bs4 import warnings -warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) +from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning from ...base.base_list import BaseList from ...base.base_class import BaseClass +warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) + @dataclass class Control(BaseClass): diff --git a/qualysdk/totalcloud/data_classes/Evaluation.py b/qualysdk/totalcloud/data_classes/Evaluation.py index 3c3f87f..0b00ab0 100644 --- a/qualysdk/totalcloud/data_classes/Evaluation.py +++ b/qualysdk/totalcloud/data_classes/Evaluation.py @@ -30,7 +30,7 @@ def __post_init__(self): if not isinstance(value, datetime): try: setattr(self, field, datetime.fromisoformat(value)) - except: + except (OSError, TypeError, ValueError, OverflowError): setattr(self, field, None) diff --git a/qualysdk/totalcloud/get_connectors.py b/qualysdk/totalcloud/get_connectors.py index a4c6c96..5829c8e 100644 --- a/qualysdk/totalcloud/get_connectors.py +++ b/qualysdk/totalcloud/get_connectors.py @@ -48,7 +48,7 @@ def get_connectors( if kwargs.get("filter"): # Make sure the query key is valid - if not kwargs["filter"].split(":")[0] in [ + if kwargs["filter"].split(":")[0] not in [ "name", "description", "state", @@ -71,7 +71,7 @@ def get_connectors( # Make the API request to retrieve the connectors response = call_api( - auth=auth, module="cloudview", endpoint=f"get_connectors", params=kwargs + auth=auth, module="cloudview", endpoint="get_connectors", params=kwargs ) if response.status_code != 200: @@ -134,7 +134,7 @@ def get_connector_details( response = call_api( auth=auth, module="cloudview", - endpoint=f"get_connector_details", + endpoint="get_connector_details", params={ "placeholder": connectorId, "cloudprovider": provider, diff --git a/qualysdk/totalcloud/get_metadata.py b/qualysdk/totalcloud/get_metadata.py index ff00b2b..dfef0e3 100644 --- a/qualysdk/totalcloud/get_metadata.py +++ b/qualysdk/totalcloud/get_metadata.py @@ -68,7 +68,7 @@ def get_control_metadata( "qflow.id", ] - if not kwargs["filter"].split(":")[0] in valid_controls: + if kwargs["filter"].split(":")[0] not in valid_controls: raise QualysAPIError( f"Invalid filter key. Valid keys: {', '.join(valid_controls)}" ) diff --git a/qualysdk/vmdr/base/helpers.py b/qualysdk/vmdr/base/helpers.py index bbcd79a..7452625 100644 --- a/qualysdk/vmdr/base/helpers.py +++ b/qualysdk/vmdr/base/helpers.py @@ -74,7 +74,7 @@ def prepare_args( threads = rl["X-Concurrency-Limit-Limit"] ( - print(f"Pulling/creating queue for full ID list...") + print("Pulling/creating queue for full ID list...") if not ids else print(f"Pulling/creating queue for user-specified IDs: {ids}...") ) diff --git a/qualysdk/vmdr/data_classes/activity_log.py b/qualysdk/vmdr/data_classes/activity_log.py index 42e9397..1069477 100644 --- a/qualysdk/vmdr/data_classes/activity_log.py +++ b/qualysdk/vmdr/data_classes/activity_log.py @@ -29,7 +29,7 @@ def __post_init__(self): if isinstance(self.Date, str): self.Date = datetime.fromisoformat(self.Date) - if self.User_IP and not type(self.User_IP) in [IPv4Address, IPv6Address]: + if self.User_IP and type(self.User_IP) not in [IPv4Address, IPv6Address]: try: self.User_IP = ip_address(self.User_IP) except ValueError: diff --git a/qualysdk/vmdr/data_classes/asset_group.py b/qualysdk/vmdr/data_classes/asset_group.py index 43cca1f..8cfddb1 100644 --- a/qualysdk/vmdr/data_classes/asset_group.py +++ b/qualysdk/vmdr/data_classes/asset_group.py @@ -139,30 +139,32 @@ def __post_init__(self): INT_LISTS = ["APPLIANCE_IDS", "ASSIGNED_USER_IDS", "ASSIGNED_UNIT_IDS"] STR_LISTS = ["DNS_LIST", "NETBIOS_LIST", "EC2_IDS", "COMMENTS", "DOMAIN_LIST"] - for field in STR_LISTS: - if getattr(self, field): - setattr(self, field, BaseList(getattr(self, field).split(","))) + for str_field in STR_LISTS: + if getattr(self, str_field): + setattr(self, str_field, BaseList(getattr(self, str_field).split(","))) - for field in INT_LISTS: - if getattr(self, field): - if isinstance(getattr(self, field), str): + for int_field in INT_LISTS: + if getattr(self, int_field): + if isinstance(getattr(self, int_field), str): setattr( self, - field, - BaseList([int(x) for x in getattr(self, field).split(",")]), + int_field, + BaseList([int(x) for x in getattr(self, int_field).split(",")]), ) else: setattr( - self, field, BaseList([int(x) for x in getattr(self, field)]) + self, + int_field, + BaseList([int(x) for x in getattr(self, int_field)]), ) - for field in DT_FIELDS: - if getattr(self, field): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + for dt_field in DT_FIELDS: + if getattr(self, dt_field): + setattr(self, dt_field, datetime.fromisoformat(getattr(self, dt_field))) - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) # Convert IP_SET to BaseList of ipaddress.* objs.: if self.IP_SET: diff --git a/qualysdk/vmdr/data_classes/detection.py b/qualysdk/vmdr/data_classes/detection.py index 6f6f98a..3fb2e4b 100644 --- a/qualysdk/vmdr/data_classes/detection.py +++ b/qualysdk/vmdr/data_classes/detection.py @@ -141,12 +141,12 @@ def __post_init__(self): INT_FIELDS = ["UNIQUE_VULN_ID", "QID", "SEVERITY", "TIMES_FOUND", "PORT", "ID"] - for field in DATETIME_FIELDS: + for dt_field in DATETIME_FIELDS: if ( - isinstance(getattr(self, field), str) - and getattr(self, field) is not None + isinstance(getattr(self, dt_field), str) + and getattr(self, dt_field) is not None ): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + setattr(self, dt_field, datetime.fromisoformat(getattr(self, dt_field))) # clean up fields that have html tags with catch_warnings(): diff --git a/qualysdk/vmdr/data_classes/kb_entry.py b/qualysdk/vmdr/data_classes/kb_entry.py index e45fd5a..fd73e3a 100644 --- a/qualysdk/vmdr/data_classes/kb_entry.py +++ b/qualysdk/vmdr/data_classes/kb_entry.py @@ -176,37 +176,45 @@ def __post_init__(self): HTML_FIELDS = ["DIAGNOSIS", "CONSEQUENCE", "SOLUTION"] - for field in DATE_FIELDS: - if getattr(self, field) is not None and not isinstance( - getattr(self, field), datetime + for dt_field in DATE_FIELDS: + if getattr(self, dt_field) is not None and not isinstance( + getattr(self, dt_field), datetime ): # special case for LAST_CUSTOMIZATION: - if field == "LAST_CUSTOMIZATION": - if isinstance(getattr(self, field), dict): + if dt_field == "LAST_CUSTOMIZATION": + if isinstance(getattr(self, dt_field), dict): setattr( self, - field, - datetime.fromisoformat(getattr(self, field)["DATETIME"]), + dt_field, + datetime.fromisoformat(getattr(self, dt_field)["DATETIME"]), ) else: setattr( - self, field, datetime.fromisoformat(getattr(self, field)) + self, + dt_field, + datetime.fromisoformat(getattr(self, dt_field)), ) else: - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + setattr( + self, dt_field, datetime.fromisoformat(getattr(self, dt_field)) + ) - for field in BOOL_FIELDS: - if getattr(self, field) and not isinstance(getattr(self, field), bool): - setattr(self, field, bool(getattr(self, field))) + for bool_field in BOOL_FIELDS: + if getattr(self, bool_field) and not isinstance( + getattr(self, bool_field), bool + ): + setattr(self, bool_field, bool(getattr(self, bool_field))) with catch_warnings(): simplefilter("ignore") # ignore the warning about the html.parser - for field in HTML_FIELDS: - if getattr(self, field): + for html_field in HTML_FIELDS: + if getattr(self, html_field): setattr( self, - field, - BeautifulSoup(getattr(self, field), "html.parser").get_text(), + html_field, + BeautifulSoup( + getattr(self, html_field), "html.parser" + ).get_text(), ) # convert the lists to BaseList objects: diff --git a/qualysdk/vmdr/data_classes/qvs.py b/qualysdk/vmdr/data_classes/qvs.py index 67b59fb..b78995c 100644 --- a/qualysdk/vmdr/data_classes/qvs.py +++ b/qualysdk/vmdr/data_classes/qvs.py @@ -54,9 +54,11 @@ def __post_init__(self): raise ValueError("The base attribute is required.") DT_FIELDS = ["qvsLastChangedDate", "nvdPublishedDate"] - for field in DT_FIELDS: - if field and not isinstance(getattr(self, field), datetime): - setattr(self, field, datetime.fromtimestamp(int(self.base.get(field)))) + for dt_field in DT_FIELDS: + if dt_field and not isinstance(getattr(self, dt_field), datetime): + setattr( + self, dt_field, datetime.fromtimestamp(int(self.base.get(dt_field))) + ) if self.base.get("id"): setattr(self, "id", self.base.get("id")) @@ -88,14 +90,14 @@ def __post_init__(self): "trending", "malwareName", ] - for field in ONE_LONG_STR_FIELDS: - if field in self.contributingFactors.keys(): + for str_field in ONE_LONG_STR_FIELDS: + if str_field in self.contributingFactors.keys(): bl = BaseList() - for item in self.contributingFactors.get(field)[0].split(","): - if field == "trending": + for item in self.contributingFactors.get(str_field)[0].split(","): + if str_field == "trending": item = int(item) bl.append(item) - setattr(self, field, bl) + setattr(self, str_field, bl) if "mitigationControls" in self.contributingFactors.keys(): bl = BaseList() diff --git a/qualysdk/vmdr/data_classes/report.py b/qualysdk/vmdr/data_classes/report.py index 1fb5e3e..e6315cd 100644 --- a/qualysdk/vmdr/data_classes/report.py +++ b/qualysdk/vmdr/data_classes/report.py @@ -58,9 +58,9 @@ class VMDRReport(BaseClass): def __post_init__(self): DT_FIELDS = ["LAUNCH_DATETIME", "EXPIRATION_DATETIME"] - for field in DT_FIELDS: - if getattr(self, field): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + for dt_field in DT_FIELDS: + if getattr(self, dt_field): + setattr(self, dt_field, datetime.fromisoformat(getattr(self, dt_field))) self.ID = int(self.ID) diff --git a/qualysdk/vmdr/data_classes/scanner_appliance.py b/qualysdk/vmdr/data_classes/scanner_appliance.py index 4449699..d047d24 100644 --- a/qualysdk/vmdr/data_classes/scanner_appliance.py +++ b/qualysdk/vmdr/data_classes/scanner_appliance.py @@ -196,13 +196,13 @@ def __post_init__(self): ) self.API_PROXY_SETTINGS = self.EC2_INFO.get("API_PROXY_SETTINGS")["SETTING"] - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) - for field in IPV4_FIELDS: - if getattr(self, field): - setattr(self, field, IPv4Address(getattr(self, field))) + for ip_field in IPV4_FIELDS: + if getattr(self, ip_field): + setattr(self, ip_field, IPv4Address(getattr(self, ip_field))) del self.EC2_INFO @@ -321,13 +321,13 @@ def __post_init__(self): INT_FIELDS = ["SPEED"] IPV4_FIELDS = ["IP_ADDRESS", "GATEWAY"] - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) - for field in IPV4_FIELDS: - if getattr(self, field): - setattr(self, field, IPv4Address(getattr(self, field))) + for ip_field in IPV4_FIELDS: + if getattr(self, ip_field): + setattr(self, ip_field, IPv4Address(getattr(self, ip_field))) if self.DNS: self.DNS_PRIMARY = self.DNS.get("PRIMARY") @@ -577,32 +577,34 @@ def __post_init__(self): FLOAT_FIELDS = ["SOFTWARE_VERSION"] CUSTOM_DATACLASSES = [("ASSET_GROUP_LIST", "ag"), ("ASSET_TAGS_LIST", "tag")] - for field in INT_FIELDS: - if getattr(self, field): + for int_field in INT_FIELDS: + if getattr(self, int_field): # Special case for polling interval. Split by space and take the first value. - if field == "POLLING_INTERVAL": - setattr(self, field, int(getattr(self, field).split()[0])) + if int_field == "POLLING_INTERVAL": + setattr(self, int_field, int(getattr(self, int_field).split()[0])) else: - setattr(self, field, int(getattr(self, field))) + setattr(self, int_field, int(getattr(self, int_field))) - for field in FLOAT_FIELDS: - if getattr(self, field): - setattr(self, field, float(getattr(self, field))) + for float_field in FLOAT_FIELDS: + if getattr(self, float_field): + setattr(self, float_field, float(getattr(self, float_field))) - for field in DT_FIELDS: - if getattr(self, field): + for dt_field in DT_FIELDS: + if getattr(self, dt_field): setattr( self, - field, - datetime.strptime(getattr(self, field), "%Y-%m-%dT%H:%M:%S%z"), + dt_field, + datetime.strptime(getattr(self, dt_field), "%Y-%m-%dT%H:%M:%S%z"), ) - for field in CUSTOM_DATACLASSES: - if getattr(self, field[0]): + for custom_field in CUSTOM_DATACLASSES: + if getattr(self, custom_field[0]): setattr( self, - field[0], - build_dataclass_baselist(getattr(self, field[0]), field[1]), + custom_field[0], + build_dataclass_baselist( + getattr(self, custom_field[0]), custom_field[1] + ), ) if self.CLOUD_INFO: diff --git a/qualysdk/vmdr/data_classes/searchlist.py b/qualysdk/vmdr/data_classes/searchlist.py index 4a4e858..5838594 100644 --- a/qualysdk/vmdr/data_classes/searchlist.py +++ b/qualysdk/vmdr/data_classes/searchlist.py @@ -141,9 +141,11 @@ def __post_init__(self): DT_FIELDS = ["CREATED", "MODIFIED"] - for field in DT_FIELDS: - if getattr(self, field) and not isinstance(getattr(self, field), datetime): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + for dt_field in DT_FIELDS: + if getattr(self, dt_field) and not isinstance( + getattr(self, dt_field), datetime + ): + setattr(self, dt_field, datetime.fromisoformat(getattr(self, dt_field))) self.ID = int(self.ID) @@ -250,9 +252,11 @@ class DynamicSearchList(BaseClass): def __post_init__(self): DT_FIELDS = ["CREATED", "MODIFIED"] - for field in DT_FIELDS: - if getattr(self, field) and not isinstance(getattr(self, field), datetime): - setattr(self, field, datetime.fromisoformat(getattr(self, field))) + for dt_field in DT_FIELDS: + if getattr(self, dt_field) and not isinstance( + getattr(self, dt_field), datetime + ): + setattr(self, dt_field, datetime.fromisoformat(getattr(self, dt_field))) self.ID = int(self.ID) diff --git a/qualysdk/vmdr/data_classes/user.py b/qualysdk/vmdr/data_classes/user.py index 8f83038..7378042 100644 --- a/qualysdk/vmdr/data_classes/user.py +++ b/qualysdk/vmdr/data_classes/user.py @@ -173,24 +173,26 @@ def __post_init__(self): # Convert the fields to the correct types - for field in DT_FIELDS: - if getattr(self, field): - if getattr(self, field) == "N/A": # Some fields have N/A as a value - setattr(self, field, None) + for dt_field in DT_FIELDS: + if getattr(self, dt_field): + if getattr(self, dt_field) == "N/A": # Some fields have N/A as a value + setattr(self, dt_field, None) else: setattr( self, - field, - datetime.strptime(getattr(self, field), "%Y-%m-%dT%H:%M:%S%z"), + dt_field, + datetime.strptime( + getattr(self, dt_field), "%Y-%m-%dT%H:%M:%S%z" + ), ) - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) - for field in BOOL_FIELDS: - if getattr(self, field): - setattr(self, field, bool(getattr(self, field))) + for bool_field in BOOL_FIELDS: + if getattr(self, bool_field): + setattr(self, bool_field, bool(getattr(self, bool_field))) if self.ASSET_GROUP_TITLE: bl = BaseList() diff --git a/qualysdk/vmdr/users.py b/qualysdk/vmdr/users.py index 2d93f18..76f0d49 100644 --- a/qualysdk/vmdr/users.py +++ b/qualysdk/vmdr/users.py @@ -140,7 +140,7 @@ def add_user( if result["USER_OUTPUT"]["RETURN"]["@status"] == "FAILED": raise QualysAPIError(result["USER_OUTPUT"]["RETURN"]["MESSAGE"]) - if kwargs.get("send_email") == False: + if not kwargs.get("send_email"): return f"User created. User:Pass is: {result['USER_OUTPUT']['USER']['USER_LOGIN']}:{result['USER_OUTPUT']['USER']['PASSWORD']}" else: diff --git a/qualysdk/was/authentication_records.py b/qualysdk/was/authentication_records.py index 74ebb2e..1bd2d61 100644 --- a/qualysdk/was/authentication_records.py +++ b/qualysdk/was/authentication_records.py @@ -620,7 +620,7 @@ def delete_authentication_record(auth: BasicAuth, **kwargs) -> list[str]: ) if serviceResponse.get("count") == "0": - print(f"No auth records found. Exiting.") + print("No auth records found. Exiting.") return [] deleted = [] diff --git a/qualysdk/was/base/auth_record_service_requests.py b/qualysdk/was/base/auth_record_service_requests.py index 05f6957..69db97c 100644 --- a/qualysdk/was/base/auth_record_service_requests.py +++ b/qualysdk/was/base/auth_record_service_requests.py @@ -306,7 +306,7 @@ def format_web_app_auth_form_record_field(field: Dict[str, Any]) -> Dict[str, An try: field["data"] = unparse(field["data"]) except Exception as e: - raise ValueError(f"Field data must be a string or an XNL-like dictionary.") + raise ValueError("Field data must be a string or an XNL-like dictionary.") return field @@ -442,7 +442,7 @@ def format_web_app_auth_oauth2_record(record: Dict[str, Any]) -> Dict[str, Any]: record["seleniumScript"]["data"] = unparse(record["seleniumScript"]["data"]) except Exception as e: raise ValueError( - f"SeleniumScript data must be a string or an XML-like dictionary." + "SeleniumScript data must be a string or an XML-like dictionary." ) return record diff --git a/qualysdk/was/base/web_app_service_requests.py b/qualysdk/was/base/web_app_service_requests.py index 02882dd..069ab62 100644 --- a/qualysdk/was/base/web_app_service_requests.py +++ b/qualysdk/was/base/web_app_service_requests.py @@ -394,19 +394,19 @@ def build_update_request( f"urlExcludelist must be passed as a list of strings. Received: {urlExcludelist}" ) - l = [] + entry_list = [] for entry in urlExcludelist: # Check for any regex characters in the entry. # If found, set the regex attribute to true. if is_valid_regex(entry): - l.append( + entry_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "true", "#text": str(entry)}}, full_document=False, ) ) else: - l.append( + entry_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "false", "#text": str(entry)}}, full_document=False, @@ -414,7 +414,7 @@ def build_update_request( ) request_dict["ServiceRequest"]["data"]["WebApp"]["urlExcludelist"] = { - "set": "\n".join(l) + "set": "\n".join(entry_list) } if _urlAllowlist: @@ -422,19 +422,19 @@ def build_update_request( raise ValueError( f"urlAllowlist must be passed as a list of strings. Received: {_urlAllowlist}" ) - l = [] + url_list = [] for entry in _urlAllowlist: # Check for any regex characters in the entry. # If found, set the regex attribute to true. if is_valid_regex(entry): - l.append( + url_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "true", "#text": str(entry)}}, full_document=False, ) ) else: - l.append( + url_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "false", "#text": str(entry)}}, full_document=False, @@ -442,7 +442,7 @@ def build_update_request( ) request_dict["ServiceRequest"]["data"]["WebApp"]["urlAllowlist"] = { - "set": "\n".join(l) + "set": "\n".join(url_list) } if _postDataExcludelist: @@ -451,19 +451,19 @@ def build_update_request( f"postDataExcludelist must be passed as a list of strings. Received: {_postDataExcludelist}" ) - l = [] + post_list = [] for entry in _postDataExcludelist: # Check for any regex characters in the entry. # If found, set the regex attribute to true. if is_valid_regex(entry): - l.append( + post_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "true", "#text": str(entry)}}, full_document=False, ) ) else: - l.append( + post_list.append( xmltodict.unparse( {"UrlEntry": {"@regex": "false", "#text": str(entry)}}, full_document=False, @@ -471,7 +471,7 @@ def build_update_request( ) request_dict["ServiceRequest"]["data"]["WebApp"]["postDataExcludelist"] = { - "set": "\n".join(l) + "set": "\n".join(post_list) } if _useSitemap is not None: @@ -484,11 +484,13 @@ def build_update_request( raise ValueError( f"headers must be passed as a list of strings. Received: {_headers}" ) - l = [] + header_list = [] for header in _headers: - l.append(xmltodict.unparse({"WebAppHeader": header}, full_document=False)) + header_list.append( + xmltodict.unparse({"WebAppHeader": header}, full_document=False) + ) request_dict["ServiceRequest"]["data"]["WebApp"]["headers"] = { - "set": "\n".join(l) + "set": "\n".join(header_list) } # Unescape any HTML. Necessary due to xmltodict's behavior. diff --git a/qualysdk/was/data_classes/Finding.py b/qualysdk/was/data_classes/Finding.py index 8927c2f..3f5e222 100644 --- a/qualysdk/was/data_classes/Finding.py +++ b/qualysdk/was/data_classes/Finding.py @@ -104,9 +104,9 @@ class FindingItem(BaseClass): def __post_init__(self): BOOL_FIELDS = ["ajax", "authentication"] - for field in BOOL_FIELDS: - if getattr(self, field): - setattr(self, field, field == "true") + for bool_field in BOOL_FIELDS: + if getattr(self, bool_field): + setattr(self, bool_field, bool_field == "true") if self.accessPath: self.accessPath_count = int(self.accessPath.get("count")) @@ -319,20 +319,20 @@ def __post_init__(self): if self.param: setattr(self, "param", unquote_plus(self.param)) - for field in INT_FIELDS: - if getattr(self, field): - setattr(self, field, int(getattr(self, field))) + for int_field in INT_FIELDS: + if getattr(self, int_field): + setattr(self, int_field, int(getattr(self, int_field))) - for field in BOOL_FIELDS: - if getattr(self, field): - setattr(self, field, bool(getattr(self, field))) + for bool_field in BOOL_FIELDS: + if getattr(self, bool_field): + setattr(self, bool_field, bool(getattr(self, bool_field))) - for field in DT_FIELDS: - if getattr(self, field): + for dt_field in DT_FIELDS: + if getattr(self, dt_field): setattr( self, - field, - datetime.strptime(getattr(self, field), "%Y-%m-%dT%H:%M:%SZ"), + dt_field, + datetime.strptime(getattr(self, dt_field), "%Y-%m-%dT%H:%M:%SZ"), ) if self.cwe: diff --git a/qualysdk/was/data_classes/WebApp.py b/qualysdk/was/data_classes/WebApp.py index 7ab9484..c8b25f6 100644 --- a/qualysdk/was/data_classes/WebApp.py +++ b/qualysdk/was/data_classes/WebApp.py @@ -405,7 +405,7 @@ def risk_rating(self) -> str: str: The risk rating. """ - if not type(self.riskScore) in [int, float]: + if type(self.riskScore) not in [int, float]: raise ValueError( f"riskScore must be an integer or float, not {type(self.riskScore)}" ) diff --git a/qualysdk/was/data_classes/WebAppAuthRecord.py b/qualysdk/was/data_classes/WebAppAuthRecord.py index 9761afb..c476a74 100644 --- a/qualysdk/was/data_classes/WebAppAuthRecord.py +++ b/qualysdk/was/data_classes/WebAppAuthRecord.py @@ -153,8 +153,8 @@ def handle_record_attrs( setattr(dataclass, subkey, None) else: list_data = handle_qualys_list(getattr(dataclass, "comments"), subkey) - setattr(dataclass, f"comments_count", list_data[0]) - setattr(dataclass, f"comments_list", list_data[1]) + setattr(dataclass, "comments_count", list_data[0]) + setattr(dataclass, "comments_list", list_data[1]) setattr(dataclass, "comments", None) diff --git a/qualysdk/was/webapps.py b/qualysdk/was/webapps.py index d6860c8..f4fefad 100644 --- a/qualysdk/was/webapps.py +++ b/qualysdk/was/webapps.py @@ -595,7 +595,7 @@ def delete_webapp( ) if serviceResponse.get("count") == "0": - print(f"No applicable web apps found. Exiting.") + print("No applicable web apps found. Exiting.") return [] deleted = [] @@ -688,7 +688,7 @@ def purge_webapp(auth: BasicAuth, **kwargs) -> list[str]: ) if serviceResponse.get("count") == "0": - print(f"No applicable web apps found. Exiting.") + print("No applicable web apps found. Exiting.") return [] deleted = []