Using the HDFS Connector with Spark

Introduction

This article provides a walkthrough that illustrates using the Hadoop Distributed File System (HDFS) connector with the Spark application framework. For the walkthrough, we use the Oracle Linux 7.4 operating system, and we run Spark as a standalone on a single computer.

Prerequisites

Following are prerequisites for completing the walkthrough:

  • You must have permission to create a compute instance. For guidance, see Creating an Instance.
  • You must be able to connect to the service instance that you've launched. For guidance, see Connecting to an Instance.
  • You must have the appropriate OCID, fingerprint, and private key for the Identity and Access Management (IAM) user that you will use to interact with an Object Storage. For guidance, see Setup and Prerequisites.
  • You must have an Object Storage bucket that you can connect to.
  • The IAM user must be able to read and write to that bucket using the Console.

Using Spark

Install Spark and Dependencies

Note

For the purpose of this example, install Spark into the current user's home directory. Note that for production scenarios, you would not do this.
Note

Versions 2.7.7.0 and later no longer install all of the required third party dependencies. Required third party dependencies are bundled under the third-party/lib folder in the zip archive and should be installed manually.
  1. Create an instance of your Compute service. For guidance, see Creating an Instance.
  2. Ensure that your service instance has a public IP address so that you can connect using a Secure Shell (SSH) connection. For guidance, see Connecting to an Instance.
  3. Connect to your service instance using an SSH connection.
  4. Install Spark and its dependencies, Java and Scala, by using the code examples that follow.
# We'll use wget to download some of the artifacts that need to be installed
sudo yum install wget
 
# First install Java
sudo yum install java-1.8.0-openjdk.x86_64
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
# Should be something like: OpenJDK Runtime Environment (build 1.8.0_161-b14)
java -version
 
# Then install Scala
wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.rpm
sudo yum install scala-2.12.4.rpm
# Should be something like: Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
scala -version
 
# Then download Spark
wget https://archive.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
tar xvf spark-2.2.1-bin-hadoop2.7.tgz
export SPARK_HOME=$HOME/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
 
# Start a Spark master
cd $SPARK_HOME
./sbin/start-master.sh

Download the HDFS Connector and Create Configuration Files

Note

For the purposes of this example, place the JAR and key files in the current user's home directory. For production scenarios you would instead put these files in a common place that enforces the appropriate permissions (that is, readable by the user under which Spark and Hive are running).

Download the HDFS Connector to the service instance and add the relevant configuration files by using the following code example. For additional information, see HDFS Connector for Object Storage.

wget https://github.com/oracle/oci-hdfs-connector/releases/download/v2.9.2.1/oci-hdfs.zip
unzip oci-hdfs.zip -d oci-hdfs
 
cd $HOME
mkdir .oci
# Create or copy your API key into the $HOME/.oci directory
 
cd $SPARK_HOME/conf
# Create a core-site.xml (e.g. by transferring one you have, using vi etc.). Consult
# https://docs.oracle.com/iaas/Content/API/SDKDocs/hdfsconnector.htm#Properties
# for what this should look like
 
# Create a spark-defaults.conf file from the template
cp spark-defaults.conf.template spark-defaults.conf

In the spark-defaults.conf file, add the following at the bottom:

spark.sql.hive.metastore.sharedPrefixes= shaded.oracle,com.oracle.bmc

Prepare Data

For testing data, we will use the MovieLens data set.

  1. Download the latest data set at https://grouplens.org/datasets/movielens/latest/. Be sure to download the "Small" data set.
  2. Unzip the download file.
  3. Upload the movies.csv file to your Object Storage bucket.

Test Using the Spark Shell

With the data ready, we can now launch the Spark shell and test it using a sample command:

cd $SPARK_HOME
./bin/spark-shell
 
scala> sc.wholeTextFiles("oci://PipedUploadTest@sampletenancy/")
java.io.IOException: No FileSystem for scheme: oci

You receive an error at this point because the oci:// file system schema is not available. We need to reference the JAR file before starting the Spark shell. Here's an example for doing so:

./bin/spark-shell --jars $HOME/oci-hdfs/lib/oci-hdfs-full-1.2.7.jar --driver-class-path $HOME/oci-hdfs/lib/oci-hdfs-full-1.2.7.jar
 
scala> sc.wholeTextFiles("oci://PipedUploadTest@sampletenancy/")
res0: org.apache.spark.rdd.RDD[(String, String)] = oci://PipedUploadTest@sampletenancy/ MapPartitionsRDD[1] at wholeTextFiles at <console>:25
 
scala> sc.textFile("oci://PipedUploadTest@sampletenancy/movies.csv").take(20).foreach(println)
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller
11,"American President, The (1995)",Comedy|Drama|Romance
12,Dracula: Dead and Loving It (1995),Comedy|Horror
13,Balto (1995),Adventure|Animation|Children
14,Nixon (1995),Drama
15,Cutthroat Island (1995),Action|Adventure|Romance
16,Casino (1995),Crime|Drama
17,Sense and Sensibility (1995),Drama|Romance
18,Four Rooms (1995),Comedy
19,Ace Ventura: When Nature Calls (1995),Comedy

The command is successful so we are able to connect to Object Storage. Note that if you do not wish to pass the --jars argument each time the command executes, you can instead copy the oci-hdfs-full JAR file into the $SPARK_HOME/jars directory.

Start the Spark Thrift Server

Start the Spark Thrift Server on port 10015 and use the Beeline command line tool to establish a JDBC connection and then run a basic query, as shown here:

cd $SPARK_HOME
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10015

Once the Spark server is running, we can launch Beeline, as shown here:

cd $SPARK_HOME
./bin/beeline
Beeline version 1.2.1.spark2 by Apache Hive
beeline>

Next, connect to the server, as shown here:

Note

For the purposes of this example, we have not configured any security, so any user name and password will be accepted. For production scenarios you would not do this.
beeline> !connect jdbc:hive2://localhost:10015 testuser testpass
Connecting to jdbc:hive2://localhost:10015
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connected to: Spark SQL (version 2.2.1)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10015>

If we now check to see what tables exist, we see the following:

0: jdbc:hive2://localhost:10015> show tables;
+-----------+------------+--------------+--+
| database  | tableName  | isTemporary  |
+-----------+------------+--------------+--+
+-----------+------------+--------------+--+
No rows selected (0.724 seconds)

None exist presently; however, we can create a table and link it to the movies.csv file that we downloaded and placed in the Object Storage bucket, as shown here:

0: jdbc:hive2://localhost:10015> create table test_table (movieId integer, title string, genres string) using csv options (path "oci://myBucket@myTenant/movies.csv", header "true", delimiter ",");
 
0: jdbc:hive2://localhost:10015> describe formatted test_table;
+-------------------------------+------------------------------------------------------------+----------+--+
|           col_name            |                         data_type                          | comment  |
+-------------------------------+------------------------------------------------------------+----------+--+
| movieId                       | int                                                        | NULL     |
| title                         | string                                                     | NULL     |
| genres                        | string                                                     | NULL     |
|                               |                                                            |          |
| # Detailed Table Information  |                                                            |          |
| Database                      | default                                                    |          |
| Table                         | test_table                                                 |          |
| Owner                         | opc                                                        |          |
| Created                       | Thu Mar 01 20:45:18 GMT 2018                               |          |
| Last Access                   | Thu Jan 01 00:00:00 GMT 1970                               |          |
| Type                          | EXTERNAL                                                   |          |
| Provider                      | csv                                                        |          |
| Table Properties              | [transient_lastDdlTime=1519937118]                         |          |
| Location                      | oci://PipedUploadTest@sampletenancy/movies.csv    |          |
| Serde Library                 | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe         |          |
| InputFormat                   | org.apache.hadoop.mapred.SequenceFileInputFormat           |          |
| OutputFormat                  | org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat  |          |
| Storage Properties            | [delimiter=,, header=true, serialization.format=1]         |          |
+-------------------------------+------------------------------------------------------------+----------+--+

Note that the table stores its data externally in Object Storage and the data can be accessed using the HDFS Connector (the oci:// file system scheme). Now that we have a table, we can query it:

0: jdbc:hive2://localhost:10015> select * from test_table limit 10;
+----------+-------------------------------------+----------------------------------------------+--+
| movieId  |                title                |                    genres                    |
+----------+-------------------------------------+----------------------------------------------+--+
| 1        | Toy Story (1995)                    | Adventure|Animation|Children|Comedy|Fantasy  |
| 2        | Jumanji (1995)                      | Adventure|Children|Fantasy                   |
| 3        | Grumpier Old Men (1995)             | Comedy|Romance                               |
| 4        | Waiting to Exhale (1995)            | Comedy|Drama|Romance                         |
| 5        | Father of the Bride Part II (1995)  | Comedy                                       |
| 6        | Heat (1995)                         | Action|Crime|Thriller                        |
| 7        | Sabrina (1995)                      | Comedy|Romance                               |
| 8        | Tom and Huck (1995)                 | Adventure|Children                           |
| 9        | Sudden Death (1995)                 | Action                                       |
| 10       | GoldenEye (1995)                    | Action|Adventure|Thriller                    |
+----------+-------------------------------------+----------------------------------------------+--+