2017-03-27 3 views
0

Während ich an meiner DAG in hazelcast Jet arbeitete, stolperte ich über ein seltsames Problem. Um nach dem Fehler zu suchen, habe ich meinen Ansatz komplett heruntergeduckt und es scheint, dass die Kanten nicht gemäß dem Tutorial funktionieren.Wie bekomme ich eine einfache DAG in Hazelcast Jet?

Der folgende Code ist fast so einfach wie es geht. Zwei Ecken (eine Quelle, eine Senke), eine Kante.

Die Quelle liest aus einer Karte, die Senke sollte in eine Karte eingefügt werden.

Der data.addEntryListener sagt mir richtig, dass die Karte mit 100 Listen (jede mit 25 Objekten bei 400 Byte) von einer anderen Anwendung gefüllt wird ... und dann nichts. Die Karte füllt sich, aber der Dag interagiert überhaupt nicht damit.

Irgendeine Idee, wo man nach dem Problem sucht?

package be.andersch.clusterbench; 

import com.fasterxml.jackson.databind.ObjectMapper; 
import com.hazelcast.config.Config; 
import com.hazelcast.config.SerializerConfig; 
import com.hazelcast.core.EntryEvent; 
import com.hazelcast.jet.*; 
import com.hazelcast.jet.config.JetConfig; 
import com.hazelcast.jet.stream.IStreamMap; 
import com.hazelcast.map.listener.EntryAddedListener; 
import be.andersch.anotherpackage.myObject; 

import java.util.List; 
import java.util.concurrent.ExecutionException; 

import static com.hazelcast.jet.Edge.between; 
import static com.hazelcast.jet.Processors.*; 

/** 
* Created by abernard on 24.03.2017. 
*/ 
public class Analyzer { 
    private static final ObjectMapper mapper = new ObjectMapper(); 
    private static JetInstance jet; 
    private static final IStreamMap<Long, List<String>> data; 
    private static final IStreamMap<Long, List<String>> testmap; 

    static { 
     JetConfig config = new JetConfig(); 
     Config hazelConfig = config.getHazelcastConfig(); 
     hazelConfig.getGroupConfig().setName("name").setPassword("password"); 
     hazelConfig.getNetworkConfig().getInterfaces().setEnabled(true).addInterface("my_IP_range_here"); 
     hazelConfig.getSerializationConfig().getSerializerConfigs().add(
       new SerializerConfig(). 
         setTypeClass(myObject.class). 
         setImplementation(new OsamKryoSerializer())); 
     jet = Jet.newJetInstance(config); 
     data = jet.getMap("data"); 
     testmap = jet.getMap("testmap"); 
    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 

     DAG dag = new DAG(); 
     Vertex source = dag.newVertex("source", readMap("data")); 
     Vertex test = dag.newVertex("test", writeMap("testmap")); 

     dag.edge(between(source, test)); 

     jet.newJob(dag).execute()get(); 

     data.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got data: " + entryEvent.getKey() + " at " + System.currentTimeMillis() + ", Size: " + jet.getHazelcastInstance().getMap("data").size()); 
     }, true); 

     testmap.addEntryListener((EntryAddedListener<Long, List<String>>) (EntryEvent<Long, List<String>> entryEvent) -> { 
      System.out.println("Got test: " + entryEvent.getKey() + " at " + System.currentTimeMillis()); 
     }, true); 

     Runtime.getRuntime().addShutdownHook(new Thread(() -> Jet.shutdownAll())); 
    } 
} 

Antwort

1

Der Jet Job ist bereits an der Linie jet.newJob(dag).execute().get() abgeschlossen, bevor Sie selbst den Eintrag Zuhörer erstellt. Dies bedeutet, dass der Job auf einer leeren Karte ausgeführt wird. Vielleicht ist Ihre Verwirrung über die Art dieses Jobs: es ist ein Stapel Job, kein unendlicher Stream, der einen verarbeitet. Jet Version 0.3 unterstützt noch keine unendliche Stream-Verarbeitung.

+0

Haben Sie auch einen Vorschlag, wie Sie es lösen können? Ich habe eine IStreamMap mit einem geplanten Exceltor, um den Job zu erledigen, aber es ist ziemlich langsam. Neil schlug eine DAG vor (was sinnvoll ist), deshalb probiere ich das aus. –

+1

Es kann ein Mikro-Batch-Schema geben, das zum Funktionieren gebracht werden könnte; Ansonsten entwickelt das Team aktiv die Unterstützung für echte unendliche Stream-Verarbeitung. –

Verwandte Themen