Skip to content

Latest commit

 

History

History

sdk_launch

Introduction

This code example shows how you can run Cloud Dataflow pipelines from App Engine apps, as a replacement for the older GAE Python MapReduce libraries, as well as do much more.

The example shows how to periodically launch a Python Dataflow pipeline from GAE, to analyze data stored in Cloud Datastore; in this case, tweets from Twitter.

This example uses the Dataflow SDK to launch the pipeline jobs. Because of its use of the SDK, it requires App Engine Flex. For an example that uses the same pipeline, but uses Dataflow Templates to launch the pipeline jobs, see the 'job_template_launch' directory, which uses App Engine Standard. Now that Dataflow Templates are available, they are likely the more straightforward option for this type of task in most cases, so you may want to start with the 'job_template_launch' directory.

The example is a GAE app with two services (previously, 'modules'):

  • a GAE Standard service that periodically pulls in timeline tweets from Twitter and stores them in Datastore; and

  • a GAE Flexible service that periodically launches a Python Dataflow pipeline to analyze the tweet data in the Datastore.

The Dataflow pipeline

The Python Dataflow pipeline reads recent tweets from the past N days from Cloud Datastore, then essentially splits into three processing branches. It finds the top N most popular words in terms of the percentage of tweets they were found in, calculates the top N most popular URLs in terms of their count, and then derives relevant word co-occurrences (bigrams) using an approximation to a tf*idf ranking metric. It writes the results to three BigQuery tables.

Prerequisites for running the example

1. Basic GCP setup

Follow the "Before you begin" steps on this page. Note your project and bucket name; you will need them in a moment.

For local testing (not required, but may be useful), follow the next section on the same page to [install pip and the Dataflow SDK](https://cloud.google.com/dataflow/docs/quickstarts/quickstart- python#Setup).

2. Create a BigQuery dataset in your project

The app will write its analytic results to BigQuery. In your project, create a new dataset to use for this purpose, or note the name of an existing dataset that you will use.

3. Create a Twitter App

Create a Twitter application.. Note the credentials under the 'Keys and Access Tokens' tag: 'Consumer Key (API Key)', 'Consumer Secret (API Secret)', 'Access Token', and 'Access Token Secret'. You'll need these in moment.

4. Library installation and config

  1. We need to 'vendor' the libraries used by the app's frontend. Install the dependencies into the app's lib subdirectory like this:
pip install --target=lib -r standard_requirements.txt

(Take a look at appengine_config.py to see where we specify to GAE to add those libs).

  1. Then, edit app.yaml to add the Twitter app credentials that you generated above.

  2. Edit the Dockerfile to specify the env vars for your PROJECT, GCS BUCKET, and DATASET names, created as described above. (For the bucket, do NOT add the 'gs://' prefix; just use the name).

Deploying the App

The example app is essentially 'headless', with no user-facing frontend. It would be straightforward to add user-facing content, but as is, it just runs two app cron jobs: one to periodically pull tweets from Twitter and add them to the Datastore, and one to periodically analyze the tweet data. The analysis results are written to BigQuery.

More specifically, the app consists of two services: a GAE Standard service, which fetches tweets and stores them in the Datastore; and a GAE Flex service, which launches a Dataflow pipeline to analyze the tweet data. Both are triggered by App Engine cron jobs.

So, the app has four .yaml files. There is one for each service: app.yaml (which uses main.py) and backend.yaml (which uses main_df.py). The GAE flex service is a custom runtime, and it uses the Dockerfile and requirements.txt in this directory when it builds and deploys.

Then, the app has a dispatch.yaml file that specifies how to route requests to the two services; and a cron.yaml file that defines the cron jobs.

Deploy the parts of the app like this, in this order:

gcloud app deploy app.yaml
gcloud app deploy backend.yaml
gcloud app deploy dispatch.yaml
gcloud app deploy cron.yaml

If you change something in either service's script, you only need to re-deploy that .yaml file. You don't need to redeploy the dispatch file again unless you change the routing. If you should want to change the cron job timings, just edit and redeploy the cron.yaml file.

Testing your deployment

To test your deployment, manually trigger the cron jobs. To do this, go to the cloud console for your project, and visit the App Engine pane. Then, click on 'Task Queues' in the left navbar, then the 'Cron Jobs' tab in the center pane.

Then, click Run now for the /timeline cron job. This is the job that fetches tweets and stores them in the Datastore. After it runs, you should be able to see Tweet entities in the Datastore. Visit the Datastore pane in the Cloud Console, and select Tweet from the 'Entities' pull-down menu. You can also try a GQL query:

select * from Tweet order by created_at desc

Once you know that the 'fetch tweets' cron is running successfully, click Run now for the /launchpipeline cron. This should kick off a Dataflow job and return within a few seconds. You should be able to see it running in the Dataflow pane of the Cloud Console. It should finish in a few minutes. Check that it finishes without error.

Once it has finished, you ought to see three new tables in your BigQuery dataset: urls, word_counts, and word_cooccur.

If you see any problems, make sure that you've configured the app.yaml and Dockerfile as described above, and check the logs for clues.

Note: the /launchpipeline request handler is configured to return without launching the pipeline if the request has not originated as a cron request. You can comment out that logic in main_df.py, in the launch() function, if you'd like to override that behavior.

Running the 'backend' script locally

If you'd like, you can directly run the main_df.py script, which launches the Dataflow pipeline, locally. You might find this easier if you're trying to debug some change you've made. To do this, first make sure you've followed these instructions. Make sure you have all the required dependencies by running the following in your virtual environment:

pip install --target=lib -r requirements.txt

Then, set the necessary environment vars in your shell (changing the following for your correct values):

export PROJECT=your-project
export BUCKET=your-bucket-name
export DATASET=your-dataset-name

Then, edit main_df.py and in launch(), comment out the code that checks that the request has the 'X-Appengine-Cron'header.

Then run:

python main_df.py

The script will start a web server on localhost port 8080. Visit http://localhost:8080/launchpipeline. That should kick off the Dataflow pipeline deployment.

Exploring the analytics results in BigQuery

Once our example app is up and running, it periodically writes the results of its analysis to BigQuery. Then, we can run some fun queries on the data.

For example, we can find recent word co-occurrences that are 'interesting' by our metric:

Or look for emerging word pairs, that have become 'interesting' in the last day or so (as of early April 2017):

We can contrast the 'interesting' word pairs with the words that are simply the most popular within a given period (you can see that most of these words are common, but not particularly newsworthy):

Or, find the most often-tweeted URLs from the past few days (some URLs are truncated in the output):

What next?

This example walks through how you can programmatically launch Dataflow pipelines — that read from Datastore — directly from your App Engine app, in order to support a range of processing and analytics tasks.

There are lots of interesting ways that this example could be extended. For example, you could add a user-facing frontend to the web app, that fetches and displays results from BigQuery. You might also look at trends over time (e.g. for bigrams) -- either from BigQuery, or by extending the Dataflow pipeline.

Contributions

Contributions are not currently accepted. This is not an official Google product.