Slides :
Module 1: Introduction to Cloud Dataproc
What Qualifies as Unstructured Data?
Data without a schema is unstructured. However, if data has a schema or partial schema but it is not helpful to the purposes of analysis or query, that data is considered unstructured also.
Two options. The second one is infinitely scalable, but it is harder to program. Because it involves distributed training.
Dataproc Eases Hadoop Management
Cloud Dataproc Architecture
Lab 1: Create a Dataproc Cluster
- Prepare a bucket and a cluster initialization script
- Create a Dataproc Hadoop Cluster customized to use Google Cloud API
- Enable secure access to Dataproc cluster
- Explore Hadoop operations
- Use Cloud storage instead of HDFS so you can shut down a Hadoop cluster when it is not actually running a job.
- Hive is designed for batch jobs and not for transactions. It ingests data into a data warehouse format requiring a schema. It does not support real-time queries, row-level updates, or unstructured data. Some queries may run much slower than others due to the underlying transformations Hive has to implement to simulate SQL.
- Pig provides SQL primitives similar to Hive, but in a more flexible scripting language format. Pig can also deal with semi-structured data, such as data having partial schemas, or for which the schema is not yet known. For this reason it is sometimes used for Extract Transform Load (ETL). It generates Java MapReduce jobs. Pig is not designed to deal with unstructured data.
Module 2: Running Dataproc jobs
Open-Source Tools on Dataproc
Data Center Power through GCP
Serverless Platform for Analytics Data Lifecycle Stages
Separation of Storage and Compute Enables Serverless
Separation of Storage and Compute for Spark Programs
Lab 2: Work with structured and semi-structured data
- Use the Hive CLI and run a Pig job
- Hive is used for structured data, similar to SQL
- Pig is used for semi-structured data, similar to SQL + scripting
Lab 3: Submit Dataproc jobs for unstructured data
- Explore HDFS and Cloud Storage
- Use interactive PySpark to learn about RDDs, Operations, and Lambda functions
File information – road-not-taken.txt
This shows that the file fits into a single HDFS block. Notice from the Block Information pulldown, that the file is only located in Block 0. And that the block is duplicated on both worker node 0 and worker node 1.
Connected, host fingerprint: ssh-rsa 2048 F3:7F:24:6D:E9:7B:B1:16:6C:D8:49:A7:CF:C0:7A:23:25
:EB:72:AF
The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.
Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
google710312_student@dataproc-cluster-m:~$ cd
google710312_student@dataproc-cluster-m:~$ cp -r /training .
google710312_student@dataproc-cluster-m:~$ ls
training
google710312_student@dataproc-cluster-m:~$ hadoop fs -ls /
18/07/18 18:32:03 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
Found 3 items
drwxrwxrwt - mapred hadoop 0 2018-07-18 18:21 /mapred
drwxrwxrwt - mapred hadoop 0 2018-07-18 18:21 /tmp
drwxrwxrwt - hdfs hadoop 0 2018-07-18 18:21 /user
google710312_student@dataproc-cluster-m:~$ cd ~/training
google710312_student@dataproc-cluster-m:~/training$ ls
road-not-taken.txt sherlock-holmes.txt training-data-analyst
google710312_student@dataproc-cluster-m:~/training$ hadoop fs -mkdir /sampledata
18/07/18 18:33:04 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
google710312_student@dataproc-cluster-m:~/training$ hadoop fs -copyFromLocal road-not-taken.txt /sampledata/.
18/07/18 18:33:07 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
google710312_student@dataproc-cluster-m:~/training$ hadoop fs -copyFromLocal sherlock-holmes.txt /sampledata/.
18/07/18 18:33:19 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
google710312_student@dataproc-cluster-m:~/training$ holmes.txt /sampledata/.
-bash: holmes.txt: command not found
google710312_student@dataproc-cluster-m:~/training$ hadoop fs -ls /sampledata
18/07/18 18:34:27 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
Found 2 items
-rw-r--r-- 2 google710312_student hadoop 729 2018-07-18 18:33 /sampledata/road-not-taken.txt
-rw-r--r-- 2 google710312_student hadoop 574845 2018-07-18 18:33 /sampledata/sherlock-holmes.txt
google710312_student@dataproc-cluster-m:~/training$ pyspark
Python 2.7.9 (default, Jun 29 2016, 13:08:31)
[GCC 4.9.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Python version 2.7.9 (default, Jun 29 2016 13:08:31)
SparkSession available as 'spark'.
>>> lines = sc.textFile("/sampledata/sherlock-holmes.txt")
>>> type(lines)
<class 'pyspark.rdd.RDD'>
>>> lines.count()
12652
>>> lines.take(15)
[u'', u'THE ADVENTURES OF SHERLOCK HOLMES by ARTHUR CONAN DOYLE', u'', u'', u'', u'', u'A Scandal in Bohemia', u'The Red-headed League', u'A Case of Identity', u'The Boscombe Valley Mystery', u'The Five Orange Pips', u'The Man with the Twisted Lip', u'The Adventure of the Blue Carbuncle', u'The Adventure of the Speckled Band', u"The Adventure of the Engineer's Thumb"]
>>> words = lines.flatMap(lambda x: x.split(' '))
>>> type(words)
<class 'pyspark.rdd.PipelinedRDD'>
>>> words.count()
107265
>>> words.take(15)
[u'', u'THE', u'ADVENTURES', u'OF', u'SHERLOCK', u'HOLMES', u'by', u'ARTHUR', u'CONAN', u'DOYLE', u'', u'', u'', u'', u'A']
>>> pairs = words.map(lambda x: (x,len(x)))
>>> type(pairs)
<class 'pyspark.rdd.PipelinedRDD'>
>>> pairs.count()
107265
>>> pairs.take(5)
[(u'', 0), (u'THE', 3), (u'ADVENTURES', 10), (u'OF', 2), (u'SHERLOCK', 8)]
>>> pairs = words.map(lambda x: (len(x),1))
>>> pairs.take(5)
[(0, 1), (3, 1), (10, 1), (2, 1), (8, 1)]
>>> from operator import add
>>> wordsize = pairs.reduceByKey(add)
>>> type(wordsize)
<class 'pyspark.rdd.PipelinedRDD'>
>>> wordsize.count()
22
>>> wordsize.take(5)
[(0, 2756), (2, 18052), (4, 19456), (6, 8622), (8, 4664)]
>>> output = wordsize.collect()
>>> type(output)
<type 'list'>
>>> for (size,count) in output: print(size, count)
...
(0, 2756)
(2, 18052)
(4, 19456)
(6, 8622)
(8, 4664)
(10, 1730)
(12, 585)
(14, 159)
(16, 31)
(18, 8)
(20, 4)
(1, 5141)
(3, 22939)
from pyspark.sql import SparkSession
>>> lines = sc.textFile("/sampledata/sherlock-holmes.txt")
>>>
>>> words = lines.flatMap(lambda x: x.split(' '))
>>> pairs = words.map(lambda x: (len(x),1))
>>> wordsize = pairs.reduceByKey(add)
>>> output = wordsize.sortByKey().collect()
>>> output2 = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (len(x),1)).reduceByKey(add).sortByKey().collect()
>>>
>>> for (size, count) in output2: print(size, count)
...
(0, 2756)
(1, 5141)
(2, 18052)
(3, 22939)
(4, 19456)
(5, 12044)
(6, 8622)
(7, 6615)
(8, 4664)
(9, 2980)
(10, 1730)
(11, 1035)
(12, 585)
(13, 352)
(14, 159)
(15, 75)
(16, 31)
(17, 12)
(18, 8)
(19, 4)
(20, 4)
(21, 1)
>>> exit()
google710312_student@dataproc-cluster-m:~/training$ vi wordcount.py
google710312_student@dataproc-cluster-m:~/training$ spark-submit wordcount.py
Okay Google.
18/07/18 19:07:06 INFO org.spark_project.jetty.util.log: Logging initialized @2865ms
18/07/18 19:07:06 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT
18/07/18 19:07:06 INFO org.spark_project.jetty.server.Server: Started @2958ms
18/07/18 19:07:06 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@29eeb21d{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
18/07/18 19:07:06 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.7-hadoop2
18/07/18 19:07:07 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at dataproc-cluster-m/10.128.0.2:8032
18/07/18 19:07:10 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1531938045693_0002
18/07/18 19:07:16 WARN org.apache.spark.sql.execution.streaming.FileStreamSink: Error while looking for metadata directory.
ABOUT = 1
AGES = 2
ALL = 1
AND = 9
ANOTHER = 1
AS = 5
BACK. = 1
BE = 2
BECAUSE = 1
BENT = 1
BETTER = 1
BLACK. = 1
BOTH = 2
BY, = 1
CLAIM, = 1
COME = 1
COULD = 2
DAY! = 1
DIFFERENCE. = 1
DIVERGED = 2
DOUBTED = 1
DOWN = 1
EQUALLY = 1
EVER = 1
FAIR, = 1
FAR = 1
FIRST = 1
FOR = 2
GRASSY = 1
HAD = 2
HAS = 1
HAVING = 1
HENCE: = 1
HOW = 1
I- = 1
IF = 1
IN = 4
IT = 2
JUST = 1
KEPT = 1
KNOWING = 1
LAY = 1
LEADS = 1
LEAVES = 1
LESS = 1
LONG = 1
LOOKED = 1
MADE = 1
MORNING = 1
NO = 1
NOT = 1
OH, = 1
ON = 1
ONE = 3
OTHER, = 1
PASSING = 1
PERHAPS = 1
REALLY = 1
ROADS = 2
SAME, = 1
SHALL = 1
SHOULD = 1
SIGH = 1
SOMEWHERE = 1
SORRY = 1
STEP = 1
STOOD = 1
TELLING = 1
THAT = 3
THE = 8
THEM = 1
THEN = 1
THERE = 1
THIS = 1
THOUGH = 1
TO = 2
TOOK = 2
TRAVEL = 1
TRAVELED = 1
TRAVELER, = 1
TRODDEN = 1
TWO = 1
UNDERGROWTH; = 1
WANTED = 1
WAS = 1
WAY = 1
WAY, = 1
WEAR; = 1
WHERE = 1
WITH = 1
WOOD, = 2
WORN = 1
YELLOW = 1
YET = 1
18/07/18 19:07:24 INFO org.spark_project.jetty.server.AbstractConnector: Stopped Spark@29eeb21d{HTTP/1.1,[http/1.1]}{0
.0.0.0:4040}
google710312_student@dataproc-cluster-m:~/training$
- Spark has the ability to run both batch jobs and streaming jobs. It can use in-memory processing for fast results. RDDs or resilient distributed data sets hide the complexity of the location of data within the cluster and also the complexity of replication.
- PySpark is a Read-Evaluate-Print-Loop (REPL) interpreter. Also known as a language shell. The REPL reads a single expression from the user, evaluates it, and prints the result. Then it loops and performs another single expression. The single-step immediate feedback of a REPL makes it useful for exploring and learning a language or system. The limitation is that the REPL holds no state context of its own, unlike more sophisticated shells.
- A Resilient Distributed Dataset (RDD) is an abstraction over data in storage. The RDD is opaque to the location and replication of data it contains. For reliability, RDDs are resilient (fault-tolerant) to data loss due to node failures. An RDD lineage graph is used to recompute damaged or missing partitions. And for efficiency, Spark might choose to process one part in one location or another, based on availability of CPU at that location, or based on network latency or proximity to other resources.
- The benefit of this abstraction is that it enables operations on an RDD to treat the RDD as a single object and ignore the complexity of how data is located, replicated, and migrated inside the RDD. All those details are left to Spark.
- Spark doesn’t perform operations immediately. It uses an approach called “lazy evaluation”.
- There are two classes of operations: transformations and actions. A transformation takes one RDD as input and generates another RDD as output. You can think of a transformation as a request. It explains to Spark what you want done, but doesn’t tell Spark how to do it.
- Actions like “collect”, “count”, or “take” produce a result, such as a number or a list.
- When Spark receives transformations, it stores them in a DAG (Directed Acyclic Graph) but doesn’t perform them at that time. When Spark receives an action it examines the transformations in the DAG and the available resources (number of workers in the cluster), and creates pipelines to efficiently perform the work.
Module 3: Leveraging GCP
Hadoop/Spark Jobs Read From BigQuery Through Temp GCS Storage
BigQuery does not natively know how to work with a Hadoop file system. Cloud storage can act as an intermediary between BigQuery and data proc.
BigQuery Integration Using Python Pandas
- A Spark DataFrame is a distributed collection of data organized into named columns, conceptually similar to a table in a relational database. A PandasDataFrame can be converted into a Spark DataFrame.
- There are many differences between the Pandas and Spark DataFrames. In general, Pandas’ DataFrames are mutable and easier to work with, and Spark DataFrames are immutable, and can be faster.
- Learn more about Pandas’ DataFrames: pandas.pydata.org
- Learn more about Spark DataFrames: spark.apache.org
Lab 4: Leverage GCP
- Explore Spark using PySpark jobs
- Using Cloud Storage instead of HDFS
- Run a PySpark application from Cloud Storage
- Using Python Pandas to add BigQuery to a Spark application
Why would you want to use Cloud Storage instead of HDFS?
You can shut down the cluster when you are not running jobs. The storage persists even when the cluster is shut down, so you don’t have to pay for the cluster just to maintain data in HDFS. In some cases Cloud Storage provides better performance than HDFS. Cloud Storage does not require the administration overhead of a local file system.
You can make the cluster stateless by keeping all the persistent data off-cluster. And this means (a) the cluster can be shut down when not in use, solving the Hadoop utilization problem, and (b) a cluster can be created and dedicated to a single job, solving the Hadoop configuration and tuning problem.
Dataproc OSS on GCP
Lab 5: Cluster automation using CLI commands
- Create a customized Dataproc cluster using Cloud Shell
Module 4: Analyzing Unstructured Data
Pretrained Models
TensorFlow is a multidimensional matrix solution service. It uses an exhaustion method to generate attempted solutions, and then it uses the error to refine its guess until it converges on a practical solution. It essentially assumes that a solution equation exists and then uses an exhaustion trial method to iteratively search for that solution. The result is called a machine learning model.
Lab 6: Add Machine Learning
- Add Machine Learning (ML) to a Spark application
from pyspark.sql import SparkSession
from operator import add
import re
print("Okay Google.")
spark = SparkSession\
.builder\
.appName("CountUniqueWords")\
.getOrCreate()
lines = spark.read.text("/sampledata/road-not-taken.txt").rdd.map(lambda x: x[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.filter(lambda x: re.sub('[^a-zA-Z]+', '', x)) \
.filter(lambda x: len(x)>1 ) \
.map(lambda x: x.upper()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.sortByKey()
output = counts.collect()
for (word, count) in output:
print("%s = %i" % (word, count))
spark.stop()