2016-03-18 11 views
0

Ich habe eine Spark-Streaming-Anwendung, um Ereignisse zu analysieren, die von einem Kafka-Broker eingehen. Ich habe Regeln wie unten und neue Regeln können durch die Kombination bestehender generiert werden:Spark Streaming Replay

If this event type occurs raise an alert. 
If this event type occurs more than 3 times in a 5-minute interval, raise an alert. 

Parallel ich alle eingehenden Daten Cassandra speichern. Was ich tun möchte, ist diese Streaming App für historische Daten von Cassandra laufen zu lassen. Zum Beispiel

<This rule> would have generated <these> alerts for <last week>. 

Gibt es eine Möglichkeit, dies in Spark zu tun, oder ist es in der Roadmap? Zum Beispiel hat Apache Flink eine Ereigniszeitverarbeitung. Aber die Migration der vorhandenen Codebase scheint schwierig und ich möchte dieses Problem lösen, indem ich meinen vorhandenen Code wiederverwende.

Antwort

0

Dies ist ziemlich geradlinig, mit einigen Vorbehalten. Zunächst hilft es zu verstehen, wie das von der Kafka-Seite aus funktioniert.

Kafka verwaltet sogenannte Offsets - jede Nachricht in Kafka hat einen Offset relativ zu ihrer Position in einer Partition. (Partitionen sind logische Unterteilungen eines Themas.) Die erste Nachricht in einer Partition hat einen Offset von 0L, die zweite ist 1L usw. Abgesehen davon, dass 0L wegen Log-Rollover und möglicherweise Topic-Compaction nicht immer der früheste Offset in a ist Partition.

Das erste, was Sie tun müssen, ist, die Offsets für alle Partitionen zu sammeln, die Sie von Anfang an lesen möchten. Hier ist eine Funktion, die dies tut:

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0) 

Für alles, was Sie schon immer über die Beschaffung von Offsets von Kafka wissen wollten, read this:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { 
    val time = kafka.api.OffsetRequest.LatestTime 
    val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)) 
) 
    val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test" 
) 
    val resp = consumer.getOffsetsBefore(req) 
    val offsets = resp.offsets(topic, partition) 
    (offsets(offsets.size - 1), offsets(0)) 
} 

Sie es so nennen würde. Es ist kryptisch, um es gelinde auszudrücken. (Lassen Sie mich wissen, wenn Sie voll und ganz auf das zweite Argument zu PartitionOffsetRequestInfo, zum Beispiel verstehen.)

Nun, da Sie firstOffset und lastOffset der Partition, die Sie auf einem historisch betrachten möchten, können Sie dann den fromOffset Parameter von createDirectStream, benutzen, die ist vom Typ: fromOffset: Map[TopicAndPartition, Long]. Sie würden den Long/Wert auf firstOffset setzen, die Sie von getOffsets() erhalten haben.

Wie für nextOffset - Sie können das verwenden, um in Ihrem Stream zu bestimmen, wenn Sie von der Verarbeitung historischer Daten zu neuen Daten wechseln. Wenn msg.offset == nextOffset dann verarbeiten Sie den ersten nicht historischen Datensatz innerhalb der Partition.

Nun zu den Vorbehalten, direkt from the documentation:

  • Sobald ein Kontext gestartet wurde, keine neue Streaming-Berechnungen können , um es einzurichten oder hinzugefügt werden.
  • Sobald ein Kontext gestoppt wurde, kann er nicht erneut gestartet werden.
  • Nur ein StreamingContext kann gleichzeitig in einer JVM unter aktiv sein.
  • stop() auf StreamingContext stoppt auch den SparkContext.Um nur den StreamingContext zu stoppen, legen Sie den optionalen Parameter stop() mit dem Namen stopSparkContext auf false fest.
  • A SparkContext kann wiederverwendet werden erstellen mehrere StreamingContexts, solange der vorhergehende Streaming gestoppt wird (ohne die SparkContext zu ANHALTEN) bevor der nächste Streaming erstellt wird.

es wegen dieser Einschränkungen ist, dass ich nextOffset zugleich als firstOffset greifen - präsentieren Zeitverarbeitung so dass ich den Strom nach oben halten kann, aber den Kontext von historischen ändern.

+0

Danke für die ausführliche Antwort. Ich habe es gelesen und werde es noch einmal lesen, um es besser zu verstehen. In der Zwischenzeit benutze ich 'Spark'-Schiebefenster, um nach der Zeit zu zählen. Wie kann ich das erreichen? – yolgun