2017-06-01 2 views
3

Hat jemand einen Millisekunden-Zeitstempel mit from_json in Spark 2+ analysiert? Wie ist es gemacht?Parsen von Epochen Millisekunden von JSON mit Spark 2

So Spark changed die TimestampType zu parse epoch numerische Werte als in Sekunden statt Millis in v2.

Meine Eingabe ist ein Bienenstock Tabelle, die eine json formatierten String in einer Spalte hat, die ich so zu analysieren, ich versuche:

val spark = SparkSession 
    .builder 
    .appName("Problematic Timestamps") 
    .enableHiveSupport() 
    .getOrCreate() 
import spark.implicits._ 
val schema = StructType(
    StructField("categoryId", LongType) :: 
    StructField("cleared", BooleanType) :: 
    StructField("dataVersion", LongType) :: 
    StructField("details", DataTypes.createArrayType(StringType)) :: 
    … 
    StructField("timestamp", TimestampType) :: 
    StructField("version", StringType) :: Nil 
) 
val item_parsed = 
    spark.sql("select * FROM source.jsonStrInOrc") 
    .select('itemid, 'locale, 
      from_json('internalitem, schema) 
       as 'internalitem, 
      'version, 'createdat, 'modifiedat) 
val item_flattened = item_parsed 
    .select('itemid, 'locale, 
      $"internalitem.*", 
      'version as'outer_version, 'createdat, 'modifiedat) 

Diese eine Zeile mit einer Spalte analysieren kann enthalten:

{ "Zeitstempel": 1494790299549, "gelöscht": false, "Version": "V1", "dataVersion": 2 "categoryId": 2641, "Details": [], ...}

Und das gibt mir timestamp Felder wie 49338-01-08 00:39:09.0 von einem Wert 1494790299549 die ich eher als gelesen hatte: 2017-05-14 19:31:39.549

Jetzt habe ich das Schema für Zeitstempel gesetzt könnte eine lang sein, dann wird der Wert dividieren von 1000 und Umwandlung in einen Zeitstempel, aber dann bin ich d haben 2017-05-14 19:31:39.000 nicht 2017-05-14 19:31:39.549. Ich habe Probleme, herauszufinden, wie ich konnte, entweder:

  • from_json An einen Millisekunden Zeitstempel zu analysieren (vielleicht durch die TimestampType in irgendeiner Weise Subklassen im Schema zu verwenden)
  • Verwenden Sie ein LongType in der Schema und Cast das zu einem Timestamp, der die Millisekunden bewahrt.

Antwort

2

Jetzt konnte ich das Schema für Zeitstempel gesetzt lange zu sein, dann wird der Wert dividieren von 1000

Eigentlich genau das, was Sie brauchen, nur die Typen rechts halten. Angenommen, Sie haben nur Longtimestamp Feld:

val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp")) 
// df: org.apache.spark.sql.DataFrame = [timestamp: bigint] 

Wenn Sie mit 1000 teilen:

val inSeconds = df.withColumn("timestamp_seconds", $"timestamp"/1000) 
// org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double] 

Sie Zeitstempel in Sekunden als Doppel erhalten (beachten Sie, dass diese SQL ist, nicht Scala Verhalten).

Alles was übrig bleibt, ist cast:

inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false) 
// +-----------------------+ 
// |timestamp_seconds  | 
// +-----------------------+ 
// |2017-05-14 21:31:39.549| 
// +-----------------------+ 
+0

Oh, also wird die Division nicht in eine andere Länge gerundet? Groß! – dlamblin

-1

Ich fand, dass die Teilung in der Auswahl zu tun versuchen, und dann habe Casting mir nicht sauber aussehen, obwohl es ein absolut gültige Methode ist. Ich entschied mich für eine UDF, die eine java.sql.timestamp verwendet, die tatsächlich in Epoch Millisekunden angegeben ist.

import java.sql.Timestamp 

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.functions.{explode, from_json, udf} 
import org.apache.spark.sql.types. 
{BooleanType, DataTypes, IntegerType, LongType, 
StringType, StructField, StructType, TimestampType} 

val tsmillis = udf { t: Long => new Timestamp (t) } 

val spark = SparkSession 
    .builder 
    .appName("Problematic Timestamps") 
    .enableHiveSupport() 
    .getOrCreate() 
import spark.implicits._ 
val schema = StructType(
    StructField("categoryId", LongType) :: 
    StructField("cleared", BooleanType) :: 
    StructField("dataVersion", LongType) :: 
    StructField("details", DataTypes.createArrayType(StringType)) :: 
    … 
    StructField("timestamp", LongType) :: 
    StructField("version", StringType) :: Nil 
) 
val item_parsed = 
    spark.sql("select * FROM source.jsonStrInOrc") 
    .select('itemid, 'locale, 
      from_json('internalitem, schema) 
       as 'internalitem, 
      'version, 'createdat, 'modifiedat) 
val item_flattened = item_parsed 
    .select('itemid, 'locale, 
      $"internalitem.categoryId", $"internalitem.cleared", 
      $"internalitem.dataVersion", $"internalitem.details", 
      tsmillis($"internalitem.timestamp"), 
      $"internalitem.version", 
      'version as'outer_version, 'createdat, 'modifiedat) 

Sehen Sie, wie das in der Auswahl ist. Ich denke, es wäre lohnend, einen Leistungstest zu machen, um zu sehen, ob withcolumn Division und Gießen schneller ist als die udf.