Data Pipelines

Ingesting real-time data poses a number of challenges:

  1. Have to scale in real time
  2. Have to deal with data being late
  3. Have to deal with bad data coming in real time (duplicates, missing data, etc.)

Cloud Pub/Sub

Distributed messaging system to handle real-time messaging.

--------------------------------------------------------------------------------------------------------------------

   (sensor data, etc.) > Cloud Pub/Sub > Cloud Dataflow > BigQuery/Cloud Storage > Explo/Visu Apps
                         [Ingests data,   [Subscribes to
                          publishes to      Cloud Pub/Sub]
                          subscribers
--------------------------------------------------------------------------------------------------------------------

Serverless Big Data Pipeline

Cloud Pub/Sub Architecture

Cloud Pub/Sub uses topics. These are like channels that is publishes. Subscribers can listen to these topics and pick up messages that are published.

For example:

  1. Setup Pub/Sub with a topic called “HR”
  2. When a new worker joins, the HR system publishes a “NEW HIRE” event to the “HR” topic
  3. Then, downstream applications (facilities, badge activation system) who are subscribed to this topic can get the message and take action as appropriate

Cloud Dataflow

Apache Beam

  • Used to implement batch or streaming data processing jobs.
  • Pipelines written in Java, Python, or Go
  • Creates a model representation of code which is portable across many runners
  • Runners pass models to an execution environment, which can run on many different engines (e.g., Spark, Cloud Dataflow)
  • Transformations can be done in parallel, making pipelines scalable

Workflow with Cloud Dataflow

image
  1. Write code to create model in Beam
  2. Beam passes a job to Cloud Dataflow
  3. Once it receives it, Cloud Dataflow’s service:
  • Optimizes execution graph
  • Schedules out to workers in distributed fashion
  • Auto-heal if workers encounter errors
  • Connect to data sinks to produce results (e.g., BigQuery)

A number of template pipelines are available:

https://github.com/googlecloudplatform/dataflowtemplates

Data Studio

  • Provides data visualization
  • Data is live, not just a static image
  • Click Explore in Data Studio in BigQuery

Creating a report

  1. Create new report
  2. Select a data source (can have multiple data sources)
  3. Create charts: click and draw

Data Studio uses Dimensions and Metric chips.

  • Dimensions: Categories or buckets of information (area code, etc.). Shown in green.
  • Metric: Measure dimension values. Measurements, aggregations, count, etc. Shown in blue.

Use calculated fields to create your own metrics.

Start with high level metrics on top of the reports and then give more input below. Don’t overcharge the report, keep it crisp. You can copy a dashboard that you like and then adapt it

Lab 1

Cloud Pub/Sub topics

Cloud Pub/Sub lets decouples senders and receivers. Senders send messages to a Cloud Pub/Sub topic, and receivers subscribe to this topic.

_Create a BigQuery dataset

Messages published into Pub/Sub will be stored in BigQuery.

  1. In Cloud Shell, run bq mk taxirides to create a dataset called taxirides in BigQuery
  2. Now create a table inside the dataset:
bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime

Create a Cloud Storage bucket

  1. Storage > Create Bucket
  2. Name must be globally unique (e.g., project id)

Setup Cloud Dataflow Pipeline

  1. Navigation > Dataflow
  2. Create job from template
  3. Type > Cloud Pub/Sub topic to BigQuery template
  4. Input topic > projects/pubsub-public-data/topics/taxirides-realtime
  5. Output table > qwiklabs-gcp-72fdadec78efe24c:taxirides.realtime
  6. Temporary location > gs://qwiklabs-gcp-72fdadec78efe24c/tmp/
  7. Click run job

Cloud Dataflow will now show a visualization of the Dataflow job running.

Analyze streaming data

You can check the data in BigQuery:

SELECT * FROM taxirides.realtime LIMIT 10;

See aggregated per/minute ride data:

WITH streaming_data AS (

SELECT
  timestamp,
  TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour,
  TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute,
  TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second,
  ride_id,
  latitude, 
  longitude,
  meter_reading,
  ride_status,
  passenger_count
FROM
  taxirides.realtime
WHERE ride_status = 'dropoff'
ORDER BY timestamp DESC
LIMIT 100000

)

# calculate aggregations on stream for reporting:
SELECT 
 ROW_NUMBER() OVER() AS dashboard_sort,
 minute,
 COUNT(DISTINCT ride_id) AS total_rides,
 SUM(meter_reading) AS total_revenue,
 SUM(passenger_count) AS total_passengers
FROM streaming_data
GROUP BY minute, timestamp

Explore in Data Studio

  1. Click Explore in Data Studio
  2. Set up dimensions and metrics as desired

Once finished, stop the Cloud Dataflow pipeline job.

What we did :

  • Created data set and table in BigQuery
  • Creatd bucket in google cloud Storage
  • Created pipeline in dataflow to bigquery from public data
  • Visualise in data studio the data and arrange chart

Approaches to ML

  1. Use pre-built AI
  • Lack enough data to build your own model
  1. Add custom models
  • Requires 100,000~millions of records of sample data
  1. Create new models

Use pre-built ML building blocks. E.g., https://console.cloud.google.com/vision/ For example, use vision API to extract text and translate API to tranlate it.

AutoML

  • Precision : classifying photos correctly
  • Recall : High number of correctly classified photos
  • True positive : Cumulus correctly classified as cumulus
  • False positives : Not cumulus wrongly classified as cumulus
  • True Negatives : not cumulus correctly classified as not cumulus
  • False negatives : Cumulus not correctly classified as not cumulus

Use to extend the capabilities of the AI building block without code. For example, extend Vision API to recognize cloud types by uploading photos of clouds with type labels. A confusion matrix shows the % of labels that were correctly/incorrectly labelled.

Uses neural architecture search to build several models and extract the best one.

Lab 2

Get an API key

  • APIs and Services > Credentials > Create Credentials > API Key

In Cloud Shell, set the API Key as an environment variable:

export API_KEY=<YOUR_API_KEY>

Create storage bucket

  • Storage > Create bucket >

Make the bucket publically available:

gsutil acl ch -u AllUsers:R gs://qwiklabs-gcp-005f9de6234f0e59/*

Then click the public link to check it worked.

Create JSON request

You can use Emacs in Cloud Shell:

  1. emacs
  2. c-X c-F to create file (this finds file, but type in the file name you want to create and it will create it as an empty buffer)
  3. c-X c-C to save file and kill terminal

(nano seems to work better in Cloud Shell)

Send the request

Use curl to send the request:

curl -s -X POST -H "Content-Type: application/json" --data-binary @request.json https://vision.googleapis.com/v1/images:annotate?key=${API_KEY}

Now, the response will provide some labels based on the pretrained model, but what if we want the model to be able to detect our own labels? In that case, we can feed in some of our own training data. To do this custom training, we can use AutoML.

AutoML

Once setup, AutoML will create a new bucket with the suffix -vcm.

  1. Bind a new environment variable to this bucket:

export BUCKET=<YOUR_AUTOML_BUCKET>

  1. Copy over the training data:

gsutil -m cp -r gs://automl-codelab-clouds/* gs://${BUCKET}

  1. Now we need to set up a CSV file that tells AutoML where to find each image and the labels associated with each image. We just copy it over:

gsutil -m cp -r gs://automl-codelab-clouds/* gs://${BUCKET}

  1. Now copy the file to your bucket:

gsutil cp ./data.csv gs://${BUCKET}

  1. Now, back in the AutoML Vision UI, click New Dataset > clouds > Select a CSV file on Cloud Storage > Create Dataset

The images will be imported from the CSV. After the images have been imported, you can view them and check their labels, etc. You can also change the labels, etc.

(You want at least 100 images for training.)

  1. Click train to start training.

After the model has finished training, you can check the accuracy of the model:

  • Precision: What proportion of +ve identifications were correct?
  • Recall: What propotion of actual +ves was identified correctly? (1.0 = good score)

Generating Predictions

Now that we’ve training our model, we can use it to make some predictions on unseen data.

  1. Go to the Predict tab
  2. Unload images to see predictions

What we did :

  • Setup AutoML API
  • Create storage bucket
  • Open AutoML Vision, specify user and project
  • Download the images of clouds into Cloud storage bucket
  • Create CSV file containing all the URL to images and the label
  • Upload the CSV to the storage bucket
  • Add new dataset to the AutoML Vision datasets, chose the CSV file that was created before
  • Train the model
  • Evaluate the model
  • upload new images and test the classification

Building a Custom Model

There are threes ways to build custom models in GCP:

  1. Using SQL models (BigQuery ML)
  2. AutoML
  3. ML Engine Notebook (Jupyter) (can write own model with Keras)

https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/deepdive/09_sequence/text_classification.ipynb

Further Courses

  • From Data to Insights
  • Data Engineering
  • ML on GCP
  • Adv ML on GCP
Categories: Cloud

Brax

Dude in his 30s starting his digital notepad