Ich habe eine Scala-Liste: fileNames, die die Dateinamen in einem lokalen Verzeichnis enthält. Ex:Fehler beim Verschieben einer Datei in HDFS von Local mit Scala
fileNames(2)
res0: String = file:///tmp/audits/xx_user.log
Ich versuche, eine Datei aus der Liste zu verschieben: Dateinamen von der lokalen bis HDFS Scala mit. Um das zu tun, befolgte ich die folgenden Schritte:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.commons.io.IOUtils;
val hadoopconf = new Configuration();
hadoopconf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
val fs = FileSystem.get(hadoopconf);
val outFileStream = fs.create(new Path("hdfs://mydev/user/devusr/testfolder"))
Der Code funktioniert bis hier. Wenn ich versuche, die input hinzuzufügen, erhalte ich Fehlermeldung wie folgt:
val inStream = fs.open(new Path(fileNames(2)))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev
Ich habe auch versucht, indem Sie die Dateinamen direkt spezifizieren und das Ergebnis ist dasselbe:
val inStream = fs.open(new Path("file:///tmp/audits/xx_user.log"))
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev
Aber wenn ich zu laden versuchen die Datei direkt in Funken, funktioniert es gut:
val localToSpark = spark.read.text(fileNames(2))
localToSpark: org.apache.spark.sql.DataFrame = [value: string]
localToSpark.collect
res1: Array[org.apache.spark.sql.Row] = Array([[Wed Dec 20 06:18:02 UTC 2017] INFO: ], [*********************************************************************************************************], [ ], [[Wed Dec 20 06:18:02 UTC 2017] INFO: Diagnostic log for xx_user.]
jemand kann mir sagen, was der Fehler ist, dass ich in diesem Punkt tue: val inStream = fs.open(new Path(fileNames(2)))
f oder was ich bekomme den Fehler.
Wenn Sie Spark haben, warum schreiben Sie nicht einfach die Datei von dort in HDFS? –