2016-08-15 5 views
0

Ich bin in der Lage Affinität Computer-Daten Kollokation mit Apache zu zünden. In den folgenden zwei Beispielen funktioniert es wie erwartet.Operate auf zwischengespeicherten Daten pro Knoten

// Works on all nodes 
    IgniteUtil.getIgnite().compute().broadcast(() -> { 
     System.out.println("Should happen on all nodes"); 
     cache.get(key).forEach(x -> { 
      System.out.println(x); 
     }); 
    }); 

    // Works on just the one node 
    IgniteUtil.getIgnite().compute().affinityRun(IgniteUtil.CACHE_NAME, key ,() -> { 
     System.out.println("Should only happen on one node"); 
     cache.get(key).forEach(x -> System.out.println(x)); 
    }); 

Allerdings möchte ich ein Lambda gegen alle Knoten Daten ausführen. So zum Beispiel, dass ich für jede Person alle Bestellungen von Amazon zwischengespeichert habe. Ich möchte wissen, wie hoch der Gesamtbetrag für jeden ist.

Ich vermisse wahrscheinlich nur ein Beispiel, aber laut den Dokumenten sehe ich nicht, wie man das macht. In den Beispielen, die ich gesehen habe, muss ich die Schlüssel angeben, mit denen ich rechnen möchte. In diesem Beispiel möchte ich nur etwas Lambda auf allen Knoten ausführen können, wobei jeder Knoten nur mit seinem eigenen Anteil an den Daten arbeitet.

Ich habe versucht, diese

IgniteUtil.getIgnite().compute().affinityRun(IgniteUtil.CACHE_NAME, key ,() -> { 
     System.out.println("Should only happen once per node"); 
     List<Integer> count = new ArrayList<Integer>(); 
     System.out.println("Size: " + Sets.newHashSet(cache.iterator()).size()); 
     cache.iterator().forEachRemaining(x -> {count.add(count.size());}); 
     System.out.println("Calculated Size: " + count.size()); 
     System.out.println("Values: "); 
     cache.get(key).forEach(x -> System.out.print(x)); 
     System.out.println(); 
    }); 

tun und es nur auf dem Knoten ausführt, der den Schlüssel hat, jedoch die Cache-Größe die volle Cache-Größe, nicht nur die Werte, die lokal sind.

Irgendwelche Vorschläge?

Antwort

1

Sie können einen Abschluss wie in Ihrem ersten Beispiel senden und die Methode IgniteCache.localEntries() verwenden, um die lokalen Daten zu durchlaufen.

+0

Nochmals vielen Dank! Ein weiterer Schritt, wenn ich dies auf eine fehlertolerante Weise tun wollte, gibt es einen eingebauten Mechanismus, um dies zu tun? Oder müsste ich etwas implementieren, um zu erkennen, wenn ein Knoten ausfällt und sich anpasst? –

+0

Ja, Sie müssen anpassen, wenn sich die Topologie mitten in der Ausführung ändert. Sie können auf EVT_NODE_LEFT- und EVT_NODE_FAILED-Ereignisse warten und entsprechend reagieren. –

Verwandte Themen