EL, ELT, ETL
EL : Extract and load
ELT : Extract, Load, Transform
ETL : Extract, Transform, Load
What are the purposes of Data Quality processing?
- Validity : Data conforms to your business rules
- Accuracy : Data conforms to an objective true value
- Completness : Create, Save and store datasets
- Consistency : Derive insights from data
- Uniformity : Explore and present data
We can use views and queries on BigQuery to filter, identify and isolate valid data through conditions (where, having, not null, count, group by..)
What if the transformations cannot be expressed in SQL? Or are too complex to do in SQL ? -> ETL
Building ETL pipelines in Dataflow and then land the data in BigQuery :
Executing Spark on Cloud Dataproc
The hadoop ecosystem
The Hadoop ecosystems developed because of a need to analyze large datasets : Distribute the processing, store the data with the processors.
Cluser data processing : Spark or MapReduce
Apache Spark is a popular, flexible, powerful way to process large datasets. a simple explanation of Spark is that it’s able to mix different kinds of applications and to adjust how it uses the available resources on your cluster.
To give spark the flexibility it needs to determine how to use the resources that are available you have to describe what you want to do and let spark determine how it actually does it to make it happen. This is called declarative programming versus imperative programming. In imperative programming, you tell the system exactly what to do and how to do it. In declarative programming, you tell the system what you want and it figures out how to actually do the implementation.
So where does a Hadoop cluster store its data? Well, within the Hadoop distributed file system, or HDFS. HDFS is the main file system Hadoop uses for distributing work to the nodes on its cluster.
If you’re using on-premise Hadoop, you’re the one responsible for managing cluster resources via the yarn utility that Hadoop provides.
On prem can be difficult :
- Not elastic
- Hard to scale fast
- Have capacity limits
- Have no separation between storage and compute ressources
Cloud Dataproc simplifies Hadoop workloads on GCP
- Built in support for Hadoop
- managed hardware and configuration
- Simplified version management
- Flexible job configuration
Running Hadoop on Cloud DataProc
The clusters are highly available, you can run clusters with multiple master nodes and set jobs to restart on failure to ensure your cluster is in jobs or highly available. It has developer tools, you have multiple ways that you can manage your cluster including the GCP console the Cloud SDK, RESTful APIs and Direct SSH access. You have initialization actions, you can run these actions to install or customize the settings in libraries that you need when you’re clusters first created.
Using Cloud Dataproc :
- Setup : create a cluster through Console, gcloud command, Deployment manager template, Cloud SDK REST API
- Configure : single VM or not, optional components, master note options, worker nodes, preemptible nobles, Region and Zone..
- Optimize : Lower costs, efficient allocation of resources, faster time to reach, faster boot time, faster processing for some workloads..
- Utilize : Console, gcloud command, REST API, Orchestration services
- Monitor : Job driver output, logs, stackdriver monitoring, Cluster Details graphs
GCS instead of HDFS
Hadoop was built on Google’s original MapReduce design from the 2004 white paper, which was written in a world where data was local to the compute machine. Back in 2004, network speeds were originally pretty slow, and that’s why data was kept as close as possible to the processor. Now, with petabit networking speeds, you can treat storage and compute independently, and still move traffic quickly over the network.
Google network permits to deliver petabit bandwidth connecting processing and storage easily.
You can switch your hadoop jobs from hdfs to gs (cloud storage) even on prem
Using Cloud storage :
- Hadoop compatible
- Faster than HDFS in many cases
- Requires less maintenance
- Enable use of whole GCP range
- Considerably less expensive
Best practices :
- Avoid small reads; use large block sizes where possible
- Avoid iterating sequentially over many nested directories in a single jobs
Optimizing DataProc
- Where is your data and where is your cluster ? Location is importent, same regional location enhances the efficiency
- Is your network traffic being funneled ? So you don’t want to throttle with bandwidth by sending traffic into a bottleneck via your GCP networking configuration
- How many input files and Hadoop partitions are you trying to deal with ? Make sure you’re not dealing with more than around say 10,000 input files or 50’000 Hadoop partitions
- Is the size of your persistent disk limiting your throughput?
- Did you allocate enough virtual machines (VMs) to your cluster ?
Optimizing Dataproc Storage
You can use HDFS if :
- Your jobs require a lot of metadata operations
- you modify the HDFS data frequently or rename directories often
- you heavily use the append operations on HDFS files
- you have workloads that involve very heavy input output
- you have IO workloads that are especially sensitive to latency.
But in general Cloud Storage is the prefered option
Cloud Dataproc + Cloud storage allows you to reduce the disk requirements and save costs.
Geographical regions can impact the efficiency of your solution
With ephemeral clusters, you only pay for what you use.
- Persistent clusters : Resources are active at all times. You are constantly paying for all available clusters
- Ephemeral clusters : Required resources are active only when being used. You only pay for what you use.
You can as well split clusters and jobs on GCP.
Finally : You can minimize the cost if a persistent or long-lived cluster by creating the smallest cluster you can. Scoping your work on that cluster for the smallest amount of jobs. And scaling the cluster to the minimum amount of worker nodes, adding more dynamically to meet the demand.
Optimizing Dataproc Templates and Autoscaling
After you submit your jobs to the cluster, one of the features you can take advantage of with data proc is autoscaling the number of nodes in your cluster to meet the demands of your job.
Optimizing Dataproc Monitoring
Use stackdriver logging and performance monitoring
Lab : Running Apache Spark jobs on Cloud Dataproc
Part 1: Lift and Shift
Migrate existing Spark jobs to Cloud Dataproc : create a new Cloud Dataproc cluster and then run an imported Jupyter notebook that uses the cluster’s default local Hadoop Distributed File system (HDFS) to store source data and then process that data just as you would on any Hadoop cluster using Spark. This demonstrates how many existing analytics workloads such as Jupyter notebooks containing Spark code require no changes when they are migrated to a Cloud Dataproc environment.
Clone the source repository for the lab
git -C ~ clone https://github.com/GoogleCloudPlatform/training-data-analyst export DP_STORAGE="gs://$(gcloud dataproc clusters describe sparktodp --region=us-central1 --format=json | jq -r '.config.configBucket')" gsutil -m cp ~/training-data-analyst/quests/sparktobq/*.ipynb $DP_STORAGE/notebooks/jupyter
Log in to the Jupyter Notebook and execute it
#In this cell Spark SQL is initialized and Spark is used to read #in the source data as text and then returns the first 5 rows. from pyspark.sql import SparkSession, SQLContext, Row spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "hdfs:///kddcup.data_10_percent.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5) #each row is split, using , as a delimiter and parsed #using a prepared inline schema in the code. csv_rdd = raw_rdd.map(lambda row: row.split(",")) parsed_rdd = csv_rdd.map(lambda r: Row( duration=int(r[0]), protocol_type=r[1], service=r[2], flag=r[3], src_bytes=int(r[4]), dst_bytes=int(r[5]), wrong_fragment=int(r[7]), urgent=int(r[8]), hot=int(r[9]), num_failed_logins=int(r[10]), num_compromised=int(r[12]), su_attempted=r[14], num_root=int(r[15]), num_file_creations=int(r[16]), label=r[-1] ) ) parsed_rdd.take(5) #a Spark SQL context is created and a Spark dataframe using that #context is created using the parsed input data from the previous #stage. Row data can be selected and displayed using the dataframe's #.show() method sqlContext = SQLContext(sc) df = sqlContext.createDataFrame(parsed_rdd) connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False) connections_by_protocol.show() #Output +-------------+------+ |protocol_type| count| +-------------+------+ | icmp|283602| | tcp|190065| | udp| 20354| +-------------+------+ #SparkSQL can also be used to query the parsed data stored in #the Dataframe. df.registerTempTable("connections") attack_stats = sqlContext.sql(""" SELECT protocol_type, CASE label WHEN 'normal.' THEN 'no attack' ELSE 'attack' END AS state, COUNT(*) as total_freq, ROUND(AVG(src_bytes), 2) as mean_src_bytes, ROUND(AVG(dst_bytes), 2) as mean_dst_bytes, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_compromised) as total_compromised, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections GROUP BY protocol_type, state ORDER BY 3 DESC """) attack_stats.show() #Output +-------------+---------+----------+--------------+-- |protocol_type| state|total_freq|mean_src_bytes| +-------------+---------+----------+--------------+-- | icmp| attack| 282314| 932.14| | tcp| attack| 113252| 9880.38| | tcp|no attack| 76813| 1439.31| ... ... | udp| attack| 1177| 27.5| +-------------+---------+----------+--------------+-- #uses the %matplotlib inline Jupyter magic function to redirect #matplotlib to render a graphic figure inline in the notebook #instead of just dumping the data into a variable. %matplotlib inline ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))
Part 2: Separate Compute and Storage
Modify Spark jobs to use Cloud Storage instead of HDFS
The only thing we did was to delete the first section and then replaced the code to :
from pyspark.sql import SparkSession, SQLContext, Row gcs_bucket='[Your-Bucket-Name]' spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5)
With my bucket name in the place holder
Part 3: Deploy Spark Jobs
Added the following code :
%%writefile spark_analysis.py import matplotlib matplotlib.use('agg') import argparse parser = argparse.ArgumentParser() parser.add_argument("--bucket", help="bucket for input and output") args = parser.parse_args() BUCKET = args.bucket
The %%writefile spark_analysis.py
Jupyter magic command creates a new output file to contain your standalone python script. You will add a variation of this to the remaining cells to append the contents of each cell to the standalone script file.
For the remaining cells insert %%writefile -a spark_analysis.py
at the start of each Python code cell.
At the end of the notebook :
%%writefile -a spark_analysis.py ax[0].get_figure().savefig('report.png'); %%writefile -a spark_analysis.py import google.cloud.storage as gcs bucket = gcs.Client().get_bucket(BUCKET) for blob in bucket.list_blobs(prefix='sparktodp/'): blob.delete() bucket.blob('sparktodp/report.png').upload_from_filename('report.png') %%writefile -a spark_analysis.py connections_by_protocol.write.format("csv").mode("overwrite").save( "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))
Test automation :
You now test that the PySpark code runs successfully as a file by calling the local copy from inside the notebook
BUCKET_list = !gcloud info --format='value(config.project)' BUCKET=BUCKET_list[0] print('Writing to {}'.format(BUCKET)) !/opt/conda/anaconda/bin/python spark_analysis.py --bucket=$BUCKET !gsutil ls gs://$BUCKET/sparktodp/** !gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py
Run the cells
Run the job from Cloud Shell :
gsutil cp gs://$PROJECT_ID/sparktodp/spark_analysis.py spark_analysis.py nano submit_onejob.sh #Paste the following in the script : #!/bin/bash gcloud dataproc jobs submit pyspark \ --cluster sparktodp \ --region us-central1 \ spark_analysis.py \ -- --bucket=$1 #make the script executable : chmod +x submit_onejob.sh #Launch the PySpark Analysis job ./submit_onejob.sh $PROJECT_ID
The job runs and then the image is created in the new bucket
Cloud data fusion
Cloud data Fusion is a fully-managed, cloud native, enterprise data integration service for quickly building and managing data pipelines.
Graphical no code tool to build data pipelines.-
Data fusion creates ephemeral execution environments to run pipelines.
But normally you will not even touch those components, you will use the graphical interface to build everything.
Ui overview :
- Control Center
- Pipelines
- Wrangler
- Metadata
- Hub
- Entities
- Administration
Wrangler is a code free visual environment for transforming data in data pipelines.
Lab – Cloud Data Fusion
- Task 1: Creating a Cloud Data Fusion instance
- Task 2: Loading the data
- Task 3: Cleaning the data
- Task 4: Creating the pipeline
- Task 5: Adding a data source
- Task 6: Joining two sources
- Task 7: Storing the output to BigQuery
- Task 8: Deploying and running the pipeline
- Task 9: Viewing the results
Cloud Composer
Cloud composer orchestrates automatic workflows
Use Apache Airflow DAGs to orchestrate GCP services (DAG = Directed Acyclic Graph)
Airflow is a platform to programmatically author, schedule and monitor workflows.
Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies
Airflow workflows are written in python. The python file creates a DAG.
Apache Airflow is open source and uses operators to run and process the data workflow.
First two get us fresh training data, the last two operators retrain and redeploy our model.
Two scheduling options for cloud composer workflows :
- Periodic
- Event-driven
Two types of workflow ETL patterns :
- Push (event-triggered)
- Pull (scheduled workflow run)
You can use cloud functions for that.
Core concepts
DAG : A Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
Operator :The description of a single task, it is usually atomic. For example, the BashOperator is used to execute bash command.
Task :A parameterised instance of an Operator; a node in the DAG.
Task Instance : A specific run of a task; characterized as: a DAG, a Task, and a point in time. It has an indicative state: running, success, failed, skipped, …
Lab: An Introduction to Cloud Composer
Below is the hadoop_tutorial.py
workflow code, also referred to as the DAG:
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop wordcount example, and deletes the cluster. This DAG relies on three Airflow variables https://airflow.apache.org/concepts.html#variables * gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster. * gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be created. * gcs_bucket - Google Cloud Storage bucket to used as output for the Hadoop jobs from Dataproc. See https://cloud.google.com/storage/docs/creating-buckets for creating a bucket. """ import datetime import os from airflow import models from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule # Output file for Cloud Dataproc job. output_file = os.path.join( models.Variable.get('gcs_bucket'), 'wordcount', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # Path to Hadoop wordcount example available on every Dataproc cluster. WORDCOUNT_JAR = ( 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' ) # Arguments to pass to Cloud Dataproc job. wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file] yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # Setting start date as yesterday starts the DAG immediately when it is # detected in the Cloud Storage bucket. 'start_date': yesterday, # To email on failure or retry set 'email' arg to your email and enable # emailing here. 'email_on_failure': False, 'email_on_retry': False, # If a task fails, retry it once after waiting at least 5 minutes 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } with models.DAG( 'composer_sample_quickstart', # Continue to run DAG once per day schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # Create a Cloud Dataproc cluster. create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator( task_id='create_dataproc_cluster', # Give the cluster a unique name by appending the date scheduled. # See https://airflow.apache.org/code.html#default-variables cluster_name='quickstart-cluster-{{ ds_nodash }}', num_workers=2, zone=models.Variable.get('gce_zone'), master_machine_type='n1-standard-1', worker_machine_type='n1-standard-1') # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster # master node. run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator( task_id='run_dataproc_hadoop', main_jar=WORDCOUNT_JAR, cluster_name='quickstart-cluster-{{ ds_nodash }}', arguments=wordcount_args) # Delete Cloud Dataproc cluster. delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator( task_id='delete_dataproc_cluster', cluster_name='quickstart-cluster-{{ ds_nodash }}', # Setting trigger_rule to ALL_DONE causes the cluster to be deleted # even if the Dataproc job fails. trigger_rule=trigger_rule.TriggerRule.ALL_DONE) # Define DAG dependencies. create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Cloud dataflow
The more data, the more it’s simply distributed in parallel across more workers. For streaming data, the Pcollection is simply without bounds. It has no end. Each element inside a Pcollection can be individually accessed and processed. This how distributed processing of the Pcollection is implemented. So you define the pipeline and that transforms on the Pcollection and the Runner handles implementing the transforms on each element, distributing the work as needed for scale and with all available resources.
Key good points :
- Graph is optimized for best execution path
- Autoscaling mid job
- Strong streaming semantics
Lab Building a Simple Dataflow Pipeline (Python)
Task 1. Preparation + Task 2. Open Dataflow project
Create buket, enable dataflow API, copy files locally, install package
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd ~/training-data-analyst/courses/data_analysis/lab2/python sudo ./install_packages.sh pip3 -V
Task 3. Pipeline filtering
Check the python file :
#!/usr/bin/env python """ Copyright Google Inc. 2016 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import apache_beam as beam import sys def my_grep(line, term): if line.startswith(term): yield line if __name__ == '__main__': p = beam.Pipeline(argv=sys.argv) input = '../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java' output_prefix = '/tmp/output' searchTerm = 'import' # find all lines that contain the searchTerm (p | 'GetJava' >> beam.io.ReadFromText(input) | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) ) | 'write' >> beam.io.WriteToText(output_prefix) ) p.run().wait_until_finish()
Task 4. Execute the pipeline locally
cd ~/training-data-analyst/courses/data_analysis/lab2/python python3 grep.py ls -al /tmp cat /tmp/output-*
Task 5. Execute the pipeline on the cloud
gsutil cp ../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/*.java gs://$BUCKET/javahelp #Change the variables for bucket and project in the grepc.py python3 grepc.py
Aggregating with GroupByKey and Combine
Lab MapReduce in Cloud Dataflow (Python)
Useless lab
Python file :
#!/usr/bin/env python3 """ Copyright Google Inc. 2016 Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import apache_beam as beam import argparse def startsWith(line, term): if line.startswith(term): yield line def splitPackageName(packageName): """e.g. given com.example.appname.library.widgetname returns com com.example com.example.appname etc. """ result = [] end = packageName.find('.') while end > 0: result.append(packageName[0:end]) end = packageName.find('.', end+1) result.append(packageName) return result def getPackages(line, keyword): start = line.find(keyword) + len(keyword) end = line.find(';', start) if start < end: packageName = line[start:end].strip() return splitPackageName(packageName) return [] def packageUse(line, keyword): packages = getPackages(line, keyword) for p in packages: yield (p, 1) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Find the most used Java packages') parser.add_argument('--output_prefix', default='/tmp/output', help='Output prefix') parser.add_argument('--input', default='../javahelp/src/main/java/com/google/cloud/training/dataanalyst/javahelp/', help='Input directory') options, pipeline_args = parser.parse_known_args() p = beam.Pipeline(argv=pipeline_args) input = '{0}*.java'.format(options.input) output_prefix = options.output_prefix keyword = 'import' # find most used packages (p | 'GetJava' >> beam.io.ReadFromText(input) | 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword)) | 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword)) | 'TotalUse' >> beam.CombinePerKey(sum) | 'Top_5' >> beam.transforms.combiners.Top.Of(5, key=lambda kv: kv[1]) | 'write' >> beam.io.WriteToText(output_prefix) ) p.run().wait_until_finish()