Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small fixes, added pm.count_product_vulns and sql.upload_pm_product_vuln_counts #232

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/patch.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ You can use any of the endpoints currently supported:
| ```get_patch_catalog``` | Returns the patch catalog for a given platform according to ```patchId```. |
| ```get_packages_in_linux_patch``` | Returns the packages associated with a Linux patch. |
| ```get_products_in_windows_patch``` | Returns the products associated with a Windows patch. |
| ```count_product_vulns``` | Return the number of vulns (active and fixed) from products in your environment. |

## Get PM Version API

Expand Down Expand Up @@ -772,6 +773,51 @@ products = get_products_in_windows_patch(
)
```

## Count Product Vulnerabilities API

```count_product_vulns``` returns the number of active and fixed vulnerabilities stemming from products.

|Parameter| Possible Values |Description| Required|
|--|--|--|--|
|```auth```|```qualysdk.auth.TokenAuth``` | Authentication object | ✅ |
| ```severityList``` | ```Union[str, list[Literal["Critical", "Important", "Moderate", "Low", "None"]]]``` | The severity levels to count vulnerabilities for. Can be a list or strings or a comma-separated string | ❌ |
| ```tagUUIDs``` | ```Union[str, list[str]]``` | The UUIDs of the tags to filter with | ❌ |

```py
from qualysdk.auth import TokenAuth
from qualysdk.pm import count_product_vulns

auth = TokenAuth(<username>, <password>, platform='qg1')

# Get the number of Critical and Important
# vulnerabilities for all products:
count = count_product_vulns(
auth,
severityList=["Critical", "Important"]
)
>>>[
ProductVulnCount(
name='Windows',
totalQIDCount=123,
patchableQIDCount=None,
type='APP_FAMILY',
patchableQIDs=None,
totalQIDs=None,
severity='Critical'
),
ProductVulnCount(
name='Office',
totalQIDCount=123,
patchableQIDCount=None,
type='APP_FAMILY',
patchableQIDs=None,
totalQIDs=None,
severity='Critical'
),
...
]
```


## ```qualysdk-pm``` CLI tool

Expand Down
1 change: 1 addition & 0 deletions docs/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ The final optional parameter is ```table_name```. If you want to specify a custo
| ```upload_pm_patch_catalog``` | Patch Management | ```pm.get_patch_catalog()``` | ```pm_patch_catalog``` |
| ```upload_pm_linux_packages``` | Patch Management | ```pm.get_packages_in_linux_patch()``` | ```pm_linux_packages``` |
| ```upload_pm_windows_products``` | Patch Management | ```pm.get_products_in_windows_patch()``` | ```pm_windows_products``` |
| ```upload_pm_product_vuln_counts``` | Patch Management | ```pm.count_product_vulns()``` | ```pm_product_vuln_counts``` |
| ```upload_cert_certs``` | Certificate View | ```cert.list_certs()``` | ```cert_certs``` for certificates and ```cert_assets``` for assets (key = certs.id -> assets.certId) |


Expand Down
134 changes: 67 additions & 67 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "qualysdk"
version = "0.2.6"
version = "0.2.7"
description = "SDK for interacting with Qualys APIs, across most modules the platform offers."
authors = ["0x41424142 <jake@jakelindsay.uk>", "0x4A616B65 <jake.lindsay@thermofisher.com>"]
maintainers = ["Jake Lindsay <jake@jakelindsay.uk>"]
Expand All @@ -27,11 +27,11 @@ bs4 = "^0.0.2"
lxml = "^5.2.2"
pandas = "^2.2.2"
mkdocs = "^1.6.0"
sqlalchemy = "^2.0.32"
sqlalchemy = "^2.0.37"
numpy = "^2.2.1"
pymssql = "^2.3.2"
pymysql = "^1.1.1"
pymdown-extensions = "^10.13"
pymdown-extensions = "^10.14"
mkdocs-material = "^9.5.48"
psycopg2 = {version = "^2.9.9", platform = "win32"}
psycopg2-binary = [
Expand Down
1 change: 1 addition & 0 deletions qualysdk/base/call_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def call_api(
auth_tuple = (auth.username, auth.password)

# Make certain payloads/params requests-friendly:
# TODO: need to evaluate other modules this may apply to.
if module != "pm":
if payload:
payload = convert_bools_and_nones(payload)
Expand Down
10 changes: 10 additions & 0 deletions qualysdk/base/call_schemas/pm_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@
"pagination": False,
"auth_type": "token",
},
"count_product_vulns": {
"endpoint": "/pm/v1/patchcatalog/patch/products/vulnerability/count",
"method": ["GET"],
"valid_params": ["severityList", "tagUUIDs"],
"valid_POST_data": [],
"use_requests_json_data": False,
"return_type": "json",
"pagination": False,
"auth_type": "token",
},
},
}
)
1 change: 1 addition & 0 deletions qualysdk/pm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
get_patch_catalog,
get_packages_in_linux_patch,
get_products_in_windows_patch,
count_product_vulns,
)
19 changes: 19 additions & 0 deletions qualysdk/pm/data_classes/ProductVulnCount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dataclasses import dataclass
from typing import Literal

from ...base.base_class import BaseClass


@dataclass
class ProductVulnCount(BaseClass):
"""
Represents a product and its associated QID count/details.
"""

name: str
totalQIDCount: int = 0
patchableQIDCount: int = None
type: str = None
patchableQIDs: str = None
totalQIDs: int = None
severity: Literal["Critical", "Important", "Moderate", "Low", "None"] = "Undefined"
100 changes: 100 additions & 0 deletions qualysdk/pm/patchcatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .data_classes.CatalogPatch import CatalogPatch, PackageDetail
from .data_classes.AssociatedProduct import AssociatedProduct
from .data_classes.ProductVulnCount import ProductVulnCount
from ..base.base_list import BaseList
from ..auth.token import TokenAuth
from ..base.call_api import call_api
Expand Down Expand Up @@ -385,3 +386,102 @@
t.join()

return responses


from typing import Union, List, Literal, overload


@overload
def count_product_vulns(
auth: TokenAuth,
severityList: Union[
Literal["Critical", "Important", "Moderate", "Low", "None"],
List[Literal["Critical", "Important", "Moderate", "Low", "None"]],
] = None,
tagUUIDs: str = None,
) -> BaseList[ProductVulnCount]:
...


@overload
def count_product_vulns(
auth: TokenAuth,
severityList: Union[
Literal["Critical", "Important", "Moderate", "Low", "None"],
List[Literal["Critical", "Important", "Moderate", "Low", "None"]],
] = None,
tagUUIDs: List[str] = None,
) -> BaseList[ProductVulnCount]:
...


def count_product_vulns(
auth: TokenAuth,
severityList: Union[
Literal["Critical", "Important", "Moderate", "Low", "None"],
List[Literal["Critical", "Important", "Moderate", "Low", "None"]],
] = None,
tagUUIDs: Union[str, List[str]] = None,
) -> BaseList[ProductVulnCount]:
"""
Get a count of active and fixed product vulnerabilities.

If no severityList value is passed, all severities will be returned (Critical, Important, Moderate, Low, None).

Args:
auth (TokenAuth): The authentication object.
severityList (Union[str, List[str]]): The severity or severities to filter by.
Can be a single value or a list of values. For all severities, leave this blank.
tagUUIDs (Union[str, List[str]]): The tag UUIDs to filter by.

Returns:
BaseList[ProductVulnCount]: A list of ProductVulnCount objects.
"""
# Normalize severityList to a list
if severityList is None:
severity_values = ["Critical", "Important", "Moderate", "Low", "None"]
# check if the string or list contains all of the valid values
elif isinstance(severityList, str) and set(
[i.title() for i in severityList.split(",")]
) == set(["Critical", "Important", "Moderate", "Low", "None"]):
severity_values = ["Critical", "Important", "Moderate", "Low", "None"]

elif isinstance(severityList, (list, BaseList)) and set(severityList) == set(
["Critical", "Important", "Moderate", "Low", "None"]
):
severity_values = ["Critical", "Important", "Moderate", "Low", "None"]

elif isinstance(severityList, str):
severity_values = [s.title().strip() for s in severityList.split(",")]

elif isinstance(severityList, (list, BaseList)):
severity_values = severityList

else:
raise ValueError(
"Invalid severityList format. Must be a string or a list of strings."
)

results = BaseList()

if tagUUIDs and isinstance(tagUUIDs, (list, BaseList)):
tagUUIDs = ",".join(tagUUIDs)
elif tagUUIDs and not isinstance(tagUUIDs, str):
raise ValueError("tagUUIDs must be a string or a list of strings.")

for severity in severity_values:
response = call_api(
auth,
"pm",
"count_product_vulns",
params={"severityList": severity, "tagUUIDs": tagUUIDs},
)

if response.status_code not in range(200, 299):
raise QualysAPIError(response.json())

for entry in response.json():
entry["severity"] = severity if severity else "All"
results.append(ProductVulnCount(**entry))

return results

Check notice on line 487 in qualysdk/pm/patchcatalog.py

View check run for this annotation

codefactor.io / CodeFactor

qualysdk/pm/patchcatalog.py#L418-L487

Complex Method
1 change: 1 addition & 0 deletions qualysdk/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@
upload_pm_patch_catalog,
upload_pm_linux_packages,
upload_pm_windows_products,
upload_pm_product_vuln_counts,
)
from .cert import upload_cert_certs
53 changes: 53 additions & 0 deletions qualysdk/sql/pm.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,56 @@ def upload_pm_linux_packages(

# Upload the data:
return upload_data(df, table_name, cnxn, COLS, override_import_dt)


def upload_pm_product_vuln_counts(
counts: BaseList,
cnxn: Connection,
table_name: str = "pm_product_vuln_counts",
override_import_dt: datetime = None,
) -> int:
"""
Upload results from ```pm.count_product_vulns``` to a SQL database.

Args:
counts (BaseList): A BaseList of ProductVulnCount objects.
cnxn (Connection): The Connection object to the SQL database.
table_name (str): The name of the table to upload to. Defaults to "pm_product_vuln_counts".
override_import_dt (datetime): If provided, will override the import_datetime column with this value.

Returns:
int: The number of rows uploaded.
"""
'''@dataclass
class ProductVulnCount(BaseClass):
"""
Represents a product and its associated QID count/details.
"""

name: str
totalQIDCount: int = 0
patchableQIDCount: int = None
type: str = None
patchableQIDs: str = None
totalQIDs: int = None
severity: Literal["Critical", "Important", "Moderate", "Low", "None"] = "Undefined"'''

COLS = {
"name": types.String().with_variant(TEXT(charset="utf8"), "mysql", "mariadb"),
"totalQIDCount": types.Integer(),
"patchableQIDCount": types.Integer(),
"type": types.String().with_variant(TEXT(charset="utf8"), "mysql", "mariadb"),
"patchableQIDs": types.String().with_variant(
TEXT(charset="utf8"), "mysql", "mariadb"
),
"totalQIDs": types.Integer(),
"severity": types.String().with_variant(
TEXT(charset="utf8"), "mysql", "mariadb"
),
}

# Prepare the dataclass for insertion:
df = DataFrame([prepare_dataclass(count) for count in counts])

# Upload the data:
return upload_data(df, table_name, cnxn, COLS, override_import_dt)
3 changes: 2 additions & 1 deletion qualysdk/sql/vmdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pandas import DataFrame
from sqlalchemy import Connection, types
from sqlalchemy.dialects.mysql import TEXT
from sqlalchemy.dialects.mssql import DATETIME2

from .base import upload_data, prepare_dataclass
from ..base.base_list import BaseList
Expand Down Expand Up @@ -218,7 +219,7 @@ def upload_vmdr_kb(
"SOLUTION_COMMENT": types.String().with_variant(
TEXT(charset="utf8"), "mysql", "mariadb"
),
"PATCH_PUBLISHED_DATE": types.DateTime(),
"PATCH_PUBLISHED_DATE": types.DateTime().with_variant(DATETIME2, "mssql"),
}

# Convert the BaseList to a DataFrame:
Expand Down
19 changes: 12 additions & 7 deletions qualysdk/vmdr/data_classes/kb_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,18 @@ def __post_init__(self):
simplefilter("ignore") # ignore the warning about the html.parser
for html_field in HTML_FIELDS:
if getattr(self, html_field):
setattr(
self,
html_field,
BeautifulSoup(
getattr(self, html_field), "html.parser"
).get_text(),
)
soup = BeautifulSoup(getattr(self, html_field), "html.parser")
for a_tag in soup.find_all("a"):
if a_tag.has_attr("href"):
a_tag.replace_with(a_tag["href"])
setattr(self, html_field, soup.get_text())
# setattr(
# self,
# html_field,
# BeautifulSoup(
# getattr(self, html_field), "html.parser"
# ).get_text(),
# )

# convert the lists to BaseList objects:
if self.BUGTRAQ_LIST:
Expand Down
2 changes: 1 addition & 1 deletion qualysdk/vmdr/data_classes/vendor_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __post_init__(self):
)

def __str__(self) -> str:
return self.ID
return f"{self.ID} ({self.URL})"

def __contains__(self, item):
# see if it was found in the name or vendor:
Expand Down
Loading