Die folgenden mein Schema DatenrahmenWie kann ich die verschachtelten Felder in dem Datenrahmen .proto zuzugreifen, ScalaPB
root
|-- name: string (nullable = true)
|-- addresses: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- street: string (nullable = true)
| | |-- city: string (nullable = true)
Ich mag Ausgabenamen und Stadt. Das Folgende ist meine Spark-Streaming-App, die Namen und Adressen ausgibt, aber ich möchte Namen und Städte in der Ausgabe. Schätzen Sie Ihre Hilfe. Vielen Dank.
object PersonConsumer {
import org.apache.spark.sql.{SQLContext, SparkSession}
import com.example.protos.demo._
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","person").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name", $"addresses")
ds2.printSchema()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
}