2016-10-31 14 views
3

Ich bin ein verschachtelter komplexer Json und unten ist das Schema dafür.Spark für Json Daten

root 
|-- businessEntity: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- payGroup: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- reportingPeriod: struct (nullable = true) 
| | | | | |-- worker: array (nullable = true) 
| | | | | | |-- element: struct (containsNull = true) 
| | | | | | | |-- category: string (nullable = true) 
| | | | | | | |-- person: struct (nullable = true) 
| | | | | | | |-- tax: array (nullable = true) 
| | | | | | | | |-- element: struct (containsNull = true) 
| | | | | | | | | |-- code: string (nullable = true) 
| | | | | | | | | |-- qtdAmount: double (nullable = true) 
| | | | | | | | | |-- ytdAmount: double (nullable = 

Meine Forderung ist ein hashmap mit Code mit qtdAmount als Schlüssel und Wert von qtdAmount als Wert verkettet zu erstellen. Map.put (Code + "qtdAmount", qtdAmount). Wie kann ich das mit Funken machen?

Ich habe versucht mit unten Shell Befehle.

import org.apache.spark.sql._ 
val sqlcontext = new SQLContext(sc) 
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json") 
val spark = SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate() 
cdm.createOrReplaceTempView("CDM") 
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show() 
val address = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address from CDM as address") 
val worker = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker from CDM") 
val tax = spark.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM") 
val tax = sqlcontext.sql("SELECT businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM") 
tax.select("tax.code") 


val codes = tax.select(expode(tax("code")) 
scala> val codes = tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount"))) 

Ich versuche, alle Codes und qtdAmount in eine Karte zu bekommen. Aber ich verstehe es nicht. Die Verwendung mehrerer explodierender Anweisungen für einen einzelnen DF erzeugt ein kartesisches Produkt der Elemente.

Könnte jemand bitte helfen, wie man den JSON von diesem viel Komplex in Funken parsen kann.

Antwort

1

Sie können code und qtyAmount auf diese Weise erhalten.

import sqlcontext.implicits._ 

    cdm.select(
     $"businessEntity.element.payGroup.element.reportingPeriod.worker.element.tax.element.code".as("code"), 
     $"businessEntity.element.payGroup.element.reportingPeriod.worker.element.tax.element.qtdAmount".as("qtdAmount") 
    ).show 

Ausführliche Informationen finden Sie in this