Sunday, March 30, 2014

Spark reads input from HDFS through HUE

I use Cloudera CDH5 + Spark 0.9.0 for CDH5

1. Build Spark for CDH5
> cd /home/david/tools/spark-0.9.0-incubating-bin-hadoop2
> sbt/sbt clean
> SPARK_HADOOP_VERSION=2.2.0-cdh5.0.0-beta-2 SPARK_YARN=true sbt/sbt assembly
> sbt assembly

Note: use command 'hadoop version' to find out what to be set for SPARK_HADOOP_VERSION
Spark source is downloaded from http://spark.incubator.apache.org/downloads.html

2. Run it
> java -jar /home/david/codes/spark/examples/WordCount/target/scala-2.10/WordCount-assembly-1.0.jar

Note: the runnable jar WordCount-assembly-1.0.jar is generated throuth 'sbt assembly'

# build.sbt
import AssemblyKeys._
assemblySettings
name := "WordCount"
version := "1.0"
scalaVersion := "2.10.3"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "0.9.24" % "compile"
libraryDependencies += "ch.qos.logback" % "logback-core" % "0.9.24" % "compile"
libraryDependencies += "org.slf4j" % "log4j-over-slf4j" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.2.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
excludedJars in assembly <<= (fullClasspath in assembly) map {
    cp =>
        cp filter {
            c => List("javax.servlet-2.5.0.v201103041518.jar",
                "slf4j-log4j12-1.7.2.jar",
                "commons-beanutils-1.7.0.jar",
                "commons-collections-3.2.1.jar",
                "minlog-1.2.jar",
                "asm-3.2.jar",
                "hadoop-yarn-common-2.2.0.jar",

                "slf4j-log4j12-1.7.5.jar",
                "log4j-over-slf4j-1.6.1.jar"
            ) exists { c.data.getName contains _ } }
}
packageOptions in (Compile, packageBin) +=
    Package.ManifestAttributes("Main-Class" -> "wc")


# wc.scala
import org.apache.spark.SparkContext

object wc extends App {
    val sc = new SparkContext("local", "Simple App", "/home/david/tools/spark",
         List("/home/david/tools/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0-cdh5.0.0-beta-2.jar
"))

    // use hdfs protocol to access file - you need to install Hue first
    // In /etc/hue/conf.empty/hue.ini you can find the port 8020 is defined as "fs_defaultfs=hdfs://localhost:8020"
    val textFile2 = sc.textFile("hdfs://localhost:8020/user/david/data/output2.csv")
    println(textFile2.count())
    println(textFile2.filter(line => line.contains("01/01/1980")).count())
}


Note: the file output2.csv is saved to HDFS though command 'hadoop fs -put output2.csv data'
Note2: HUE may not needed based on the quote "To interact with Hadoop Distributed File System (HDFS), you need to use a Spark version that is built against the same version of Hadoop as your cluster" from Fast Data Processing with Spark p.16.

# logback.xml
< configuration >

    < appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
        < !-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -- >
        < encoder >
            < pattern >%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %line --- %msg%n< /pattern >
        < /encoder >
    < /appender >

    < root level="error" >
        < appender-ref ref="STDOUT" / >
    < /root >
< /configuration >

Friday, March 28, 2014

Spark for Big Analytics

http://blog.revolutionanalytics.com/2013/12/apache-spark.html

Spark seeks to address the critical challenges for advanced analytics in Hadoop.  First, Spark is designed to support in-memory processing, so developers can write iterative algorithms without writing out a result set after each pass through the data.  This enables true high performance advanced analytics; for techniques like logistic regression, project sponsors report runtimes in Spark 100X faster than what they are able to achieve with MapReduce. 
Second, Spark offers an integrated framework for advanced analytics, including a machine learning library (MLLib); a graph engine (GraphX); a streaming analytics engine (Spark Streaming) and a fast interactive query tool (Shark).   This eliminates the need to support multiple point solutions, such as Giraph, GraphLab and Tez for graph engines; Storm and S3 for streaming; or Hive and Impala for interactive queries.  A single platform simplifies integration, and ensures that users can produce consistent results across different types of analysis.  
At Spark's core is an abstraction layer called Resilient Distributed Datasets, or RDDs.  RDDs are read-only partitioned collections of records created through deterministic operations on stable data or other RDDs.  RDDs include information about data lineage together with instructions for data transformation and (optional) instructions for persistence.  They are designed to be fault tolerant, so that if an operation fails it can be reconstructed.  
For data sources, Spark works with any file stored in HDFS, or any other storage system supported by Hadoop (including local file systems, Amazon S3, Hypertable and HBase).  Hadoop supports text files, SequenceFiles and any other Hadoop InputFormat.

Spark RDD

http://www.thecloudavenue.com/2014/01/resilient-distributed-datasets-rdd.html

Resilient Distributed Datasets (RDD) for the impatient

There was not much resources around RDD, but this paper and presentation are the roots of RDD. Check this to know more about RDDs from a Spark perspective.

Formally, an RDD is a read-only, partitioned collection of records. RDDs can only be created through deterministic operations on either (1) data in stable storage or (2) other RDDs.
To summarize, for iterative processing MR model is less suited than the RDD model. Performance metrics around iterative and other processings are mentioned in detail in this paperaround RDD.


Thursday, March 27, 2014

R Language Installation


http://en.wikipedia.org/wiki/R_%28programming_language%29

How to install R on CentOS 6:
> sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

> sudo yum install R
 
Useful IDE:
http://www.rstudio.com/ide/download/desktop 

How to install packages for all users:

Part 1/3: First, run R as root

Launch R with sudo R, and enter your password.

Part 2/3: To install everything, just paste this single long line in R:

(Note that these packages will still NOT be available to all users until we fix it in part 3/3, because the default root permissions forbid other users from reading the installed packages.)
source("http://www.bioconductor.org/biocLite.R") ; biocLite() ; biocLite("limma",dependencies=TRUE); biocLite("statmod"); biocLite("ADaCGH", dependencies=TRUE); biocLite("arrayQuality"); biocLite("affxparser"); biocLite("makecdfenv"); biocLite("affycomp"); biocLite("multtest",dependencies=TRUE); biocLite("Genominator",dependencies=TRUE); biocLite("affy"); biocLite("HTqPCR");
install.packages(c("Agi4x44PreProcess", "aroma.affymetrix", "bitops", "caTools", "cluster", "CNTools", "cgh", "DBI", "digest", "DESeq", "getopt", "genefilter", "geneplotter", "gplots", "gtools", "grid", "gridBase", "GLAD", "GO.db", "hopach", "Hmisc", "hexbin", "IRanges", "KernSmooth", "KEGG.db", "mgcv", "methods", "matrixStats", "Matrix", "MASS", "marray", "nnet", "nlme", "org.Hs.eg.db", "preprocessCore", "pixmap", "parser", "qvalue", "qtl", "rpart", "R.huge", "R.filesets", "RColorBrewer", "RSQLite", "sandwich", "sets", "simpleaffy", "snapCGH", "spatial", "splines", "stats4", "strucchange", "survival", "tilingArray", "tcltk", "topGO", "vsn", "xterm256"), dependencies=TRUE);

Part 3/3: Finally, fix the permissions (required step!)

Now, if you installed things for all users using the sudo R command up above, you have an easy-to-solve problem---the new scripts aren't readable! They are installed by default into/usr/local/lib/R/. So you want to make everything there world-readable, and the directories world-exectuable (exectuable for directories means "can be looked at using ls").
If you skip this step, things will still work for YOU, but they will break for all other users.
Solution:
sudo chmod -R a+r /usr/local/lib/R ; sudo find /usr/local/lib/R -type d | sudo xargs chmod a+x
Now you're done installing the R package!

================

 If you want to  build your own R with the source from http://cran.rstudio.com/ make sure you do the followint to install the  required font:
- download the current package from 
http://mirrors.ctan.org/systems/texlive/tlnet/archive/inconsolata.tar.xz

- locate your personal texmf directory by

kpsewhich -var-value=TEXMFHOME

usually ~/texmf (and it may need to be created) and cd there.

- install the current version by something like

tar xf inconsolata.tar.xz

or use untar('inconsolata.tar.xz') in R;

(if there is an ls-R file in that directory, run

mktexlsr .

) then

sudo updmap --enable Map=zi4.map

Monday, March 24, 2014

Play framework, Akka: actor communication with controller

http://stackoverflow.com/questions/21779600/play-framework-akka-actor-communication-with-controller


In short - I'm building a Scala/Play application which will monitor certain files/folders for changes and use Server Sent Events to push data to the browser.
I'm using Swatch library to do the monitoring which launches with application. This is my Global object. It starts the monitoring by launching a worker actor (as far as I understand) and reports if there are any changes - it's working just fine:
import akka.actor.{Props, ActorSystem}
import play.api._
import play.api.libs.concurrent.Akka
import play.api.Play.current
import include.Swatch._
import include.SwatchActor

object Global extends GlobalSettings {

    override def onStart(app: Application) {
        Logger.info("Application has started")
        val swatch = Akka.system.actorOf(Props[SwatchActor])
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true)
    }

}
At the moment whenever a change happens the swatch actor just outputs this to the console:
[INFO] [02/14/2014 11:45:04.377] [application-akka.actor.default-dispatcher-3] [akka://application/deadLetters] Message [include.Swatch$Create] from Actor[akka://application/deadLetters] to Actor[akka://application/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
This is my controller (so far it's not doing much):
package controllers

import play.api._
import play.api.mvc._

object Application extends Controller {

    def index = Action {
        Ok("index")
    }

}
The browser should establish a connection with the controller and then whenever my worker actor detects a change, server push to the browser happens. My problem so far is: how do I get my worker actor to send messages to my index action in the controller? Would the approach I've described above work at all?
UPDATE: I'll be using websockets to communicate with the browser. Another important thing - the worker actor must be running all the time despite of if there is an established communication with the browser or not.
UPDATE2: Thanks to drexin and others I've managed to scrape together a solution.

=======

I have found the solution.
Case class that has to be imported from a separate file:
case class Start(out: Concurrent.Channel[String])
Global object:
object Global extends GlobalSettings {

    override def onStart(app: Application) {

        class Listener extends Actor {
            var out = {
                val (enum, chan) = Concurrent.broadcast[String]
                chan
            }
            def receive = {
                //Websocket channel out is set here
                case Start(out) => this.out = out
                //Pushing messages to Websocket
                case Create(path) => this.out.push(path.toString)
                case Delete(path) => this.out.push(path.toString)
                case Modify(path) => this.out.push(path.toString)
            }
        }

        val listener = Akka.system.actorOf(Props[Listener], "listener")
        val swatch = Akka.system.actorOf(Props[SwatchActor], "swatch")
        swatch ! Watch("/folder/path", Seq(Create, Modify, Delete), true, Option(listener))

    }

}
Play controller:
object Application extends Controller {

    def test = WebSocket.using[String] { request =>

        val (out, channel) = Concurrent.broadcast[String]

        val listener = Akka.system.actorSelection("akka://application/user/listener")
        //This is where the websocket out channel is being passed to the listener actor
        listener ! Start(channel)

        val in = Iteratee.foreach[String] { msg =>
            channel push("RESPONSE: " + msg)
        }

        (in, out)

    }   

}

share|improve this answer

Sunday, March 23, 2014

A quick tour of relational database access with Scala

http://manuel.bernhardt.io/2014/02/04/a-quick-tour-of-relational-database-access-with-scala/

I started looking for a tool that would let me achieve the following:
  • library optimized for Scala (easy to add to an SBT workflow, ideally with a wrapper for the Play framework)
  • easy mapping to and from case classes
  • control over the resulting SQL
  • ability to take care of CRUD operations without too much dialogue
  • ability to build finders quickly (note: I’m talking about down-to-earth, readable finders, such as findById and findByEmail, not about monstruosities such asfindByAgeBetweenEighteenAndTwentySevenAndWithColorBlue – I don’t really know where this comes from, but I’ve seen it, and it’s a terrible, terrible idea)
  • last but not least: a certain level of maturity, and perspective that it will be maintained in the future (unlike e.g. the once promisingCricumflex ORM which appears to be no longer maintained)

Friday, March 21, 2014

How to profile methods in Scala?

http://stackoverflow.com/questions/9160001/how-to-profile-methods-in-scala

import System.{currentTimeMillis => _time}
def profile[R](code: => R, t: Long = _time) = (code, _time - t)

// usage:
val (result, time) = profile { /* block of code to be profiled*/ }

val (result2, time2) = profile methodToBeProfiled(foo)