Thursday, June 4, 2015

Spark Cluster Computing - Part 1


One of the things that I am interested at is the challenges big data poses and the opportunities it provides in terms of helping us understand our data more and help us make better decisions. One of the tools embraced by Data Engineers and Scientist over the last few years is Spark. And it peeked my interest and thus I have been toying around with Spark and I thought about blog about it as my reading progresses. Welcome to be part of this journey.

What is spark?
Spark is a cluster computing platform designed to be fast and general purpose. It is built on the map reduce model and can be used in variety of computation be it batch processing, stream processing or interactive queries.
Spark implementation is based on RDD (Resilent Distributed Dataset). RDD is an immutable collection of elements (can be any data type) that can be parallelized and partitioned and computed on a set of clusters.
Every action in spark can be either
1) Creating new RDD – e.g. reading from local file, reading files from s3 or any HDFS storage
2) Transforming existing RDD – e.g. mapping to another form of RDD, filtering a RDD set to get a subset of it
3) Action – any operation on the RDD e.g. count or reduce operations
What is the series about?
These series is intended to give you a jump start to toy around spark and solve some simple big data problems. A basic understanding of map reduce is helpful but not necessary. This is not intended to give you an in depth theoretical foundation of spark framework and features. 
Spark provides API to interact in pyton, scala and java. I will use java in this series.
I am just sharing what I did as a I read along and learn it.
The examples in this series are based on a hypothetical high volume news website catering to users across all the states of the United States. For every user that visits this site we are logging the state they are visiting from. This can be done by looking up their IP address against services like Digital Envoy to get their state or zip code. Assume we dump the state of the visitor, section of the website(e.g. entertainment, news , sport etc.) into a TSV(tab separated file) where each page visit logged into a new line. In practice this blob of log files will be stored in a scalable cloud based storage like S3( we will see how to use S3 as data source for spark in this series).
The files content looks like the following.
MD news
KY entertainment
CA sport 
A sample file for your exercise can be downloaded from here
As we go through this series we will try to answer the questions a data engineer will be interested in like showing a hitmap of our visitor count by state. Or a question a data enginer might ask like which state has the most visitors to the sport section and the outcome can be used to decide to add more local news from that state to increase the visitors traffic etc. 

Lets see some code here. I am using maven to build my application and manage dependencies. You need to add Spark distribution as a dependency in your pom.xml file.

<dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.3.1</version>
      </dependency>
In the main function of your app, you need to initialize the spark context , set the master to be "local" (your application runs in local instance) as opposed to a cluster (which we will see in the later parts of these series). You need to give your application a unique name so that it can be identified when it is running a shared cluster environment.
I have added the file attached above to the root folder of my application. If you are running this in a cluster environment, the files should be stored in the same location specified by the application in all clusters. However, in real life you want to store the file in shared storage like S3 where all clusters can get it from.

SparkConf conf = new SparkConf().setMaster("local").setAppName("regionRequestCounter");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("visitordata.txt");
        
Example 1)  count the number of lines in the file
Long linesCount = lines.count();// should set linesCount to 1000 

Example 2) Counting the page visits by States.  As we discussed above each spark operation is either creating a RDD , transforming RDD or doing action on RDD. In this example we will see all these.
Sc.textFile(“visitordata.txt”); //creates a  JavaRDD collection of the lines in the path defined ( this path could be a folder of compressed(e.g. .gz files) or uncompressed files, although we are using a single file in this example.


Mapping Phase
In this line we are transforming the original RDD line into a pair of key and values where we splitting each lines using the TAB separator to extract each column (in this case states and section of the entry). 
Since we are interested only counting the visits by states we are using the state as key and making the values as 1 (representing 1 visit).

JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] parts = s.split("\t");
                Tuple2<String , Integer>  tuple;
                 return new Tuple2<String, Integer>(parts[0],1);
            }
        }) ;

Reduce Phase 
In this line we are taking the pairs defined above and reducing it by the key where we are adding the values corresponding to pairs with the same key. Although the syntax in the call method seemed to be bizarre to catch at first, you will get used to it as you frequently use it going forward.
JavaPairRDD<String, Integer> visitorCountByState = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
Finally we write the output of the reduce into a text file using 
 visitorCountByState.saveAsTextFile("visitorsByState.txt"); 

If you run this application using the file I provided above you will see this result when you inspect the visitoryByState.txt file.
(NY,90)
(IA,108)
(VA,94)
(CA,90)
(MD,108)
(AZ,107)
(,1)
(SC,100)
(KY,93)
(DC,109)
(WV,101)

Putting it all together
package SparkOnLocalFile;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/**
 * Spark – Excercise 1 and 2
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("regionRequestCounter");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("visitordata.txt");
        //Excerise 1
        //long countLines = lines.count();
        //Excerices 2
        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                String[] parts = s.split("\t");
                Tuple2<String , Integer>  tuple;
                 return new Tuple2<String, Integer>(parts[0],1);
            }

        }) ;
        JavaPairRDD<String, Integer> visitorCountByState = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        visitorCountByState.saveAsTextFile("visitorsByState");
    }
}
In the next post we will see how to work with S3 files and Spark Streaming.



Monday, June 30, 2014

Future in Java - Asynchronous Computation

These days I am doing a bit of Java programming. One of the new Features that I learned recently is Future.

A Future is a representation of the result of an asynchronous computation. A Future has methods to check if the computation has completed and retrieve the result of the task. You can also cancel a computation if you choose to, unless the computation has already completed.

A couple of notable methods in Future

  •  isDone() returns a boolean representing if the task is done. This method is used for polling to check if the task is done.
  • Get() returns the final result of the Future,it is a blocking method which waits until the computation is   done. This method take a timeout argument in milliseconds. Which specifies the maximum amount of   time we wait for the computation to complete. If the task is not done in the specified time, a  timeout exception will be thrown.
  •  Cancel()  will abort the computation.

Let’s see the context in which we can see the usage of Futures.Think of a hypothetical scenario in which we have to fire a http request asynchronously and do some random task while waiting for the response to come.

 The steps involved look like the following    
  • Fire http request and get the Future representing the outcome(i.e no waiting for the request to finish).
  • Do some random task
  • Check if the response is ready. Here we use the get() method to block and retrieve the response.

To demonstrate this with a code I will use an AsyncHttpClient from NING. This Client allows us to send out http requests and receive the response asynchronously. The execute method of this client returns a Future. 

 If you are using maven for your dependency management you can add NING to your project by adding this dependency.

<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>1.8.11</version>
</dependency>


Then your code will look like the following code snipnet . I am using the RequestBuilder utility from the same  library that allow me set the url, headers and other properties corresponding to http request.

RequestBuilder requestBuilder = new com.ning.http.client.RequestBuilder();
requestBuilder.setUrl("http://www.example.com");
AsyncHttpClient client = new AsyncHttpClient();
Future response = asyncHttpClient.executeRequest(requestBuilder.build());

//do some task here while waiting for the response

//check if the task is done.If not done , do some other task
If(!response.isDone()) {
// Do some other task
}
//at this point , block and wait for the response
try {
NettyResponse  value =(NettyResponse) response.get();
}
Catch(TimeoutException ex) {
//Log the exception or bubble it
}
Catch (ExecutionException ex) {
//Log the exception  or bubble it
}
Catch (Exception ex) {
//Log the exception  or bubble it
}

Saturday, February 1, 2014

Using Node.js to build a site that returns JSON data


I am working on a project that integrates with multiple other vendors using Http endpoint. We send http requests with JSON payload and the vendors has to consume that and respond with a response with JSON payload. I wanted to have a simple dummy site that takes the request and responds with the same kind of data I expect from the vendors. The whole objective of having this is to mock the vendor and test the integration. Moreover, I wanted an easy and fast process to create, deploy and start multiple mock vendor sites.

Node.js can be used to achieve this. For those of you who are already familiar with javascript, the code needed to create and run these sites is very minimal.


Note: this post is not supposed to be a tutorial on Node.js. That would be a topic for another day.Once I am comfortable enough with the nitty gritty details of Node.js, I hope to come back with a post.

I am using a Windows machine and here are the steps I followed,

1. Install Node.js from here

  •  Node.js will install a console window that you can use as a     playground or IDE.  Here you can write and run any JavaScript code
  •  Node.js root directory is included in the PATH variable of  system environment  variables . Thus you can invoke the node command from your command prompt
 2. I created a folder c:\projects\nodejsfiles where I keep all my            Node.js related files.     
    Note: You can keep your node file at any directory you want.

3. Copy the code shown below(after step 5) into notepad and save        it as myhttpendpoint.js under  the directory I created above.
   
4.  Open Command Prompt and navigate to the path where the              myhttpendpoint.js is located.
    
5.  In my case  c:\projects\nodejsfiles and then run the command
     node myhttpendpoint.js

6. Voila! the server should be up and running now. If you hit the url http://127.0.0.1:1337 from browser, you     should    see the JSON response being returned.

        The code looks like the following,

var http = require('http');
http.createServer(function (req, res) {
  res.writeHead(200, {'Content-Type': 'text/json'});
  var responseObj = {
  id: null,
  detail: [
    {
      part1: [
        {
          property1: 55,
          property2: "hello there!",
          property3: 12.0,
          property4: null,
          property5: 0,
          property6: true
        }
      ],
    
    }
  ],
 
};
  res.end(JSON.stringify(responseObj));
  }).listen(1330, '127.0.0.1');
console.log('Server running at http://127.0.0.1:1330/');

Wednesday, June 26, 2013

IIS Servant - a web based management tool for IIS Manager

Big kudos to  Jonas Hovgaard for building it and making it available for public.  IIS Servant gives you access to IIS manager and you can do most of the basic tasks related to web site creation and maintenance that you normally do through IIS manager.

It saves you the trouble of  remote desktop to your severs to open IIS manager and manage your web sites.
It allows you do all this management actions through a beautiful user interface.

 I installed it on some of our web server boxes .The installation is very simple and you can install it and start using it in a matter of few minutes.

I had a CNAME ready to use for the iismanager site it is going to create and I used that as 'servant url' during installation.

You can download it from servant io
Here is a screen shot of one of my servant IIS configurations ...


Tuesday, April 23, 2013

The PRG /POST-REDIRECT-GET/ Pattern using ASP.NET MVC

This pattern is used to solve the double submit problem. When a user tries to refresh the result page of a post request, The page is resubmitted again resulting in undesirable results like double withdrawal of account, multiple orders etc.

If a post request is supposed to return a page, instead it should  does its side effect operation(e.g. saving order) and  redirect to a get request that returns the page. Thus result of the get request could be bookmarked and can be requested as needed with out a side effect(undesired additional orders).


The double submit problem is explained in depth in this article by Michael Jouravlev.

Using the ASP.Net MVC framework it is simple to implement the PRG pattern, below you will find a small 


code snipnet that uses the RPG pattern.


Step1(POST)  Controller action will make a redirect to Step 1(GET request) on error or if successful it will make a GET request to the next step 2(GET). Step 1(POST)  doesn't build and return pages it self.
[HttpGet]
[ImportModelStateFromTempData]
public ActionResult Step1(int id)
{

var m = _orderService.GetOrder(id);
return View(m);
}

[HttpPost]
[ExportModelStateToTempData]
public ActionResult Step1(Order m)

//If the model is valid(satisfies all the validation requirements set for it)
//persist it and make a get request to the next step 
//else make a redirect to GET request page 

if (ModelState.IsValid)
{
// save
-orderService.Save(m);
// Go to the next step (Step 2 here is a HttpGet ActionResult)
return RedirectToAction("Step2", new { id = m.OrderId });
}
//redirect to the Get Request

return RedirectToAction("Step1",new {id =m.OrderId});

}

[HttpGet]
[ImportModelStateFromTempData]
public ActionResult Step2(int id)
{

var m = _orderService.GetOrder(id);
return View(m);
}

Friday, March 22, 2013

Lunch and Learn Session on SignalR .Net - Realtime for Web


Here is the power-point  to the presentation I gave  my colleagues a few weeks back on SignalR.

Tuesday, January 15, 2013

Extending the default Command Timeout in Nhibernate



There was a Nhibernate query that is taking more than 30 sec , 34 seconds to be exact. The query involves a view that joins so many views and tables. It started taking more than 30 seconds last week. Reviewing the execution plan and throwing indexes here and there didn't help. The next immediate solution was to extend the Nhibernate default timeout time (i.e.30 seconds).

I believe extending the timeout should be a last resort approach , should be used as immediate fix until you come up with a long term solution - like redesigning your data architecture , optimizing your queries and others.

Nhibernate allows you set the command timeout for query. This timeout is the length of time the query will be allowed to execute and finish. If  the query takes more than this time,the query will be aborted and SQL timeout exception will be thrown.

Currently you can set the timeout on query if you are using ICriteria. The Nhibernate team is working to make this method (setting the timeout) available on  QueryOver  on the next release (3.3.x). 

I was using Nhibernate QueryOver for my query, thus I had to rewrite it using ICriteria , I know it is a bummer :(  . Assuming you are using a unit of work pattern  the code will be along the line of the following 


var query = _unitOfWork.currentSession.CreateCriteria<MyView>().          setTimeout(TIMEOUT_IN_MILLISECONDS);


I will leave it like that until we completely remove that view and replace it with a roll up table in our next release.