EL, ELT, ETL

EL : Extract and load

ELT : Extract, Load, Transform

ETL : Extract, Transform, Load

What are the purposes of Data Quality processing?

  • Validity : Data conforms to your business rules
  • Accuracy : Data conforms to an objective true value
  • Completness : Create, Save and store datasets
  • Consistency : Derive insights from data
  • Uniformity : Explore and present data

We can use views and queries on BigQuery to filter, identify and isolate valid data through conditions (where, having, not null, count, group by..)

What if the transformations cannot be expressed in SQL? Or are too complex to do in SQL ? -> ETL

Building ETL pipelines in Dataflow and then land the data in BigQuery :

Executing Spark on Cloud Dataproc

The hadoop ecosystem

The Hadoop ecosystems developed because of a need to analyze large datasets : Distribute the processing, store the data with the processors.

Cluser data processing : Spark or MapReduce

Apache Spark is a popular, flexible, powerful way to process large datasets. a simple explanation of Spark is that it’s able to mix different kinds of applications and to adjust how it uses the available resources on your cluster.

To give spark the flexibility it needs to determine how to use the resources that are available you have to describe what you want to do and let spark determine how it actually does it to make it happen. This is called declarative programming versus imperative programming. In imperative programming, you tell the system exactly what to do and how to do it. In declarative programming, you tell the system what you want and it figures out how to actually do the implementation. 

So where does a Hadoop cluster store its data? Well, within the Hadoop distributed file system, or HDFS. HDFS is the main file system Hadoop uses for distributing work to the nodes on its cluster.

If you’re using on-premise Hadoop, you’re the one responsible for managing cluster resources via the yarn utility that Hadoop provides.

On prem can be difficult :

  • Not elastic
  • Hard to scale fast
  • Have capacity limits
  • Have no separation between storage and compute ressources

Cloud Dataproc simplifies Hadoop workloads on GCP

  • Built in support for Hadoop
  • managed hardware and configuration
  • Simplified version management
  • Flexible job configuration

Running Hadoop on Cloud DataProc

The clusters are highly available, you can run clusters with multiple master nodes and set jobs to restart on failure to ensure your cluster is in jobs or highly available. It has developer tools, you have multiple ways that you can manage your cluster including the GCP console the Cloud SDK, RESTful APIs and Direct SSH access. You have initialization actions, you can run these actions to install or customize the settings in libraries that you need when you’re clusters first created.

Using Cloud Dataproc :

  • Setup : create a cluster through Console, gcloud command, Deployment manager template, Cloud SDK REST API
  • Configure : single VM or not, optional components, master note options, worker nodes, preemptible nobles, Region and Zone..
  • Optimize : Lower costs, efficient allocation of resources, faster time to reach, faster boot time, faster processing for some workloads..
  • Utilize : Console, gcloud command, REST API, Orchestration services
  • Monitor : Job driver output, logs, stackdriver monitoring, Cluster Details graphs

GCS instead of HDFS

Hadoop was built on Google’s original MapReduce design from the 2004 white paper, which was written in a world where data was local to the compute machine. Back in 2004, network speeds were originally pretty slow, and that’s why data was kept as close as possible to the processor. Now, with petabit networking speeds, you can treat storage and compute independently, and still move traffic quickly over the network.

Google network permits to deliver petabit bandwidth connecting processing and storage easily.

You can switch your hadoop jobs from hdfs to gs (cloud storage) even on prem

Using Cloud storage :

  • Hadoop compatible
  • Faster than HDFS in many cases
  • Requires less maintenance
  • Enable use of whole GCP range
  • Considerably less expensive

Best practices :

  • Avoid small reads; use large block sizes where possible
  • Avoid iterating sequentially over many nested directories in a single jobs

Optimizing DataProc

  • Where is your data and where is your cluster ? Location is importent, same regional location enhances the efficiency
  • Is your network traffic being funneled ? So you don’t want to throttle with bandwidth by sending traffic into a bottleneck via your GCP networking configuration
  • How many input files and Hadoop partitions are you trying to deal with ? Make sure you’re not dealing with more than around say 10,000 input files or 50’000 Hadoop partitions
  • Is the size of your persistent disk limiting your throughput?
  • Did you allocate enough virtual machines (VMs) to your cluster ?

Optimizing Dataproc Storage

You can use HDFS if :

  • Your jobs require a lot of metadata operations
  • you modify the HDFS data frequently or rename directories often
  • you heavily use the append operations on HDFS files
  • you have workloads that involve very heavy input output
  • you have IO workloads that are especially sensitive to latency.

But in general Cloud Storage is the prefered option

Cloud Dataproc + Cloud storage allows you to reduce the disk requirements and save costs.

Geographical regions can impact the efficiency of your solution

With ephemeral clusters, you only pay for what you use.

  • Persistent clusters : Resources are active at all times. You are constantly paying for all available clusters
  • Ephemeral clusters : Required resources are active only when being used. You only pay for what you use.

You can as well split clusters and jobs on GCP.

Finally : You can minimize the cost if a persistent or long-lived cluster by creating the smallest cluster you can. Scoping your work on that cluster for the smallest amount of jobs. And scaling the cluster to the minimum amount of worker nodes, adding more dynamically to meet the demand.

Optimizing Dataproc Templates and Autoscaling

After you submit your jobs to the cluster, one of the features you can take advantage of with data proc is autoscaling the number of nodes in your cluster to meet the demands of your job.

Optimizing Dataproc Monitoring

Use stackdriver logging and performance monitoring

Lab : Running Apache Spark jobs on Cloud Dataproc

Part 1: Lift and Shift

Migrate existing Spark jobs to Cloud Dataproc : create a new Cloud Dataproc cluster and then run an imported Jupyter notebook that uses the cluster’s default local Hadoop Distributed File system (HDFS) to store source data and then process that data just as you would on any Hadoop cluster using Spark. This demonstrates how many existing analytics workloads such as Jupyter notebooks containing Spark code require no changes when they are migrated to a Cloud Dataproc environment.

Clone the source repository for the lab

git -C ~ clone https://github.com/GoogleCloudPlatform/training-data-analyst

export DP_STORAGE="gs://$(gcloud dataproc clusters describe sparktodp --region=us-central1 --format=json | jq -r '.config.configBucket')"

gsutil -m cp ~/training-data-analyst/quests/sparktobq/*.ipynb $DP_STORAGE/notebooks/jupyter

Log in to the Jupyter Notebook and execute it

#In this cell Spark SQL is initialized and Spark is used to read 
#in the source data as text and then returns the first 5 rows.
from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "hdfs:///kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

#each row is split, using , as a delimiter and parsed 
#using a prepared inline schema in the code.

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]),
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

#a Spark SQL context is created and a Spark dataframe using that 
#context is created using the parsed input data from the previous
#stage. Row data can be selected and displayed using the dataframe's
#.show() method

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

#Output 
+-------------+------+
|protocol_type| count|
+-------------+------+
|         icmp|283602|
|          tcp|190065|
|          udp| 20354|
+-------------+------+

#SparkSQL can also be used to query the parsed data stored in
#the Dataframe.

df.registerTempTable("connections")
attack_stats = sqlContext.sql("""
    SELECT
      protocol_type,
      CASE label
        WHEN 'normal.' THEN 'no attack'
        ELSE 'attack'
      END AS state,
      COUNT(*) as total_freq,
      ROUND(AVG(src_bytes), 2) as mean_src_bytes,
      ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
      ROUND(AVG(duration), 2) as mean_duration,
      SUM(num_failed_logins) as total_failed_logins,
      SUM(num_compromised) as total_compromised,
      SUM(num_file_creations) as total_file_creations,
      SUM(su_attempted) as total_root_attempts,
      SUM(num_root) as total_root_acceses
    FROM connections
    GROUP BY protocol_type, state
    ORDER BY 3 DESC
    """)
attack_stats.show()

#Output
+-------------+---------+----------+--------------+--
|protocol_type|    state|total_freq|mean_src_bytes|
+-------------+---------+----------+--------------+--
|         icmp|   attack|    282314|        932.14|
|          tcp|   attack|    113252|       9880.38|
|          tcp|no attack|     76813|       1439.31|
...
...
|          udp|   attack|      1177|          27.5|
+-------------+---------+----------+--------------+--

#uses the %matplotlib inline Jupyter magic function to redirect
#matplotlib to render a graphic figure inline in the notebook 
#instead of just dumping the data into a variable.

%matplotlib inline
ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

Part 2: Separate Compute and Storage

Modify Spark jobs to use Cloud Storage instead of HDFS

The only thing we did was to delete the first section and then replaced the code to :

from pyspark.sql import SparkSession, SQLContext, Row

gcs_bucket='[Your-Bucket-Name]'
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

With my bucket name in the place holder

Part 3: Deploy Spark Jobs

Added the following code :

%%writefile spark_analysis.py

import matplotlib
matplotlib.use('agg')

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket

The %%writefile spark_analysis.py Jupyter magic command creates a new output file to contain your standalone python script. You will add a variation of this to the remaining cells to append the contents of each cell to the standalone script file.

For the remaining cells insert %%writefile -a spark_analysis.py at the start of each Python code cell. 

At the end of the notebook :

%%writefile -a spark_analysis.py

ax[0].get_figure().savefig('report.png');

%%writefile -a spark_analysis.py

import google.cloud.storage as gcs
bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

%%writefile -a spark_analysis.py

connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

Test automation :

You now test that the PySpark code runs successfully as a file by calling the local copy from inside the notebook

BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET=BUCKET_list[0]
print('Writing to {}'.format(BUCKET))
!/opt/conda/anaconda/bin/python spark_analysis.py --bucket=$BUCKET

!gsutil ls gs://$BUCKET/sparktodp/**
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py

Run the cells

Run the job from Cloud Shell :

gsutil cp gs://$PROJECT_ID/sparktodp/spark_analysis.py spark_analysis.py
nano submit_onejob.sh

#Paste the following in the script : 
#!/bin/bash
gcloud dataproc jobs submit pyspark \
       --cluster sparktodp \
       --region us-central1 \
       spark_analysis.py \
       -- --bucket=$1
       
#make the script executable : 
chmod +x submit_onejob.sh

#Launch the PySpark Analysis job
./submit_onejob.sh $PROJECT_ID

The job runs and then the image is created in the new bucket

Cloud data fusion

Cloud data Fusion is a fully-managed, cloud native, enterprise data integration service for quickly building and managing data pipelines.

Graphical no code tool to build data pipelines.-

Data fusion creates ephemeral execution environments to run pipelines.

But normally you will not even touch those components, you will use the graphical interface to build everything.

Ui overview :

  • Control Center
  • Pipelines
  • Wrangler
  • Metadata
  • Hub
  • Entities
  • Administration

Wrangler is a code free visual environment for transforming data in data pipelines.

Lab – Cloud Data Fusion

  • Task 1: Creating a Cloud Data Fusion instance
  • Task 2: Loading the data
  • Task 3: Cleaning the data
  • Task 4: Creating the pipeline
  • Task 5: Adding a data source
  • Task 6: Joining two sources
  • Task 7: Storing the output to BigQuery
  • Task 8: Deploying and running the pipeline
  • Task 9: Viewing the results

Cloud Composer

Cloud composer orchestrates automatic workflows

Use Apache Airflow DAGs to orchestrate GCP services (DAG = Directed Acyclic Graph)

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies

Airflow workflows are written in python. The python file creates a DAG.

Apache Airflow is open source and uses operators to run and process the data workflow.

First two get us fresh training data, the last two operators retrain and redeploy our model.

Two scheduling options for cloud composer workflows :

  • Periodic
  • Event-driven

Two types of workflow ETL patterns :

  • Push (event-triggered)
  • Pull (scheduled workflow run)

You can use cloud functions for that.

Core concepts

DAG : A Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Operator :The description of a single task, it is usually atomic. For example, the BashOperator is used to execute bash command.

Task :A parameterised instance of an Operator; a node in the DAG.

Task Instance : A specific run of a task; characterized as: a DAG, a Task, and a point in time. It has an indicative state: runningsuccessfailedskipped, …

Lab: An Introduction to Cloud Composer

Below is the hadoop_tutorial.py workflow code, also referred to as the DAG:

"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to used as output for the Hadoop jobs from Dataproc.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='quickstart-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Cloud dataflow

The more data, the more it’s simply distributed in parallel across more workers. For streaming data, the Pcollection is simply without bounds. It has no end. Each element inside a Pcollection can be individually accessed and processed. This how distributed processing of the Pcollection is implemented. So you define the pipeline and that transforms on the Pcollection and the Runner handles implementing the transforms on each element, distributing the work as needed for scale and with all available resources. 

Key good points :

  • Graph is optimized for best execution path
  • Autoscaling mid job
  • Strong streaming semantics

Lab Building a Simple Dataflow Pipeline (Python)

Task 1. Preparation + Task 2. Open Dataflow project

Create buket, enable dataflow API, copy files locally, install package

git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd ~/training-data-analyst/courses/data_analysis/lab2/python
sudo ./install_packages.sh
pip3 -V

Task 3. Pipeline filtering

Check the python file :

#!/usr/bin/env python

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import sys

def my_grep(line, term):
   if line.startswith(term):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java'
   output_prefix = '/tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()

Task 4. Execute the pipeline locally

cd ~/training-data-analyst/courses/data_analysis/lab2/python
python3 grep.py
ls -al /tmp
cat /tmp/output-*

Task 5. Execute the pipeline on the cloud

gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://$BUCKET/javahelp

#Change the variables for bucket and project in the grepc.py

python3 grepc.py

Aggregating with GroupByKey and Combine

Lab MapReduce in Cloud Dataflow (Python)

Useless lab

Python file :

#!/usr/bin/env python3

"""
Copyright Google Inc. 2016
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import apache_beam as beam
import argparse

def startsWith(line, term):
   if line.startswith(term):
      yield line

def splitPackageName(packageName):
   """e.g. given com.example.appname.library.widgetname
           returns com
	           com.example
                   com.example.appname
      etc.
   """
   result = []
   end = packageName.find('.')
   while end > 0:
      result.append(packageName[0:end])
      end = packageName.find('.', end+1)
   result.append(packageName)
   return result

def getPackages(line, keyword):
   start = line.find(keyword) + len(keyword)
   end = line.find(';', start)
   if start < end:
      packageName = line[start:end].strip()
      return splitPackageName(packageName)
   return []

def packageUse(line, keyword):
   packages = getPackages(line, keyword)
   for p in packages:
      yield (p, 1)

if __name__ == '__main__':
   parser = argparse.ArgumentParser(description='Find the most used Java packages')
   parser.add_argument('--output_prefix', default='/tmp/output', help='Output prefix')
   parser.add_argument('--input', default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/', help='Input directory')

   options, pipeline_args = parser.parse_known_args()
   p = beam.Pipeline(argv=pipeline_args)

   input = '{0}*.java'.format(options.input)
   output_prefix = options.output_prefix
   keyword = 'import'

   # find most used packages
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword))
      | 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword))
      | 'TotalUse' >> beam.CombinePerKey(sum)
      | 'Top_5' >> beam.transforms.combiners.Top.Of(5, key=lambda kv: kv[1])
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()
Categories: CloudTech

Brax

Dude in his 30s starting his digital notepad