2016-08-29 3 views
2

Ich finke von IDE. Das Speichern von Daten in der abfragbar ist arbeitet, aber irgendwie, wenn ich es abfragen, wirft es AusnahmeFlink Abfragbarer Status funktioniert nicht

Exeception

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)]) 

Mein Code:

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost") 
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123") 

@throws[Throwable] 
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure) 
else { 
    // At startup some failures are expected 
    // due to races. Make sure that they don't 
    // fail this test. 
    return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() { 
    @throws[Exception] 
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey) 
    }) 
} 
} 

    @SuppressWarnings(Array("unchecked")) 
    private def getKvStateWithRetries(queryName: String, 
           keyHash: Int, 
           serializedKey: Array[Byte]): Future[Array[Byte]] = { 

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey) 
kvState.recoverWith(recover(queryName, keyHash, serializedKey)) 
    } 

def onSuccess = new OnSuccess[Array[Byte]]() { 
@throws(classOf[Throwable]) 
override def onSuccess(result: Array[Byte]): Unit = { 
    println("found record ") 
    val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer) 
    println(value) 
} 
} 


override def invoke(query: QueryMetaData): Unit = { 
println("getting inside querystore"+query.record) 
val serializedResult = flinkQuery.getResult(query.record, queryName) 
serializedResult.onSuccess(onSuccess) 

ich nicht Laichen ein neuer Mini-Cluster oder cluster.submit wie https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java wie ich das im selben Cluster in der Sam wollen e environment als Hauptanwendung, die mit env.execute ausgeführt wird. Ist dieser Schritt notwendig?

Aus der Dokumentation von deault flink auf localhost läuft: 6123 Gibt es Probleme mit Verbindung. Muss ich einen Job in einem separaten Cluster einreichen?

+0

ist auch ein Weg, es zu wissen, wo Jobmanager läuft.Ich bin nicht in der Lage, die API zu finden –

+0

Wie reichen Sie Ihre Arbeit ein? Kannst du deine Logs von Jobeinträgen teilen? –

+1

Ich führe meinen Job von der IDE, ich denke, dass es keine Möglichkeit gibt, sich mit dem Jobmanager zu verbinden, wenn Sie von IDE laufen. Ich tworks mit Garn/Cluster-Modus –

Antwort

1

Nach viel googlen fand ich eine Lösung.

Ich benutze LocalStreamEnvironment und bekomme den gleichen Fehler, bis ein Thread gefunden RemoteEnv connect failed. Der beschriebene Fehler bezieht sich auf ein anderes Setup (nicht lokal), aber das Beispiel gist in dem zum Testen verwendeten Thema erstellt den LocalFlinkMiniCluster mit dem Parameter "useSingleActorSystem", der auf false festgelegt ist.

Mit Blick auf die Implementierung von LocalStreamEnvironment wird der MiniCluster erstellt mit "useSingleActorSystem" auf true.

Ich habe einfach eine Klasse LocalQueryableStreamEnvironment LocalStreamEnvironment erstreckt, wo der Mini-Cluster mit „useSingleActorSystem“ auf wahr erstellt wird, und alles funktioniert von IDE.

Jetzt ist mein Code wie folgt:

Konfiguration:

Configuration config = new Configuration(); 
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6); 
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); 
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue()); 
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); 
config.setString(JobManagerOptions.ADDRESS, "localhost"); 
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue()); 
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);** 

HINWEIS: QueryableState nur mit dieser Config LOCAL_NUMBER_TASK_MANAGER arbeitet gesetzt mehr als 1 zu schätzen!

Instantiate/Umgebung ausführen:

LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config); 
... 
env.addSource(anySource) 
    .keyby(anyAtribute) 
    .flatmap(new UpdateMyStateToBeQueriedLaterMapper()) 
    .addSink(..); //etc 
... 
env.execute("JobNameHere"); 

und dem Client zu erstellen:

final Configuration config = new Configuration(); 
config.setString(JobManagerOptions.ADDRESS, "localhost"); 
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue()); 

HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils 
    .createHighAvailabilityServices(
        config, 
        Executors.newSingleThreadScheduledExecutor(), 
        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION 
    ); 
return new QueryableStateClient(config,highAvailabilityServices); 

Für weitere Informationen Zugang:

Queryable States in ApacheFlink - Implementation

Queryable State Client with 1.3.0-rc0

Meine Abhängigkeiten:

compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1' 
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1' 
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1' 
Verwandte Themen