Introduction to Data Engineering
Role of a data engineer
->building data pipelines
- Get the data to where it can be useful
- Get the data into a usable condition
- Add new value to the data
- Manage the data
- Productionize data processes
A data lake brings together data from across the enterprise into a single location.
Raw data -> Data lake
- Key considerations when building a data lake :
- Can your data lake handle all the types of data you have ?
- Can it scale to meet the demand ?
- Does it support high-throughput ingestion?
- Is there fine-grained access control to objects ?
- Can other tools connect easily ?
Cloud bucket = data lake
BigQuery = Data warehouse
What if your data is not usable in its original form ? -> use ETL (Extract, Transform, Load) -> Data processing : Cloud Dataproc and Cloud Dataflow
Common challenges encountered by data engineers :
- Access to data
- Data accuracy and quality
- Availability of computational resources
- Query performance
Cleaning, formatting, and getting the data ready for useful business insights in a data warehouse.
Data engineers need to manage server and cluster capacity if using on prem.
Dealing with versioning, query engine, patching..
BigQuery :
BigQuery allocates storage and computation power dynamically. Allocates when you consume, drops when not used. Each Query is billed based on the computation.
Considerations when choosing a data warehouse include :
- Can it serve as a sink for both batch and streaming data pipelines?
- Can the data warehouse scale to meet my needs ?
- How is the data organized, cataloged, and access controlled ?
- Is the warehouse designed for performance?
- What level of maintenance is required by our engineering team ?
BigQuery > Traditional DW
You can simplify Data Warehouse ETL pipelines with external connections to Cloud Strorage and Cloud SQL :
CloudSQL is fully managed SQL Server, Postgres, or MySQL for your relational database (transactional RDBMS).
CloudSQL optimized for writes, BigQuery optimized for reads.
Datalake : High durability and availability
Feature pipeline : are all of these features available at production time ? Can you help us get more features (columns) ?
In bigQuery you can create ML model using SQL.
Add value with BigQuery BI Engine :
- No need to manage OLAP cubes or separate BI servers for dashboard performance.
- Natively integrates with BigQuery streaming for real-time data refresh.
- Column oriented in-memory BI execution engine.
Cloud Data Catalog is a manged data discovery + Data loss prevention API for guarding PII
Cloud composer (managed Apache Airflow) is used to orchestrate production workflows.
Lab Using BigQuery to do Analysis
We go to bigquery and check the public datasets for bikes in New York and Precipitation for a weather sensor in new york.
SELECT MIN(start_station_name) AS start_station_name, MIN(end_station_name) AS end_station_name, APPROX_QUANTILES(tripduration, 10)[OFFSET (5)] AS typical_duration, COUNT(tripduration) AS num_trips FROM `bigquery-public-data.new_york_citibike.citibike_trips` WHERE start_station_id != end_station_id GROUP BY start_station_id, end_station_id ORDER BY num_trips DESC LIMIT 10
typical duration for the 10 most common one-way rentals
SELECT wx.date, wx.value/10.0 AS prcp FROM `bigquery-public-data.ghcn_d.ghcnd_2015` AS wx WHERE id = 'USW00094728' AND qflag IS NULL AND element = 'PRCP' ORDER BY wx.date
This query will return rainfall (in mm) for all days in 2015 from a weather station in New York whose id is provided in the query (the station corresponds to NEW YORK CNTRL PK TWR )
WITH bicycle_rentals AS ( SELECT COUNT(starttime) as num_trips, EXTRACT(DATE from starttime) as trip_date FROM `bigquery-public-data.new_york_citibike.citibike_trips` GROUP BY trip_date ), rainy_days AS ( SELECT date, (MAX(prcp) > 5) AS rainy FROM ( SELECT wx.date AS date, IF (wx.element = 'PRCP', wx.value/10, NULL) AS prcp FROM `bigquery-public-data.ghcn_d.ghcnd_2015` AS wx WHERE wx.id = 'USW00094728' ) GROUP BY date ) SELECT ROUND(AVG(bk.num_trips)) AS num_trips, wx.rainy FROM bicycle_rentals AS bk JOIN rainy_days AS wx ON wx.date = bk.trip_date GROUP BY wx.rainy
Running the query yields that, yes, New Yorkers ride the bicycle 47% fewer times when it rains.
Building a data lake
What is a data lake ?
A scalable and secure data platforn that allows enterprises to ingest, store, process and analyze any type or volume or information.
Data sinks : Central Data Lake repository, Data warehouse
Let’s focus on data lakes
Cloud storage is one of the options for a good data lake. But it is not the only option.- BigQuery can also be considered as a data lake.
Data lake versus data warehouse
A data lake is a capture of every aspect of your business operation. The data is stored in its natural/raw format, usually as object blobs or files.
- Retain all data in its native format
- Support all data types and all users
- Adapt to changes easily
- Tends to be application-specific
In contrast, a data warehouse typically has the following characteristics :
- Loaded only when its use is defined
- Processed/organized/transformed
- Provide faster insights
- Current/historical data for reporting
- Tends to have consistent schema shared across applications
The path your data takes to get to the cloud depends on :
- Where is your data now
- How big your data is
- where it has to go
- how much transformation is needed
The method you use to load data depends on how much transformation is needed
- EL : extract load, no transformation
- ELT : extract, load, transform : usually when there is not much transformation needed
- ETL : Extract transform load : apply a bunch of processing and then load
Cloud storage gives the advange of persistence, durability, strong consistency, availability, high throughput
Once created you cannot change the location of the bucket, so chose wisely ! For disaster recovery, it can be benefitial to chose multiple regions, to make sure you have multiple physical places storing the data.
Cloud storage has many object management features.
Optimizing cost with google cloud storage
Your buckets can move to nearline when they are not accessed for some time, and then to coldline reducing the overall costs.
Securing Cloud Storage
Cloud IAM (identification and access management) + ACL (access control lists).
IAM set at the bucket level.
ACL can be implemented at bucket level or specific objects (more granularity)
Data encryption options for many requirements :
Storing all sorts of data types :
Cloud SQL as a relational data lake
CloudSQL is a fully managed database service that makes it easy to set up and aminister your relation MySQL and postgreSQL.
Those dbs are accessible from other GCP services (App engine, compute engine, external services..)
Backup, recovery, scaling and security is managed for you.
Lab : Loading Taxi Data into Google Cloud SQL
export PROJECT_ID=$(gcloud info --format='value(config.project)') export BUCKET=${PROJECT_ID}-ml #Create SQL instance gcloud sql instances create taxi \ --tier=db-n1-standard-1 --activation-policy=ALWAYS #Set a password for the instance gcloud sql users set-password root --host % --instance taxi \ --password Passw0rd #create an environment variable with the IP address of the Cloud Shell: export ADDRESS=$(wget -qO - http://ipecho.net/plain)/32 #Whitelist the Cloud Shell instance for management access to your SQL instance gcloud sql instances patch taxi --authorized-networks $ADDRESS #Get the IP address of your Cloud SQL instance by running: MYSQLIP=$(gcloud sql instances describe \ taxi --format="value(ipAddresses.ipAddress)") #Check the variable MYSQLIP: echo $MYSQLIP #Create the taxi trips table by logging into the mysql command line interface. mysql --host=$MYSQLIP --user=root \ --password --verbose #create the schema for the trips table: create database if not exists bts; use bts; drop table if exists trips; create table trips ( vendor_id VARCHAR(16), pickup_datetime DATETIME, dropoff_datetime DATETIME, passenger_count INT, trip_distance FLOAT, rate_code VARCHAR(16), store_and_fwd_flag VARCHAR(16), payment_type VARCHAR(16), fare_amount FLOAT, extra FLOAT, mta_tax FLOAT, tip_amount FLOAT, tolls_amount FLOAT, imp_surcharge FLOAT, total_amount FLOAT, pickup_location_id VARCHAR(16), dropoff_location_id VARCHAR(16) ); #check the import by entering the following commands: describe trips; #Query the trips table: select distinct(pickup_location_id) from trips; exit #Now you'll copy the New York City taxi trips CSV files stored on Cloud Storage locally. gsutil cp gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_1.csv trips.csv-1 gsutil cp gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_2.csv trips.csv-2 #Import the CSV file data into Cloud SQL using mysql mysqlimport --local --host=$MYSQLIP --user=root --password \ --ignore-lines=1 --fields-terminated-by=',' bts trips.csv-* #Connect to the mysql interactive console: mysql --host=$MYSQLIP --user=root --password #mysql interactive console select the database: use bts; #Query the trips table for unique pickup location regions: select distinct(pickup_location_id) from trips; # Let's start by digging into the trip_distance column. Enter the following query into the console: select max(trip_distance), min(trip_distance) from trips; # How many trips in the dataset have a trip distance of 0? select count(*) from trips where trip_distance = 0; #e expect the fare_amount column to be positive. Enter the following query to see if this is true in the database: select count(*) from trips where fare_amount < 0; #Finally, let's investigate the payment_type column. select payment_type, count(*) from trips group by payment_type;
Building a data warehouse
A data warehouse should consolidate data from many sources. Imposes a schema, different from a data lake.
A data warehouse should be optimized for simplicity of access and high speed query performance.
Modern Data warehouse :
- Gigabytes to petabytes
- Serverless and no-ops, including ad hoc queries
- Ecosystem of visualization and reporting tools
- Ecosystem of ETL and data processing tools
- Up-to-the minute data
- Machine learning
BigQuery is a great data warehouse
- Interactive SQL queries over large datasets in seconds
- Serverless and no-ops ibncluding ad hoc queries
- ecosystem of visualization and reporting tools
- ecosystem of ETL and data processing tools
- Up to the minute data
- Machine learning
- Security and collaboartion
every table has a schema.
BigQuery provides predefined toles for controlling access to resources.
IAM applies to datasets, use authorized views to have more granularity on what is accessible inside a data set.
BigQuery accepts files in the following formats :
- CSV
- JSON (newline)
- AVRO
- Datastore backup
- Parquet
- Orc
- Cloud Dataflow
- SaaS DTS (API)
Automate the execution of queries based on a schedule
BigQuery Data Transfer Service helps you build and manage your data warehouse
Lab Loading data into BigQuery
Create a new dataset to store tables in BigQuery
Ingest a new Dataset from a CSV -> select your dataset, create table specify CSV, auto dectect schema etc and upload
Ingest a new Dataset from Google Cloud Storage -> run the following code in the shell :
bq load \ --source_format=CSV \ --autodetect \ --noreplace \ nyctaxi.2018trips \ gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_2.csv
Create tables from other tables with DDL -> run query in big query to create new table out of sub set of data from the first one :
#standardSQL CREATE TABLE nyctaxi.january_trips AS SELECT * FROM nyctaxi.2018trips WHERE EXTRACT(Month FROM pickup_datetime)=1;
END
Schema design
Schema takes advantage of nested and repeated columns in BigQuery.
Faster reading but be careful concerning the storing capacity.
So with nesting and repeated columns we solve the storing problem as we do not repeat the same information plenty of times :
Nested array fields and struct fields allow for differing data granularity in the same table.
Check Dremel white paper for further info
Those nested and repeated fields permit to work a bit like the merge cell in excel, in permits to merge information from multiple tables into one WITHOUT repeating the same information multiple times. The query will be faster, the data processed will be smaller, the bytes shuffled will also be smaller.
use UNNEST() to query the data when you need to unpack arrays.
Normalized : multiple tables with relations
Denormalized : with arrays, stucts etc
Lab Working with JSON and Array data in BigQuery
Creating your own arrays with ARRAY_AGG()
SELECT fullVisitorId, date, v2ProductName, pageTitle FROM `data-to-insights.ecommerce.all_sessions` WHERE visitId = 1501570398 ORDER BY date --aggregate our string values into an array SELECT fullVisitorId, date, ARRAY_AGG(v2ProductName) AS products_viewed, ARRAY_AGG(pageTitle) AS pages_viewed FROM `data-to-insights.ecommerce.all_sessions` WHERE visitId = 1501570398 GROUP BY fullVisitorId, date ORDER BY date --ARRAY_LENGTH() function to count the number of pages and products that were viewed. SELECT fullVisitorId, date, ARRAY_AGG(v2ProductName) AS products_viewed, ARRAY_LENGTH(ARRAY_AGG(v2ProductName)) AS num_products_viewed, ARRAY_AGG(pageTitle) AS pages_viewed, ARRAY_LENGTH(ARRAY_AGG(pageTitle)) AS num_pages_viewed FROM `data-to-insights.ecommerce.all_sessions` WHERE visitId = 1501570398 GROUP BY fullVisitorId, date ORDER BY date --deduplicate the pages and products so we can see how many unique products were viewed SELECT fullVisitorId, date, ARRAY_AGG(DISTINCT v2ProductName) AS products_viewed, ARRAY_LENGTH(ARRAY_AGG(DISTINCT v2ProductName)) AS distinct_products_viewed, ARRAY_AGG(DISTINCT pageTitle) AS pages_viewed, ARRAY_LENGTH(ARRAY_AGG(DISTINCT pageTitle)) AS distinct_pages_viewed FROM `data-to-insights.ecommerce.all_sessions` WHERE visitId = 1501570398 GROUP BY fullVisitorId, date ORDER BY date
Querying datasets that already have ARRAYs
SELECT * FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801` WHERE visitId = 1501570398 --Use the UNNEST() function on your array field SELECT DISTINCT visitId, h.page.pageTitle FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801`, UNNEST(hits) AS h WHERE visitId = 1501570398 LIMIT 10
Introduction to STRUCTs
You may have wondered why the field alias hit.page.pageTitle
looks like three fields in one separated by periods. Just as ARRAY values give you the flexibility to go deep into the granularity of your fields, another data type allows you to go wide in your schema by grouping related fields together. That SQL data type is the STRUCT data type.
The easiest way to think about a STRUCT is to consider it conceptually like a separate table that is already pre-joined into your main table.
A STRUCT can have:
- one or many fields in it
- the same or different data types for each field
- it’s own alias
Sounds just like a table right?
SELECT visitId, totals.*, device.* FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801` WHERE visitId = 1501570398 LIMIT 10
Practice with STRUCTs and ARRAYs
--standardSQL SELECT STRUCT("Rudisha" as name, 23.4 as split) as runner --with an array SELECT STRUCT("Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits) AS runner --standardSQL SELECT race, participants.name FROM racing.race_results CROSS JOIN race_results.participants # full STRUCT name --same : SELECT race, participants.name FROM racing.race_results AS r, r.participants
Recap of STRUCTs:
- A SQL STRUCT is simply a container of other data fields which can be of different data types. The word struct means data structure. Recall the example from earlier:
- __
STRUCT(
__"Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits
__)
__AS runner
- STRUCTs are given an alias (like runner above) and can conceptually be thought of as a table inside of your main table.
- STRUCTs (and ARRAYs) must be unpacked before you can operate over their elements. Wrap an UNNEST() around the name of the struct itself or the struct field that is an array in order to unpack and flatten it.
Unpacking arrays with unnest :
--standardSQL SELECT COUNT(p.name) AS racer_count FROM racing.race_results AS r, UNNEST(r.participants) AS p --standardSQL SELECT p.name, SUM(split_times) as total_race_time FROM racing.race_results AS r , UNNEST(r.participants) AS p , UNNEST(p.splits) AS split_times WHERE p.name LIKE 'R%' GROUP BY p.name ORDER BY total_race_time ASC;
second one :
Write a query that will list the total race time for racers whose names begin with R. Order the results with the fastest total time first. Use the UNNEST() operator and start with the partially written query below.
- You will need to unpack both the struct and the array within the struct as data sources after your FROM clause
- Be sure to use aliases where appropriate
Filtering within ARRAY values
You happened to see that the fastest lap time recorded for the 800 M race was 23.2 seconds, but you did not see which runner ran that particular lap. Create a query that returns that result.
--standardSQL SELECT p.name, split_time FROM racing.race_results AS r , UNNEST(r.participants) AS p , UNNEST(p.splits) AS split_time WHERE split_time = 23.2;