Ich benutze die neue Apache Spark Version 1.4.0 Daten-Frames-API, um Informationen aus Twitter Status JSON zu extrahieren, vor allem auf die Entities Object konzentriert - die relevanten Teil auf diese Frage wird im folgenden gezeigt:So extrahieren Sie komplexe JSON-Strukturen mit Apache Spark 1.4.0 Daten Frames
{
...
...
"entities": {
"hashtags": [],
"trends": [],
"urls": [],
"user_mentions": [
{
"screen_name": "linobocchini",
"name": "Lino Bocchini",
"id": 187356243,
"id_str": "187356243",
"indices": [ 3, 16 ]
},
{
"screen_name": "jeanwyllys_real",
"name": "Jean Wyllys",
"id": 111123176,
"id_str": "111123176",
"indices": [ 79, 95 ]
}
],
"symbols": []
},
...
...
}
Es gibt mehrere Beispiele, wie Extraktion von Informationen aus Primitiven Typen wie string
, integer
, etc. - aber ich kann nichts finden, wie man diese Art von Komplex verarbeiten Strukturen.
ich den Code versucht, unten, aber es ist immer noch nicht funktioniert, wirft es eine Ausnahme
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val tweets = sqlContext.read.json("tweets.json")
// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)
import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)
val mentions = tweets.select("entities.user_mentions").
filter(!isEmpty($"user_mentions")).
explode($"user_mentions") {
case Row(arr: Array[Row]) => arr.map { elem =>
UserMention(
elem.getAs[Long]("id"),
elem.getAs[String]("is_str"),
elem.getAs[Array[Long]]("indices"),
elem.getAs[String]("name"),
elem.getAs[String]("screen_name"))
}
}
mentions.first
Ausnahme, wenn ich versuche mentions.first
zu nennen:
scala> mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
Was ist hier falsch? Ich verstehe, dass es mit den Typen zusammenhängt, aber ich konnte es noch nicht herausfinden.
Als zusätzlicher Kontext abgebildet die Struktur automatisch ist:
scala> mentions.printSchema
root
|-- user_mentions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- id_str: string (nullable = true)
| | |-- indices: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- screen_name: string (nullable = true)
Anmerkung 1: Ich weiß es möglich ist, diese HiveQL
mit zu lösen, aber ich mag noch einmal Daten-Frames verwenden, es gibt so viel Momentum um es herum.
SELECT explode(entities.user_mentions) as mentions
FROM tweets
Anmerkung 2: die UDFval isEmpty = udf((value: List[Any]) => value.isEmpty)
eine hässliche Hack ist, und ich bin hier fehlt etwas, aber es war der einzige Weg, ich
Ich denke, Ihr 'Fall Row (arr: Array [Row])' nicht Ihre Eingabe nicht überein. – elmalto
Hallo @elmalto, habe ich sowohl "List" und "Array" versucht, aber so oder so bekomme ich den gleichen Fehler. – arjones