Skip to content

Commit

Permalink
Add support for GCP daily flow
Browse files Browse the repository at this point in the history
  • Loading branch information
esebesto committed Feb 14, 2025
1 parent 48cbacc commit 75300b4
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 141 deletions.
2 changes: 1 addition & 1 deletion nise/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "4.7.0"
__version__ = "4.7.2"


VERSION = __version__.split(".")
8 changes: 8 additions & 0 deletions nise/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ def add_gcp_parser_args(parser):
dest="daily_report",
help="GCP daily report activation",
)

parser.add_argument(
"--gcp-daily-flow",
action="store_true",
required=False,
dest="gcp_daily_flow",
help="additional GCP day to day ingest",
)
parser.add_argument(
"-etag", "--gcp-etag", metavar="GCP_ETAG", dest="gcp_etag", required=False, help="The etag in the filename"
)
Expand Down
10 changes: 9 additions & 1 deletion nise/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ def _gcp_bigquery_process(
start_date, end_date, currency, projects, generators, options, gcp_bucket_name, gcp_dataset_name, gcp_table_name
):
resource_level = options.get("gcp_resource_level", False)
gcp_daily_flow = options.get("gcp_daily_flow", False)
data = []
for project in projects:
num_gens = len(generators)
Expand Down Expand Up @@ -1239,7 +1240,14 @@ def _gcp_bigquery_process(
gcp_table_name = f"gcp_billing_export_resource_{etag}"
else:
gcp_table_name = f"gcp_billing_export_{etag}"
gcp_bucket_to_dataset(gcp_bucket_name, output_file_name, gcp_dataset_name, gcp_table_name, resource_level)
gcp_bucket_to_dataset(
gcp_bucket_name,
output_file_name,
gcp_dataset_name,
gcp_table_name,
resource_level,
gcp_daily_flow=gcp_daily_flow,
)

return monthly_files

Expand Down
296 changes: 157 additions & 139 deletions nise/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ def upload_to_gcp_storage(bucket_name, source_file_name, destination_blob_name):
return uploaded


def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name, resource_level=False):
def gcp_bucket_to_dataset(
gcp_bucket_name, file_name, dataset_name, table_name, resource_level=False, gcp_daily_flow=False
):
"""
Create a gcp dataset from a file stored in a bucket.
Expand All @@ -137,6 +139,8 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
file_name (String): The name of the file stored in GCP
dataset_name (String): name for the created dataset in GCP
table_name (String): name for the created dataset in GCP
resource_level (Boolean): indicates whether to generate a resource level report
gcp_daily_flow (Boolean): indicates if the data are ingested as part of the day-to-day flow
Returns:
(Boolean): True if the dataset was created
Expand All @@ -156,156 +160,165 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
project_name = bigquery_client.project
dataset_id = f"{project_name}.{dataset_name}"
dataset = bigquery.Dataset(dataset_id)
table_id = f"{project_name}.{dataset_name}.{table_name}"

# delete dataset (does not error if it doesn't exist) and create fresh one
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
dataset = bigquery_client.create_dataset(dataset)
if gcp_daily_flow:
# create the job config for daily flow - i.e., appending data
job_config = bigquery.LoadJobConfig(

Check warning on line 167 in nise/upload.py

View check run for this annotation

Codecov / codecov/patch

nise/upload.py#L167

Added line #L167 was not covered by tests
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition="WRITE_APPEND",
time_partitioning=bigquery.TimePartitioning(), # may be not needed?
)
log_message = f"Dataset {dataset_name} updated in GCP bigquery under the table name {table_name}."

Check warning on line 172 in nise/upload.py

View check run for this annotation

Codecov / codecov/patch

nise/upload.py#L172

Added line #L172 was not covered by tests

table_id = f"{project_name}.{dataset_name}.{table_name}"
else:
# delete dataset (does not error if it doesn't exist) and create fresh one
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
dataset = bigquery_client.create_dataset(dataset)

# Build schema
schema = [
{"name": "billing_account_id", "type": "STRING", "mode": "NULLABLE"},
{
"name": "service",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "sku",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{"name": "usage_start_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "usage_end_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{
"name": "project",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "number", "type": "STRING", "mode": "NULLABLE"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{
"name": "labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{"name": "ancestry_numbers", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "system_labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "location",
"type": "RECORD",
"fields": [
{"name": "location", "type": "STRING", "mode": "NULLABLE"},
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
{"name": "region", "type": "STRING", "mode": "NULLABLE"},
{"name": "zone", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{"name": "export_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "cost", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "currency", "type": "STRING", "mode": "NULLABLE"},
{"name": "currency_conversion_rate", "type": "FLOAT", "mode": "NULLABLE"},
{
"name": "usage",
"type": "RECORD",
"fields": [
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "unit", "type": "STRING", "mode": "NULLABLE"},
{"name": "amount_in_pricing_units", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "pricing_unit", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "credits",
"type": "RECORD",
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "full_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "invoice",
"type": "RECORD",
"fields": [{"name": "month", "type": "STRING", "mode": "NULLABLE"}],
"mode": "NULLABLE",
},
{"name": "cost_type", "type": "STRING", "mode": "NULLABLE"},
{
"name": "adjustment_info",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
{"name": "mode", "type": "STRING", "mode": "NULLABLE"},
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
]

# Add resource to schema if required
if resource_level:
schema += [
# Build schema
schema = [
{"name": "billing_account_id", "type": "STRING", "mode": "NULLABLE"},
{
"name": "service",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "sku",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{"name": "usage_start_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "usage_end_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{
"name": "project",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "number", "type": "STRING", "mode": "NULLABLE"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{
"name": "labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{"name": "ancestry_numbers", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "system_labels",
"type": "RECORD",
"fields": [
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "location",
"type": "RECORD",
"fields": [
{"name": "location", "type": "STRING", "mode": "NULLABLE"},
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
{"name": "region", "type": "STRING", "mode": "NULLABLE"},
{"name": "zone", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{"name": "export_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "cost", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "currency", "type": "STRING", "mode": "NULLABLE"},
{"name": "currency_conversion_rate", "type": "FLOAT", "mode": "NULLABLE"},
{
"name": "resource",
"name": "usage",
"type": "RECORD",
"fields": [
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "unit", "type": "STRING", "mode": "NULLABLE"},
{"name": "amount_in_pricing_units", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "pricing_unit", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
},
{
"name": "credits",
"type": "RECORD",
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "global_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "full_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "REPEATED",
},
{
"name": "invoice",
"type": "RECORD",
"fields": [{"name": "month", "type": "STRING", "mode": "NULLABLE"}],
"mode": "NULLABLE",
},
{"name": "cost_type", "type": "STRING", "mode": "NULLABLE"},
{
"name": "adjustment_info",
"type": "RECORD",
"fields": [
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
{"name": "mode", "type": "STRING", "mode": "NULLABLE"},
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
}
},
]

# creates the job config with specifics
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
time_partitioning=bigquery.TimePartitioning(),
schema=schema,
)
# Add resource to schema if required
if resource_level:
schema += [
{
"name": "resource",
"type": "RECORD",
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "global_name", "type": "STRING", "mode": "NULLABLE"},
],
"mode": "NULLABLE",
}
]

# creates the job config with specifics
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
time_partitioning=bigquery.TimePartitioning(),
schema=schema,
)
log_message = f"Dataset {dataset_name} created in GCP bigquery under the table name {table_name}."

uri = f"gs://{gcp_bucket_name}/{file_name}"

load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)

# waits for the job to finish, will raise an exception if it doesnt work
Expand All @@ -316,16 +329,21 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
bucket = storage_client.bucket(gcp_bucket_name)
blob = bucket.blob(file_name)
blob.delete()

# Our downloader downloads by the paritiontime, however the default partitiontime is the date
# the data is uploaded to bigquery. Therefore, everything goes into one single day. The load
# job config does not let you upload to the _PARTITIONTIME because it is a prebuild column in
# bigquery. However, we do have permission to update it.
# TODO there is likely a bug on koku side for month boundary - update this to set different
# partitions for previous and current months to investigate it
# e.g., but using end_time intead of start time, or by using export_time (would have to be adjusted first)
partition_date_sql = f"""
UPDATE `{table_id}` SET _PARTITIONTIME=CAST(DATE_TRUNC(DATE(usage_start_time), DAY) AS timestamp) WHERE 1=1;
"""
bigquery_client.query(partition_date_sql)

LOG.info(f"Dataset {dataset_name} created in GCP bigquery under the table name {table_name}.")
LOG.info(log_message)

except GoogleCloudError as upload_err:
LOG.error(upload_err)
uploaded = False
Expand Down

0 comments on commit 75300b4

Please sign in to comment.