2017-05-02 6 views
0

Ich habe diesen Code in meinem Main() Funktion:Flink AsyncDataStream wie passieren und Verwendung Konfigurationsparameter

DataStream<OutputObject> asyncResultStream = AsyncDataStream.orderedWait(
      listOfData, 
      new CustomAsyncConnector(), 
      5, 
      TimeUnit.SECONDS, 
      10).setParallelism(3).startNewChain().uid("customUid"); 

die für die Anwendung AsyncDataStreams in 1.2 das einfache Format ist. Und der Code in dem CustomAsyncConnector ist wie jedes Beispiel, das Sie in ihrem Kern finden:

public class CustomAsyncConnector extends RichAsyncFunction<CustomObject, ResultObject> { 

private transient Session client; 

@Override 
public void open(Configuration parameters) throws Exception { 
    client = Cluster.builder().addContactPoint("<CustomUrl>") 
      .withPort(1234) 
      .build() 
      .connect("<thisKeyspace>"); 
} 
@Override 
public void close() throws Exception { 
    client.close(); 
} 
@Override 
public void asyncInvoke(final CustomObject ctsa, final AsyncCollector<ResultObject> asyncCollector) throws Exception { 

    //Custom code here... 

} 

}

Hier sind meine Fragen: 1.) Was ist der richtige Weg ist „Parameter“ passieren an die Funktion open() in CustomAsyncConnector() von wo es in meiner Main() -Funktion aufgerufen wird. 2.) Wie sollen die Parameter verwendet werden, um in der Funktion open() die Verbindung zum Client aufzubauen?

Meine Vermutung zur ersten Frage ist, eine neue Object-Instanz CustomAsyncConnector() in main zu erstellen und dann die Funktion open() direkt aufzurufen, das Parameter-Objekt an sie zu übergeben und diese Instanz dann in den AsysDataStream-Code einzufügen. Ich bin mir jedoch nicht sicher, ob dies der beste Weg ist oder, und was noch wichtiger ist, die richtige Art und Weise, die Felder in einem Objekt vom Typ Configuration zu setzen (wieder vorausgesetzt configParameters.setString ("contactPointUrl", "127.0.0.1") "ist richtig, bin mir aber nicht sicher." Und das führt zu meiner zweiten und ehrlich wichtigsten Frage.

Also in Bezug auf meine zweite Frage, die Parameter, die ich an die Funktion open() übergeben möchte, sind die contactPointUrl, die PortNumber und der Schlüsselbereich in .connect(). Ich kann jedoch nicht darauf zugreifen, indem ich etwas wie ".addContactPoint (parameters.getString (" contactPointUrl "))". Ich habe auch versucht zu sehen, ob oder wo ich Cluster.builder(). GetConfiguration (Parameter) tun sollte, aber ich schieße im Dunkeln, wohin das überhaupt gehört oder überhaupt und ob die Parameternamen etwas Bestimmtes sein müssen und so weiter.

Also ich hoffe, ich habe das nicht zu schlecht, aber jede und alle Hilfe würde sehr geschätzt werden.

Vielen Dank im Voraus!

Antwort

0

Hier ist, was am Ende funktionierte. Immer noch nicht sicher, wie die Konfigurationsparameter an die .open() - Methode übergeben werden, aber na ja.

hinzugefügt dies der CustomAsyncConnector Klasse:

private final CustomProps props; 

public CustomAsyncConnector(CustomProps props) { 
    super(); 
    this.props = props; 
} 

Und was gebe ich in der Methode main():

AsyncDataStream 
       .unorderedWait(
         dataToProcess, 
         new CustomAsyncConnector(props), 
         5, 
         TimeUnit.SECONDS, 
         10); 

und verwendet die Stützen in der .open() -Methode wie, wie ich wollte die Parameter verwenden.

Verwandte Themen