Introduction to Data Engineering

Role of a data engineer

->building data pipelines

  • Get the data to where it can be useful
  • Get the data into a usable condition
  • Add new value to the data
  • Manage the data
  • Productionize data processes

A data lake brings together data from across the enterprise into a single location.

Raw data -> Data lake

  • Key considerations when building a data lake :
  • Can your data lake handle all the types of data you have ?
  • Can it scale to meet the demand ?
  • Does it support high-throughput ingestion?
  • Is there fine-grained access control to objects ?
  • Can other tools connect easily ?

Cloud bucket = data lake

BigQuery = Data warehouse

What if your data is not usable in its original form ? -> use ETL (Extract, Transform, Load) -> Data processing : Cloud Dataproc and Cloud Dataflow

Common challenges encountered by data engineers :

  • Access to data
  • Data accuracy and quality
  • Availability of computational resources
  • Query performance

Cleaning, formatting, and getting the data ready for useful business insights in a data warehouse.

Data engineers need to manage server and cluster capacity if using on prem.

Dealing with versioning, query engine, patching..

BigQuery :

BigQuery allocates storage and computation power dynamically. Allocates when you consume, drops when not used. Each Query is billed based on the computation.

Considerations when choosing a data warehouse include :

  • Can it serve as a sink for both batch and streaming data pipelines?
  • Can the data warehouse scale to meet my needs ?
  • How is the data organized, cataloged, and access controlled ?
  • Is the warehouse designed for performance?
  • What level of maintenance is required by our engineering team ?

BigQuery > Traditional DW

You can simplify Data Warehouse ETL pipelines with external connections to Cloud Strorage and Cloud SQL :

CloudSQL is fully managed SQL Server, Postgres, or MySQL for your relational database (transactional RDBMS).

CloudSQL optimized for writes, BigQuery optimized for reads.

Datalake : High durability and availability

Feature pipeline : are all of these features available at production time ? Can you help us get more features (columns) ?

In bigQuery you can create ML model using SQL.

Add value with BigQuery BI Engine :

  • No need to manage OLAP cubes or separate BI servers for dashboard performance.
  • Natively integrates with BigQuery streaming for real-time data refresh.
  • Column oriented in-memory BI execution engine.

Cloud Data Catalog is a manged data discovery + Data loss prevention API for guarding PII

Cloud composer (managed Apache Airflow) is used to orchestrate production workflows.

Lab Using BigQuery to do Analysis

We go to bigquery and check the public datasets for bikes in New York and Precipitation for a weather sensor in new york.

SELECT
  MIN(start_station_name) AS start_station_name,
  MIN(end_station_name) AS end_station_name,
  APPROX_QUANTILES(tripduration, 10)[OFFSET (5)] AS typical_duration,
  COUNT(tripduration) AS num_trips
FROM
  `bigquery-public-data.new_york_citibike.citibike_trips`
WHERE
  start_station_id != end_station_id
GROUP BY
  start_station_id,
  end_station_id
ORDER BY
  num_trips DESC
LIMIT
  10

typical duration for the 10 most common one-way rentals

SELECT
  wx.date,
  wx.value/10.0 AS prcp
FROM
  `bigquery-public-data.ghcn_d.ghcnd_2015` AS wx
WHERE
  id = 'USW00094728'
  AND qflag IS NULL
  AND element = 'PRCP'
ORDER BY
  wx.date

This query will return rainfall (in mm) for all days in 2015 from a weather station in New York whose id is provided in the query (the station corresponds to NEW YORK CNTRL PK TWR )

WITH bicycle_rentals AS (
  SELECT
    COUNT(starttime) as num_trips,
    EXTRACT(DATE from starttime) as trip_date
  FROM `bigquery-public-data.new_york_citibike.citibike_trips`
  GROUP BY trip_date
),

rainy_days AS
(
SELECT
  date,
  (MAX(prcp) > 5) AS rainy
FROM (
  SELECT
    wx.date AS date,
    IF (wx.element = 'PRCP', wx.value/10, NULL) AS prcp
  FROM
    `bigquery-public-data.ghcn_d.ghcnd_2015` AS wx
  WHERE
    wx.id = 'USW00094728'
)
GROUP BY
  date
)

SELECT
  ROUND(AVG(bk.num_trips)) AS num_trips,
  wx.rainy
FROM bicycle_rentals AS bk
JOIN rainy_days AS wx
ON wx.date = bk.trip_date
GROUP BY wx.rainy

Running the query yields that, yes, New Yorkers ride the bicycle 47% fewer times when it rains.

Building a data lake

What is a data lake ?

A scalable and secure data platforn that allows enterprises to ingest, store, process and analyze any type or volume or information.

Data sinks : Central Data Lake repository, Data warehouse

Let’s focus on data lakes

Cloud storage is one of the options for a good data lake. But it is not the only option.- BigQuery can also be considered as a data lake.

Data lake versus data warehouse

A data lake is a capture of every aspect of your business operation. The data is stored in its natural/raw format, usually as object blobs or files.

  • Retain all data in its native format
  • Support all data types and all users
  • Adapt to changes easily
  • Tends to be application-specific

In contrast, a data warehouse typically has the following characteristics :

  • Loaded only when its use is defined
  • Processed/organized/transformed
  • Provide faster insights
  • Current/historical data for reporting
  • Tends to have consistent schema shared across applications

The path your data takes to get to the cloud depends on :

  • Where is your data now
  • How big your data is
  • where it has to go
  • how much transformation is needed

The method you use to load data depends on how much transformation is needed

  • EL : extract load, no transformation
  • ELT : extract, load, transform : usually when there is not much transformation needed
  • ETL : Extract transform load : apply a bunch of processing and then load

Cloud storage gives the advange of persistence, durability, strong consistency, availability, high throughput

Once created you cannot change the location of the bucket, so chose wisely ! For disaster recovery, it can be benefitial to chose multiple regions, to make sure you have multiple physical places storing the data.

Cloud storage has many object management features.

Optimizing cost with google cloud storage

Your buckets can move to nearline when they are not accessed for some time, and then to coldline reducing the overall costs.

Securing Cloud Storage

Cloud IAM (identification and access management) + ACL (access control lists).

IAM set at the bucket level.

ACL can be implemented at bucket level or specific objects (more granularity)

Data encryption options for many requirements :

Storing all sorts of data types :

Cloud SQL as a relational data lake

CloudSQL is a fully managed database service that makes it easy to set up and aminister your relation MySQL and postgreSQL.

Those dbs are accessible from other GCP services (App engine, compute engine, external services..)

Backup, recovery, scaling and security is managed for you.

Lab : Loading Taxi Data into Google Cloud SQL

export PROJECT_ID=$(gcloud info --format='value(config.project)')
export BUCKET=${PROJECT_ID}-ml

#Create SQL instance
gcloud sql instances create taxi \
    --tier=db-n1-standard-1 --activation-policy=ALWAYS
    
#Set a password for the instance 
gcloud sql users set-password root --host % --instance taxi \
 --password Passw0rd

#create an environment variable with the IP address of the Cloud Shell:
export ADDRESS=$(wget -qO - http://ipecho.net/plain)/32

#Whitelist the Cloud Shell instance for management access to your SQL instance
gcloud sql instances patch taxi --authorized-networks $ADDRESS


#Get the IP address of your Cloud SQL instance by running:
MYSQLIP=$(gcloud sql instances describe \
taxi --format="value(ipAddresses.ipAddress)")

#Check the variable MYSQLIP:
echo $MYSQLIP

#Create the taxi trips table by logging into the mysql command line interface.
mysql --host=$MYSQLIP --user=root \
      --password --verbose

#create the schema for the trips table:
create database if not exists bts;
use bts;

drop table if exists trips;

create table trips (
  vendor_id VARCHAR(16),		
  pickup_datetime DATETIME,
  dropoff_datetime DATETIME,
  passenger_count INT,
  trip_distance FLOAT,
  rate_code VARCHAR(16),
  store_and_fwd_flag VARCHAR(16),
  payment_type VARCHAR(16),
  fare_amount FLOAT,
  extra FLOAT,
  mta_tax FLOAT,
  tip_amount FLOAT,
  tolls_amount FLOAT,
  imp_surcharge FLOAT,
  total_amount FLOAT,
  pickup_location_id VARCHAR(16),
  dropoff_location_id VARCHAR(16)
);

#check the import by entering the following commands:
describe trips;

#Query the trips table:
select distinct(pickup_location_id) from trips;

exit

#Now you'll copy the New York City taxi trips CSV files stored on Cloud Storage locally.
gsutil cp gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_1.csv trips.csv-1
gsutil cp gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_2.csv trips.csv-2


#Import the CSV file data into Cloud SQL using mysql
mysqlimport --local --host=$MYSQLIP --user=root --password \
--ignore-lines=1 --fields-terminated-by=',' bts trips.csv-*

#Connect to the mysql interactive console:
mysql --host=$MYSQLIP --user=root  --password

#mysql interactive console select the database:
use bts;

#Query the trips table for unique pickup location regions:
select distinct(pickup_location_id) from trips;

# Let's start by digging into the trip_distance column. Enter the following query into the console:
select
  max(trip_distance),
  min(trip_distance)
from
  trips;
  
# How many trips in the dataset have a trip distance of 0?
select count(*) from trips where trip_distance = 0;
  
#e expect the fare_amount column to be positive. Enter the following query to see if this is true in the database:
select count(*) from trips where fare_amount < 0;



#Finally, let's investigate the payment_type column.
select
  payment_type,
  count(*)
from
  trips
group by
  payment_type;

Building a data warehouse

A data warehouse should consolidate data from many sources. Imposes a schema, different from a data lake.

A data warehouse should be optimized for simplicity of access and high speed query performance.

Modern Data warehouse :

  • Gigabytes to petabytes
  • Serverless and no-ops, including ad hoc queries
  • Ecosystem of visualization and reporting tools
  • Ecosystem of ETL and data processing tools
  • Up-to-the minute data
  • Machine learning

BigQuery is a great data warehouse

  • Interactive SQL queries over large datasets in seconds
  • Serverless and no-ops ibncluding ad hoc queries
  • ecosystem of visualization and reporting tools
  • ecosystem of ETL and data processing tools
  • Up to the minute data
  • Machine learning
  • Security and collaboartion

every table has a schema.

BigQuery provides predefined toles for controlling access to resources.

IAM applies to datasets, use authorized views to have more granularity on what is accessible inside a data set.

BigQuery accepts files in the following formats :

  • CSV
  • JSON (newline)
  • AVRO
  • Datastore backup
  • Parquet
  • Orc
  • Cloud Dataflow
  • SaaS DTS (API)

Automate the execution of queries based on a schedule

BigQuery Data Transfer Service helps you build and manage your data warehouse

Lab Loading data into BigQuery

Create a new dataset to store tables in BigQuery

Ingest a new Dataset from a CSV -> select your dataset, create table specify CSV, auto dectect schema etc and upload

Ingest a new Dataset from Google Cloud Storage -> run the following code in the shell :

bq load \
--source_format=CSV \
--autodetect \
--noreplace  \
nyctaxi.2018trips \
gs://cloud-training/OCBL013/nyc_tlc_yellow_trips_2018_subset_2.csv

Create tables from other tables with DDL -> run query in big query to create new table out of sub set of data from the first one :

#standardSQL 
CREATE TABLE   nyctaxi.january_trips AS 
SELECT   * FROM   nyctaxi.2018trips
WHERE   EXTRACT(Month   FROM     pickup_datetime)=1; 

END

Schema design

Schema takes advantage of nested and repeated columns in BigQuery.

Faster reading but be careful concerning the storing capacity.

So with nesting and repeated columns we solve the storing problem as we do not repeat the same information plenty of times :

Nested array fields and struct fields allow for differing data granularity in the same table.

Check Dremel white paper for further info

Those nested and repeated fields permit to work a bit like the merge cell in excel, in permits to merge information from multiple tables into one WITHOUT repeating the same information multiple times. The query will be faster, the data processed will be smaller, the bytes shuffled will also be smaller.

use UNNEST() to query the data when you need to unpack arrays.

Normalized : multiple tables with relations

Denormalized : with arrays, stucts etc

Lab Working with JSON and Array data in BigQuery

Creating your own arrays with ARRAY_AGG()

SELECT
  fullVisitorId,
  date,
  v2ProductName,
  pageTitle
  FROM `data-to-insights.ecommerce.all_sessions`
WHERE visitId = 1501570398
ORDER BY date

--aggregate our string values into an array

SELECT
  fullVisitorId,
  date,
  ARRAY_AGG(v2ProductName) AS products_viewed,
  ARRAY_AGG(pageTitle) AS pages_viewed
  FROM `data-to-insights.ecommerce.all_sessions`
WHERE visitId = 1501570398
GROUP BY fullVisitorId, date
ORDER BY date

--ARRAY_LENGTH() function to count the number of pages and products that were viewed.

SELECT
  fullVisitorId,
  date,
  ARRAY_AGG(v2ProductName) AS products_viewed,
  ARRAY_LENGTH(ARRAY_AGG(v2ProductName)) AS num_products_viewed,
  ARRAY_AGG(pageTitle) AS pages_viewed,
  ARRAY_LENGTH(ARRAY_AGG(pageTitle)) AS num_pages_viewed
  FROM `data-to-insights.ecommerce.all_sessions`
WHERE visitId = 1501570398
GROUP BY fullVisitorId, date
ORDER BY date


--deduplicate the pages and products so we can see how many unique products were viewed

SELECT
  fullVisitorId,
  date,
  ARRAY_AGG(DISTINCT v2ProductName) AS products_viewed,
  ARRAY_LENGTH(ARRAY_AGG(DISTINCT v2ProductName)) AS distinct_products_viewed,
  ARRAY_AGG(DISTINCT pageTitle) AS pages_viewed,
  ARRAY_LENGTH(ARRAY_AGG(DISTINCT pageTitle)) AS distinct_pages_viewed
  FROM `data-to-insights.ecommerce.all_sessions`
WHERE visitId = 1501570398
GROUP BY fullVisitorId, date
ORDER BY date

Querying datasets that already have ARRAYs

SELECT
  *
FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801`
WHERE visitId = 1501570398

--Use the UNNEST() function on your array field

SELECT DISTINCT
  visitId,
  h.page.pageTitle
FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801`,
UNNEST(hits) AS h
WHERE visitId = 1501570398
LIMIT 10

Introduction to STRUCTs

You may have wondered why the field alias hit.page.pageTitle looks like three fields in one separated by periods. Just as ARRAY values give you the flexibility to go deep into the granularity of your fields, another data type allows you to go wide in your schema by grouping related fields together. That SQL data type is the STRUCT data type.

The easiest way to think about a STRUCT is to consider it conceptually like a separate table that is already pre-joined into your main table.

A STRUCT can have:

  • one or many fields in it
  • the same or different data types for each field
  • it’s own alias

Sounds just like a table right?

SELECT
  visitId,
  totals.*,
  device.*
FROM `bigquery-public-data.google_analytics_sample.ga_sessions_20170801`
WHERE visitId = 1501570398
LIMIT 10

Practice with STRUCTs and ARRAYs

--standardSQL
SELECT STRUCT("Rudisha" as name, 23.4 as split) as runner

--with an array 
SELECT STRUCT("Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits) AS runner

--standardSQL
SELECT race, participants.name
FROM racing.race_results
CROSS JOIN
race_results.participants # full STRUCT name

--same :

SELECT race, participants.name
FROM racing.race_results AS r, r.participants

Recap of STRUCTs:

  • A SQL STRUCT is simply a container of other data fields which can be of different data types. The word struct means data structure. Recall the example from earlier:
  • __STRUCT(__"Rudisha" as name, [23.4, 26.3, 26.4, 26.1] as splits__)__AS runner
  • STRUCTs are given an alias (like runner above) and can conceptually be thought of as a table inside of your main table.
  • STRUCTs (and ARRAYs) must be unpacked before you can operate over their elements. Wrap an UNNEST() around the name of the struct itself or the struct field that is an array in order to unpack and flatten it.

Unpacking arrays with unnest :

--standardSQL
SELECT COUNT(p.name) AS racer_count
FROM racing.race_results AS r, UNNEST(r.participants) AS p

--standardSQL
SELECT
  p.name,
  SUM(split_times) as total_race_time
FROM racing.race_results AS r
, UNNEST(r.participants) AS p
, UNNEST(p.splits) AS split_times
WHERE p.name LIKE 'R%'
GROUP BY p.name
ORDER BY total_race_time ASC;

second one :

Write a query that will list the total race time for racers whose names begin with R. Order the results with the fastest total time first. Use the UNNEST() operator and start with the partially written query below.

  • You will need to unpack both the struct and the array within the struct as data sources after your FROM clause
  • Be sure to use aliases where appropriate

Filtering within ARRAY values

You happened to see that the fastest lap time recorded for the 800 M race was 23.2 seconds, but you did not see which runner ran that particular lap. Create a query that returns that result.

--standardSQL
SELECT
  p.name,
  split_time
FROM racing.race_results AS r
, UNNEST(r.participants) AS p
, UNNEST(p.splits) AS split_time
WHERE split_time = 23.2;
Categories: CloudTech

Brax

Dude in his 30s starting his digital notepad