Ich finke von IDE. Das Speichern von Daten in der abfragbar ist arbeitet, aber irgendwie, wenn ich es abfragen, wirft es AusnahmeFlink Abfragbarer Status funktioniert nicht
Exeception
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)])
Mein Code:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
ich nicht Laichen ein neuer Mini-Cluster oder cluster.submit wie https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java wie ich das im selben Cluster in der Sam wollen e environment als Hauptanwendung, die mit env.execute ausgeführt wird. Ist dieser Schritt notwendig?
Aus der Dokumentation von deault flink auf localhost läuft: 6123 Gibt es Probleme mit Verbindung. Muss ich einen Job in einem separaten Cluster einreichen?
ist auch ein Weg, es zu wissen, wo Jobmanager läuft.Ich bin nicht in der Lage, die API zu finden –
Wie reichen Sie Ihre Arbeit ein? Kannst du deine Logs von Jobeinträgen teilen? –
Ich führe meinen Job von der IDE, ich denke, dass es keine Möglichkeit gibt, sich mit dem Jobmanager zu verbinden, wenn Sie von IDE laufen. Ich tworks mit Garn/Cluster-Modus –