2017-06-16 3 views
1

Ich bin neu in Apache Spark und versuchen, eine Datei aus dem lokalen Dateisystem zu laden. Ich folge Hadoop - The Definitve Guide Book.Apache Spark: Laden Sie Datei von lokalen anstelle von HDFS und Laden lokaler Datei mit IllegalArguementException

Hier sind die Umgebungsvariablen, die ich gesetzt haben:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home 
export HADOOP_HOME=/Users/bng/Documents/hadoop-2.6.4 
export PATH=$PATH:$HADOOP_HOME/bin 
export PATH=$PATH:$HADOOP_HOME/sbin 
export HADOOP_PREFIX=/Users/bng/Documents/hadoop-2.6.4 
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop 
export HADOOP_MAPRED_HOME=$HADOOP_HOME 
export HADOOP_COMMON_HOME=$HADOOP_HOME 
export HADOOP_HDFS_HOME=$HADOOP_HOME 
export YARN_HOME=$HADOOP_HOME 

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native 
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib" 
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar 

export PATH=/usr/local/mysql/bin:/Users/bng/Documents/mongodb/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH 
export GOOGLE_APPLICATION_CREDENTIALS=/Users/bng/Downloads/googleCredentials 

export FLUME_HOME=/Users/bng/Documents/apache-flume-1.7.0-bin 
export PATH=$PATH:$FLUME_HOME/bin 

export SQOOP_HOME=/Users/bng/Documents/sqoop-1.4.6.bin__hadoop-2.0.4-alpha 
export PATH=$PATH:$SQOOP_HOME/bin 

export PIG_HOME=/Users/bng/Documents/pig-0.16.0 
export PATH=$PATH:$PIG_HOME/bin 

export HIVE_HOME=/Users/bng/Documents/apache-hive-1.2.2-bin 
export PATH=$PATH:$HIVE_HOME/bin 

export SPARK_HOME=/Users/bng/Documents/spark-1.6.3-bin-hadoop2.6 
export PATH=$PATH:$SPARK_HOME/bin 

Und hier sind die Befehle, die ich ausführen bin:

val lines = sc.textFile("Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt"); 
val records = lines.map(_.split("\t")); 
val filters = records.filter(rec => (rec(1) != "9999" && rec(2).matches("[01459]"))); 
val tuples = filters.map(rec => (rec(0).toInt, rec(1).toInt)); 
val maxTemps = tuples.reduceByKey((a,b) => Math.max(a,b)); 
maxTemps.foreach(println(_)); 

Die oben sc.textFile Befehle haben den Weg von meinem lokalen Dateisystem, aber einige, wie es auf die hdfs zeigt, für die ich den folgenden Fehler erhielt:

Also dachte ich, es würde auf mein hdfs-Dateisystem verweisen, also habe ich manuell eine Datei in hdfs im Verzeichnis "/ user/hive/warehouse/records" hinzugefügt und versucht, folgendes auszuführen: val lines = sc.textFile ("/ Benutzer/Bienenstock/Lager/Aufzeichnungen");

Und alles hat gut funktioniert.

Aber ich wollte die Datei vom lokalen System laden, so dass nach der Suche auf ich fand, dass ich brauche das hinzufügen „file: //“ uri, so habe ich versucht, den unter Befehl:

val localLines = sc.textFile("file://Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt"); 
localLines.foreach(println(_)); 

aber noch, ich habe die folgende Ausnahme:

java.lang.IllegalArgumentException: Wrong FS: file://Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt 
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645) 
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:80) 
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:529) 
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747) 
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524) 
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) 
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57) 
at org.apache.hadoop.fs.Globber.glob(Globber.java:252) 
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644) 
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257) 
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202) 
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
at scala.Option.getOrElse(Option.scala:120) 
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) 
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) 
at scala.Option.getOrElse(Option.scala:120) 
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:41) 
at $iwC$$iwC$$iwC.<init>(<console>:43) 
at $iwC$$iwC.<init>(<console>:45) 
at $iwC.<init>(<console>:47) 
at <init>(<console>:49) 
at .<init>(<console>:53) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

Bitte legen nahe, was das Problem hier sein könnte ...

Antwort

3

ich den Haken bekam, das Problem mit der war „file: //“ uri. Von "file: //" musste ich "file: ///" uri verwenden und alles hat gut funktioniert.

Intead von:

val localLines = sc.textFile("file://Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt"); 

Ich brauchte das folgende zu verwenden:

val localLines = sc.textFile("file:///Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt"); 
1

Sie haben soeben

val localLines = sc.textFile("/Users/bng/Documents/hContent/input/ncdc/micro-tab/sample.txt"); 

ohne file:// am Anfang

+0

ich das denken können wird auf "hd" zeigen fs'. 'file: //' ist notwendig in neueren Spark-Versionen (1.6+ vielleicht) – philantrovert

+0

@philantrovert es ist nicht wahr. Ich verwende nur absoluten Pfad, um Dateien in Spark 2.0 + ohne Probleme zu zeigen –

+0

überprüfen Sie auch [Dokumentation] (https://spark.apache.org/docs/latest/programming-guide.html#external-datasets) –

Verwandte Themen