2017-06-30 1 views
0

Ich versuche, ein haselnussbraunes Jet 0.3 DAG-System zu überarbeiten, das ich vor ein paar Wochen in v0.4 geschrieben habe, als ersten Schritt, es von Batch zu Stream zu ändern. Interessanterweise habe ich auf einmal ein merkwürdiges Verhalten, bei dem ich nicht sicher sein kann, dass die Scheitelpunkte wie erwartet funktionieren. Wenn ich versuche, auf das, was gerade passiert, herunterzukommen, finde ich keine Möglichkeit, in die inneren Funktionen jedes Eckpunktes hineinzuschauen. Gibt es eine Möglichkeit, um zumindest einige Fehlermeldungen aus ihnen zu bekommen?Wie kann man in die Funktionsweise eines Hazelcast Jet Vertex sehen?

In einem Versuch, das Problem zu isolieren, habe ich versucht, es zu einem sehr vereinfachten "lese aus der Liste, map es auf eine Karte schreiben auf Karte" DAG zu verdummen. Aber immer noch kein Erfolg, etwas rauszubekommen.

Unter meinem verdummten Beispiel mache ich vielleicht einen sehr einfachen Fehler, den jemand, der besser Bescheid weiß, sofort sehen wird?

Verlag:

// every second via executorservice: 
final IStreamMap<Long, List<byte[]>> data = jet.getMap("data"); 
data.set(jet.getHazelcastInstance().getAtomicLong("key").getAndIncrement(), myByteArray); 

Analyzer:

jet.getList(key.toString()).addAll((List<byte[]>) jet.getMap("data").get(key)); 
jet.getMap("data").remove(key); 
logger.debug("List {} has size: {}", key, jet.getList(key.toString()).size()); 

final Vertex sourceDataMap = this.newVertex("sourceDataMap", readList(key.toString())).localParallelism(1); 
final Vertex parseByteArrayToMap = this.newVertex("parseByteArrayToMap", map(
    (byte[] e) -> new AbstractMap.SimpleEntry<>(jet.getHazelcastInstance().getAtomicLong("counter").getAndIncrement(), e))); 
final Vertex sinkIntoResultMap = this.newVertex("sinkIntoResultMap", writeMap("result")); 

this.edge(between(sourceDataMap, parseByteArrayToMap)) 
    .edge(between(parseByteArrayToMap, sinkIntoResultMap)); 

Zuhörer:

jet.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>) 
     (EntryEvent<Long, byte[]> entryEvent) 
      -> logger.debug("Got result: {} at {}",entryEvent.getValue().length, System.currentTimeMillis()) 
     , true); 

Die Datengenerierung funktioniert ganz gut, alles ist ok bis die DAG übernehmen soll ... aber keine Fehlermeldungen oder irgendetwas kommt von der DAG. Irgendwelche Vorschläge?

+0

Können Sie zeigen uns eine [MCVE] (https://stackoverflow.com/help/mcve) das dein Problem zeigen kann? Und Sie können auch die Klasse 'DiagnosticProcessors' mit den von Ihnen beschriebenen Tools überprüfen. –

+0

Sie haben ein Lambda, das sich auf "Jet" bezieht. Die JetInstance ist nicht serialisierbar und kann daher nicht als Teil des Lambda verwendet werden. Wenn Sie eindeutige IDs generieren möchten, können Sie etwas wie 'UUID.randomUUID()' verwenden. –

+0

Ich habe versucht, einen Job von der DAG auszuführen, die Sie gepostet haben, und er löst eine "IllegalArgumentException" in der Zeile aus, die den Scheitelpunkt Can erwähnt. Es schlägt also schnell (und laut), bevor man überhaupt die Möglichkeit hat, einen Job zu beginnen. –

Antwort

1

Hier ist der Code leicht hygienisiert, die auf meiner Seite funktioniert:

public class Main { 
    public static void main(String[] args) throws Exception { 
     JetInstance jet = Jet.newJetInstance(); 
     try { 
      HazelcastInstance hz = jet.getHazelcastInstance(); 
      ILogger logger = hz.getLoggingService().getLogger("a"); 

      // every second via executorservice: 
      final IStreamMap<Long, List<byte[]>> data = jet.getMap("data"); 
      List<byte[]> myByteArray = asList(new byte[1], new byte[2]); 
      IAtomicLong keyGen = hz.getAtomicLong("key"); 
      Long key = keyGen.getAndIncrement(); 
      data.set(key, myByteArray); 

      String stringKey = key.toString(); 
      hz.getList(stringKey).addAll((List<byte[]>) jet.getMap("data").get(key)); 
      jet.getMap("data").remove(key); 
      logger.severe(String.format("List %s has size: %d", key, jet.getList(stringKey).size())); 

      hz.getMap("result").addEntryListener((EntryAddedListener<Long, byte[]>) 
        (EntryEvent<Long, byte[]> entryEvent) -> logger.severe(String.format(
          "Got result: %d at %d", entryEvent.getValue().length, System.currentTimeMillis())), 
        true); 

      DAG dag = new DAG(); 
      Vertex sourceDataMap = dag.newVertex("sourceDataMap", readList(stringKey)).localParallelism(1); 
      Vertex parseByteArrayToMap = dag.newVertex("parseByteArrayToMap", map(
        (byte[] e) -> entry(randomUUID(), e))); 
      Vertex sinkIntoResultMap = dag.newVertex("sinkIntoResultMap", writeMap("result")); 

      dag.edge(between(sourceDataMap, parseByteArrayToMap)) 
       .edge(between(parseByteArrayToMap, sinkIntoResultMap)); 

      jet.newJob(dag).execute().get(); 
      Thread.sleep(1000); 
     } finally { 
      Jet.shutdownAll(); 
     } 
    } 
} 

in der Konsolen ich sehe:

SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] List 0 has size: 2 
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 2 at 1498822322228 
SEVERE: [192.168.5.12]:5701 [jet] [0.4-SNAPSHOT] [3.8.2] Got result: 1 at 1498822322228 
Verwandte Themen