2017-02-08 3 views
0

In einer Spark-Streaming-Anwendung gibt es eine externe Datenquelle (relationale Datenbank), die ich alle 10 Minuten abfragen und die Ergebnisse für meine Stream-Verarbeitungspipeline bereitstellen muss.Spark Streaming: Aktualisierbare Ergebnismenge für die Stream-Verarbeitungslogik verfügbar machen

Ich verstehe nicht ganz, was der richtige Weg ist, es zu tun. nur Accumulators werden hängen (wie in der Dokumentation beschrieben), aber ich fand dies:

/** 
    * Set the accumulator's value; only allowed on master 
    */ 
    def setValue(newValue: R) { 
    this.value = newValue 
    } 

und Broadcast variables sind nur write-once https://spark.apache.org/docs/1.6.2/programming-guide.html#broadcast-variables

Die Planung Aspekt mir auch nicht klar.

Gibt es eine Möglichkeit, aktualisierte Ergebnismenge für die Stream-Verarbeitungslogik verfügbar zu machen?

PS Es scheint sehr ähnlich zu sein, was ich brauche How can I update a broadcast variable in spark streaming?

+0

„Ich muss alle 10 Minuten abgefragt und die Ergebnisse für meine Stream Processing-Pipeline zur Verfügung stellen:“ Was meinst du denn hier? Es gibt eine Tabelle, an die angehängt wird, und Sie müssen die letzten Zeilen lesen? Eine Abfrage, die Sie ausführen müssen? Welche anderen Datenquellen befinden sich in Ihrer Stream-Verarbeitungspipeline? Weitere Informationen zu dem, was Sie hier erreichen möchten, würden helfen. –

+0

Es gibt tatsächlich einen Dienst mit API oben auf der Datenbank, so dass ich nur den Dienst abfrage und es eine aktuelle Version der Daten zurückgibt. Ich benutze Kafka Direct Consumer Approach und muss den Stream mit der aktuellsten Version "anreichern", aber die API jedes Mal abzufragen (selbst wenn sie innerhalb des Abschnitts "foreachPartition" ist) ist sehr teuer. –

+1

Mögliches Duplikat von [Aktualisieren einer globalen Variablen in Spark] (http://stackoverflow.com/questions/33748528/updating-a-global-variable-periodically-in-spark) – Aastha

Antwort

0

ich das in Java speichern tue, und es ist recht funktioniert. Es gibt ähnliche Antwort hier Updating a global variable periodically in Spark und auch in Frage erwähnt

public Broadcast<Map<String, List<String>>> updateBroadcastVariable(
    SparkContext sparkContext, DatabaseService databaseService) { 
Date d = Calendar.getInstance().getTime(); 
long diff = d.getTime()-mysqlLastUpdatedAt.getTime(); 
if (updatedVar == null || diff > 60000) { 
    if (var != null) 
    updatedVar.unpersist(); 
    mysqlLastUpdatedAt = new Date(System.currentTimeMillis()); 
    Map<String, List<String>> map = new LinkedHashMap<>(); 

    List<String> dbData = databaseService.refreshData(JavaSparkContext.fromSparkContext(sparkContext)); 
    } 
    updatedVar = JavaSparkContext.fromSparkContext(sparkContext).broadcast(dbData); 
} 
return updatedVar; 
}