Sunday, March 30, 2014

Spark reads input from HDFS through HUE

I use Cloudera CDH5 + Spark 0.9.0 for CDH5

1. Build Spark for CDH5
> cd /home/david/tools/spark-0.9.0-incubating-bin-hadoop2
> sbt/sbt clean
> SPARK_HADOOP_VERSION=2.2.0-cdh5.0.0-beta-2 SPARK_YARN=true sbt/sbt assembly
> sbt assembly

Note: use command 'hadoop version' to find out what to be set for SPARK_HADOOP_VERSION
Spark source is downloaded from http://spark.incubator.apache.org/downloads.html

2. Run it
> java -jar /home/david/codes/spark/examples/WordCount/target/scala-2.10/WordCount-assembly-1.0.jar

Note: the runnable jar WordCount-assembly-1.0.jar is generated throuth 'sbt assembly'

# build.sbt
import AssemblyKeys._
assemblySettings
name := "WordCount"
version := "1.0"
scalaVersion := "2.10.3"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "0.9.24" % "compile"
libraryDependencies += "ch.qos.logback" % "logback-core" % "0.9.24" % "compile"
libraryDependencies += "org.slf4j" % "log4j-over-slf4j" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.2.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
excludedJars in assembly <<= (fullClasspath in assembly) map {
    cp =>
        cp filter {
            c => List("javax.servlet-2.5.0.v201103041518.jar",
                "slf4j-log4j12-1.7.2.jar",
                "commons-beanutils-1.7.0.jar",
                "commons-collections-3.2.1.jar",
                "minlog-1.2.jar",
                "asm-3.2.jar",
                "hadoop-yarn-common-2.2.0.jar",

                "slf4j-log4j12-1.7.5.jar",
                "log4j-over-slf4j-1.6.1.jar"
            ) exists { c.data.getName contains _ } }
}
packageOptions in (Compile, packageBin) +=
    Package.ManifestAttributes("Main-Class" -> "wc")


# wc.scala
import org.apache.spark.SparkContext

object wc extends App {
    val sc = new SparkContext("local", "Simple App", "/home/david/tools/spark",
         List("/home/david/tools/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0-cdh5.0.0-beta-2.jar
"))

    // use hdfs protocol to access file - you need to install Hue first
    // In /etc/hue/conf.empty/hue.ini you can find the port 8020 is defined as "fs_defaultfs=hdfs://localhost:8020"
    val textFile2 = sc.textFile("hdfs://localhost:8020/user/david/data/output2.csv")
    println(textFile2.count())
    println(textFile2.filter(line => line.contains("01/01/1980")).count())
}


Note: the file output2.csv is saved to HDFS though command 'hadoop fs -put output2.csv data'
Note2: HUE may not needed based on the quote "To interact with Hadoop Distributed File System (HDFS), you need to use a Spark version that is built against the same version of Hadoop as your cluster" from Fast Data Processing with Spark p.16.

# logback.xml
< configuration >

    < appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
        < !-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -- >
        < encoder >
            < pattern >%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %line --- %msg%n< /pattern >
        < /encoder >
    < /appender >

    < root level="error" >
        < appender-ref ref="STDOUT" / >
    < /root >
< /configuration >

No comments: