Ich habe diesen Code in meinem Main() Funktion:Flink AsyncDataStream wie passieren und Verwendung Konfigurationsparameter
DataStream<OutputObject> asyncResultStream = AsyncDataStream.orderedWait(
listOfData,
new CustomAsyncConnector(),
5,
TimeUnit.SECONDS,
10).setParallelism(3).startNewChain().uid("customUid");
die für die Anwendung AsyncDataStreams in 1.2 das einfache Format ist. Und der Code in dem CustomAsyncConnector ist wie jedes Beispiel, das Sie in ihrem Kern finden:
public class CustomAsyncConnector extends RichAsyncFunction<CustomObject, ResultObject> {
private transient Session client;
@Override
public void open(Configuration parameters) throws Exception {
client = Cluster.builder().addContactPoint("<CustomUrl>")
.withPort(1234)
.build()
.connect("<thisKeyspace>");
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(final CustomObject ctsa, final AsyncCollector<ResultObject> asyncCollector) throws Exception {
//Custom code here...
}
}
Hier sind meine Fragen: 1.) Was ist der richtige Weg ist „Parameter“ passieren an die Funktion open() in CustomAsyncConnector() von wo es in meiner Main() -Funktion aufgerufen wird. 2.) Wie sollen die Parameter verwendet werden, um in der Funktion open() die Verbindung zum Client aufzubauen?
Meine Vermutung zur ersten Frage ist, eine neue Object-Instanz CustomAsyncConnector() in main zu erstellen und dann die Funktion open() direkt aufzurufen, das Parameter-Objekt an sie zu übergeben und diese Instanz dann in den AsysDataStream-Code einzufügen. Ich bin mir jedoch nicht sicher, ob dies der beste Weg ist oder, und was noch wichtiger ist, die richtige Art und Weise, die Felder in einem Objekt vom Typ Configuration zu setzen (wieder vorausgesetzt configParameters.setString ("contactPointUrl", "127.0.0.1") "ist richtig, bin mir aber nicht sicher." Und das führt zu meiner zweiten und ehrlich wichtigsten Frage.
Also in Bezug auf meine zweite Frage, die Parameter, die ich an die Funktion open() übergeben möchte, sind die contactPointUrl, die PortNumber und der Schlüsselbereich in .connect(). Ich kann jedoch nicht darauf zugreifen, indem ich etwas wie ".addContactPoint (parameters.getString (" contactPointUrl "))". Ich habe auch versucht zu sehen, ob oder wo ich Cluster.builder(). GetConfiguration (Parameter) tun sollte, aber ich schieße im Dunkeln, wohin das überhaupt gehört oder überhaupt und ob die Parameternamen etwas Bestimmtes sein müssen und so weiter.
Also ich hoffe, ich habe das nicht zu schlecht, aber jede und alle Hilfe würde sehr geschätzt werden.
Vielen Dank im Voraus!