What is big Query ?

Second generation of big data at google.

It started with GFS and mapreduce (aka sharding of the data). Then Map operation with nodes and then reduce aggregates the data. Mapreduce is a prior step, you have to split and shard the data and store it on multiple machine. That does not scale very well as you mix storage and compute. Each time you want to compute you have to first see where the data is stored.

->Second generation serverless autoscale way to do data analysis.

The reason we use BigQuery is that it permits to query big datasets in seconds. No need to provision clusters etc.. you just need to have the data on google cloud and you’re good to go.

BigQuery bills you according to the data processed in the queries.

BigQuery benefits :

It permits :

  • Near-real time analysis of massive datasets
  • No-ops; Pay for use
  • Durable (replicated) inexpensive storage
  • Immutable audit logs
  • Mashing up different datasets to derive insights

You can share your data easily in your organisation. Enhance data collaboration (data sets and analysis, you can share queries)

BigQuery in a Reference Architecture

Queries and Functions

You can also select data from multiple tables at once (in the FROM section) and you can also do JOIN on fields across tables. FROM … JOIN … on w.rainyday = f.date

Lab1 : Building an BigQuery Query

We basically just create a bunch of queries in BigQuery to check some things in a dataset

--Delayed flights aggregated by company : 

SELECT
  airline,
  COUNT(departure_delay)
FROM
   `bigquery-samples.airline_ontime_data.flights`
WHERE
  departure_delay > 0 AND
  departure_airport = 'LGA'
  AND date = '2008-05-13'
GROUP BY
  airline
ORDER BY airline

-- Get both info : Total flights and total delays

SELECT
  f.airline,
  COUNT(f.departure_delay) AS total_flights,
  SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed
FROM
   `bigquery-samples.airline_ontime_data.flights` AS f
WHERE
  f.departure_airport = 'LGA' AND f.date = '2008-05-13'
GROUP BY
  f.airline

-- Now we join on date with the weather data set to check what is the number of flights delayed on rainy days.

-- (skiped a step here with a simple join on the date with the weather data set and gone directly to the subquery

-- Please note that this time we look at the arrival_delay and not departure ones

SELECT
  airline,
  num_delayed,
  total_flights,
  num_delayed / total_flights AS frac_delayed
FROM (
SELECT
  f.airline AS airline,
  SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
  COUNT(f.arrival_delay) AS total_flights
FROM
  `bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
  SELECT
    CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
  FROM
    `bigquery-samples.weather_geo.gsod`
  WHERE
    station_number = 725030
    AND total_precipitation > 0) AS w
ON
  w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
  )
ORDER BY
  frac_delayed ASC

END OF LAB

Load and Export Data

Lab : Load and Export Data

Task 1. Upload the data using the web UI

Update data via the web UI : Go to BigQuery, create new data set, create new table, specify the CSV file, add manually the different columns and their type and then create the table.

Task 2. Upload the data using the CLI

--In the cloud shell : download the schema file for the table :

curl https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/schema_flight_performance.json -o schema_flight_performance.json

--Create a table in the dataset using the schema file you downloaded to Cloud Shell and data from JSON files that are in Cloud Storage.

bq load --source_format=NEWLINE_DELIMITED_JSON $DEVSHELL_PROJECT_ID:cpb101_flight_data.flights_2014 gs://cloud-training/CPB200/BQ/lab4/domestic_2014_flights_*.json ./schema_flight_performance.json

--Verify that the tables have been created : 

bq ls $DEVSHELL_PROJECT_ID:cpb101_flight_data

Task 3. Export table

Create a bucket via the Gui and then in the shell :

BUCKET="<your unique bucket name (Project ID)>"

We then exported the AIRPORTS table via the gui in BigQuery and then also via the shell :

bq extract cpb101_flight_data.AIRPORTS gs://$BUCKET/bq/airports2.csv

Both files are in the bucket we created

END OF LAB

Advanced capabilities

BigQuery supports all classic SQL Data types (String, Int64, Float64, Bool, Array, Struct, Timestamp)

Slightly more advanced query :

Basically gets the names of the stations in Washington with rainy days and order them by number of rainy days.

Denormalizing brings repeated fields and takes more storage space but increases the performance.

Arrays and structs :

First subquery gives for a date all the titles and scores published in an array. Then second query un-nest the array “titles” and selects the two titles with the biggest scores for every specific date.

Gets the names and the number of their occurence from the usa names. And then gets in shakespeare the more frequent names appearing.

Some funky functions :

REGEXP_CONTAINS : does string contains 2 letters followed by an apostrophy and then two letters ? (This is obviously an example).

Define a function :

Lab 3 : Advanced SQL Queries

SELECT
  author.email,
  diff.new_path AS path,
  DATETIME(TIMESTAMP_SECONDS(author.date.seconds)) AS date
FROM
  `bigquery-public-data.github_repos.commits`,
  UNNEST(difference) diff
WHERE
  EXTRACT(YEAR FROM TIMESTAMP_SECONDS(author.date.seconds))=2016
LIMIT 10

Author is nested, difference is an array (hence the author.email and the unnest). We also extract only the rows for which the year of author.date.seconds is 2016.

We go the files, we got the files changed and the dates.

We want now to extract the suffix from the file name (end of the file name 🙂 )

We use then the REGEXP_EXTRACT to extract the very last dot and what follows : LOWER(REGEXP_EXTRACT(diff.new_path, r’\.([^\./\(~_ \- #]*)$’)) lang,

Next step would be to add a condition on the days as we are trying to get the favorite code used by developers on the week ends or week days.

To this end we use a marker on the week days.

Final solution :

WITH commits AS (
  SELECT
    author.email,
    EXTRACT(DAYOFWEEK
    FROM
      TIMESTAMP_SECONDS(author.date.seconds)) BETWEEN 2
    AND 6 is_weekday,
    LOWER(REGEXP_EXTRACT(diff.new_path, r'\.([^\./\(~_ \- #]*)$')) lang,
    diff.new_path AS path,
    TIMESTAMP_SECONDS(author.date.seconds) AS author_timestamp
  FROM
    `bigquery-public-data.github_repos.commits`,
    UNNEST(difference) diff
  WHERE
    EXTRACT(YEAR
    FROM
      TIMESTAMP_SECONDS(author.date.seconds))=2016)
SELECT
  lang,
  is_weekday,
  COUNT(path) AS numcommits
FROM
  commits
WHERE
  lang IS NOT NULL
GROUP BY
  lang,
  is_weekday
HAVING
  numcommits > 100
ORDER BY
  numcommits DESC

We select all the data we need in the with statement and then take only what we want to aggregate and display in the second query.

Performance and pricing

Less work -> Faster Query. Work : How many bytes did you readt, how many byted did you üpass to the next stage ? (Shuffle). How many bytes did you write ?

-> Don’t project unnecessary columns, do not select *, select only the fields you actually want

-> Excess rows incur “waste” similar to excess columns.

-> Avoid self-join if you can, since it squares the number of rows processed, do big joins first and then small joins.

-> Get a count of your groups when trying to understand performance. Something about key cardinality (need to check again). How many rows the group bys are processing.

-> Built in functions are going to be the fastest. Javascrip and UDFS going to be the slowlest. check to see if there are reasonable approximate functions for your query

-> When you order, do this on filtered data

Wildcard tables and partitioning

Use wildcards to query multiple tables using concise SQL statements -> from blablabla.gsod* -> queries all the tables starting with “gsod”

You can partition your data by timestamp, similar advantages that sharding the tables. Where _PARTITIONTIME BETWEEN (”) and (”)

Understand query performance :

What does my query do ? look at input, it’s the number of rows processed, it has to be as low as possible.

Understand BigQuery Plans :

If there is data skew, significant difference between avg and max time. -> filter early to workaround, use _APPROX

You can also monitor BigQuery with Stackdriver. -> dashboard.

Pricing for BigQuery :

Use validator to have an estimation of the data which will be processed and thencopy paste it into the estimation tool of bigquery to get a quote.

What is dataflow

implementation of Apache beam

You create a pipeline and than you do a sery of applys.

When you stream you use a window (in blue above)

Data Pipelines

A pipeline is a set of steps.

A typical pipeline, read data -> does transforms -> writes out. Those are wrote usually either in Java or Python.

To ingest data into the pipeline you have to read the data from different sources : file system, google cloud storage, BigQuery, Pub/Sub.. You can then also write to the same types of recipients. You can also chose to use sharding or not with .withoutSharding() method.

executing pipeline : run the main .py file, this runs locally. You can also submit a job to Dataflow if you want to run it on the cloud in that case you have to specify a few variables (project, job_name..)

Lab : Data Pipeline

Task 1. Preparation

You can list the active account name with this command: gcloud auth list

You can list the project ID with this command: gcloud config list project

Create bucket, variable, open shell and console, clone from git a repository.

Task 2. Open Dataflow project

  1. turn to the browser tab containing Cloud Shell. In Cloud Shell navigate to the directory for this lab:
cd ~/training-data-analyst/courses/data_analysis/lab2/python
  1. Install the necessary dependencies for Python dataflow:
sudo ./install_packages.sh
  1. Verify that you have the right version of pip. (It should be > 8.0):
pip3 -V

Task 3. Pipeline filtering

Grep.py file :


import apache_beam as beam
import sys

def my_grep(line, term):
   if line.startswith(term):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

Just analyse the file

Task 4. Execute the pipeline locally

  1. In the Cloud Shell command line, locally execute grep.py.
cd ~/training-data-analyst/courses/data_analysis/lab2/python
python3 grep.py

Task 5. Execute the pipeline on the cloud

On the cloud :

#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam

def my_grep(line, term):
   if line.startswith(term):
      yield line

PROJECT='qwiklabs-gcp-03-fa87312aa062'
BUCKET='qwiklabs-gcp-03-fa87312aa062'

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--job_name=examplejob2',
      '--save_main_session',
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)
   input = 'gs://{0}/javahelp/*.java'.format(BUCKET)
   output_prefix = 'gs://{0}/javahelp/output'.format(BUCKET)
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run()

if __name__ == '__main__':
   run()

In dataflow on the GUI :

And the shell code to install the packages :

#!/bin/bash

apt-get install python3-pip
pip3 install apache-beam[gcp]
pip3 install oauth2client==3.0.0
pip3 install -U pip

MapReduce in Dataflow

MapReduce : If you want to process a big dataset you break it into small part that different nodes can process. Reduce nodes then process the output of the nodes and combines it into a single result.

You can do that into dataflow. Parallel processing is done via ParDo, it acts on one item at a time (like a Map in MapReduce). You can use it to filter, extracting parts of an input, calculating values from different parts of inputs..

in Python : beam.Map or beam.FlatMap

GroupBy and Combine. to get filtered information or to do sums

Lab : MapReduce in Dataflow

Open cloud shell and then :

gcloud is the command-line tool for Google Cloud Platform. It comes pre-installed on Cloud Shell and supports tab-completion.

You can list the active account name with this command:

gcloud auth list

Example output:

Credentialed accounts:
 - google1623327_student@qwiklabs.net

You can list the project ID with this command:

gcloud config list project

Example output:

[core]
project = qwiklabs-gcp-44776a13dea667a6

Open then Cloud shell code editor (top right cloud shell window)

Task 1. Review Preparations

  • Create Cloud Storage bucket
  • Clone github repository to Cloud Shell
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
  • Upgrade packages and install Apache Beam
cd training-data-analyst/courses/data_analysis/lab2/python
sudo ./install_packages.sh

Task 2. Identify Map and Reduce operations

#!/usr/bin/env python3

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import argparse

def startsWith(line, term):
   if line.startswith(term):
      yield line

def splitPackageName(packageName):
   """e.g. given com.example.appname.library.widgetname
           returns com
	           com.example
                   com.example.appname
      etc.
   """
   result = []
   end = packageName.find('.')
   while end > 0:
      result.append(packageName[0:end])
      end = packageName.find('.', end+1)
   result.append(packageName)
   return result

def getPackages(line, keyword):
   start = line.find(keyword) + len(keyword)
   end = line.find(';', start)
   if start < end:
      packageName = line[start:end].strip()
      return splitPackageName(packageName)
   return []

def packageUse(line, keyword):
   packages = getPackages(line, keyword)
   for p in packages:
      yield (p, 1)

if __name__ == '__main__':
   parser = argparse.ArgumentParser(description='Find the most used Java packages')
   parser.add_argument('--output_prefix', default='/tmp/output', help='Output prefix')
   parser.add_argument('--input', default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/', help='Input directory')

   options, pipeline_args = parser.parse_known_args()
   p = beam.Pipeline(argv=pipeline_args)

   input = '{0}*.java'.format(options.input)
   output_prefix = options.output_prefix
   keyword = 'import'

   # find most used packages
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword))
      | 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword))
      | 'TotalUse' >> beam.CombinePerKey(sum)
      | 'Top_5' >> beam.transforms.combiners.Top.Of(5, key=lambda kv: kv[1])
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

Explanation :

We are getting all java files and findings all the import statements in the java files (lines starts with import).

Example of line being processed :

import java.util.ArrayList;

We get those lines and then we try to figure out which package is being used/imported.

A single import statement ca import multiple packages so we need to keep this in mind. For every package we do a combine by key and then a combiners to get the top 5 and that’s what we write out.

StartsWith function -> if line starts with defined string get the line

PackageUse : we get all packages in the line between begining and “;” and then store it by splitting it with “.”

Task 3. Execute the pipeline

  1. Run the pipeline locally:
cd ~/training-data-analyst/courses/data_analysis/lab2/python
python3 ./is_popular.py
  1. entify the output file. It should be output<suffix> and could be a sharded file.
ls -al /tmp
  1. Examine the output file, replacing ‘-*’ with the appropriate suffix.
cat /tmp/output-*

Task 4. Use command line parameters

  1. Change the output prefix from the default value:
python3 ./is_popular.py --output_prefix=/tmp/myoutput
  1. What will be the name of the new file that is written out?
  2. Note that we now have a new file in the /tmp directory:
ls -lrt /tmp/myoutput*

Side Inputs

In reality you may need to read data into multiple files, for instance if the information for transform is not embeded in the file you are using.

Lab : Side Inputs

Task 1. Preparation

Clone the data from git like in previous lab, apache beam install etc.

Task 2. Try using BigQuery query

Go to big query and execute a bunch of queries to get the content (and some metadata) of all the Java files present in GitHub in 2016.

Doing a count(*) does not process any data as BigQuery stores common metadata about the table (like row count). Querying metadata processes 0 bytes.

Task 3. Explore the pipeline code

This is a dataflow pipeline that demonstrates Python use of side inputs. The pipeline finds Java packageson Github that are (a) popular and (b) need help. Popularity is use of the package in a lot of otherprojects, and is determined by counting the number of times the package appears in import statements.Needing help is determined by counting the number of times the package contains the words FIXME or TODOin its source.

Task 4. Execute the pipeline

python3 JavaProjectsThatNeedHelp.py --bucket $BUCKET --project $DEVSHELL_PROJECT_ID --DirectRunner

This executes locally.

Could not complete the lab as the code for running on the cloud does not work.

Dataflow Templates and dataprep

In the Gui you start a cloud dataflow job and you specify :

  • The location of the template in cloud storage
  • An output location in cloud storage
  • Name : value parameters (that map to the valueprovider interface)

There are basic templates provided for wordcount etc.

Dataprep is a graphical user interface to create a pipleline.