Skip to content

python implementation of a queue for exporting large volumes of data from log analytics to a storage account, azure function, fastapi

Notifications You must be signed in to change notification settings

dtagler/azure-log-analytics-data-export

Repository files navigation

Azure Log Analytics Data Export

Summary | Files | Setup | Usage | Issues | References

Summary

This Azure Function App enables the export of big data (10M+ records per hour) from Azure Log Analytics to Blob Storage via Python SDKs, FastAPI, and API Management. In testing, 50M records with 10 columns were successfully exported in approximately 1 hour.

Inputs and Outputs:

  • Input: log analytics workspace table(s), columns, and date range
  • Output: JSON (line delimited) (default), CSV, or PARQUET files

Azure FastAPI HTTP Functions:

  1. azure_ingest_test_data(): creates and ingests test data (optional)
  2. azure_submit_query(): submits single query that is split into smaller queries/jobs and sends to queue
  3. azure_submit_query_parallel(): breaks up initial query and submits multiple queries in parallel (use for large query date ranges)
  4. azure_get_status(): gives high-level status of query (number of sub-queries, successes, failures, row counts, file sizes)

Azure Queue Functions:

  1. azure_queue_query(): processes split queries
  2. azure_queue_process(): processes subqueries and saves output to storage blobs
  3. azure_queue_query_poison(): processes invalid messages in query queue and saves to table log
  4. azure_queue_process_poison(): processes invalid message in process queue and saves to table log

image

image

image

Files

  • azure-log-analytics-data-export.ipynb: python notebook for development, testing, or interactive use
  • function_app.py: Azure Function App python source code
  • host.json: Azure Function App settings
  • requirements.txt: python package requirements file
  • function.app: zip push deployment
  • zip-push-deployment.txt: instructions on deploying streamlined function without APIM via CLI

Setup

You will need to have access to or provision the following Azure Resources:

  1. Log Analytics Workspace (data source)
  2. Storage Account
  • 1 Container (data output destination)
  • 4 Queues (temp storage for split query messages/jobs)
  • 3 Tables (logging)
  1. Azure Function App (Python 3.11+, premium)
  1. Azure API Management (consumption, DO NOT use developer) (optional)

Authentication (Managed Identity) Roles Setup:

  • Azure Portal -> Function App -> Identity -> System Assigned -> On -> Add Azure Role Assignments
  1. Monitoring Metrics Publisher: Ingest to Log Analytics (optional)
  2. Log Analytics Contributor: Query Log Analytics
  3. Storage Queue Data Contributor: Storage Queue Send/Get/Delete
  4. Storage Queue Data Message Processor: Storage Queue Trigger for Azure Function
  5. Storage Blob Data Contributor: Upload to Blob Storage
  6. Storage Table Data Contributor: Logging

image

Environment Variables for Queue Triggers via Managed Identity:

  • Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
  1. storageAccountConnectionString__queueServiceUri -> https://<STORAGE_ACCOUNT>.queue.core.windows.net/
  2. storageAccountConnectionString__credential -> managedidentity
  3. QueueQueryName -> <STORAGE_QUEUE_NAME_FOR_QUERIES>
  4. QueueProcessName -> <STORAGE_QUEUE_NAME_FOR_PROCESSING>
  5. LogAnalyticsEndpoint -> <LAW_ENDPOINT_URL>

image

Optional Environment Variables (reduces number of params in requests):

  • Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
  1. QueueURL -> <STORAGE_QUEUE_URL>
  2. TableURL -> <STORAGE_TABLE_URL>
  3. TableIngestName -> <STORAGE_TABLE_INGEST_LOG_NAME>
  4. TableQueryName -> <STORAGE_TABLE_QUERY_LOG_NAME>
  5. TableProcessName -> <STORAGE_TABLE_PROCESS_LOG_NAME>

image

Azure Storage Setup:

  1. Create 1 container for data output files
    • <STORAGE_CONTAINER_NAME>
  2. Create 4 queues for messages/jobs
    • <STORAGE_QUEUE_NAME_FOR_QUERIES>
    • <STORAGE_QUEUE_NAME_FOR_PROCESSING>
    • <STORAGE_QUEUE_NAME_FOR_QUERIES>-poison for failed messages
    • <STORAGE_QUEUE_NAME_FOR_PROCESSING>-poison for failed messages
  3. Create 3 tables for logging (i.e. ingestlog, querylog, and processlog)
    • <STORAGE_TABLE_INGEST_LOG_NAME>
    • <STORAGE_TABLE_QUERY_LOG_NAME>
    • <STORAGE_TABLE_PROCESS_LOG_NAME>

API Management (APIM) Setup (Optional):

  • Note: APIM is used to access the FastAPI Swagger/OpenAPI docs
  1. Create APIM Service -> Consumption Pricing Tier (DO NOT use developer)
  2. Add new API -> Function App
    • Function App: <YOUR_FUNCTION>
    • Display Name: Protected API Calls
    • Name: protected-api-calls
    • Suffix: api
  3. Remove all operations besides POST
    • Edit POST operation
      • Display name: azure_ingest_test_data
      • URL: POST /azure_ingest_test_data
    • Clone and Edit new POST operation
      • Display name: azure_submit_query
      • URL: POST /azure_submit_query
    • Clone and Edit new POST operation
      • Display name: azure_submit_query_parallel
      • URL: POST /azure_submit_query_parallel
    • Clone and Edit new POST operation
      • Display name: azure_get_status_post
      • URL: POST /azure_get_status
    • Clone azure_get_status operation
      • Change from POST to GET
      • Display name: azure_get_status
      • URL: GET /azure_get_status
    • Edit OpenAPI spec json operation ids to match above
  4. Add new API -> Function App
    • Function App: <YOUR_FUNCTION>
    • Display Name: Public Docs
    • Name: public-docs
    • Suffix: public
  5. Remove all operations besides GET
    • Settings -> uncheck 'subscription required'
    • Edit GET operation
      • Display name: Documentation
      • URL: GET /docs
    • Clone and Edit new GET operation
      • Display name: OpenAPI Schema
      • URL: GET /openapi.json
    • Edit OpenAPI spec json operation ids to match above
    • Test at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs

image

image

Queue Trigger Setup::

  • To fix message encoding errors (default is base64), add "extensions": {"queues": {"messageEncoding": "none"}} to host.json
  • Note: Failed messages/jobs are sent to <QUEUE_NAME>-poison

Optional Data Collection Endpoint and Rule Setup for Log Analytics Ingest:

  1. Azure Portal -> Monitor -> Data Collection Endpoints -> Create
  2. Azure Portal -> Log Analytics -> Tables -> Create New Custom Table
  • Create new Data Collection Rule (DCR)
  • Create data_sample.json with sample schema of {"TimeGenerated" : "2024-01-01 00:00:00", "DataColumn1" : "string", "DataColumn2" : "string", ... "DataColumn10" : "string"}
  • Azure Portal -> Monitor -> Data Collection Rules -> JSON View -> immutableId, outputStream
  • For the DCE+DCR, assign the Monitoring Metrics Publisher role
  1. If changing or updating number of columns later:
  • azure portal -> log analytics -> settings - tables -> ... -> edit schema
  • azure portal -> data collection rules -> export template -> deploy -> edit
  1. Reference: Tutorial: Send data to Azure Monitor Logs with Logs ingestion API (Azure portal)

Optional Security Settings:

  1. Restrict Azure Function App and APIM to specific IP address range(s)
  • Networking -> Public Access -> Select Virtual Networks or IPs

image

Azure Default Credential Microsoft Entra Endpoints Set AZURE_AUTHORITY_HOST enviorment variable

Usage

Swagger UI Docs at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs

  • API calls require a APIM subscription key
  • APIM -> Subscription -> Create Subscription -> Copy Key
  • Paste in "Ocp-Apim-Subscription-Key" header field

image

  1. azure_submit_query() or azure_submit_query_parallel():
  • HTTP POST Example:
{
    "subscription_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "resource_group_name" : "XXXXXXXXXXXXXXXXXXXXXXX",
    "log_analytics_worksapce_name" : "XXXXXXXXXXXXXXXX",
    "log_analytics_workspace_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "storage_blob_url" : "https://XXXXXXXXXXXXXXXXXXXXX.blob.core.windows.net/",
    "storage_blob_container_name" : "XXXXXXXXXXXXX",
    "table_names_and_columns" : { "XXXXXXXXXXXXXXX": ["TimeGenerated","DataColumn1","DataColumn2","DataColumn3","DataColumn4","DataColumn5","DataColumn6","DataColumn7","DataColumn8","DataColumn9", "DataColumn10"]},
    "start_datetime" : "2024-03-19 00:00:00",
    "end_datetime" : "2024-03-20 00:00:00"
}
  • HTTP Response Examples:
    • azure_submit_query()
    • azure_submit_query_parallel()
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "submit_status": "Success",
    "table_names": "XXXXXXXXXXX",
    "start_datetime": "2024-03-19 00:00:00.000000",
    "end_datetime": "2024-03-20 00:00:00.000000",
    "total_row_count": 23000000,
    "subqueries_generated": 95,
    "subqueries_sent_to_queue": 95,
    "runtime_seconds": 92.1,
    "submit_datetime": "2024-03-26 16:24:38.771336"
}
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "split_status": "Success",
    "table_names": "XXXXXXXXXXX",
    "start_datetime": "2024-04-04 00:00:00.000000",
    "end_datetime": "2024-04-10 00:00:00.000000",
    "number_of_messages_generated": 6,
    "number_of_messages_sent": 6,
    "total_row_count": 2010000,
    "runtime_seconds": 0.9,
    "split_datetime": "2024-04-12 14:06:41.688752"
}
  1. azure_get_status():
  • HTTP POST Request Example:
{
    "query_uuid" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
  • HTTP Response Example:
{
    "query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "query_partitions" : 1,
    "submit_status": "Success",
    "processing_status": "Partial",
    "percent_complete": 29.5,
    "runtime_since_submit_seconds": 463.6,
    "estimated_time_remaining_seconds": 1107.9,
    "number_of_subqueries": 95,
    "number_of_subqueries_success": 28,
    "number_of_subqueries_failed": 0,
    "query_row_count": 23000000,
    "output_row_count": 6972002,
    "output_file_size": 2.05,
    "output_file_units" : "GB"
}

How To Get All Table Column Names in List:

Note: need to extend or add hidden columns (_ prefix) manually

let cols = toscalar(<TABLE_NAME>
| extend _ItemId
| getschema
| summarize make_list(ColumnName)
);
print(cols)

Issues

  1. Azure Function App stops processing sub-queries, queue trigger not processing messages in queue:

    • Manually restart Azure Function App in Azure Portal
    • Use Premium Plan
  2. Submit exceed 10 min Azure Function limit and fails

    • Use azure_submit_query_parallel() function
    • Reduce the datetime range of the query (recommend less than 100M records per request)
    • Decrease break_up_query_freq value in azure_submit_query()
    • Decrease parallel_process_break_up_query_freq value in azure_submit_query_parallel()
    • Use Premium Plan with no time limit, change in hosts.json
  3. Table row count values exceeding 2,147,483,647

    • Change type from int32 to int64
  4. Hidden columns are not auto-detected

    • Add hidden columns manually in list of columns

Changelog

2.2.0 (12/20/2024):

  • Changed output file format to better mimic countinous export

2.1.3 (12/13/2024):

  • Added changes for US Gov CLoud
  • Added _ItemId hidden column by default
  • Added error checking for table/column names before processing
  • fixed logging bugs
  • updated documenation for data ingestion
  • lowered defaults (number of rows) to avoid azure errors

2.1.0:

  • Updated azure queue triggers to use blueprints
  • Added Zip Deployment
  • Fixed uuid bug, fixed table query bug

2.0.0:

  • Changed Azure Function code to use FastAPI in order to use Swagger UI
  • Added pydantic input/output JSON schemas
  • Updated documentation

1.5.0:

  • Added azure_submit_queries() function for larger datetime ranges and parallel processing

1.4.0:

  • Refactored code and made pylint edits
  • Changed logging to % formatting from f-strings

1.3.1:

  • Fixed UTC time zone bug
  • Added estimated time remaining to get_status() response
  • Added option to put storage queue and table params in env variables

1.3.0:

  • Added pydantic input validation
  • Added Open API yaml file for Azure API Management

1.2.0:

  • Added get_status() azure function

1.1.0:

  • Added logging to Azure Table Storage
  • Added row count checks

1.0.0:

  • Initial release

References

  1. How to use logic apps to handle large amounts of data from log analtyics
  2. FastAPI on Azure Functions

About

python implementation of a queue for exporting large volumes of data from log analytics to a storage account, azure function, fastapi

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published