2017-05-12 3 views
2

Gibt es eine Möglichkeit, die Anzahl der Datensätze zu erhalten, die bei der Verwendung von Spark geschrieben werden, um Datensätze zu speichern? Während ich weiß, dass es derzeit nicht in der Spezifikation ist, würde Ich mag Lage sein, etwas zu tun:Wie erhält man die Anzahl der geschriebenen Datensätze (mit der Sicherungsoperation von DataFrameWriter)?

val count = df.write.csv(path) 

Alternativ eine Inline-Zählung in der Lage zu tun (vorzugsweise ohne nur einen Standard-Akku verwendet wird) von die Ergebnisse eines Schrittes wären (fast) so effektiv. d. h .:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect() 

Irgendwelche Ideen?

Antwort

0

Es gibt zwei Möglichkeiten, die ich kenne, um die Datensätze count zu erhalten, die geschrieben werden.

können sagen, wir haben ein dataset

import sqlContext.implicits._ 
val dataSet = sc.parallelize(1 to 10).toDS 

Der erste Weg, den eingebauten count funtion

dataSet.count() 

Die zweite Methode zu verwenden ist, ist

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} 

var recordsCount = 0L 

sc.addSparkListener(new SparkListener() { 
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { 
    recordsCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    } 
}) 

individuelle Zuhörer zu schaffen und Holen Sie sich die count beim Schreiben auf die output
Hinweis: wir müssen die dataset zu rdd

dataSet.rdd.saveAsTextFile("outputlocation") 
println(recordsCount) 

Hoffnung wandeln diese

2

hilfreich Ich SparkListener verwenden würde, die onTaskEnd oder onStageCompleted Ereignisse abfangen können, die Sie für den Zugriff auf Task-Metriken nutzen könnten.

Task-Metriken geben Ihnen die Akkumulatoren, die Spark verwendet, um Metriken auf der Registerkarte SQL anzuzeigen (in Details für die Abfrage).

web UI/Details for Query

Zum Beispiel die folgende Abfrage:

spark. 
    read. 
    option("header", true). 
    csv("../datasets/people.csv"). 
    limit(10). 
    write. 
    csv("people") 

genau 10 Ausgabezeilen gibt so Funke weiß es (und man konnte auch).

Die Schnittstelle von Abfrageausführungs Listener, der verwendet werden kann, die Ausführung Metriken zu analysieren:

enter image description here


könnten Sie auch Spark-SQLs QueryExecutionListener erkunden.

können Sie registrieren ein QueryExecutionListenerExecutionListenerManager verwenden, die als spark.listenerManager verfügbar ist.

Ich denke, es ist näher an der "Bare Metal", aber das habe ich vorher nicht verwendet.

+1

Haben Sie 'SparkListener' mit SQL getestet? Ich habe vorher ein paar grobe Tests durchgeführt und SQL-Schreibmetriken nicht erfasst. 'ExecutionListenerManager' ist der Weg zu gehen, und hat ein schönes Beispiel in den Tests. – zero323

+1

Nein, ich habe keine Schnittstelle mit SQL verwendet. Etwas, das ich nur theoretisch kenne ... immer noch. –

Verwandte Themen