2017-03-28 2 views
1

Ich muss Datenströme von Kafka mit Flink als Streaming-Engine verarbeiten. Um die Analyse der Daten durchzuführen, muss ich einige Tabellen in Cassandra abfragen. Was ist der beste Weg, dies zu tun? Ich habe in Scala nach Beispielen für solche Fälle gesucht. Aber ich konnte keine finden. Wie können Daten von Cassandra in Flink mit Scala als Programmiersprache gelesen werden? Read & write data into cassandra using apache flink Java API hat eine andere Frage in den gleichen Zeilen. Es hat mehrere Ansätze in den Antworten erwähnt. Ich würde gerne wissen, was der beste Ansatz in meinem Fall ist. Außerdem sind die meisten verfügbaren Beispiele in Java. Ich suche nach Scala-Beispielen.Daten von Cassandra zur Verarbeitung in Flink lesen

Antwort

2
+0

Dieser Link enthält Anweisungen zum Schreiben an Cassandra.Ich muss in meinem Flink-Programm Daten von Cassandra lesen. – avidlearner

+1

Hier ist ein schönes Beispiel dafür, wie man mit Cassandra mit flink lesen und schreiben kann: http://stackoverflow.com/questions/42617575/read-write-data-into-cassandra- using-apache-flink-java-api –

+0

ja .. Das ist der gleiche Link, den ich in meiner Frage erwähnt habe. Es hat mehrere Ansätze in den Antworten erwähnt. Ich würde gerne wissen, was der beste Ansatz in meinem Fall ist. Außerdem sind die meisten online verfügbaren Beispiele in Java. Ich suche nach Scala-Beispielen. – avidlearner

1

ich zur Zeit von cassandra lesen 1.3 unter Verwendung von asyncIO in flink. Hier ist die Dokumentation darauf:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html (wo es DatabaseClient hat, werden Sie die com.datastax.drive.core.Cluster anstelle)

Lassen Sie mich wissen, wenn Sie eine mehr in die Tiefe Beispiel müssen für die Verwendung von Es ist speziell von Cassandra zu lesen, aber ich kann leider nur ein Beispiel in Java geben.

EDIT 1

Hier ist ein Beispiel für den Code, den ich von Cassandra zum Lesen mit flink dem Async I/O verwenden. Ich arbeite immer noch daran, ein Problem zu identifizieren und zu beheben, bei dem aus irgendeinem Grund (ohne tief hineinzugehen) große Datenmengen von einer einzelnen Abfrage zurückgegeben werden, das Zeitlimit des asynchronen Datenstroms wird ausgelöst, obwohl es von Cassandra als gut zurückgegeben wird und lange vor der Timeout-Zeit. Aber das unter der Annahme ist nur ein Bug mit anderen Sachen, die ich tue, und nicht, weil dieser Code, soll dies für Sie funktioniert (und seit Monaten auch für mich gut funktioniert):

public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> { 

    private final Properties props; 
    private Session client; 

    public GenericCassandraReader(Properties props) { 
     super(); 
     this.props = props; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
     client = Cluster.builder() 
       .addContactPoint(props.cassandraUrl) 
       .withPort(props.cassandraPort) 
       .build() 
       .connect(props.cassandraKeyspace); 
    } 

    @Override 
    public void close() throws Exception { 
     client.close(); 
    } 

    @Override 
    public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception { 

     String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';"; 

     ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString); 

     Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 

      public void onSuccess(ResultSet resultSet) { 
       asyncCollector.collect(Collections.singleton(resultSet)); 
      } 

      public void onFailure(Throwable t) { 
       asyncCollector.collect(t); 
      } 
     }); 
    } 
} 

Wieder leid die Verzögerung. Ich hatte gehofft, den Fehler beheben zu können, damit ich sicher sein konnte, aber ich dachte mir, dass ich zu diesem Zeitpunkt nur einen Hinweis hätte, wäre besser als nichts.

EDIT 2

So kamen wir schließlich zu bestimmen, dass das Problem nicht mit dem Code ist, aber mit dem Netzwerkdurchsatz. Viele Bytes versuchen, durch eine Pipe zu kommen, die nicht groß genug ist, um damit umzugehen, Zeug beginnt zu sichern, einige beginnen zu rinnen, aber (dank dataastax cassandra driver's QueryLogger konnten wir das sehen) die Zeit, die es brauchte, um das Ergebnis zu erhalten Jede Abfrage begann auf 4 Sekunden zu steigen, dann 6, dann 8 und so weiter.

TL; DR, Code ist in Ordnung, sei dir jedoch bewusst, dass es zu einem Netzwerkproblem kommen kann, wenn du timeoutExceptions von Flink asyncWaitOperator hast.

bearbeiten 2.5

erkannte auch, dass es von Vorteil sein könnte, dass aufgrund der Netzwerk-Latenz Problem zu erwähnen, wir bewegen endete eine RichMapFunction zu verwenden, die die Daten enthält, die wir von cassandra in Lesezustand wurden. Der Job verfolgt also nur alle Datensätze, die durch ihn gehen, anstatt jedes Mal, wenn ein neuer Datensatz durchkommt, aus der Tabelle zu lesen, um alle darin enthaltenen Informationen zu erhalten.

+0

Danke Jicaar. Ich habe den Datastax-Client in meinem Java-Code verwendet (mein Code wurde aufgrund einer geänderten Anforderung von Scala nach Java migriert) und es hat gut funktioniert, obwohl ich AsyncIO noch nicht implementiert habe. Wie können Sie in Ihrer Antwort ein Beispiel angeben, das asyncIO implementiert? – avidlearner

+0

Ich muss eigentlich darauf zurückkommen. Ich habe eine "funktionierende" Instanz, aber vor kurzem begann mit größeren Datenmengen zu testen und es wirft jetzt seltsame timeoutExceptions (würde hineinkommen, aber es wäre ehrlich gesagt verdient, seine eigene Frage hier zu sein). Sobald dies herausgefunden ist, werde ich die Antwort mit der Korrektur bearbeiten. – Jicaar

+0

@Jicaar Lesen Sie diesen Weg als Stream (AsyncDataStream)? Wenn ja, wie oft fragt dieser Code Cassandra? – user1870400

Verwandte Themen