-1

Ich versuche ein Bild mit einem ForkJoinPool in Java zu verarbeiten. Ich habe Streams verwendet, um einige benutzerdefinierte Operationen auf dem Bild auszuführen. Ich versuche, ForkJoinPool für getRGB und setRGB Methoden zu verwenden. Wie erreiche ich Parallelität auf getRGB Methoden?ForkJoinPool BufferedImage Verarbeitungsstil

@Override 
    public int[] getRGB(int xStart, int yStart, int w, int h, int[] rgbArray,int offset, int scansize) { 

     int[][] sol = new int[h][w]; 

     int threshold = w; 

     class RecursiveSetter extends RecursiveAction { 
      int from; 
      int to; 
      FJBufferedImage image; 

      RecursiveSetter(int from, int to, FJBufferedImage image) { 
       this.from = from; 
       this.to = to; 
       this.image = image; 
      } 

      @Override 
      protected void compute() { 
       System.out.println("From : " + from + " To : " + to); 
       if (from >= to) return; 

       if (to - from == 1) { 
        computeDirectly(from); 
        return; 
       } else { 
        int mid = from + (to - from)/2; 
        System.out.println("From : " + from + " To : " + to + 
          "Mid :" + mid); 
        invokeAll(
          new RecursiveSetter(from, mid, image), 
          new RecursiveSetter(mid + 1, to, image)); 
        return; 
       } 
      } 

      void computeDirectly(int row) { 
       sol[from] = image.getRealRGB(from, 0, w, 1, null, offset, 
         scansize); 

      } 
     } 

     ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); 
     pool.invoke(new RecursiveSetter(0, h-1, this)); 
     return Arrays.stream(sol) 
       .flatMapToInt(Arrays::stream) 
       .toArray(); 
    } 

Die getRealRGB Proxys nur auf das Verfahren von BufferedImage. Ich verstehe, dass dies möglicherweise unpraktisch ist, aber ich möchte nur wissen, wie kann ich ForkJoinPool in diesem Zusammenhang verwenden. Und ja, der obige Code wirft ArrayIndexOutOfBound Exception. Bitte geben Sie einen Vorschlag, wie Sie die Arbeitsbelastung aufteilen (Reihe vs Spalte gegen kleines Gitter. Im Moment mache ich das spaltenweise) und wie man den Schwellenwert entscheidet.

Antwort

3

Zunächst einige Anmerkungen zu Ihrem Versuch:

int[][] sol = new int[h][w]; 

hier gibt es eine zweidimensionale Anordnung schaffen in Java, die mit Elementtyp ein eindimensionales Array ist int[], die bereits mit Subarray des int[] Art bevölkert . Da Sie die Elemente mit sol[from] = /* something returning an int[] array */ überschreiben werden, ist die Zuweisung dieser Sub-Arrays veraltet. In diesem Fall sollten Sie stattdessen

int[][] sol = new int[h][]; 

verwenden. Aber die Anerkennung der eindimensionale Natur der äußeren Anordnung ermöglicht erkennt auch, dass eine einfache Streaming-Lösung wird die Arbeit tun, das heißt

int[][] sol = IntStream.range(yStart, yStart+h) 
    .parallel() 
    .mapToObj(y -> getRealRGB(xStart, y, w, 1, null, 0, scansize)) 
    .toArray(int[][]::new); 

Das bereits erledigt die Arbeit auf den verfügbaren Kernen die Arbeitsbelastung zu verteilen. Es verwendet das Fork/Join-Framework hinter den Kulissen, genau wie Sie es versucht haben, aber das ist ein Implementierungsdetail. Sie könnten das mit der nächsten Stromoperation, z.

return IntStream.range(yStart, yStart+h) 
    .parallel() 
    .flatMap(y -> Arrays.stream(getRealRGB(xStart, y, w, 1, null, 0, scansize))) 
    .toArray(); 

aber, wenn ich die Methode Signatur richtig verstehe, wollen Sie tatsächlich

public int[] getRGB(
     int xStart, int yStart, int w, int h, int[] rgbArray, int offset, int scansize) { 

    final int[] result = rgbArray!=null? rgbArray: new int[offset+h*scansize]; 
    IntStream.range(yStart, yStart+h).parallel() 
     .forEach(y -> getRealRGB(xStart, y, w, 1, result, offset+y*scansize, scansize)); 
    return result; 
} 

zu erfüllen, den Vertrag zu tun. Dies minimiert auch die Anzahl der Kopiervorgänge. Da jede Abfrage in eine andere Region des Arrays schreibt, ist das direkte Schreiben in das Zielarray threadsicher.

Dies hält die Strategie der Teilung von Reihen von nur Zeilen. Das Unterteilen von Zeilen ist möglich, aber komplizierter, während es sich selten auszahlt. Es würde nur im Falle eines Anrufers helfen, der eine sehr kleine Anzahl von Zeilen, aber viele Werte pro Zeile anfordert. Aber selbst dann ist nicht klar, ob sich das komplizierte Unterspalten wegen der Speicherlokalität auszahlen würde.


Ihre ursprüngliche Frage angeht, wenn Sie eine ForkJoinTask direkt implementieren, können Sie getSurplusQueuedTaskCount() verwenden, um zu entscheiden, ob wieder aufzuspalten oder direkt berechnen.

Die Auswahl des Schwellenwerts ist ein Kompromiss zwischen Overhead aufgrund der Anzahl der Aufgabenobjekte, die synchronisiert werden müssen, und Core-Auslastung. Wenn die Arbeitslast perfekt ausgeglichen werden kann und kein anderer unabhängiger Thread oder Prozess CPU-Zeit verbraucht, wäre ein einzelner Artikel pro Kern perfekt.In der Praxis laufen die Tasks jedoch nie genau zur selben Zeit, so dass es wünschenswert ist, eine Spare-Split-Task zu verwenden, die von den Kernen ausgeführt wird, die zuerst fertig waren. Ein typischer Schwellenwert liegt zwischen 1 oder 3 (denken Sie daran, dass die Anzahl der Aufgaben in der Warteschlange pro Kern ist), für Ihre Art von Aufgaben mit sehr gleichmäßigen Arbeitsbelastungen könnte eine kleinere Anzahl verwendet werden, z. Beenden Sie die Aufteilung, sobald ein anderes Element der Warteschlange vorhanden ist.