0

brauche Hilfe bei meinem ersten Versuch, json auf Kafka zu parsen, um strukturiertes Streaming auszulösen.Wie konvertiere ich Sparks-Streaming verschachtelte JSON kommt auf Kafka zu flachen Datenrahmen?

Ich habe Schwierigkeiten, den eingehenden JSON zu konvertieren und ihn in einen flachen Datenrahmen für die weitere Verarbeitung zu konvertieren.

ist mein Eingang json

[ 
    { "siteId": "30:47:47:BE:16:8F", "siteData": 
     [ 
      { "dataseries": "trend-255", "values": 
       [ 
        {"ts": 1502715600, "value": 35.74 }, 
        {"ts": 1502715660, "value": 35.65 }, 
        {"ts": 1502715720, "value": 35.58 }, 
        {"ts": 1502715780, "value": 35.55 } 
       ] 
      }, 
      { "dataseries": "trend-256", "values": 
       [ 
        {"ts": 1502715840, "value": 18.45 }, 
        {"ts": 1502715900, "value": 18.35 }, 
        {"ts": 1502715960, "value": 18.32 } 
       ] 
      } 
     ] 
    }, 
    { "siteId": "30:47:47:BE:16:FF", "siteData": 
     [ 
      { "dataseries": "trend-255", "values": 
       [ 
        {"ts": 1502715600, "value": 35.74 }, 
        {"ts": 1502715660, "value": 35.65 }, 
        {"ts": 1502715720, "value": 35.58 }, 
        {"ts": 1502715780, "value": 35.55 } 
       ] 
      }, 
      { "dataseries": "trend-256", "values": 
       [ 
        {"ts": 1502715840, "value": 18.45 }, 
        {"ts": 1502715900, "value": 18.35 }, 
        {"ts": 1502715960, "value": 18.32 } 
       ] 
      } 
     ] 
    } 
] 

Spark-Schema

data1_spark_schema = ArrayType(
StructType([ 
    StructField("siteId", StringType(), False), 
    StructField("siteData", ArrayType(StructType([ 
    StructField("dataseries", StringType(), False), 
    StructField("values", ArrayType(StructType([ 
     StructField("ts", IntegerType(), False), 
     StructField("value", StringType(), False) 
    ]), False), False) 
    ]), False), False) 
]), False 
) 

Meine sehr einfachen Code ist:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 

from config.general import kafka_instance 
from config.general import topic 
from schemas.schema import data1_spark_schema 

spark = SparkSession \ 
      .builder \ 
      .appName("Structured_BMS_Feed") \ 
      .getOrCreate() 

stream = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", kafka_instance) \ 
     .option("subscribe", topic) \ 
     .option("startingOffsets", "latest") \ 
     .option("max.poll.records", 100) \ 
     .option("failOnDataLoss", False) \ 
     .load() 

stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1") \ 
         .select(from_json("bms_data1", data1_spark_schema).alias("bms_data1")) 

sites = stream_records.select(explode("bms_data1").alias("site")) \ 
         .select("site.*") 

sites.printSchema() 

stream_debug = sites.writeStream \ 
          .outputMode("append") \ 
          .format("console") \ 
          .option("numRows", 20) \ 
          .option("truncate", False) \ 
          .start() 


stream_debug.awaitTermination() 

Wenn ich laufen Schema gedruckt wird wie folgt den Code I:

root 
|-- siteId: string (nullable = false) 
|-- siteData: array (nullable = false) 
| |-- element: struct (containsNull = false) 
| | |-- dataseries: string (nullable = false) 
| | |-- values: array (nullable = false) 
| | | |-- element: struct (containsNull = false) 
| | | | |-- ts: integer (nullable = false) 
| | | | |-- value: string (nullable = false) 

Ist es möglich, dieses Schema in einer Weise, in der ich alle Felder in einem flachen Datenframe anstelle von verschachtelten Json erhalten. Also für jeden ts und Wert sollte es mir eine Zeile mit seinen Elterndaten und Site-ID geben.

+0

Scheinen doppelte Frage: [siehe this]: (https://stackoverflow.com/questions/35027966/elegant-json-flatten-in-spark) –

Antwort

0

Beantworten meiner eigenen Frage. Ich schaffte es mit folgenden Zeilen zu glätten:

sites_flat = stream_records.select(explode("bms_data1").alias("site")) \ 
          .select("site.siteId", explode("site.siteData").alias("siteData")) \ 
          .select("siteId", "siteData.dataseries", explode("siteData.values").alias("values")) \ 
          .select("siteId", "dataseries", "values.*") 
Verwandte Themen