2015-06-24 2 views
6

Ich benutze die neue Apache Spark Version 1.4.0 Daten-Frames-API, um Informationen aus Twitter Status JSON zu extrahieren, vor allem auf die Entities Object konzentriert - die relevanten Teil auf diese Frage wird im folgenden gezeigt:So extrahieren Sie komplexe JSON-Strukturen mit Apache Spark 1.4.0 Daten Frames

{ 
    ... 
    ... 
    "entities": { 
    "hashtags": [], 
    "trends": [], 
    "urls": [], 
    "user_mentions": [ 
     { 
     "screen_name": "linobocchini", 
     "name": "Lino Bocchini", 
     "id": 187356243, 
     "id_str": "187356243", 
     "indices": [ 3, 16 ] 
     }, 
     { 
     "screen_name": "jeanwyllys_real", 
     "name": "Jean Wyllys", 
     "id": 111123176, 
     "id_str": "111123176", 
     "indices": [ 79, 95 ] 
     } 
    ], 
    "symbols": [] 
    }, 
    ... 
    ... 
} 

Es gibt mehrere Beispiele, wie Extraktion von Informationen aus Primitiven Typen wie string, integer, etc. - aber ich kann nichts finden, wie man diese Art von Komplex verarbeiten Strukturen.

ich den Code versucht, unten, aber es ist immer noch nicht funktioniert, wirft es eine Ausnahme

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 

val tweets = sqlContext.read.json("tweets.json") 

// this function is just to filter empty entities.user_mentions[] nodes 
// some tweets doesn't contains any mentions 
import org.apache.spark.sql.functions.udf 
val isEmpty = udf((value: List[Any]) => value.isEmpty) 

import org.apache.spark.sql._ 
import sqlContext.implicits._ 
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String) 

val mentions = tweets.select("entities.user_mentions"). 
    filter(!isEmpty($"user_mentions")). 
    explode($"user_mentions") { 
    case Row(arr: Array[Row]) => arr.map { elem => 
    UserMention(
     elem.getAs[Long]("id"), 
     elem.getAs[String]("is_str"), 
     elem.getAs[Array[Long]]("indices"), 
     elem.getAs[String]("name"), 
     elem.getAs[String]("screen_name")) 
    } 
} 

mentions.first 

Ausnahme, wenn ich versuche mentions.first zu nennen:

scala>  mentions.first 
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8) 
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55) 
    at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81) 

Was ist hier falsch? Ich verstehe, dass es mit den Typen zusammenhängt, aber ich konnte es noch nicht herausfinden.

Als zusätzlicher Kontext abgebildet die Struktur automatisch ist:

scala> mentions.printSchema 
root 
|-- user_mentions: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- id_str: string (nullable = true) 
| | |-- indices: array (nullable = true) 
| | | |-- element: long (containsNull = true) 
| | |-- name: string (nullable = true) 
| | |-- screen_name: string (nullable = true) 

Anmerkung 1: Ich weiß es möglich ist, diese HiveQL mit zu lösen, aber ich mag noch einmal Daten-Frames verwenden, es gibt so viel Momentum um es herum.

SELECT explode(entities.user_mentions) as mentions 
FROM tweets 

Anmerkung 2: die UDFval isEmpty = udf((value: List[Any]) => value.isEmpty) eine hässliche Hack ist, und ich bin hier fehlt etwas, aber es war der einzige Weg, ich

+0

Ich denke, Ihr 'Fall Row (arr: Array [Row])' nicht Ihre Eingabe nicht überein. – elmalto

+0

Hallo @elmalto, habe ich sowohl "List" und "Array" versucht, aber so oder so bekomme ich den gleichen Fehler. – arjones

Antwort

4

hier eine NPE kam zu vermeiden, ist eine Lösung, funktioniert, mit nur einem kleinen Hack.

Die Hauptidee ist durch die Deklaration einer Liste [Zeichenfolge] um den Typ Problem zu arbeiten, anstatt List [Row]:

val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m} 

Diese eine zweite Spalte „erwähnen“ vom Typ „Struct“ genannt erstellt:

|   entities|    mention| 
+--------------------+--------------------+ 
|[List(),List(),Li...|[187356243,187356...| 
|[List(),List(),Li...|[111123176,111123...| 

Jetzt machen Sie eine Karte(), um die Felder innerhalb der Erwähnung zu extrahieren. Die getStruct (1) Aufruf wird der Wert in Spalte 1 jeder Reihe:

case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String) 
val mentionsRdd = mentions.map(
    row => 
    { 
     val mention = row.getStruct(1) 
     Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4)) 
    } 
) 

Und die RDD wieder in einen Datenrahmen konvertieren:

val mentionsDf = mentionsRdd.toDF() 

Dort gehen Sie!

|  id| id_str|  indices|   name| screen_name| 
+---------+---------+------------+-------------+---------------+ 
|187356243|187356243| List(3, 16)|Lino Bocchini| linobocchini| 
|111123176|111123176|List(79, 95)| Jean Wyllys|jeanwyllys_real| 
+0

Dank Xinh Huynh, meine Sorge mit diesem Hack, ist, dass ich durch den ganzen Dataset gehe 'Row.toString()' vor dem Extrahieren der Elemente, ich habe keine spezifischen Benchmarks, aber scheint, wir würden viel verschwenden der Maschinenzeit, um diesen Schritt zu machen. Dies ist der einzige Grund, warum ich deine Frage nicht für die richtige halte! – arjones

-1

versuchen dies zu tun:

case Row(arr: Seq[Row]) => arr.map { elem => 
+1

Bitte fügen Sie einige Kommentare zu Ihrer Lösung hinzu, warum und wie das Problem gelöst wird –

Verwandte Themen