2016-10-29 5 views
4

Verwenden von Spark 1.6.1 Ich möchte die Anzahl der Aufrufe einer UDF aufrufen. Ich möchte dies tun, weil ich eine sehr teure UDF (~ 1sec pro Anruf) und habe Ich vermute, dass die UDF häufiger als die Anzahl der Datensätze in meinem Datenframe aufgerufen wird, so dass mein Funke Job langsamer als notwendig.Anzahl Aufrufe von UDF in Spark

Obwohl ich diese Situation nicht reproduzieren konnte, kam ich zu einem einfachen Beispiel, das zeigt, dass die Anzahl der Aufrufe an die UDF anders zu sein scheint (hier: weniger) als die Anzahl der Zeilen, wie kann das sein?

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 

object Demo extends App { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("Demo") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("WARN") 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 


    val callCounter = sc.accumulator(0) 

    val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value") 

    println(df.count) // gives 10000 

    val myudf = udf((d:Int) => {callCounter.add(1);d}) 

    val res = df.withColumn("result",myudf($"value")).cache 

    println(res.select($"result").collect().size) // gives 10000 
    println(callCounter.value) // gives 9941 

} 

Wenn die Verwendung eines Akkumulators nicht der richtige Weg ist, die Zählerstände der UDF aufzurufen, wie sonst könnte ich das tun?

Hinweis: In meiner aktuellen Spark-Job, erhalten Sie eine Anruf-Zählung, die etwa 1,7 mal höher ist als die tatsächliche Anzahl der Datensätze.

+0

Ich habe versucht, den gleichen Code, seine Druck 10000 als Callcounter, alle 'Println's sind die gleiche Nummer drucken, ich benutze funke 2.0 – Shankar

+0

: Ich kann neu produzieren, wenn ich meinen Master als' local [ändern] *] 'anstelle von lokalen, wenn ich versuchte, mit lokalen Drucken richtig. Wenn ich mit local [*] versuchte, druckte es 9996 anstelle von 10000 – Shankar

+0

Ist es ein bekanntes Problem, wenn wir Akku für diese Art von Fall verwenden? Wenn wir local [*] verwenden, warum zählt es dann nicht richtig? – Shankar

Antwort

1

Spark-Anwendungen sollten eine main() - Methode definieren, anstatt scala.App zu erweitern. Unterklassen von scala.App funktionieren möglicherweise nicht richtig.

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 

object Demo extends App { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]") 
     val sc = new SparkContext(conf) 
     // [...] 
    } 
} 

Dies sollte Ihr Problem lösen.

+0

danke, das löst das Problem. Aber in meiner realen Anwendung habe ich keine Hauptmethode (wir benutzen Spark Notebook). Vielleicht gelingt es mir, dieses Verhalten in einem kleinen Beispiel zu reproduzieren, ich werde dann eine neue Frage stellen. Kurz gesagt, ich schließe zwei Datenrahmen an (Standard-Innenverbindung) und rufe dann mein udf mit withColumn auf. Das udf wird mehrmals für dieselbe Zeile aufgerufen –

+0

Ich bin nicht sicher, wie Spark-Notebooks funktioniert, um ehrlich zu sein. Ich werde es mir ansehen müssen, wenn ich Zeit habe. – eliasah

+0

Hier ist die neue Frage: http://stackoverflow.com/questions/40320563/spark-udf-called-more-than-once-per-record-when-df-has-too-many-columns –

Verwandte Themen