1. Build Spark for CDH5
> cd /home/david/tools/spark-0.9.0-incubating-bin-hadoop2
> sbt/sbt clean
> SPARK_HADOOP_VERSION=2.2.0-cdh5.0.0-beta-2 SPARK_YARN=true sbt/sbt assembly
> sbt assembly
Note: use command 'hadoop version' to find out what to be set for SPARK_HADOOP_VERSION
Spark source is downloaded from http://spark.incubator.apache.org/downloads.html
2. Run it
> java -jar /home/david/codes/spark/examples/WordCount/target/scala-2.10/WordCount-assembly-1.0.jar
Note: the runnable jar WordCount-assembly-1.0.jar is generated throuth 'sbt assembly'
# build.sbt
import AssemblyKeys._
assemblySettings
name := "WordCount"
version := "1.0"
scalaVersion := "2.10.3"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "0.9.24" % "compile"
libraryDependencies += "ch.qos.logback" % "logback-core" % "0.9.24" % "compile"
libraryDependencies += "org.slf4j" % "log4j-over-slf4j" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.2.0"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
excludedJars in assembly <<= (fullClasspath in assembly) map {
cp =>
cp filter {
c => List("javax.servlet-2.5.0.v201103041518.jar",
"slf4j-log4j12-1.7.2.jar",
"commons-beanutils-1.7.0.jar",
"commons-collections-3.2.1.jar",
"minlog-1.2.jar",
"asm-3.2.jar",
"hadoop-yarn-common-2.2.0.jar",
"slf4j-log4j12-1.7.5.jar",
"log4j-over-slf4j-1.6.1.jar"
) exists { c.data.getName contains _ } }
}
packageOptions in (Compile, packageBin) +=
Package.ManifestAttributes("Main-Class" -> "wc")
# wc.scala
import org.apache.spark.SparkContext
object wc extends App {
val sc = new SparkContext("local", "Simple App", "/home/david/tools/spark",
List("/home/david/tools/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0-cdh5.0.0-beta-2.jar"))
// use hdfs protocol to access file - you need to install Hue first
// In /etc/hue/conf.empty/hue.ini you can find the port 8020 is defined as "fs_defaultfs=hdfs://localhost:8020"
val textFile2 = sc.textFile("hdfs://localhost:8020/user/david/data/output2.csv")
println(textFile2.count())
println(textFile2.filter(line => line.contains("01/01/1980")).count())
}
Note: the file output2.csv is saved to HDFS though command 'hadoop fs -put output2.csv data'
Note2: HUE may not needed based on the quote "To interact with Hadoop Distributed File System (HDFS), you need to use a Spark version that is built against the same version of Hadoop as your cluster" from Fast Data Processing with Spark p.16.
# logback.xml
< appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
< !-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -- >
< encoder >
< pattern >%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %line --- %msg%n< /pattern >
< /encoder >
< /appender >
< root level="error" >
< appender-ref ref="STDOUT" / >
< /root >
< /configuration >
No comments:
Post a Comment