远程操作Spark集群

Java

源码:

1
2
3
4
5
6
7
8
9
10
11
12
public class RowCounter {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("spark://HADOOP-MASTER-153:7077");
JavaSparkContext sc = new JavaSparkContext("local[2]","first spark app",conf);
JavaRDD<String> lines = sc.textFile("C:\\data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
System.out.println(totalLength);
}

}

  • 报错:

    1
    2
    3
    4
    Error:(16, 13) java: 无法访问scala.Cloneable
    找不到scala.Cloneable的类文件
    Error:(19, 45) java: 无法访问scala.Serializable
    找不到scala.Serializable的类文件
  • 解决:

    增加依赖包:spark-assembly-1.6.1-hadoop2.6.0
  • 报错:

    1
    Exception in thread "main" org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://HADOOP-MASTER-153:9000/user/xuyifei01/input/access.log
  • 解决:

    模拟设置用户

    1
    2
    System.setProperty("HADOOP_USER_NAME","hadoop");
    System.setProperty("user.name","hadoop");

最终Java代码:

1
2
3
4
5
6
7
8
9
10
11
12
public class RowCounter {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("spark://HADOOP-MASTER-153:7077").setAppName("统计行数");
conf.setJars(new String[]{"F:\\wftsync-hbase\\out\\artifacts\\wftsync_jar\\wftsync.jar"});
System.setProperty("HADOOP_USER_NAME","hadoop");
System.setProperty("user.name","hadoop");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("input/access.log");
int totalLength = lines.map(s -> s.length()).reduce((a, b) -> a + b);
System.out.println(totalLength);
}
}

Scala

IDEA需要安装Scala插件。
scala代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object RowCounter {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("统计行数")
.setMaster("spark://HADOOP-MASTER-153:7077")
.setJars(List("F:\\wftsync-spark\\out\\artifacts\\wftsync_spark_jar\\wftsync-spark.jar"))
System.setProperty("HADOOP_USER_NAME","hadoop") //模拟设置用户防止权限问题
System.setProperty("user.name","hadoop")
val spark = new SparkContext(conf)
val rdd= spark.textFile("C:\\data.txt")
val count = rdd.map(line => line.length).reduce((a,b)=>a+b)
println("lines " + count )
spark.stop()
}
}

  • 报错:

    1
    Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
  • 解决

    差了下错误,应该是scala版本造成的,将scala版本会退到2.10.6问题解决。

  • warn:

    1
    WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
  • 解决:

    spark内存要求比较高,应该是运行的时候资源不足了。

报错:

1
16/03/30 15:59:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

解决:

这个问题主要是jre目录下缺少了libhadoop.so和libsnappy.so两个文件。具体是,spark-shell依赖的是scala,scala依赖的是JAVA_HOME下的jdk,libhadoop.so和libsnappy.so两个文件应该放到JAVA_HOME/jre/lib/amd64下面。要注意的是要知道真正依赖到的JAVA_HOME是哪一个,把两个.so放对地方。这两个so:libhadoop.so和libsnappy.so。前一个so可以在HADOOP_HOME下找到,比如hadoop\lib\native\Linux-amd64-64。第二个libsnappy.so需要下载一个snappy-1.1.0.tar.gz,然后./configure,make编译出来。snappy是google的一个压缩算法,在hadoop jira下https://issues.apache.org/jira/browse/HADOOP-7206记录了这次集成。
单替换了hadoop的core包后,可能还会出一些WARN或者ERROR的提示,主要牵扯到的是hadoop别的包的一些兼容啊,版本提升的问题。具体问题具体再解决吧。

参考链接