Stream analytics has many applications

Data Integration

  • Access data warehousesin real-time
  • Take the load off scource databases with change data capture (CDC)
  • Utilize microservices more effectively

Online decisions

  • Real-time recommandations
  • Fraud detection
  • Gaming events
  • Finance back office apps

Volume (Terabites of data) + Velocity (Real time) + Variety (Unstructured data)

What we will see in this training :

  • Cloud PUB/SUB : Deal with variable volumes of data
  • Cloud DataFlow : Process data without undue delays
  • BigQuery : Need ad-hoc analysis and immediate insights

Cloud Pub/Sub

Data ingestion and distribution. Asynchronous messaging bus.

Connect applications in premise, on the cloud etc to create hybrid setups.

Receiving and holding data. Processing baclog of work or scaling on demand.

Topic + Subscription

Published will publish messages with a topic. to receive information you have to do subscription to the topic. One subscription belongs to a single topic. A single topic can have multiple subscriptions

Cloud Pub/Sub provides both pull and Push delivery. So either the message is automatically PUSHED to the subscriptions or the Subscription sends a pull request and then pub/sub sends the information :

You can have also multiple subscribers for one subscription and the messages can be directed to some or all subscribers.

There can be some problems with out of order messages and duplication of messages, so it is good to use pub sub with dataflow so that :

  • Cloud Pub/Sub delivers at least once
  • Cloud Dataflow : Deduplicate, order and window
  • Separation of concerns -> scale

Lab Intro : Publish streaming data into Pub/Sub

In this lab, you will use simulate your traffic sensor data into a Pub/Sub topic for later to be processed by Dataflow pipeline before finally ending up in a BigQuery table for further analysis.

Connect on VM through SSH.

cd ~/training-data-analyst/courses/streaming/publish

#Create your topic and publish a simple message
gcloud pubsub topics create sandiego

#Create a subscription for the topic
gcloud pubsub subscriptions create --topic sandiego mySub1

#Publish a simple message.
gcloud pubsub topics publish sandiego --message "hello again"

#ll the first message that was published to your topic.
gcloud pubsub subscriptions pull --auto-ack mySub1

Output :

#Cancel a subscription
gcloud pubsub subscriptions delete mySub1

Simulate traffic sensor data into Pub/Sub

./download_data.sh
do apt-get install -y python-pip
sudo pip install -U google-cloud-pubsub
./send_sensor_data.py --speedFactor=60 --project $DEVSHELL_PROJECT_ID

We execute a python code that will simulates sensor data by sending recorded sensor data via Pub/Sub messages. The script extracts the original time of the sensor data and pauses between sending each message to simulate realistic timing of the sensor data. The value speedFactor changes the time between messages proportionally. So a speedFactor of 60 means “60 times faster” than the recorded timing. It will send about an hour of data every 60 seconds.

Once the script is running it will send messages.

We open a secon SSH access to the VM and then subscribe to the topic and then send a pull request, and we get a message, it works !

We close everything and ciao

Cloud Dataflow

Three kinds of windows fit most circumstances :

Fixed : windowing divides data into time-based finite chunks (monthly.. daily..)

Sliding : Often required when doing aggregations over unbounded data. Give me 5 minutes of data every 30 minutes.

Sessions : Web sessions for instance, user comes, checks a few pages on your website and then leaves, this is a session.

You define this in your code when you created your window.

window.FixedWindows(60) or SlidingWindows(30,5), or window.Sessions..

problem is that in real life there is latency, Data 1 can come when window 1 is already closed :

This can be solved by watermarks. It provides flexibility for a little lag time.

You have some tools to deal with late data and computing accumulations. You can use custom triggers.

Lab : Streaming Data Pipelines

  • Launch Dataflow and run a Dataflow job
  • Understand how data elements flow through the transformations of a Dataflow pipeline
  • Connect Dataflow to Pub/Sub and BigQuery
  • Observe and understand how Dataflow autoscaling adjusts compute resources to process input data optimally
  • Learn where to find logging information created by Dataflow
  • Explore metrics and create alerts and dashboards with Stackdriver Monitoring

Lab was super confusing

Streaming into BigQuery and Visualizing Results

Interactive SQL Queries over large datasets in seconds. Near real time insights.

100K rows per second per table is the rate for inserts. But unlike load jobs, there is a cost for streaming inserts.

Loading batch data is not charged, use batch loading rather than streaming unless there is a real need for it.

You use Python file to stream the data into big query.

Build, collaborate, and share your dashboards with data studio.

Lab Streaming Data Processing : Streaming Analytics and Dashboards

Task 1 : Connect in ssh on the VM,  Copy the repository to your home directory and set environment variables.

cp -r /training/training-data-analyst/ .
source /training/project_env.sh

Task 2 : Creating a data source

We go to datastudio.google.com and connect on a new blank template

We added a new data source from big query :

Task 3: Creating a bar chart using a calculated field

Create a new bar chart, select the data source, add a new metric, change the type of the metric from date to numeric, show the labels.

Task 4: Creating a chart using a custom query

We play a little bit with the metrics of this new graph :

Those are the speeds of the vehicles captured by the sensor.

Task 5: Viewing your query history

in bigquery :

Streaming into Cloud BigTable

If you want information to be available in miliseconds instead of seconds / minutes you can’t use BigQuery. You would need to use BigTable

BigTable is for high performance application. How to chose ?

Consider using cloud Bigtable :

  • Hugh throughput
  • Asynchronouse batch
  • Rapidly Changing
  • Real time processing
  • Time series data
  • Natural semantic order
  • >1TB

BigTable is fault tolerant, in case of node failure only pointers will have to be restarted. Colossus maintains 3 replicas to ensure durability.

Cloud BigTable has only one index : Row Key

Speed through simplification. It is a no SQL database.

You scan one time the row key from top to bottom to collect the result set.

To get the best performance you need to get your data in order first. and use the rowkey smartly to get a maximum number of information in it to simplify your queries. Ask yourself, what is the best rowkey ?

How does bigtable removes data ?

The row is marked for deletion and is not considered and skipped. Periodically bigtable removes the rows and reorganize the data.

A few tips :

  • Distributed data evenly for more efficient writes.
  • Group related data for more efficient reads,
  • Place identical values in the same row or adjoining rows for more efficient compression

Optimizing BigTable performance

You can use replications to improve availability. To isolate serving applications from batch reads, to improve availability, to provide near real time backup, to ensure your data has a global presence.

the more your row is small, the faster the query will get. Getting more nodes will also improve the performance.

Key visualizer exposes read/write access patterns over time and key space

Lab Streaming Data Pipelines into Bigtable

Task 1 : Preparation

Prepare HBase quickstart files :

cd ~/training-data-analyst/courses/streaming/process/sandiego
./install_quickstart.sh

Task 2: Simulate traffic sensor data into Pub/Sub

initiate the sensor simulator. the script reads sample data from a csv file and publishes it to Pub/Sub

/training/sensor_magic.sh

Open a new SSH and set up the environment variables.

Task 3: Launch Dataflow Pipeline

cd ~/training-data-analyst/courses/streaming/process/sandiego

nano run_oncloud.sh

#Create bigtable instance
cd ~/training-data-analyst/courses/streaming/process/sandiego

./create_cbt.sh

#Run the Dataflow pipeline to read from PubSub and write into Cloud Bigtable

cd ~/training-data-analyst/courses/streaming/process/sandiego

./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET CurrentConditions --bigtable

Task 4: Explore the pipeline

Task 5: Query Bigtable data

cd ~/training-data-analyst/courses/streaming/process/sandiego/quickstart
#Launch the HHBase shell
./quickstart.sh

# query to retrieve 2 rows from your Bigtable table that was populated by the pipeline
scan 'current_conditions', {'LIMIT' => 2}

# look only at the lane: speed column, limit to 10 rows, and specify 
#rowid patterns for start and end rows to scan over.
scan 'current_conditions', {'LIMIT' => 10, STARTROW => '15#S#1', ENDROW => '15#S#999', COLUMN => 'lane:speed'}

Clean up and end lab

BigQuery advanced functionalities

Case study : london bike share dataset

Our three analysis goals:

  • Finding the fastest bike share commuters in London (avg kpg between stations)
  • Find the stations with the fqastest and slowest bike turnover times
  • Rank which bikeds need maintenance the most based on usage metrics

We can find the average speed with the distance and the time of each ride. We can use the latitude and longitude with ST_distance() to get the straight line distance between two geographic points.

use STRUCT() (pre join container a bit like a join) to organize columns from different tables

We use ST_GEOGPOINT() to convert the lat and long (floats) into real latitude and longitutde and then pass it into the ST_DISTANCE().

You can use ST_MAKELINE() to draw a line between those two points on a GIS visualization tool

With clauses vs Permanent tables

PT : Much faster to query later as preprocessing and joins already done, easy to share, but not easy to adapt it afterwards to select sub sections

with WITH statements it preprocess the condition before executing the joins and conditions.

Would be good to use with and as it is refined put the result into a permanent table.

Analytical window functions

Lab Optimizing your BigQuery Queries for Performance

Minimize I/O

Because BigQuery uses columnar file formats, the fewer the columns that are read in a SELECT, the less the amount of data that needs to be read. 

Do not use select * but rather select the columns you need to reduce data processed

We can reduce the I/O overhead of the query if we do the filtering and grouping using the station name rather than the station id since we will need to read fewer columns.

SELECT
  start_station_name,
  end_station_name,
  APPROX_QUANTILES(duration, 10)[OFFSET(5)] AS typical_duration,
  COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
WHERE
  start_station_name != end_station_name
GROUP BY
  start_station_name,
  end_station_name
ORDER BY
  num_trips DESC
LIMIT
  10

Reduce number of expensive computations

Computing the distance is a pretty expensive operation and we can avoid joining the cycle_stations table against the cycle_hire table if we precompute the distances between all pairs of stations:

WITH
  stations AS (
SELECT
  s.id AS start_id,
  e.id AS end_id,
  ST_Distance(ST_GeogPoint(s.longitude,
      s.latitude),
    ST_GeogPoint(e.longitude,
      e.latitude)) AS distance
FROM
  `bigquery-public-data`.london_bicycles.cycle_stations s,
  `bigquery-public-data`.london_bicycles.cycle_stations e ),
trip_distance AS (
SELECT
  bike_id,
  distance
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire,
  stations
WHERE
  start_station_id = start_id
  AND end_station_id = end_id )
SELECT
  bike_id,
  SUM(distance)/1000 AS total_distance
FROM
  trip_distance
GROUP BY
  bike_id
ORDER BY
  total_distance DESC
LIMIT
  5

The BigQuery service automatically caches query results in a temporary table. If the identical query is submitted within approximately 24 hours, the results are served from this temporary table without any recomputation. Cached results are extremely fast and do not incur charges.

A good idea would be to create a permanent table :

CREATE OR REPLACE TABLE
  mydataset.typical_trip AS
SELECT
  start_station_name,
  end_station_name,
  APPROX_QUANTILES(duration, 10)[OFFSET (5)] AS typical_duration,
  COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY
  start_station_name,
  end_station_name

Then use the WITH clause to find days when bicycle trips are much longer than usual:

WITH
typical_trip AS (
SELECT
  start_station_name,
  end_station_name,
  APPROX_QUANTILES(duration, 10)[OFFSET (5)] AS typical_duration,
  COUNT(duration) AS num_trips
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire
GROUP BY
  start_station_name,
  end_station_name )
SELECT
  EXTRACT (DATE
  FROM
    start_date) AS trip_date,
  APPROX_QUANTILES(duration / typical_duration, 10)[
OFFSET
  (5)] AS ratio,
  COUNT(*) AS num_trips_on_day
FROM
  `bigquery-public-data`.london_bicycles.cycle_hire AS hire
JOIN
  typical_trip AS trip
ON
  hire.start_station_name = trip.start_station_name
  AND hire.end_station_name = trip.end_station_name
  AND num_trips > 10
GROUP BY
  trip_date
HAVING
  num_trips_on_day > 10
ORDER BY
  ratio DESC
LIMIT
10

Gain of about 50% in speed. The permanent table is not updated tho. One way to solve this problem of stale data is to use a materialized view or to schedule queries to update the table periodically. You should measure the cost of such updates to see whether the improvement in query performance makes up for the extra cost of maintaining the table or materialized view up-to-date.

You can use BI Engine as well.

One way to improve the read performance and avoid joins is to give up on storing data efficiently, and instead add redundant copies of data. This is called denormalization. Create a new permanent table witch contains everything you need in it.

Avoid self-joins of large tables. In many cases, you can avoid the self-join by taking advantage of SQL features such as aggregation and window functions.

A faster, more elegant (and correct!) solution is to recast the query to read the input only once and avoid the self-join completely.

WITH
all_babies AS (
SELECT
  name,
  SUM(
  IF
    (gender = 'M',
      number,
      0)) AS male_babies,
  SUM(
  IF
    (gender = 'F',
      number,
      0)) AS female_babies
FROM
  `bigquery-public-data.usa_names.usa_1910_current`
GROUP BY
  name ),
both_genders AS (
SELECT
  name,
  (male_babies + female_babies) AS num_babies,
  SAFE_DIVIDE(male_babies,
    male_babies + female_babies) AS frac_male
FROM
  all_babies
WHERE
  male_babies > 0
  AND female_babies > 0 )
SELECT
  *
FROM
  both_genders
WHERE
  frac_male BETWEEN 0.3
  AND 0.7
ORDER BY
  num_babies DESC
LIMIT
  5

Reduce data being joined

Use a window function instead of a self-join

Join with precomputed values

Sometimes, it can be helpful to precompute functions on smaller tables, and then join with the precomputed values rather than repeat an expensive calculation each time.

Avoid overwhelming a worker

Data skew

Approximate aggregation functions

End lab

Optimize BigQuery queries :

Lab : Creating Date – Partitioned Tables in BigQuery

A partitioned table is a table that is divided into segments, called partitions, that make it easier to manage and query your data. By dividing a large table into smaller partitions, you can improve query performance, and control costs by reducing the number of bytes read by a query.

Before the query runs, the query engine does not know whether 2018 data exists to satisfy the WHERE clause condition and it needs to scan through all records in a non-partitioned table.

Instead of scanning the entire dataset and filtering on a date field like we did in the earlier queries, we will now setup a date-partitioned table. This will allow us to completely ignore scanning records in certain partitions if they are irrelevant to our query.

#standardSQL
 CREATE OR REPLACE TABLE ecommerce.partition_by_day
 PARTITION BY date_formatted
 OPTIONS(
   description="a table partitioned by date"
 ) AS
 SELECT DISTINCT
 PARSE_DATE("%Y%m%d", date) AS date_formatted,
 fullvisitorId
 FROM `data-to-insights.ecommerce.all_sessions_raw`

#standardSQL
SELECT *
FROM `data-to-insights.ecommerce.partition_by_day`
WHERE date_formatted = '2016-08-01'

Consumes way less data

Auto-expiring partitioned tables are used to comply with data privacy statutes, and can be used to avoid unnecessary storage (which you’ll be charged for in a production environment). If you want to create a rolling window of data, add an expiration date so the partition disappears after you’re finished using it.