2017-09-15 2 views
0

„Could not ID für Eintrag“ Wenn Prüfpunkten auf einem einfachen CEP Schleifenmuster gedreht wirdAusgabe CEP während Prüfpunkte.

private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn) 
     .followedBy("middle").where(checkStatusOn).times(2) 
     .next("end").where(checkStatusOn).within(Time.minutes(5)) 

ich Ausfälle sehen.

SimpleBinaryEvent ist

public class SimpleBinaryEvent implements Serializable { 

private int id; 
private int sequence; 
private boolean status; 
private long time; 

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) { 
    this.id = id; 
    this.sequence = sequence; 
    this.status = status; 
    this.time = time; 
} 
public int getId() { 
    return id; 
} 
public int getSequence() { 
    return sequence; 
} 
public boolean isStatus() { 
    return status; 
} 
public long getTime() { 
    return time; 
} 
@Override 
public boolean equals(Object o) { 
    if (this == o) return true; 
    if (o == null || getClass() != o.getClass()) return false; 

    SimpleBinaryEvent that = (SimpleBinaryEvent) o; 

    if (getId() != that.getId()) return false; 
    if (isStatus() != that.isStatus()) return false; 
    if (getSequence() != that.getSequence()) return false; 
    return getTime() == that.getTime(); 
} 

@Override 
public int hashCode() { 
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime()); 
    int result = getId(); 
    result = 31 * result + (isStatus() ? 1 : 0); 
    result = 31 * result + getSequence(); 
    result = 31 * result + (int) (getTime()^(getTime() >>> 32)); 
    return result; 
} 

@Override 
public String toString() { 
    return "SimpleBinaryEvent{" + 
      "id='" + id + '\'' + 
      ", status=" + status + 
      ", sequence=" + sequence + 
      ", time=" + time + 
      '}'; 
} 

}

Fehlerursache:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1). 
... 6 more 
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),.... 

Ich bin sicher, dass ich die Gleichen haben() und hashCode() implementiert, um die Art und Weise sollte es sein. Ich habe auch den Objects.hashCode ausprobiert. In anderen Fällen hatte ich CircularReference (und damit stackOverflow) auf SharedBuffer.toString(), was wiederum auf Probleme mit Referenzen (Gleichheit und was nicht). Ohne aktivierten Checkpointing funktioniert es wie erwartet. Ich laufe auf einem lokalen Cluster. Ist die CEP-Produktion bereit?

ich die Bibliothek zum Ausprobieren und Berichterstattung dieser 1.3.2 Flink

+0

Es sehr ähnlich sieht http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Keyed-CEP-checkpoint-fails- td14795.html – VishalSan

+0

Bei Bedarf kann ich aus einem eigenständigen Junit heraus reproduzieren, so wie ich es erstellt habe. – VishalSan

+0

Es wäre toll, wenn Sie eine JIRA mit dem in sich geschlossenen Junit veröffentlichen könnten. Ich werde es mir dann ansehen. –

Antwort

0

Vielen Dank bin mit!

Die Bibliothek befindet sich in aktiver Entwicklung, da immer mehr Funktionen hinzugefügt werden. Das 1.3 war das erste Release der Bibliothek mit solch einer reichen Semantik, also erwarten wir, dass 1) wie die Leute es benutzen und 2) ob es irgendwelche Fehler gibt. Also ich würde sagen, dass es nicht 100% produktionsfertig ist, aber es ist nicht weit.

Jetzt für das Problem zur Hand, nehme ich an, dass Sie RocksDB für Checkpointing verwenden, richtig? Der Grund dafür ist, dass mit RocksDB bei jedem Wasserzeichen (in Ereigniszeit) der notwendige Zustand deserialisiert wird (z. B. NFA), einige Ereignisse verarbeitet und dann erneut serialisiert werden, bevor sie wieder in RocksDB übertragen werden.

Dies ist nicht der Fall für das Dateisystem-Status-Backend, wo Sie den Status nur beim Checkpointing serialisieren und ihn erst nach der Wiederherstellung lesen und deserialisieren. Wenn Sie also in diesem Fall gesagt haben, dass Ihr Job ohne Checkpointing einwandfrei funktioniert, sehen Sie dieses Problem erst nach der Wiederherstellung nach einem Fehler.

Die Wurzel des Problems kann entweder, dass equals()/hashcode() fehlerhaft sein (was der Fall zu sein scheint nicht), oder es gibt ein Problem auf dem Weg wir serialisiert/deserialisiert den Zustand CEP.

Könnten Sie auch eine minimale Eingabesequenz von Ereignissen angeben, die dies bewirken? Dies wird sehr hilfreich sein, um das Problem zu reproduzieren.

Vielen Dank, Kostas

+0

Ich laufe in einem lokalen Modus als ein Junit wie in einem LocalFlinkMiniCluster (config, false). Wird den Test fertig machen und abschicken. Wurden von anderen Dingen zugeschlagen. – VishalSan