1

Die Frage hier ist, wie Sie Objekte eines UDF wiederverwenden, aber Race Conditions vermeiden?Spark 1.5.2 DataFramesUDF Race Condition bei Objektwiederverwendung vermeiden

Ich verwende ein UDF innerhalb meiner Spark-Anwendung und die Unit-Tests scheinen aufgrund einer Race-Bedingung nicht deterministisch. Manchmal passieren sie manchmal, sie scheitern ...

Ich habe versucht, die Wiederverwendung von Objekten zu erzwingen, indem ich sie erstelle und sie zur Effizienz an die UDF übergebe. Es scheint jedoch, dass separate "Tests", die denselben Funkenkontext und JVM teilen, diese Objekte verwenden und Fehler verursachen.

def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={ 
    sdfOut.format(sdfIn.parse(input)) 
    } 

    val datePartitionFormat = new SimpleDateFormat("yyyyMMdd") 
    val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd") 
    val validDateFormat = new SimpleDateFormat("yyyy-MM-dd") 

    val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat)) 
    val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat)) 

Manchmal, wenn ich meine Unit-Tests laufen bekomme ich folgende Fehler mit dieser Funktion:

17/01/13 11:45:45 ERROR Executor: Ausnahme in Aufgabe 0.0 in der Stufe 2.0 (TID 2) java.lang.NumberFormatException: mehrere Punkte bei sun.misc.FloatingDecimal.readJavaFormatString (FloatingDecimal.java:1890) bei sun.misc.FloatingDecimal.parseDouble (FloatingDecimal.java:110) um java.lang. Double.parseDouble (Double.java:538) unter java.text.DigitList.getDouble (DigitList. Java: 169) bei java.text.DecimalFormat.parse (DecimalFormat.java:2056) bei java.text.SimpleDateFormat.subParse (SimpleDateFormat.java:1867) bei java.text.SimpleDateFormat.parse (SimpleDateFormat.java: 1514) bei java.text.DateFormat.parse (DateFormat.java:364) bei com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF $ .reformatDate (mDnsPreviouslySeenDomainsStartOfDayDF.scala: 22)

ich benutze die Funktion wie folgt:

val df = df2 
    .filter(
    datediff(
     to_date(partitionToDateUDF($"dt")) 
     ,to_date(dTStampToDate($"d_last_seen")) 
    ) < 90 
) 

und auf Fehlersuche hat der Eingang „DF2“ gefunden zu sein:

+-----------+--------+-------------------------+--------------------------------+ 
|d_last_seen|  dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")| 
+-----------+--------+-------------------------+--------------------------------+ 
| 2016/11/02|20161102|2016-11-02    |2016-11-02      | 
| 2016/11/01|20161102|2016-11-02    |2016-11-01      | 
+-----------+--------+-------------------------+--------------------------------+ 

Ich benutze conf.setMaster („local [2]“), ist es, dass Funken benutzt Threads sein könnte und daher teilen die gleiche JVM beim Laufen lokal, jedoch würde dies nicht passieren, wenn die separaten Executoren ihre eigene JVM und somit ihre eigenen Instanziierungen der Objekte haben würden.

Antwort

2

SimpleDateFormat ist nicht Thread-sicher (siehe zum Beispiel Why is Java's SimpleDateFormat not thread-safe?). Das bedeutet, dass wenn Sie es in einer UDF verwenden (sogar in einem einzigen Spark-Job) Sie möglicherweise unerwartete Ergebnisse erhalten, weil Spark Ihre UDF in mehreren Aufgaben verwendet, die auf separaten Threads enden mit mehreren Threads, die darauf zugreifen die selbe Zeit. Dies gilt sowohl für den lokalen Modus als auch für tatsächlich verteilte Cluster - eine einzelne Kopie wird von mehreren Threads auf jedem Executor verwendet.

Um dies zu überwinden - verwenden Sie einfach einen anderen Formatierer, der ist Thread-sicher, z. Jodas .

+1

Danke Tzach, Ich würde hier die Antwort auf die allgemeine Frage hinzufügen, dass ist: Sie haben innerhalb Funken UDF sicher Thread als mehrere Aufgaben auf mehrere Threads auf jedem Testamentsvollstrecker laufen. Die Lösung, die mir Tzach bietet, ist Threadssafe. –