2017-07-03 1 views
0

Ich versuche, Flink Cassandra SINK-Stecker mit CassandraPojoSink Klasse zu schreiben. Ich erhalte keine Fehler/Ausnahme, aber keine Datensätze in der Cassandra-Tabelle.CassandraPojoSink kein Fehler, aber Daten werden nicht in die Cassandra geschrieben

Ich verwende folgenden Code.

========= Sink Steckverbindercode snapshot ==================

DataStream<Event> stream = eventStream.flatMap(new EventTransformation()); 

    try { 
     stream.addSink(new CassandraPojoSink<>(Event.class, new ClusterBuilder() { 

      private static final long serialVersionUID = -2485105213096858846L; 

      @Override 
      public Cluster buildCluster(Cluster.Builder builder) { 
      return builder.addContactPoint("localhost").withPort(9042).build(); 
     } 
     })); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

====== POJO CLASS ================

@Table(keyspace= "cloud", name = "event") 
public class Event implements Serializable { 

    private static final long serialVersionUID = 3284839826384795926L; 

    @Column(name = "name") 
    private String name; 

    @Column(name = "msg") 
     private String msg; 

    public Event(){ 

    } 

    //...... 

} 

Antwort

0

Es gibt viele Gründe, warum ein Flink Job eine Ausgabe zu erzeugen, könnte scheitern. Einige der häufigsten Gründe sind:

  • die App nicht nennen env.execute()
  • die App wird Ereigniszeit verwenden, aber es gibt keine Wasserzeichen Generator
  • die Watermarking Logik irgendwie verwirrt ist, und
0

auf POJO zu Tuple Ändern, Hinzufügen Zeitstempel Wasserzeichen Code funktioniert perfekt keine Wasserzeichen erzeugt werden (zu spät kommen zB die App Wasserzeichen auf der Grundlage der CPU-Takt zu erzeugen, anstatt Ereignis Zeitstempel, jedes Ereignis verursacht). Ich kann sehen, dass meine Daten in die Cassandra-Datenbank geschrieben werden.

Datastream> Veranstaltungen = event_stream.flatMap (neu EventTransformation()). AssignTimestampsAndWatermarks ( neue AssignerWithPeriodicWatermarks>() {

 private static final long serialVersionUID = 1L; 
     private final long maxOutOfOrderness = 1_000L; // 1 
     // second 
     private long currentMaxTimestamp = 0; 

     @Override 
     public long extractTimestamp(Tuple3<String, String, Long> arg0, 
              long arg1) { 
       long timestamp = arg0.f3; // get 
       currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 
       return timestamp; 
     } 

     @Override 
     public Watermark getCurrentWatermark() { 
      return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
     } 
    }); 

       event_stream.addSink(new CassandraTupleSink<Tuple3<String, String, Long>("INSERT INTO cloud.condition (name, msg, time) VALUES (?,?,?);", new ClusterBuilder() { 

          /** 
          * 
          */ 
          private static final long serialVersionUID = 1L; 

          @Override 
          protected Cluster buildCluster(Builder builder) { 
           return builder.addContactPoint("localhost").withPort(9042).build(); 
          } 
         })); 

       env.setParallelism(2); 

       env.execute(); 
Verwandte Themen