Thursday, June 4, 2015

Spark Cluster Computing - Part 1


One of the things that I am interested at is the challenges big data poses and the opportunities it provides in terms of helping us understand our data more and help us make better decisions. One of the tools embraced by Data Engineers and Scientist over the last few years is Spark. And it peeked my interest and thus I have been toying around with Spark and I thought about blog about it as my reading progresses. Welcome to be part of this journey.

What is spark?
Spark is a cluster computing platform designed to be fast and general purpose. It is built on the map reduce model and can be used in variety of computation be it batch processing, stream processing or interactive queries.
Spark implementation is based on RDD (Resilent Distributed Dataset). RDD is an immutable collection of elements (can be any data type) that can be parallelized and partitioned and computed on a set of clusters.
Every action in spark can be either
1) Creating new RDD – e.g. reading from local file, reading files from s3 or any HDFS storage
2) Transforming existing RDD – e.g. mapping to another form of RDD, filtering a RDD set to get a subset of it
3) Action – any operation on the RDD e.g. count or reduce operations
What is the series about?
These series is intended to give you a jump start to toy around spark and solve some simple big data problems. A basic understanding of map reduce is helpful but not necessary. This is not intended to give you an in depth theoretical foundation of spark framework and features. 
Spark provides API to interact in pyton, scala and java. I will use java in this series.
I am just sharing what I did as a I read along and learn it.
The examples in this series are based on a hypothetical high volume news website catering to users across all the states of the United States. For every user that visits this site we are logging the state they are visiting from. This can be done by looking up their IP address against services like Digital Envoy to get their state or zip code. Assume we dump the state of the visitor, section of the website(e.g. entertainment, news , sport etc.) into a TSV(tab separated file) where each page visit logged into a new line. In practice this blob of log files will be stored in a scalable cloud based storage like S3( we will see how to use S3 as data source for spark in this series).
The files content looks like the following.
MD news
KY entertainment
CA sport 
A sample file for your exercise can be downloaded from here
As we go through this series we will try to answer the questions a data engineer will be interested in like showing a hitmap of our visitor count by state. Or a question a data enginer might ask like which state has the most visitors to the sport section and the outcome can be used to decide to add more local news from that state to increase the visitors traffic etc. 

Lets see some code here. I am using maven to build my application and manage dependencies. You need to add Spark distribution as a dependency in your pom.xml file.

<dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.3.1</version>
      </dependency>
In the main function of your app, you need to initialize the spark context , set the master to be "local" (your application runs in local instance) as opposed to a cluster (which we will see in the later parts of these series). You need to give your application a unique name so that it can be identified when it is running a shared cluster environment.
I have added the file attached above to the root folder of my application. If you are running this in a cluster environment, the files should be stored in the same location specified by the application in all clusters. However, in real life you want to store the file in shared storage like S3 where all clusters can get it from.

SparkConf conf = new SparkConf().setMaster("local").setAppName("regionRequestCounter");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("visitordata.txt");
        
Example 1)  count the number of lines in the file
Long linesCount = lines.count();// should set linesCount to 1000 

Example 2) Counting the page visits by States.  As we discussed above each spark operation is either creating a RDD , transforming RDD or doing action on RDD. In this example we will see all these.
Sc.textFile(“visitordata.txt”); //creates a  JavaRDD collection of the lines in the path defined ( this path could be a folder of compressed(e.g. .gz files) or uncompressed files, although we are using a single file in this example.


Mapping Phase
In this line we are transforming the original RDD line into a pair of key and values where we splitting each lines using the TAB separator to extract each column (in this case states and section of the entry). 
Since we are interested only counting the visits by states we are using the state as key and making the values as 1 (representing 1 visit).

JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] parts = s.split("\t");
                Tuple2<String , Integer>  tuple;
                 return new Tuple2<String, Integer>(parts[0],1);
            }
        }) ;

Reduce Phase 
In this line we are taking the pairs defined above and reducing it by the key where we are adding the values corresponding to pairs with the same key. Although the syntax in the call method seemed to be bizarre to catch at first, you will get used to it as you frequently use it going forward.
JavaPairRDD<String, Integer> visitorCountByState = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
Finally we write the output of the reduce into a text file using 
 visitorCountByState.saveAsTextFile("visitorsByState.txt"); 

If you run this application using the file I provided above you will see this result when you inspect the visitoryByState.txt file.
(NY,90)
(IA,108)
(VA,94)
(CA,90)
(MD,108)
(AZ,107)
(,1)
(SC,100)
(KY,93)
(DC,109)
(WV,101)

Putting it all together
package SparkOnLocalFile;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/**
 * Spark – Excercise 1 and 2
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("regionRequestCounter");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("visitordata.txt");
        //Excerise 1
        //long countLines = lines.count();
        //Excerices 2
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] parts = s.split("\t");
                Tuple2<String , Integer>  tuple;
                 return new Tuple2<String, Integer>(parts[0],1);
            }

        }) ;
        JavaPairRDD<String, Integer> visitorCountByState = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        visitorCountByState.saveAsTextFile("visitorsByState");
    }
}
In the next post we will see how to work with S3 files and Spark Streaming.