2017-01-04 23 views
2

I json scala Bibliothek wurde mit einem json von einem lokalen Laufwerk in Funken Job zu analysieren:Parsing json in Funken

val requestJson=JSON.parseFull(Source.fromFile("c:/data/request.json").mkString) 
    val mainJson=requestJson.get.asInstanceOf[Map[String,Any]].get("Request").get.asInstanceOf[Map[String,Any]] 
    val currency=mainJson.get("currency").get.asInstanceOf[String] 

Aber wenn ich versuche, mit dem Hinweis auf hdfs Speicherort der Datei denselben Parser verwenden tut es Arbeit :

val requestJson=JSON.parseFull(Source.fromFile("hdfs://url/user/request.json").mkString) 

und gibt mir eine Fehlermeldung:

java.io.FileNotFoundException: hdfs:/localhost/user/request.json (No such file or directory) 
    at java.io.FileInputStream.open0(Native Method) 
    at java.io.FileInputStream.open(FileInputStream.java:195) 
    at java.io.FileInputStream.<init>(FileInputStream.java:138) 
    at scala.io.Source$.fromFile(Source.scala:91) 
    at scala.io.Source$.fromFile(Source.scala:76) 
    at scala.io.Source$.fromFile(Source.scala:54) 
    ... 128 elided 

Wie ich Json.parseFull Bibliothek verwenden können Daten her bekommen m hdfs Speicherort?

Dank

+0

Sie sollten hdfs' Lage wie diese 'hdfs provide': // cluster/path/to/file'' oder geben Sie einfach einen Verzeichnisnamen wie '/ path/to/file /'. Plz versuchen und lassen Sie mich wissen, ich werde ans. –

+0

Ja, ich habe versucht, den hdfs-Pfad zu Source.fromFile api, aber funktioniert nicht – baiduXiu

+0

Könnten Sie in der Lage, Fehlerprotokoll zu buchen? –

Antwort

1

Funken eine eingebaute Unterstützung für JSON Dokumente Parsen hat, die in spark-sql_${scala.version} Glas zur Verfügung stehen wird.

In Funken 2.0+:

import org.apache.spark.sql.SparkSession 

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate 

val df = spark.read.format("json").json("json/file/location/in/hdfs") 

df.show() 

mit df Objekt können Sie alle unterstützten SQL-Operationen auf sie tun und es ist Datenverarbeitung unter den Knoten verteilt wird, während requestJson wird in einzelnen berechnet werden nur Maschine.

Abhängigkeiten Maven

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.11</artifactId> 
    <version>2.0.0</version> 
</dependency> 

Edit: (as per comment to read file from hdfs)

val hdfs = org.apache.hadoop.fs.FileSystem.get(
      new java.net.URI("hdfs://ITS-Hadoop10:9000/"), 
      new org.apache.hadoop.conf.Configuration() 
      ) 
val path=new Path("/user/zhc/"+x+"/") 
val t=hdfs.listStatus(path) 
val in =hdfs.open(t(0).getPath) 
val reader = new BufferedReader(new InputStreamReader(in)) 
var l=reader.readLine() 

code credits: from another SO question

Maven dependencies:

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-hdfs</artifactId> 
    <version>2.7.2</version> <!-- you can change this as per your hadoop version --> 
</dependency> 
+0

die JSON-Datei ist nur ein paar kbs, so möchte ich in diesem Fall die Verwendung von Dataframe vermeiden und analysieren die JSON auf Dirver und nicht auf alle Arbeiter – baiduXiu

+0

wir Arbeiter einschränken können zu einem nach Änderungscode als 'master (" local [1] ")'. Wenn Sie im lokalen Modus arbeiten, befinden sich Arbeiter und Fahrer auf demselben Rechner. – mrsrinivas

+0

können Sie 'df.collect()' verwenden, um ganze Daten zum Treiber zu bekommen. – mrsrinivas

1

Es ist viel einfacher, in Funken 2,0

val df = spark.read.json("json/file/location/in/hdfs") 
df.show() 
+0

es erzeugt einen map reduce job für diesen .für einen kleinen json ist es ein overkill und daher wollte ich das mit scala ausführen – baiduXiu