Summary | Files | Setup | Usage | Issues | References
This Azure Function App enables the export of big data (10M+ records per hour) from Azure Log Analytics to Blob Storage via Python SDKs, FastAPI, and API Management. In testing, 50M records with 10 columns were successfully exported in approximately 1 hour.
Inputs and Outputs:
- Input: log analytics workspace table(s), columns, and date range
- Output: JSON (line delimited) (default), CSV, or PARQUET files
Azure FastAPI HTTP Functions:
- azure_ingest_test_data(): creates and ingests test data (optional)
- azure_submit_query(): submits single query that is split into smaller queries/jobs and sends to queue
- azure_submit_query_parallel(): breaks up initial query and submits multiple queries in parallel (use for large query date ranges)
- azure_get_status(): gives high-level status of query (number of sub-queries, successes, failures, row counts, file sizes)
Azure Queue Functions:
- azure_queue_query(): processes split queries
- azure_queue_process(): processes subqueries and saves output to storage blobs
- azure_queue_query_poison(): processes invalid messages in query queue and saves to table log
- azure_queue_process_poison(): processes invalid message in process queue and saves to table log
- azure-log-analytics-data-export.ipynb: python notebook for development, testing, or interactive use
- function_app.py: Azure Function App python source code
- host.json: Azure Function App settings
- requirements.txt: python package requirements file
- function.app: zip push deployment
- zip-push-deployment.txt: instructions on deploying streamlined function without APIM via CLI
You will need to have access to or provision the following Azure Resources:
- Log Analytics Workspace (data source)
- Storage Account
- 1 Container (data output destination)
- 4 Queues (temp storage for split query messages/jobs)
- 3 Tables (logging)
- Azure Function App (Python 3.11+, premium)
- Clone this repo, use VS Code, install Azure Functions tools/extension, deploy to Azure subscription
- Reference: Create a function in Azure with Python using Visual Studio Code
- Azure API Management (consumption, DO NOT use developer) (optional)
Authentication (Managed Identity) Roles Setup:
- Azure Portal -> Function App -> Identity -> System Assigned -> On -> Add Azure Role Assignments
- Monitoring Metrics Publisher: Ingest to Log Analytics (optional)
- Log Analytics Contributor: Query Log Analytics
- Storage Queue Data Contributor: Storage Queue Send/Get/Delete
- Storage Queue Data Message Processor: Storage Queue Trigger for Azure Function
- Storage Blob Data Contributor: Upload to Blob Storage
- Storage Table Data Contributor: Logging
Environment Variables for Queue Triggers via Managed Identity:
- Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
- storageAccountConnectionString__queueServiceUri -> https://<STORAGE_ACCOUNT>.queue.core.windows.net/
- storageAccountConnectionString__credential -> managedidentity
- QueueQueryName -> <STORAGE_QUEUE_NAME_FOR_QUERIES>
- QueueProcessName -> <STORAGE_QUEUE_NAME_FOR_PROCESSING>
- LogAnalyticsEndpoint -> <LAW_ENDPOINT_URL>
Optional Environment Variables (reduces number of params in requests):
- Setup via Azure Portal -> Function App -> Settings -> Configuration -> Environment Variables
- QueueURL -> <STORAGE_QUEUE_URL>
- TableURL -> <STORAGE_TABLE_URL>
- TableIngestName -> <STORAGE_TABLE_INGEST_LOG_NAME>
- TableQueryName -> <STORAGE_TABLE_QUERY_LOG_NAME>
- TableProcessName -> <STORAGE_TABLE_PROCESS_LOG_NAME>
Azure Storage Setup:
- Create 1 container for data output files
- <STORAGE_CONTAINER_NAME>
- Create 4 queues for messages/jobs
- <STORAGE_QUEUE_NAME_FOR_QUERIES>
- <STORAGE_QUEUE_NAME_FOR_PROCESSING>
- <STORAGE_QUEUE_NAME_FOR_QUERIES>-poison for failed messages
- <STORAGE_QUEUE_NAME_FOR_PROCESSING>-poison for failed messages
- Create 3 tables for logging (i.e. ingestlog, querylog, and processlog)
- <STORAGE_TABLE_INGEST_LOG_NAME>
- <STORAGE_TABLE_QUERY_LOG_NAME>
- <STORAGE_TABLE_PROCESS_LOG_NAME>
API Management (APIM) Setup (Optional):
- Note: APIM is used to access the FastAPI Swagger/OpenAPI docs
- Create APIM Service -> Consumption Pricing Tier (DO NOT use developer)
- Add new API -> Function App
- Function App: <YOUR_FUNCTION>
- Display Name: Protected API Calls
- Name: protected-api-calls
- Suffix: api
- Remove all operations besides POST
- Edit POST operation
- Display name: azure_ingest_test_data
- URL: POST /azure_ingest_test_data
- Clone and Edit new POST operation
- Display name: azure_submit_query
- URL: POST /azure_submit_query
- Clone and Edit new POST operation
- Display name: azure_submit_query_parallel
- URL: POST /azure_submit_query_parallel
- Clone and Edit new POST operation
- Display name: azure_get_status_post
- URL: POST /azure_get_status
- Clone azure_get_status operation
- Change from POST to GET
- Display name: azure_get_status
- URL: GET /azure_get_status
- Edit OpenAPI spec json operation ids to match above
- Edit POST operation
- Add new API -> Function App
- Function App: <YOUR_FUNCTION>
- Display Name: Public Docs
- Name: public-docs
- Suffix: public
- Remove all operations besides GET
- Settings -> uncheck 'subscription required'
- Edit GET operation
- Display name: Documentation
- URL: GET /docs
- Clone and Edit new GET operation
- Display name: OpenAPI Schema
- URL: GET /openapi.json
- Edit OpenAPI spec json operation ids to match above
- Test at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs
Queue Trigger Setup::
- To fix message encoding errors (default is base64), add "extensions": {"queues": {"messageEncoding": "none"}} to host.json
- Note: Failed messages/jobs are sent to <QUEUE_NAME>-poison
Optional Data Collection Endpoint and Rule Setup for Log Analytics Ingest:
- Azure Portal -> Monitor -> Data Collection Endpoints -> Create
- Azure Portal -> Log Analytics -> Tables -> Create New Custom Table
- Create new Data Collection Rule (DCR)
- Create data_sample.json with sample schema of {"TimeGenerated" : "2024-01-01 00:00:00", "DataColumn1" : "string", "DataColumn2" : "string", ... "DataColumn10" : "string"}
- Azure Portal -> Monitor -> Data Collection Rules -> JSON View -> immutableId, outputStream
- For the DCE+DCR, assign the Monitoring Metrics Publisher role
- If changing or updating number of columns later:
- azure portal -> log analytics -> settings - tables -> ... -> edit schema
- azure portal -> data collection rules -> export template -> deploy -> edit
Optional Security Settings:
- Restrict Azure Function App and APIM to specific IP address range(s)
- Networking -> Public Access -> Select Virtual Networks or IPs
Azure Default Credential Microsoft Entra Endpoints Set AZURE_AUTHORITY_HOST enviorment variable
- Public Cloud: https://login.microsoftonline.com (default)
- US Gov Cloud: https://login.microsoftonline.us
Swagger UI Docs at https://<APIM_ENDPOINT_NAME>.azure-api.net/public/docs
- API calls require a APIM subscription key
- APIM -> Subscription -> Create Subscription -> Copy Key
- Paste in "Ocp-Apim-Subscription-Key" header field
- azure_submit_query() or azure_submit_query_parallel():
- HTTP POST Example:
{
"subscription_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"resource_group_name" : "XXXXXXXXXXXXXXXXXXXXXXX",
"log_analytics_worksapce_name" : "XXXXXXXXXXXXXXXX",
"log_analytics_workspace_id" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"storage_blob_url" : "https://XXXXXXXXXXXXXXXXXXXXX.blob.core.windows.net/",
"storage_blob_container_name" : "XXXXXXXXXXXXX",
"table_names_and_columns" : { "XXXXXXXXXXXXXXX": ["TimeGenerated","DataColumn1","DataColumn2","DataColumn3","DataColumn4","DataColumn5","DataColumn6","DataColumn7","DataColumn8","DataColumn9", "DataColumn10"]},
"start_datetime" : "2024-03-19 00:00:00",
"end_datetime" : "2024-03-20 00:00:00"
}
- HTTP Response Examples:
- azure_submit_query()
- azure_submit_query_parallel()
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"submit_status": "Success",
"table_names": "XXXXXXXXXXX",
"start_datetime": "2024-03-19 00:00:00.000000",
"end_datetime": "2024-03-20 00:00:00.000000",
"total_row_count": 23000000,
"subqueries_generated": 95,
"subqueries_sent_to_queue": 95,
"runtime_seconds": 92.1,
"submit_datetime": "2024-03-26 16:24:38.771336"
}
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"split_status": "Success",
"table_names": "XXXXXXXXXXX",
"start_datetime": "2024-04-04 00:00:00.000000",
"end_datetime": "2024-04-10 00:00:00.000000",
"number_of_messages_generated": 6,
"number_of_messages_sent": 6,
"total_row_count": 2010000,
"runtime_seconds": 0.9,
"split_datetime": "2024-04-12 14:06:41.688752"
}
- azure_get_status():
- HTTP POST Request Example:
{
"query_uuid" : "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
}
- HTTP Response Example:
{
"query_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"query_partitions" : 1,
"submit_status": "Success",
"processing_status": "Partial",
"percent_complete": 29.5,
"runtime_since_submit_seconds": 463.6,
"estimated_time_remaining_seconds": 1107.9,
"number_of_subqueries": 95,
"number_of_subqueries_success": 28,
"number_of_subqueries_failed": 0,
"query_row_count": 23000000,
"output_row_count": 6972002,
"output_file_size": 2.05,
"output_file_units" : "GB"
}
Note: need to extend or add hidden columns (_ prefix) manually
let cols = toscalar(<TABLE_NAME>
| extend _ItemId
| getschema
| summarize make_list(ColumnName)
);
print(cols)
-
Azure Function App stops processing sub-queries, queue trigger not processing messages in queue:
- Manually restart Azure Function App in Azure Portal
- Use Premium Plan
-
Submit exceed 10 min Azure Function limit and fails
- Use azure_submit_query_parallel() function
- Reduce the datetime range of the query (recommend less than 100M records per request)
- Decrease break_up_query_freq value in azure_submit_query()
- Decrease parallel_process_break_up_query_freq value in azure_submit_query_parallel()
- Use Premium Plan with no time limit, change in hosts.json
-
Table row count values exceeding 2,147,483,647
- Change type from int32 to int64
-
Hidden columns are not auto-detected
- Add hidden columns manually in list of columns
2.2.0 (12/20/2024):
- Changed output file format to better mimic countinous export
2.1.3 (12/13/2024):
- Added changes for US Gov CLoud
- Added _ItemId hidden column by default
- Added error checking for table/column names before processing
- fixed logging bugs
- updated documenation for data ingestion
- lowered defaults (number of rows) to avoid azure errors
2.1.0:
- Updated azure queue triggers to use blueprints
- Added Zip Deployment
- Fixed uuid bug, fixed table query bug
2.0.0:
- Changed Azure Function code to use FastAPI in order to use Swagger UI
- Added pydantic input/output JSON schemas
- Updated documentation
1.5.0:
- Added azure_submit_queries() function for larger datetime ranges and parallel processing
1.4.0:
- Refactored code and made pylint edits
- Changed logging to % formatting from f-strings
1.3.1:
- Fixed UTC time zone bug
- Added estimated time remaining to get_status() response
- Added option to put storage queue and table params in env variables
1.3.0:
- Added pydantic input validation
- Added Open API yaml file for Azure API Management
1.2.0:
- Added get_status() azure function
1.1.0:
- Added logging to Azure Table Storage
- Added row count checks
1.0.0:
- Initial release