Skip to content

Commit

Permalink
Merge pull request #193 from 0x41424142/certs
Browse files Browse the repository at this point in the history
Added certificate view module, sql uploads
  • Loading branch information
0x41424142 authored Nov 22, 2024
2 parents 6686cef + 0f96116 commit c920206
Show file tree
Hide file tree
Showing 18 changed files with 724 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ hosts = get_host_list(auth, details="All/AGs", show_tags=True, page_count=4)
|Connectors | Not Started |
|Cloud Agent | ✅ See Cloud Agent [documentation page](https://qualysdk.jakelindsay.uk/cloudagent/) for supported calls |
|CS (Container Security) | ✅ See CS [documentation page](https://qualysdk.jakelindsay.uk/containersecurity/) for supported calls |
|CERT (Certificate View) | ✅ See CERT [documentation page](https://qualysdk.jakelindsay.uk/cert/) for supported calls |
|ADMIN (Administration) | Not Started |
|Asset Management & Tagging| Not Started |
| SQL Data Uploads | ✅ See SQL [documentation page](https://qualysdk.jakelindsay.uk/sql/) for supported uploads/DBs |
Expand Down
60 changes: 60 additions & 0 deletions docs/cert.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Certificate View APIs

Certificate View APIs return data on certificates in the subscription.

After running:
```py
from qualysdk.cert import *
```
You can use any of the endpoints currently supported:

## Certificate View Endpoints

|API Call| Description |
|--|--|
| ```list_certs``` | Lists all certificates in the subscription that match given kwargs. |


## List Certificates API

```list_certs``` returns a list of certificates in the subscription that match the given kwargs.

|Parameter| Possible Values |Description| Required|
|--|--|--|--|
|```auth```|```qualysdk.auth.TokenAuth``` | Authentication object ||
| ```page_count``` | ```Union[int, 'all'] = 'all'``` | Number of pages to pull ||
| ```certId``` | ```str``` | The ID of a certificate to return ||
| ```certId_operator``` | ```Literal["IN", "LESSER", "IS_EMPTY", "GREATER", "GREATER_THAN_EQUAL", "IS_NOT_EMPTY", "EQUALS", "NOT_EQUALS", "LESS_THAN_EQUAL", "CONTAINS"] = "IS_NOT_EMPTY"``` | The operator to use for the certId ||
| ```hash``` | ```str``` | The hash of a certificate to return ||
| ```hash_operator``` | ```Literal["IN", "LESSER", "IS_EMPTY", "GREATER", "GREATER_THAN_EQUAL", "IS_NOT_EMPTY", "EQUALS", "NOT_EQUALS", "LESS_THAN_EQUAL", "CONTAINS"] = "IS_NOT_EMPTY"``` | The operator to use for the hash ||
| ```validFromDate``` | ```str``` | The date the certificate is valid from ||
| ```validFromDate_operator``` | ```Literal["IN", "LESSER", "IS_EMPTY", "GREATER", "GREATER_THAN_EQUAL", "IS_NOT_EMPTY", "EQUALS", "NOT_EQUALS", "LESS_THAN_EQUAL", "CONTAINS"] = "GREATER"``` | The operator to use for the validFromDate ||
| ```wasUrl``` | ```str``` | The URL of the site the certificate lives on, according to the WAS module ||
| ```wasUrl_operator``` | ```Literal["IN", "LESSER", "IS_EMPTY", "GREATER", "GREATER_THAN_EQUAL", "IS_NOT_EMPTY", "EQUALS", "NOT_EQUALS", "LESS_THAN_EQUAL", "CONTAINS"] = "IS_NOT_EMPTY"``` | The operator to use for the wasUrl ||
| ```certificateType``` | ```Literal["Leaf", "Intermediate", "Root"]``` | The type of certificate ||
| ```pageSize``` | ```int > 0 (default=10)``` | The number of certificates to return per page ||

```py
from qualysdk.auth import TokenAuth
from qualysdk.cert import list_certs

auth = TokenAuth(<username>, <password>, platform='qg1')

# Get all certificates:
certs = list_certs(auth)

# Get all certificates that match a given piece of a hash
# and are valid afer a certain date:
certs = list_certs(auth, hash='1234', hash_operator='CONTAINS', validFromDate='2024-01-01')
>>>[
Certificate(
id=12345678,
certhash='111222333444...',
keySize=2048,
serialNumber='12345...',
validToDate=datetime.datetime(2030, 1, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
...
),
...
]
```
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ hosts = get_host_list(auth, details="All/AGs", show_tags=True, page_count=4)
|Connectors | Not Started |
|Cloud Agent | ✅ See Cloud Agent [documentation page](https://qualysdk.jakelindsay.uk/cloudagent/) for supported calls |
|CS (Container Security) | ✅ See CS [documentation page](https://qualysdk.jakelindsay.uk/containersecurity/) for supported calls |
|CERT (Certificate View) | ✅ See CERT [documentation page](https://qualysdk.jakelindsay.uk/cert/) for supported calls |
|ADMIN (Administration) | Not Started |
|Asset Management & Tagging| Not Started |
| SQL Data Uploads | ✅ See SQL [documentation page](https://qualysdk.jakelindsay.uk/sql/) for supported uploads/DBs |
Expand Down
1 change: 1 addition & 0 deletions docs/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ The final optional parameter is ```table_name```. If you want to specify a custo
| ```upload_pm_jobs``` | Patch Management | ```pm.list_jobs()``` | ```pm_jobs``` |
| ```upload_pm_job_results``` | Patch Management | ```pm.get_job_results()``` | ```pm_job_results_jobResults``` for job summaries and ```pm_job_results_assets``` for assets (key = jobResults.id -> assets.jobId)|
| ```upload_pm_job_runs``` | Patch Management | ```pm.get_job_runs()``` | ```pm_job_runs``` |
| ```upload_cert_certs``` | Certificate View | ```cert.list_certs()``` | ```cert_certs``` for certificates and ```cert_assets``` for assets (key = certs.id -> assets.certId) |


```py
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nav:
- TotalCloud: totalcloud.md
- Cloud Agent: cloud_agent.md
- Container Security: containersecurity.md
- Certificate View: cert.md
- Web Application Scanning: was.md
- Patch Management: patch.md
- SQL Uploads: sql.md
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "qualysdk"
version = "0.1.77"
version = "0.1.78"
description = "SDK for interacting with Qualys APIs, across most modules the platform offers."
authors = ["0x41424142 <jake@jakelindsay.uk>", "0x4A616B65 <jake.lindsay@thermofisher.com>"]
maintainers = ["Jake Lindsay <jake@jakelindsay.uk>"]
Expand Down
1 change: 1 addition & 0 deletions qualysdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from . import cs
from . import was
from . import pm
from . import cert

from .sql import db_connect

Expand Down
10 changes: 9 additions & 1 deletion qualysdk/base/call_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,15 @@ def call_api(
# check for errors not related to rate limiting:
if (
module
not in ["gav", "cloud_agent", "cloudview", "containersecurity", "was", "pm"]
not in [
"gav",
"cloud_agent",
"cloudview",
"containersecurity",
"was",
"pm",
"cert",
]
and response.status_code in range(400, 599)
and response.status_code not in [429, 414]
and (
Expand Down
1 change: 1 addition & 0 deletions qualysdk/base/call_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TOTALCLOUD_SCHEMA,
VMDR_SCHEMA,
WAS_SCHEMA,
CERTVIEW_SCHEMA,
]

CALL_SCHEMA = dict()
Expand Down
1 change: 1 addition & 0 deletions qualysdk/base/call_schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .totalcloud_schema import TOTALCLOUD_SCHEMA
from .vmdr_schema import VMDR_SCHEMA
from .was_schema import WAS_SCHEMA
from .certview_schema import CERTVIEW_SCHEMA
31 changes: 31 additions & 0 deletions qualysdk/base/call_schemas/certview_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
certificate view call schemas
"""

from frozendict import frozendict

CERTVIEW_SCHEMA = frozendict(
{
"cert": {
"url_type": "gateway",
"list_certs": {
"endpoint": "/certview/v2/certificates",
"method": ["POST"],
"valid_params": [],
"valid_POST_data": [
"certId",
"pageSize",
"pageNumber",
"hash",
"validFromDate",
"wasUrl",
"certificateType",
],
"use_requests_json_data": True,
"return_type": "json",
"pagination": True,
"auth_type": "token",
},
},
}
)
5 changes: 5 additions & 0 deletions qualysdk/cert/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Certificate View module
"""

from .list_certs import list_certs
112 changes: 112 additions & 0 deletions qualysdk/cert/data_classes/Asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Asset data class
"""

from dataclasses import dataclass, asdict
from typing import Union

from ...base.base_list import BaseList

"""
TODO: Move BaseCls into the package's base module
so it can be used universally. Things you realize
months into a project...
"""


@dataclass
class BaseCls:
pass

def to_dict(self):
return asdict(self)

def __dict__(self):
return self.to_dict()

def keys(self):
return self.to_dict().keys()

def values(self):
return self.to_dict().values()

def items(self):
return self.to_dict().items()

@classmethod
def from_dict(cls, d: dict):
return cls(**d)


@dataclass
class hostInstance(BaseCls):
id: int = None
port: int = None
fqdn: str = None
protocol: str = None
service: str = None
grade: str = None

def __str__(self):
return str(getattr(self, "id", ""))

def __int__(self):
return int(getattr(self, "id", 0))


@dataclass
class assetInterface(BaseCls):
hostname: str = None
address: str = None

def __str__(self):
return self.hostname if self.hostname else self.address


@dataclass
class Tag(BaseCls):
name: str = None
uuid: str = None

def __str__(self):
return self.name if self.name else self.uuid


@dataclass
class Asset(BaseCls):
"""
Represents an asset in CERT,
underneath a certificate
data class.
"""

id: int = None
uuid: str = None
netbiosName: str = None
name: str = None
operatingSystem: str = None
hostInstances: Union[list[dict], BaseList[object]] = None
assetInterfaces: Union[list[dict], BaseList[object]] = None
tags: Union[list[str], BaseList[str]] = None
primaryIp: str = None

def __post_init__(self):
if self.hostInstances:
setattr(
self,
"hostInstances",
BaseList([hostInstance(**x) for x in self.hostInstances]),
)

if self.assetInterfaces:
setattr(
self,
"assetInterfaces",
BaseList([assetInterface(**x) for x in self.assetInterfaces]),
)

if self.tags:
setattr(self, "tags", BaseList([Tag(**x) for x in self.tags]))

def __str__(self):
return getattr(self, "name", "")
Loading

0 comments on commit c920206

Please sign in to comment.