2017-09-14 20 views
5

Szenario: Ich habe einen Dienst, die Ereignisse wie in diesem CSV Beispiel protokolliert:einkochen Ereignisse zu Zeitintervallen

#TimeStamp, Name, ColorOfPullover 
TimeStamp01, Peter, Green 
TimeStamp02, Bob, Blue 
TimeStamp03, Peter, Green 
TimeStamp04, Peter, Red 
TimeStamp05, Peter, Green 

Ereignisse, die zum Beispiel Peter trägt Grün wird sehr oft in Folge auftreten.

Ich habe zwei Ziele:

  1. die Daten halten so klein wie möglich
  2. Halten Sie die alle relevanten Daten

Relevant heißt: ich wissen muss, in denen Zeit Spannweite eine Person war tragen, welche Farbe. Z. B:

#StartTime, EndTime, Name, ColorOfPullover 
TimeStamp01, TimeStamp03, Peter, Green 
TimeStamp02, TimeStamp02, Bob, Blue 
TimeStamp03, TimeStamp03, Peter, Green 
TimeStamp04, TimeStamp04, Peter, Red 
TimeStamp05, TimeStamp05, Peter, Green 

In diesem Format kann ich beantworten Fragen wie: Welche Farbe wurde Peter zum Zeitpunkt TimeStamp02 tragen? (Ich kann sicher davon ausgehen, dass jede Person in die gleiche Farbe trägt zwischen zwei Ereignisse für die gleiche Farbe angemeldet.)

Haupt Frage: Kann ich eine bereits bestehende Technologie nutzen, das zu erreichen? I.e. Ich kann es mit einem kontinuierlichen Strom von Ereignissen versorgen und es extrahiert und speichert die relevanten Daten?


Um genau zu sein, muss ich einen Algorithmus wie diesen (Pseudocode) implementieren. Die Methode OnNewEvent wird für jede Zeile des CSV-Beispiels aufgerufen. Wobei der Parameter event bereits die Daten aus der Zeile als Member-Variablen enthält.

def OnNewEvent(even) 
    entry = Database.getLatestEntryFor(event.personName) 
    if (entry.pulloverColor == event.pulloverColor) 
     entry.setIntervalEndDate(event.date) 
     Database.store(entry) 
    else 
     newEntry = new Entry 
     newEntry.setIntervalStartDate(event.date) 
     newEntry.setIntervalEndDate(event.date) 
     newEntry.setPulloverColor(event.pulloverColor)) 
     newEntry.setName(event.personName) 
     Database.createNewEntry(newEntry) 
    end 
end 
+0

Es sollte möglich sein, sie mit logstash zu tun, aber das Problem ist, dass Sie für jede Zeile eine Elasticsearch Anfrage zu tun haben, werden die neuesten Eintrag abzurufen, die das machen Prozess sehr langsam. Deshalb halte ich Logstash nicht für das richtige Werkzeug. – baudsp

+0

Was sind Ihre Datenmengen und wie schnell müssen Sie reagieren, wenn ein neues Ereignis eintritt? Ist es in Ordnung, wenn einige Ereignisse verloren gehen? – ffeast

+0

Die Reaktion auf Ereignisse kann langsam sein. Z.B. 1 Tag Verspätung ist akzeptabel. Also, ein Cron-Job einen Tag könnte eine Option sein. Ereignisse können nicht verloren gehen, das ist geschäftskritisch. – fex

Antwort

0
This is typical scenario of any streaming architecture. 

There are multiple existing technologies which work in tandem to get what you want. 


1. NoSql Database (Hbase, Aerospike, Cassandra) 
2. streaming jobs Like Spark streaming(micro batch), Storm 
3. Run mapreduce in micro batch to insert into NoSql Database. 
4. Kafka Distriuted queue 

The end to end flow. 

Data -> streaming framework -> NoSql Database. 
OR 
Data -> Kafka -> streaming framework -> NoSql Database. 


IN NoSql database there are two ways to model your data. 
1. Key by "Name" and for every event for that given key, insert into Database. 
    While fetching u get back all events corresponding to that key. 

2. Key by "name", every time a event for key is there, do a UPSERT into a existing blob(Object saved as binary), Inside the blob you maintain the time range and color seen. 

Code sample to read and write to Hbase and Aerospike 

Hbase: http://bytepadding.com/hbase/

Aerospike: http://bytepadding.com/aerospike/

+0

beide Links sind gebrochen – ffeast

+0

Sorry Kumpel, Hacker hatten Spaß, haben gerade die Seite behoben. Fühlen Sie sich frei, durch die Beispiele zu gehen. und ich weiß, ob Sie mehr Klarheit brauchen – KrazyGautam

0

Eine Möglichkeit, es zu tun HiveMQ zu verwenden ist. HiveMQ ist eine MQTT-basierte Nachrichtenwarteschlangentechnologie. Schön daran ist, dass Sie benutzerdefinierte Plugins schreiben können, um eingehende Nachrichten zu verarbeiten. Um den neuesten Eintrag eines Ereignisses für eine Person zu erhalten, würde eine Hash-Tabelle im HiveMQ-Plugin funktionieren. Wenn die Anzahl der verschiedenen Personen sehr groß ist, würde ich einen Cache wie Redis in Betracht ziehen, um das neueste Ereignis für jede Person zwischenzuspeichern.

Ihr Dienst veröffentlicht Ereignisse in HiveMQ. Das HiveMQ-Plugin verarbeitet eingehende Ereignisse und aktualisiert Ihre Datenbank.

HiveMQ Plugin

Redis

Verwandte Themen