„Could not ID für Eintrag“ Wenn Prüfpunkten auf einem einfachen CEP Schleifenmuster gedreht wirdAusgabe CEP während Prüfpunkte.
private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
.followedBy("middle").where(checkStatusOn).times(2)
.next("end").where(checkStatusOn).within(Time.minutes(5))
ich Ausfälle sehen.
SimpleBinaryEvent ist
public class SimpleBinaryEvent implements Serializable {
private int id;
private int sequence;
private boolean status;
private long time;
public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
this.id = id;
this.sequence = sequence;
this.status = status;
this.time = time;
}
public int getId() {
return id;
}
public int getSequence() {
return sequence;
}
public boolean isStatus() {
return status;
}
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimpleBinaryEvent that = (SimpleBinaryEvent) o;
if (getId() != that.getId()) return false;
if (isStatus() != that.isStatus()) return false;
if (getSequence() != that.getSequence()) return false;
return getTime() == that.getTime();
}
@Override
public int hashCode() {
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());
int result = getId();
result = 31 * result + (isStatus() ? 1 : 0);
result = 31 * result + getSequence();
result = 31 * result + (int) (getTime()^(getTime() >>> 32));
return result;
}
@Override
public String toString() {
return "SimpleBinaryEvent{" +
"id='" + id + '\'' +
", status=" + status +
", sequence=" + sequence +
", time=" + time +
'}';
}
}
Fehlerursache:
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
Ich bin sicher, dass ich die Gleichen haben() und hashCode() implementiert, um die Art und Weise sollte es sein. Ich habe auch den Objects.hashCode ausprobiert. In anderen Fällen hatte ich CircularReference (und damit stackOverflow) auf SharedBuffer.toString(), was wiederum auf Probleme mit Referenzen (Gleichheit und was nicht). Ohne aktivierten Checkpointing funktioniert es wie erwartet. Ich laufe auf einem lokalen Cluster. Ist die CEP-Produktion bereit?
ich die Bibliothek zum Ausprobieren und Berichterstattung dieser 1.3.2 Flink
Es sehr ähnlich sieht http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Keyed-CEP-checkpoint-fails- td14795.html – VishalSan
Bei Bedarf kann ich aus einem eigenständigen Junit heraus reproduzieren, so wie ich es erstellt habe. – VishalSan
Es wäre toll, wenn Sie eine JIRA mit dem in sich geschlossenen Junit veröffentlichen könnten. Ich werde es mir dann ansehen. –