Die Frage hier ist, wie Sie Objekte eines UDF wiederverwenden, aber Race Conditions vermeiden?Spark 1.5.2 DataFramesUDF Race Condition bei Objektwiederverwendung vermeiden
Ich verwende ein UDF innerhalb meiner Spark-Anwendung und die Unit-Tests scheinen aufgrund einer Race-Bedingung nicht deterministisch. Manchmal passieren sie manchmal, sie scheitern ...
Ich habe versucht, die Wiederverwendung von Objekten zu erzwingen, indem ich sie erstelle und sie zur Effizienz an die UDF übergebe. Es scheint jedoch, dass separate "Tests", die denselben Funkenkontext und JVM teilen, diese Objekte verwenden und Fehler verursachen.
def reformatDate(input:String,sdfIn:SimpleDateFormat,sdfOut:SimpleDateFormat): String ={
sdfOut.format(sdfIn.parse(input))
}
val datePartitionFormat = new SimpleDateFormat("yyyyMMdd")
val dTStampFormat = new SimpleDateFormat("yyyy/MM/dd")
val validDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val partitionToDateUDF = udf(reformatDate(_:String,datePartitionFormat,validDateFormat))
val dTStampToDateUDF= udf(reformatDate(_:String,dTStampFormat,validDateFormat))
Manchmal, wenn ich meine Unit-Tests laufen bekomme ich folgende Fehler mit dieser Funktion:
17/01/13 11:45:45 ERROR Executor: Ausnahme in Aufgabe 0.0 in der Stufe 2.0 (TID 2) java.lang.NumberFormatException: mehrere Punkte bei sun.misc.FloatingDecimal.readJavaFormatString (FloatingDecimal.java:1890) bei sun.misc.FloatingDecimal.parseDouble (FloatingDecimal.java:110) um java.lang. Double.parseDouble (Double.java:538) unter java.text.DigitList.getDouble (DigitList. Java: 169) bei java.text.DecimalFormat.parse (DecimalFormat.java:2056) bei java.text.SimpleDateFormat.subParse (SimpleDateFormat.java:1867) bei java.text.SimpleDateFormat.parse (SimpleDateFormat.java: 1514) bei java.text.DateFormat.parse (DateFormat.java:364) bei com.baesystems.ai.engineering.threatanalytics.microbatch.processor.transformers.metric.mDnsPreviouslySeenDomainsStartOfDayDF $ .reformatDate (mDnsPreviouslySeenDomainsStartOfDayDF.scala: 22)
ich benutze die Funktion wie folgt:
val df = df2
.filter(
datediff(
to_date(partitionToDateUDF($"dt"))
,to_date(dTStampToDate($"d_last_seen"))
) < 90
)
und auf Fehlersuche hat der Eingang „DF2“ gefunden zu sein:
+-----------+--------+-------------------------+--------------------------------+
|d_last_seen| dt|partitionToDateUDF($"dt")|dTStampToDateUDF($"d_last_seen")|
+-----------+--------+-------------------------+--------------------------------+
| 2016/11/02|20161102|2016-11-02 |2016-11-02 |
| 2016/11/01|20161102|2016-11-02 |2016-11-01 |
+-----------+--------+-------------------------+--------------------------------+
Ich benutze conf.setMaster („local [2]“), ist es, dass Funken benutzt Threads sein könnte und daher teilen die gleiche JVM beim Laufen lokal, jedoch würde dies nicht passieren, wenn die separaten Executoren ihre eigene JVM und somit ihre eigenen Instanziierungen der Objekte haben würden.
Danke Tzach, Ich würde hier die Antwort auf die allgemeine Frage hinzufügen, dass ist: Sie haben innerhalb Funken UDF sicher Thread als mehrere Aufgaben auf mehrere Threads auf jedem Testamentsvollstrecker laufen. Die Lösung, die mir Tzach bietet, ist Threadssafe. –