Data Pipelines
Ingesting real-time data poses a number of challenges:
- Have to scale in real time
- Have to deal with data being late
- Have to deal with bad data coming in real time (duplicates, missing data, etc.)
Cloud Pub/Sub
Distributed messaging system to handle real-time messaging.
--------------------------------------------------------------------------------------------------------------------
(sensor data, etc.) > Cloud Pub/Sub > Cloud Dataflow > BigQuery/Cloud Storage > Explo/Visu Apps
[Ingests data, [Subscribes to
publishes to Cloud Pub/Sub]
subscribers
--------------------------------------------------------------------------------------------------------------------
Serverless Big Data Pipeline
Cloud Pub/Sub Architecture
Cloud Pub/Sub uses topics. These are like channels that is publishes. Subscribers can listen to these topics and pick up messages that are published.
For example:
- Setup Pub/Sub with a topic called “HR”
- When a new worker joins, the HR system publishes a “NEW HIRE” event to the “HR” topic
- Then, downstream applications (facilities, badge activation system) who are subscribed to this topic can get the message and take action as appropriate
Cloud Dataflow
Apache Beam
- Used to implement batch or streaming data processing jobs.
- Pipelines written in Java, Python, or Go
- Creates a model representation of code which is portable across many runners
- Runners pass models to an execution environment, which can run on many different engines (e.g., Spark, Cloud Dataflow)
- Transformations can be done in parallel, making pipelines scalable
Workflow with Cloud Dataflow
- Write code to create model in Beam
- Beam passes a job to Cloud Dataflow
- Once it receives it, Cloud Dataflow’s service:
- Optimizes execution graph
- Schedules out to workers in distributed fashion
- Auto-heal if workers encounter errors
- Connect to data sinks to produce results (e.g., BigQuery)
A number of template pipelines are available:
https://github.com/googlecloudplatform/dataflowtemplates
Data Studio
- Provides data visualization
- Data is live, not just a static image
- Click
Explore in Data Studio
in BigQuery
Creating a report
- Create new report
- Select a data source (can have multiple data sources)
- Create charts: click and draw
Data Studio uses Dimensions and Metric chips.
- Dimensions: Categories or buckets of information (area code, etc.). Shown in green.
- Metric: Measure dimension values. Measurements, aggregations, count, etc. Shown in blue.
Use calculated fields to create your own metrics.
Start with high level metrics on top of the reports and then give more input below. Don’t overcharge the report, keep it crisp. You can copy a dashboard that you like and then adapt it
Lab 1
Cloud Pub/Sub topics
Cloud Pub/Sub lets decouples senders and receivers. Senders send messages to a Cloud Pub/Sub topic, and receivers subscribe to this topic.
_Create a BigQuery dataset
Messages published into Pub/Sub will be stored in BigQuery.
- In Cloud Shell, run
bq mk taxirides
to create a dataset calledtaxirides
in BigQuery - Now create a table inside the dataset:
bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime
Create a Cloud Storage bucket
- Storage > Create Bucket
- Name must be globally unique (e.g., project id)
Setup Cloud Dataflow Pipeline
- Navigation > Dataflow
- Create job from template
- Type > Cloud Pub/Sub topic to BigQuery template
- Input topic > projects/pubsub-public-data/topics/taxirides-realtime
- Output table > qwiklabs-gcp-72fdadec78efe24c:taxirides.realtime
- Temporary location > gs://qwiklabs-gcp-72fdadec78efe24c/tmp/
- Click run job
Cloud Dataflow will now show a visualization of the Dataflow job running.
Analyze streaming data
You can check the data in BigQuery:
SELECT * FROM taxirides.realtime LIMIT 10;
See aggregated per/minute ride data:
WITH streaming_data AS ( SELECT timestamp, TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour, TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute, TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second, ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM taxirides.realtime WHERE ride_status = 'dropoff' ORDER BY timestamp DESC LIMIT 100000 ) # calculate aggregations on stream for reporting: SELECT ROW_NUMBER() OVER() AS dashboard_sort, minute, COUNT(DISTINCT ride_id) AS total_rides, SUM(meter_reading) AS total_revenue, SUM(passenger_count) AS total_passengers FROM streaming_data GROUP BY minute, timestamp
Explore in Data Studio
- Click
Explore in Data Studio
- Set up dimensions and metrics as desired
Once finished, stop the Cloud Dataflow pipeline job.
What we did :
- Created data set and table in BigQuery
- Creatd bucket in google cloud Storage
- Created pipeline in dataflow to bigquery from public data
- Visualise in data studio the data and arrange chart
Approaches to ML
- Use pre-built AI
- Lack enough data to build your own model
- Add custom models
- Requires 100,000~millions of records of sample data
- Create new models
Use pre-built ML building blocks. E.g., https://console.cloud.google.com/vision/ For example, use vision API to extract text and translate API to tranlate it.
AutoML
- Precision : classifying photos correctly
- Recall : High number of correctly classified photos
- True positive : Cumulus correctly classified as cumulus
- False positives : Not cumulus wrongly classified as cumulus
- True Negatives : not cumulus correctly classified as not cumulus
- False negatives : Cumulus not correctly classified as not cumulus
Use to extend the capabilities of the AI building block without code. For example, extend Vision API to recognize cloud types by uploading photos of clouds with type labels. A confusion matrix shows the % of labels that were correctly/incorrectly labelled.
Uses neural architecture search to build several models and extract the best one.
Lab 2
Get an API key
- APIs and Services > Credentials > Create Credentials > API Key
In Cloud Shell, set the API Key as an environment variable:
export API_KEY=<YOUR_API_KEY>
Create storage bucket
- Storage > Create bucket >
Make the bucket publically available:
gsutil acl ch -u AllUsers:R gs://qwiklabs-gcp-005f9de6234f0e59/*
Then click the public link to check it worked.
Create JSON request
You can use Emacs in Cloud Shell:
emacs
c-X c-F
to create file (this finds file, but type in the file name you want to create and it will create it as an empty buffer)c-X c-C
to save file and kill terminal
(nano seems to work better in Cloud Shell)
Send the request
Use curl
to send the request:
curl -s -X POST -H "Content-Type: application/json" --data-binary @request.json https://vision.googleapis.com/v1/images:annotate?key=${API_KEY}
Now, the response will provide some labels based on the pretrained model, but what if we want the model to be able to detect our own labels? In that case, we can feed in some of our own training data. To do this custom training, we can use AutoML.
AutoML
Once setup, AutoML will create a new bucket with the suffix -vcm
.
- Bind a new environment variable to this bucket:
export BUCKET=<YOUR_AUTOML_BUCKET>
- Copy over the training data:
gsutil -m cp -r gs://automl-codelab-clouds/* gs://${BUCKET}
- Now we need to set up a CSV file that tells AutoML where to find each image and the labels associated with each image. We just copy it over:
gsutil -m cp -r gs://automl-codelab-clouds/* gs://${BUCKET}
- Now copy the file to your bucket:
gsutil cp ./data.csv gs://${BUCKET}
- Now, back in the AutoML Vision UI, click
New Dataset > clouds > Select a CSV file on Cloud Storage > Create Dataset
The images will be imported from the CSV. After the images have been imported, you can view them and check their labels, etc. You can also change the labels, etc.
(You want at least 100 images for training.)
- Click train to start training.
After the model has finished training, you can check the accuracy of the model:
- Precision: What proportion of +ve identifications were correct?
- Recall: What propotion of actual +ves was identified correctly? (1.0 = good score)
Generating Predictions
Now that we’ve training our model, we can use it to make some predictions on unseen data.
- Go to the
Predict
tab - Unload images to see predictions
What we did :
- Setup AutoML API
- Create storage bucket
- Open AutoML Vision, specify user and project
- Download the images of clouds into Cloud storage bucket
- Create CSV file containing all the URL to images and the label
- Upload the CSV to the storage bucket
- Add new dataset to the AutoML Vision datasets, chose the CSV file that was created before
- Train the model
- Evaluate the model
- upload new images and test the classification
Building a Custom Model
There are threes ways to build custom models in GCP:
- Using SQL models (BigQuery ML)
- AutoML
- ML Engine Notebook (Jupyter) (can write own model with Keras)
Further Courses
- From Data to Insights
- Data Engineering
- ML on GCP
- Adv ML on GCP