What is big Query ?
Second generation of big data at google.
It started with GFS and mapreduce (aka sharding of the data). Then Map operation with nodes and then reduce aggregates the data. Mapreduce is a prior step, you have to split and shard the data and store it on multiple machine. That does not scale very well as you mix storage and compute. Each time you want to compute you have to first see where the data is stored.
->Second generation serverless autoscale way to do data analysis.
The reason we use BigQuery is that it permits to query big datasets in seconds. No need to provision clusters etc.. you just need to have the data on google cloud and you’re good to go.
BigQuery bills you according to the data processed in the queries.
BigQuery benefits :
It permits :
- Near-real time analysis of massive datasets
- No-ops; Pay for use
- Durable (replicated) inexpensive storage
- Immutable audit logs
- Mashing up different datasets to derive insights
You can share your data easily in your organisation. Enhance data collaboration (data sets and analysis, you can share queries)
BigQuery in a Reference Architecture
Queries and Functions
You can also select data from multiple tables at once (in the FROM section) and you can also do JOIN on fields across tables. FROM … JOIN … on w.rainyday = f.date
Lab1 : Building an BigQuery Query
We basically just create a bunch of queries in BigQuery to check some things in a dataset
--Delayed flights aggregated by company :
SELECT
airline,
COUNT(departure_delay)
FROM
`bigquery-samples.airline_ontime_data.flights`
WHERE
departure_delay > 0 AND
departure_airport = 'LGA'
AND date = '2008-05-13'
GROUP BY
airline
ORDER BY airline
-- Get both info : Total flights and total delays
SELECT
f.airline,
COUNT(f.departure_delay) AS total_flights,
SUM(IF(f.departure_delay > 0, 1, 0)) AS num_delayed
FROM
`bigquery-samples.airline_ontime_data.flights` AS f
WHERE
f.departure_airport = 'LGA' AND f.date = '2008-05-13'
GROUP BY
f.airline
-- Now we join on date with the weather data set to check what is the number of flights delayed on rainy days.
-- (skiped a step here with a simple join on the date with the weather data set and gone directly to the subquery
-- Please note that this time we look at the arrival_delay and not departure ones
SELECT
airline,
num_delayed,
total_flights,
num_delayed / total_flights AS frac_delayed
FROM (
SELECT
f.airline AS airline,
SUM(IF(f.arrival_delay > 0, 1, 0)) AS num_delayed,
COUNT(f.arrival_delay) AS total_flights
FROM
`bigquery-samples.airline_ontime_data.flights` AS f
JOIN (
SELECT
CONCAT(CAST(year AS STRING), '-', LPAD(CAST(month AS STRING),2,'0'), '-', LPAD(CAST(day AS STRING),2,'0')) AS rainyday
FROM
`bigquery-samples.weather_geo.gsod`
WHERE
station_number = 725030
AND total_precipitation > 0) AS w
ON
w.rainyday = f.date
WHERE f.arrival_airport = 'LGA'
GROUP BY f.airline
)
ORDER BY
frac_delayed ASC
END OF LAB
Load and Export Data
Lab : Load and Export Data
Task 1. Upload the data using the web UI
Update data via the web UI : Go to BigQuery, create new data set, create new table, specify the CSV file, add manually the different columns and their type and then create the table.
Task 2. Upload the data using the CLI
--In the cloud shell : download the schema file for the table :
curl https://storage.googleapis.com/cloud-training/CPB200/BQ/lab4/schema_flight_performance.json -o schema_flight_performance.json
--Create a table in the dataset using the schema file you downloaded to Cloud Shell and data from JSON files that are in Cloud Storage.
bq load --source_format=NEWLINE_DELIMITED_JSON $DEVSHELL_PROJECT_ID:cpb101_flight_data.flights_2014 gs://cloud-training/CPB200/BQ/lab4/domestic_2014_flights_*.json ./schema_flight_performance.json
--Verify that the tables have been created :
bq ls $DEVSHELL_PROJECT_ID:cpb101_flight_data
Task 3. Export table
Create a bucket via the Gui and then in the shell :
BUCKET="<your unique bucket name (Project ID)>"
We then exported the AIRPORTS table via the gui in BigQuery and then also via the shell :
bq extract cpb101_flight_data.AIRPORTS gs://$BUCKET/bq/airports2.csv
Both files are in the bucket we created
END OF LAB
Advanced capabilities
BigQuery supports all classic SQL Data types (String, Int64, Float64, Bool, Array, Struct, Timestamp)
Slightly more advanced query :
Basically gets the names of the stations in Washington with rainy days and order them by number of rainy days.
Denormalizing brings repeated fields and takes more storage space but increases the performance.
Arrays and structs :
First subquery gives for a date all the titles and scores published in an array. Then second query un-nest the array “titles” and selects the two titles with the biggest scores for every specific date.
Gets the names and the number of their occurence from the usa names. And then gets in shakespeare the more frequent names appearing.
Some funky functions :
REGEXP_CONTAINS : does string contains 2 letters followed by an apostrophy and then two letters ? (This is obviously an example).
Define a function :
Lab 3 : Advanced SQL Queries
SELECT
author.email,
diff.new_path AS path,
DATETIME(TIMESTAMP_SECONDS(author.date.seconds)) AS date
FROM
`bigquery-public-data.github_repos.commits`,
UNNEST(difference) diff
WHERE
EXTRACT(YEAR FROM TIMESTAMP_SECONDS(author.date.seconds))=2016
LIMIT 10
Author is nested, difference is an array (hence the author.email and the unnest). We also extract only the rows for which the year of author.date.seconds is 2016.
We go the files, we got the files changed and the dates.
We want now to extract the suffix from the file name (end of the file name 🙂 )
We use then the REGEXP_EXTRACT to extract the very last dot and what follows : LOWER(REGEXP_EXTRACT(diff.new_path, r’\.([^\./\(~_ \- #]*)$’)) lang,
Next step would be to add a condition on the days as we are trying to get the favorite code used by developers on the week ends or week days.
To this end we use a marker on the week days.
Final solution :
WITH commits AS (
SELECT
author.email,
EXTRACT(DAYOFWEEK
FROM
TIMESTAMP_SECONDS(author.date.seconds)) BETWEEN 2
AND 6 is_weekday,
LOWER(REGEXP_EXTRACT(diff.new_path, r'\.([^\./\(~_ \- #]*)$')) lang,
diff.new_path AS path,
TIMESTAMP_SECONDS(author.date.seconds) AS author_timestamp
FROM
`bigquery-public-data.github_repos.commits`,
UNNEST(difference) diff
WHERE
EXTRACT(YEAR
FROM
TIMESTAMP_SECONDS(author.date.seconds))=2016)
SELECT
lang,
is_weekday,
COUNT(path) AS numcommits
FROM
commits
WHERE
lang IS NOT NULL
GROUP BY
lang,
is_weekday
HAVING
numcommits > 100
ORDER BY
numcommits DESC
We select all the data we need in the with statement and then take only what we want to aggregate and display in the second query.
Performance and pricing
Less work -> Faster Query. Work : How many bytes did you readt, how many byted did you üpass to the next stage ? (Shuffle). How many bytes did you write ?
-> Don’t project unnecessary columns, do not select *, select only the fields you actually want
-> Excess rows incur “waste” similar to excess columns.
-> Avoid self-join if you can, since it squares the number of rows processed, do big joins first and then small joins.
-> Get a count of your groups when trying to understand performance. Something about key cardinality (need to check again). How many rows the group bys are processing.
-> Built in functions are going to be the fastest. Javascrip and UDFS going to be the slowlest. check to see if there are reasonable approximate functions for your query
-> When you order, do this on filtered data
Wildcard tables and partitioning
Use wildcards to query multiple tables using concise SQL statements -> from blablabla.gsod* -> queries all the tables starting with “gsod”
You can partition your data by timestamp, similar advantages that sharding the tables. Where _PARTITIONTIME BETWEEN (”) and (”)
Understand query performance :
What does my query do ? look at input, it’s the number of rows processed, it has to be as low as possible.
Understand BigQuery Plans :
If there is data skew, significant difference between avg and max time. -> filter early to workaround, use _APPROX
You can also monitor BigQuery with Stackdriver. -> dashboard.
Pricing for BigQuery :
Use validator to have an estimation of the data which will be processed and thencopy paste it into the estimation tool of bigquery to get a quote.
What is dataflow
implementation of Apache beam
You create a pipeline and than you do a sery of applys.
When you stream you use a window (in blue above)
Data Pipelines
A pipeline is a set of steps.
A typical pipeline, read data -> does transforms -> writes out. Those are wrote usually either in Java or Python.
To ingest data into the pipeline you have to read the data from different sources : file system, google cloud storage, BigQuery, Pub/Sub.. You can then also write to the same types of recipients. You can also chose to use sharding or not with .withoutSharding() method.
executing pipeline : run the main .py file, this runs locally. You can also submit a job to Dataflow if you want to run it on the cloud in that case you have to specify a few variables (project, job_name..)
Lab : Data Pipeline
Task 1. Preparation
You can list the active account name with this command: gcloud auth list
You can list the project ID with this command: gcloud config list project
Create bucket, variable, open shell and console, clone from git a repository.
Task 2. Open Dataflow project
- turn to the browser tab containing Cloud Shell. In Cloud Shell navigate to the directory for this lab:
cd ~/training-data-analyst/courses/data_analysis/lab2/python
- Install the necessary dependencies for Python dataflow:
sudo ./install_packages.sh
- Verify that you have the right version of pip. (It should be > 8.0):
pip3 -V
Task 3. Pipeline filtering
Grep.py file :
import apache_beam as beam
import sys
def my_grep(line, term):
if line.startswith(term):
yield line
if __name__ == '__main__':
p = beam.Pipeline(argv=sys.argv)
input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
output_prefix = '/tmp/output'
searchTerm = 'import'
# find all lines that contain the searchTerm
(p
| 'GetJava' >> beam.io.ReadFromText(input)
| 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
| 'write' >> beam.io.WriteToText(output_prefix)
)
p.run().wait_until_finish()
Just analyse the file
Task 4. Execute the pipeline locally
- In the Cloud Shell command line, locally execute
grep.py
.
cd ~/training-data-analyst/courses/data_analysis/lab2/python
python3 grep.py
Task 5. Execute the pipeline on the cloud
On the cloud :
#!/usr/bin/env python
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import apache_beam as beam
def my_grep(line, term):
if line.startswith(term):
yield line
PROJECT='qwiklabs-gcp-03-fa87312aa062'
BUCKET='qwiklabs-gcp-03-fa87312aa062'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=examplejob2',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
p = beam.Pipeline(argv=argv)
input = 'gs://{0}/javahelp/*.java'.format(BUCKET)
output_prefix = 'gs://{0}/javahelp/output'.format(BUCKET)
searchTerm = 'import'
# find all lines that contain the searchTerm
(p
| 'GetJava' >> beam.io.ReadFromText(input)
| 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
| 'write' >> beam.io.WriteToText(output_prefix)
)
p.run()
if __name__ == '__main__':
run()
In dataflow on the GUI :
And the shell code to install the packages :
#!/bin/bash
apt-get install python3-pip
pip3 install apache-beam[gcp]
pip3 install oauth2client==3.0.0
pip3 install -U pip
MapReduce in Dataflow
MapReduce : If you want to process a big dataset you break it into small part that different nodes can process. Reduce nodes then process the output of the nodes and combines it into a single result.
You can do that into dataflow. Parallel processing is done via ParDo, it acts on one item at a time (like a Map in MapReduce). You can use it to filter, extracting parts of an input, calculating values from different parts of inputs..
in Python : beam.Map or beam.FlatMap
GroupBy and Combine. to get filtered information or to do sums
Lab : MapReduce in Dataflow
Open cloud shell and then :
gcloud is the command-line tool for Google Cloud Platform. It comes pre-installed on Cloud Shell and supports tab-completion.
You can list the active account name with this command:
gcloud auth list
Example output:
Credentialed accounts:
- google1623327_student@qwiklabs.net
You can list the project ID with this command:
gcloud config list project
Example output:
[core]
project = qwiklabs-gcp-44776a13dea667a6
Open then Cloud shell code editor (top right cloud shell window)
Task 1. Review Preparations
- Create Cloud Storage bucket
- Clone github repository to Cloud Shell
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
- Upgrade packages and install Apache Beam
cd training-data-analyst/courses/data_analysis/lab2/python
sudo ./install_packages.sh
Task 2. Identify Map and Reduce operations
#!/usr/bin/env python3
"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import apache_beam as beam
import argparse
def startsWith(line, term):
if line.startswith(term):
yield line
def splitPackageName(packageName):
"""e.g. given com.example.appname.library.widgetname
returns com
com.example
com.example.appname
etc.
"""
result = []
end = packageName.find('.')
while end > 0:
result.append(packageName[0:end])
end = packageName.find('.', end+1)
result.append(packageName)
return result
def getPackages(line, keyword):
start = line.find(keyword) + len(keyword)
end = line.find(';', start)
if start < end:
packageName = line[start:end].strip()
return splitPackageName(packageName)
return []
def packageUse(line, keyword):
packages = getPackages(line, keyword)
for p in packages:
yield (p, 1)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Find the most used Java packages')
parser.add_argument('--output_prefix', default='/tmp/output', help='Output prefix')
parser.add_argument('--input', default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/', help='Input directory')
options, pipeline_args = parser.parse_known_args()
p = beam.Pipeline(argv=pipeline_args)
input = '{0}*.java'.format(options.input)
output_prefix = options.output_prefix
keyword = 'import'
# find most used packages
(p
| 'GetJava' >> beam.io.ReadFromText(input)
| 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword))
| 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword))
| 'TotalUse' >> beam.CombinePerKey(sum)
| 'Top_5' >> beam.transforms.combiners.Top.Of(5, key=lambda kv: kv[1])
| 'write' >> beam.io.WriteToText(output_prefix)
)
p.run().wait_until_finish()
Explanation :
We are getting all java files and findings all the import statements in the java files (lines starts with import).
Example of line being processed :
import java.util.ArrayList;
We get those lines and then we try to figure out which package is being used/imported.
A single import statement ca import multiple packages so we need to keep this in mind. For every package we do a combine by key and then a combiners to get the top 5 and that’s what we write out.
StartsWith function -> if line starts with defined string get the line
PackageUse : we get all packages in the line between begining and “;” and then store it by splitting it with “.”
Task 3. Execute the pipeline
- Run the pipeline locally:
cd ~/training-data-analyst/courses/data_analysis/lab2/python
python3 ./is_popular.py
- entify the output file. It should be output<suffix> and could be a sharded file.
ls -al /tmp
- Examine the output file, replacing ‘-*’ with the appropriate suffix.
cat /tmp/output-*
Task 4. Use command line parameters
- Change the output prefix from the default value:
python3 ./is_popular.py --output_prefix=/tmp/myoutput
- What will be the name of the new file that is written out?
- Note that we now have a new file in the /tmp directory:
ls -lrt /tmp/myoutput*
Side Inputs
In reality you may need to read data into multiple files, for instance if the information for transform is not embeded in the file you are using.
Lab : Side Inputs
Task 1. Preparation
Clone the data from git like in previous lab, apache beam install etc.
Task 2. Try using BigQuery query
Go to big query and execute a bunch of queries to get the content (and some metadata) of all the Java files present in GitHub in 2016.
Doing a count(*) does not process any data as BigQuery stores common metadata about the table (like row count). Querying metadata processes 0 bytes.
Task 3. Explore the pipeline code
This is a dataflow pipeline that demonstrates Python use of side inputs. The pipeline finds Java packageson Github that are (a) popular and (b) need help. Popularity is use of the package in a lot of otherprojects, and is determined by counting the number of times the package appears in import statements.Needing help is determined by counting the number of times the package contains the words FIXME or TODOin its source.
Task 4. Execute the pipeline
python3 JavaProjectsThatNeedHelp.py --bucket $BUCKET --project $DEVSHELL_PROJECT_ID --DirectRunner
This executes locally.
Could not complete the lab as the code for running on the cloud does not work.
Dataflow Templates and dataprep
In the Gui you start a cloud dataflow job and you specify :
- The location of the template in cloud storage
- An output location in cloud storage
- Name : value parameters (that map to the valueprovider interface)
There are basic templates provided for wordcount etc.
Dataprep is a graphical user interface to create a pipleline.