Skip to content

Commit

Permalink
Use Spark 3 in Data Explorer Sample (#98)
Browse files Browse the repository at this point in the history
Also:
- automatically generate PAT token for Databricks
- added README about continuous export for Data Explorer
  • Loading branch information
algattik authored Oct 21, 2020
1 parent c96e95c commit 7a68c7a
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 52 deletions.
71 changes: 25 additions & 46 deletions components/azure-databricks/create-databricks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,40 @@ fi

databricks_metainfo=$(az resource show -g $RESOURCE_GROUP --resource-type Microsoft.Databricks/workspaces -n $ADB_WORKSPACE -o json)

# Databricks CLI automatically picks up configuration from $DATABRICKS_HOST and $DATABRICKS_TOKEN.
export DATABRICKS_HOST=$(jq -r '"https://" + .location + ".azuredatabricks.net"' <<<"$databricks_metainfo")

echo 'creating Key Vault to store Databricks PAT token'
az keyvault create -g $RESOURCE_GROUP -n $ADB_TOKEN_KEYVAULT -o tsv >>log.txt

echo 'checking PAT token secret presence in Key Vault'
databricks_token_secret_name="DATABRICKS-TOKEN"
pat_token_secret=$(az keyvault secret list --vault-name $ADB_TOKEN_KEYVAULT --query "[?ends_with(id, '/$databricks_token_secret_name')].id" -o tsv)
if [[ -z "$pat_token_secret" ]]; then
echo 'PAT token secret not present. Creating dummy entry for user to fill in manually'
az keyvault secret set --vault-name $ADB_TOKEN_KEYVAULT -n "$databricks_token_secret_name" --file /dev/null -o tsv >>log.txt
echo 'generating PAT token'
wsId=$(jq -r .id <<<"$databricks_metainfo")

# Get a token for the global Databricks application.
# The resource name is fixed and never changes.
token_response=$(az account get-access-token --resource 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d)
token=$(jq .accessToken -r <<< "$token_response")

# Get a token for the Azure management API
token_response=$(az account get-access-token --resource https://management.core.windows.net/)
azToken=$(jq .accessToken -r <<< "$token_response")

api_response=$(curl -sf "$DATABRICKS_HOST/api/2.0/token/create" \
-H "Authorization: Bearer $token" \
-H "X-Databricks-Azure-SP-Management-Token:$azToken" \
-H "X-Databricks-Azure-Workspace-Resource-Id:$wsId" \
-d '{ "lifetime_seconds": 864000, "comment": "streaming-at-scale generated token" }')
pat_token=$(jq .token_value -r <<< "$api_response")

az keyvault secret set --vault-name "$ADB_TOKEN_KEYVAULT" --name "$databricks_token_secret_name" --value "$pat_token"
fi

echo 'checking PAT token presence in Key Vault'
pat_token=$(az keyvault secret show --vault-name $ADB_TOKEN_KEYVAULT -n "$databricks_token_secret_name" --query value -o tsv)

if [[ -z "$pat_token" ]]; then
echo 'PAT token not present. Requesting user to fill in manually'
databricks_login_url=$(jq -r '"https://" + .location + ".azuredatabricks.net/aad/auth?has=&Workspace=" + .id + "&WorkspaceResourceGroupUri="+ .properties.managedResourceGroupId' <<<"$databricks_metainfo")

kv_info=$(az resource show -g $RESOURCE_GROUP --resource-type Microsoft.KeyVault/vaults -n $ADB_TOKEN_KEYVAULT -o json)
kv_secrets_url=$(jq -r '"https://portal.azure.com/#@" + .properties.tenantId + "/resource" + .id + "/secrets"' <<<$kv_info)

cat <<EOM
ERROR: Missing PAT token in Key Vault (this is normal the first time you run this script).
You need to manually create a Databricks PAT token and register it into the Key Vault as follows,
then rerun this script or pipeline.
- Navigate to:
$databricks_login_url
Create a PAT token and copy it to the clipboard:
https://docs.azuredatabricks.net/api/latest/authentication.html#generate-a-token
- Navigate to:
$kv_secrets_url
Click $databricks_token_secret_name
Click "+ New Version"
As value, enter the PAT token you copied
Click Create
- The script will wait for the PAT to be copied into the Key Vault
If you stop the script, you can resume it running the following command:
./create-solution.sh -d "$PREFIX" -t $TESTTYPE -s PT
EOM

echo 'waiting for PAT (polling every 5 secs)...'
while : ; do
pat_token=$(az keyvault secret show --vault-name "$ADB_TOKEN_KEYVAULT" --name "$databricks_token_secret_name" --query value -o tsv | grep dapi || true)
if [ ! -z "$pat_token" ]; then break; fi
sleep 5
done
echo 'PAT detected'
fi

# Databricks CLI automatically picks up configuration from these two environment variables.
export DATABRICKS_HOST=$(jq -r '"https://" + .location + ".azuredatabricks.net"' <<<"$databricks_metainfo")
export DATABRICKS_TOKEN="$pat_token"
echo 'getting PAT token from Key Vault'
export DATABRICKS_TOKEN=$(az keyvault secret show --vault-name $ADB_TOKEN_KEYVAULT -n "$databricks_token_secret_name" --query value -o tsv)

fi
echo 'checking Databricks secrets scope exists'
Expand Down
33 changes: 33 additions & 0 deletions eventhubs-dataexplorer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,36 @@ To remove all the created resource, you can just delete the related resource gro
```bash
az group delete -n <resource-group-name>
```

## Next steps

Retaining long-term data in Azure Data Explorer can drive up costs. You can set up [continuous data export](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/data-export/continuous-data-export) to save derivations from ingested data into storage. In conjunction with a [retention policy](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/retentionpolicy), this allows data tiering, serving hot data from Data Explorer's own storage, and colder data through the external table.

The sample statements below use CSV files in storage blob for simplicity. Use Parquet instead to improve file size and access performance, especially if planning to query data from the external table. Use Azure Data Lake Storage Gen2 instead of blob for improved performance and to avoid the need for hard-coded credentials.


```kql
.create external table SummarizedEvents (deviceId: string, type: string, count:long, from:datetime, to:datetime)
kind=blob
dataformat=csv
(
h@'https://<SOLUTION_NAME>storage.blob.core.windows.net/export;<STORAGE_KEY>'
)
.create function
EventSummary()
{
EventTable
| summarize count=count(), from=min(createdAt), to=max(createdAt) by deviceId, type
}
// Create the target table (if it doesn't already exist)
.set-or-append SummarizedEvents <| EventSummary() | limit 0
.create-or-alter continuous-export SummarizedEventsExport
to table SummarizedEvents
with
(intervalBetweenRuns=5m)
<| EventSummary()
```
9 changes: 6 additions & 3 deletions eventhubs-dataexplorer/create-solution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ if [[ -z "$PREFIX" ]]; then
usage
fi

export DATABRICKS_SPARKVERSION=7.3.x-scala2.12

# 10000 messages/sec
if [ "$TESTTYPE" == "10" ]; then
export EVENTHUB_PARTITIONS=12
export EVENTHUB_CAPACITY=12
export DATAEXPLORER_SKU=D13_v2
export DATAEXPLORER_SKU=Standard_D13_v2
export DATAEXPLORER_CAPACITY=3
export SIMULATOR_INSTANCES=5
fi
Expand All @@ -60,7 +62,7 @@ fi
if [ "$TESTTYPE" == "5" ]; then
export EVENTHUB_PARTITIONS=8
export EVENTHUB_CAPACITY=6
export DATAEXPLORER_SKU=D12_v2
export DATAEXPLORER_SKU=Standard_D12_v2
export DATAEXPLORER_CAPACITY=2
export SIMULATOR_INSTANCES=3
fi
Expand All @@ -69,7 +71,7 @@ fi
if [ "$TESTTYPE" == "1" ]; then
export EVENTHUB_PARTITIONS=2
export EVENTHUB_CAPACITY=2
export DATAEXPLORER_SKU=D11_v2
export DATAEXPLORER_SKU=Standard_D11_v2
export DATAEXPLORER_CAPACITY=2
export SIMULATOR_INSTANCES=1
fi
Expand Down Expand Up @@ -169,6 +171,7 @@ echo "***** [V] Starting deployment VERIFICATION"

RUN=`echo $STEPS | grep V -o || true`
if [ ! -z "$RUN" ]; then
source ../assert/has-local-databrickscli.sh
source ../components/azure-databricks/create-databricks.sh
source ../streaming/databricks/runners/verify-dataexplorer.sh
fi
Expand Down
1 change: 1 addition & 0 deletions streaming/databricks/job/run-databricks-job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ wait_for_run () {
cluster_jq_command="$(cat <<JQ
.name = "Streaming at scale job $notebook_name"
| .notebook_task.notebook_path = "/Shared/streaming-at-scale/$notebook_name"
| .new_cluster.spark_version = "$DATABRICKS_SPARKVERSION"
| .new_cluster.node_type_id = "$DATABRICKS_NODETYPE"
| .new_cluster.num_workers = $DATABRICKS_WORKERS
| .timeout_seconds = $((${REPORT_THROUGHPUT_MINUTES:-30} * 60))
Expand Down
4 changes: 2 additions & 2 deletions streaming/databricks/notebooks/verify-dataexplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ val cluster = dbutils.widgets.get("dataexplorer-cluster")
val database = dbutils.widgets.get("dataexplorer-database")
val query = dbutils.widgets.get("dataexplorer-query")
val conf: Map[String, String] = Map(
KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> dbutils.widgets.get("dataexplorer-client-id"),
KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> dbutils.secrets.get(scope = "MAIN", key = "dataexplorer-client-password"),
KustoSourceOptions.KUSTO_AAD_APP_ID -> dbutils.widgets.get("dataexplorer-client-id"),
KustoSourceOptions.KUSTO_AAD_APP_SECRET -> dbutils.secrets.get(scope = "MAIN", key = "dataexplorer-client-password"),
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_NAME -> dbutils.widgets.get("dataexplorer-storage-account"),
KustoSourceOptions.KUSTO_BLOB_STORAGE_ACCOUNT_KEY -> dbutils.secrets.get(scope = "MAIN", key = "dataexplorer-storage-key"),
KustoSourceOptions.KUSTO_BLOB_CONTAINER -> dbutils.widgets.get("dataexplorer-storage-container")
Expand Down
1 change: 1 addition & 0 deletions streaming/databricks/runners/verify-common.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export DATABRICKS_NODETYPE=Standard_F4s
export DATABRICKS_WORKERS=2
export DATABRICKS_SPARKVERSION=${DATABRICKS_SPARKVERSION:-5.5.x-scala2.11}
export DATABRICKS_MAXEVENTSPERTRIGGER=10000

export DATABRICKS_TESTOUTPUTPATH=dbfs:/test-output/$(uuidgen)
Expand Down
2 changes: 1 addition & 1 deletion streaming/databricks/runners/verify-dataexplorer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ databricks secrets put --scope "MAIN" --key "dataexplorer-client-password" --str
databricks secrets put --scope "MAIN" --key "dataexplorer-storage-key" --string-value "$AZURE_STORAGE_KEY"

source ../streaming/databricks/job/run-databricks-job.sh verify-dataexplorer true "$(cat <<JQ
.libraries += [ { "maven": { "coordinates": "com.microsoft.azure.kusto:spark-kusto-connector:1.0.0-BETA-04", "exclusions": ["javax.mail:mail"] } } ]
.libraries += [ { "maven": { "coordinates": "com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.3.0" } } ]
| .notebook_task.base_parameters."test-output-path" = "$DATABRICKS_TESTOUTPUTPATH"
| .notebook_task.base_parameters."dataexplorer-cluster" = "$kustoURL"
| .notebook_task.base_parameters."dataexplorer-database" = "$DATAEXPLORER_DATABASE"
Expand Down

0 comments on commit 7a68c7a

Please sign in to comment.