2017-12-20 3 views
0

Ich habe eine Scala-Liste: fileNames, die die Dateinamen in einem lokalen Verzeichnis enthält. Ex:Fehler beim Verschieben einer Datei in HDFS von Local mit Scala

fileNames(2) 
res0: String = file:///tmp/audits/xx_user.log 

Ich versuche, eine Datei aus der Liste zu verschieben: Dateinamen von der lokalen bis HDFS Scala mit. Um das zu tun, befolgte ich die folgenden Schritte:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 

import org.apache.commons.io.IOUtils; 
val hadoopconf = new Configuration(); 
hadoopconf.addResource(new Path("/etc/hadoop/conf/core-site.xml")); 
val fs = FileSystem.get(hadoopconf); 
val outFileStream = fs.create(new Path("hdfs://mydev/user/devusr/testfolder")) 

Der Code funktioniert bis hier. Wenn ich versuche, die input hinzuzufügen, erhalte ich Fehlermeldung wie folgt:

val inStream = fs.open(new Path(fileNames(2))) 
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev 

Ich habe auch versucht, indem Sie die Dateinamen direkt spezifizieren und das Ergebnis ist dasselbe:

val inStream = fs.open(new Path("file:///tmp/audits/xx_user.log")) 
java.lang.IllegalArgumentException: Wrong FS: file:/tmp/audits/xx_user.log, expected: hdfs://mergedev 

Aber wenn ich zu laden versuchen die Datei direkt in Funken, funktioniert es gut:

val localToSpark = spark.read.text(fileNames(2)) 
localToSpark: org.apache.spark.sql.DataFrame = [value: string] 
localToSpark.collect 
res1: Array[org.apache.spark.sql.Row] = Array([[Wed Dec 20 06:18:02 UTC 2017] INFO: ], [*********************************************************************************************************], [ ], [[Wed Dec 20 06:18:02 UTC 2017] INFO: Diagnostic log for xx_user.] 

jemand kann mir sagen, was der Fehler ist, dass ich in diesem Punkt tue: val inStream = fs.open(new Path(fileNames(2))) f oder was ich bekomme den Fehler.

+0

Wenn Sie Spark haben, warum schreiben Sie nicht einfach die Datei von dort in HDFS? –

Antwort

2

Bei kleinen Dateien ist copyFromLocalFile() ausreichend:

fs.copyFromLocalFile(new Path(localFileName), new Path(hdfsFileName)) 

Für große Dateien, es effizienter ist Apache Commons-IO zu verwenden:

IOUtils.copyLarge(
    new FileInputStream(new File(localFileName)), fs.create(new Path(hdfsFileName))) 

Beachten Sie, dass die lokale Datei Name sollte nicht das Protokoll enthalten (also keine file:/// drin)

-1

Sie müssen einen lokalen InputStream in diesen OutputStream kopieren. Und Sie brauchen fs nicht, um den lokalen Stream zu erstellen.

import org.apache.commons.io.IOUtils; 

val inStream = new BufferedInputStream(new FileInputStream(fileNames(2))); 
IOUtils.copyBytes(inStream, outFileStream, 4096, true); 
+0

Es heißt: "Keine solche Datei oder Verzeichnis", wenn ich den Code versuchen val inStream = neue BufferedInputStream (neue FileInputStream ("file: ///tmp/audits/xx_user.log")) java.io.FileNotFoundException: Datei : /tmp/audits/xx_user.log (Keine solche Datei oder Verzeichnis). Die Liste enthält die Namen mit dem Wort "file: ///". Ex: file: ///tmp/hive_audits/xx_user.log " – Sidhartha

+0

Wenn ich den Pfad direkt im Code geben, heißt es copyBytes ist kein Mitglied des Objekts org.apache.commons.io.IOUtils: val inStream = neu BufferedInputStream (new Fileinputstream ("/ tmp/hive_audits/362650_logs/362675_xx_crm_user_diagnostic.log")) inStream: java.io.BufferedInputStream = [email protected] IOUtils.copyBytes (inStream, outFileStream, 4096, true) : 58: Fehler: Wert copyBytes ist kein Mitglied des Objekts org.apache.commons.io.IOUtils IOUtils.copyBytes (inStream, outFileStream, 4096, wahr) ^ – Sidhartha

Verwandte Themen