0

Hier ist mein Code für die ConcurrentApp-Klasse, die die Quelle meines Problems ist:Java-Programm-Laufzeit unverändert, wenn mehr Threads hinzufügen und zu unterschiedlichen Ergebnissen

class Processor implements Runnable { 

    private int id; 
    private Integer interaction; 
    private Set<Integer> subset; 
    private Set<Integer> y; 
    private Object lock = new Object(); 

    public DCP<BSN> dcp; 



    public Processor(int id, Integer interaction, Set<Integer> subset, DCP<BSN> dcp, Set<Integer> y) { 
     this.id = id; 
     this.interaction = interaction; 
     this.subset= subset; 
     this.dcp = dcp; 
     this.y = y; 
    } 

    public void run() { 
     //System.out.println("Starting: " + this.id); 
     if (this.y.contains(this.interaction)){ 
      this.subset.add(this.interaction); 
      processRemoval(this.subset); 
     } 

     //System.out.println("Completed: " + this.id); 
    } 

    public void processRemoval(Set<Integer> collection){ 
     synchronized(Processor.lock) { 
      for (Iterator<Integer> iter = this.y.iterator(); iter.hasNext();) { 
       int element = iter.next(); 
       while(element != this.interaction){ 
        element = iter.next(); 
       } 
       this.dcp.increaseScore(collection); 
       if (!collection.contains(this.interaction)) { 
        System.out.println(element + " WAS REMOVED by thread " + this.id); 
        iter.remove(); 
       } 
      } 
     } 
    } 

} 


public class ConcurrentApp { 

    public void multiRP (DCP<BSN> dcp, int threads) { 

     ConcurrentHashMap<Integer,Boolean> x = new ConcurrentHashMap<Integer,Boolean>(); 
     ConcurrentHashMap<Integer,Boolean> z = new ConcurrentHashMap<Integer,Boolean>(); 
     Set<Integer> y = (Set<Integer>) Collections.newSetFromMap(x); 
     y.addAll(dcp.PA); 
     Set<Integer> zeta = (Set<Integer>) Collections.newSetFromMap(z); 
     ExecutorService executor = Executors.newFixedThreadPool(threads); 

     int i =1; 
     while ((y.size() > i) && (i <= dcp.R)){ 
      for (Iterator<Integer> iterator = y.iterator(); iterator.hasNext();){ 
       zeta.addAll(y); 
       Integer interaction = iterator.next(); 
       zeta.remove(interaction); 
       ArrayList<Set<Integer>> subsets = dcp.getSubsets(zeta, i); 
       for (int j = 0; j< subsets.size(); j++){ 
        executor.submit(new Processor(j, interaction, subsets.get(j), dcp, y)); 
       } 
      } 
      i++; 
     } 
     executor.shutdown(); 
     System.out.println("All tasks submitted"); 
     try { 
      executor.awaitTermination(1, TimeUnit.DAYS); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     System.out.println(y); 
     dcp.PA = new ArrayList<Integer>(y); 
     System.out.println("All tasks completed"); 
    } 
} 

und zugehörige Code für die Klasse DCP, die einige enthält Hilfsfunktionen, ist hier:

public class DCP<E> extends CondInd<E> { 
    public final int R;    //assumed maximum # of nodes blocking any node from target 
    public ArrayList <Integer> PA; //set of all parents of the target node (should be static) 
    //public Node NULL;  //NULL is model with no parents of target (should be final static) 
    public E scoreType = null; 
    public ScoringFunction<? super E> scoringFunction; 

    public double calcScore(E sT, Set<Integer> parentIndices) { 
     ArrayList<Integer> list = new ArrayList<Integer>(parentIndices); 
     return this.scoringFunction.score(sT, list); 
    } 

    public double calcScore(E sT, ArrayList<Integer> parentIndices) { 
     return this.scoringFunction.score(sT, parentIndices); 
    } 


    //helper for actual subsets method 
    private void getSubsets(ArrayList<Integer> input, int length, int start_index, Set<Integer> acc, ArrayList<Set<Integer>> sol){ 
     //base case 
     if (acc.size() == length){ 
      sol.add(new HashSet<>(acc)); 
      return; 
     } 
     //recursive solution 
     if (start_index == input.size()) return; 
     int x = input.get(start_index); 
     acc.add(x); 
     //first recursion 
     getSubsets(input, length, start_index+1, acc, sol); 
     acc.remove(x); 
     //second recursion, after x removed 
     getSubsets(input, length, start_index+1, acc, sol); 
    } 

    //different arguments and returns a list of subsets 
    public ArrayList<Set<Integer>> getSubsets(ArrayList<Integer> input, int length){ 
     ArrayList<Set<Integer>> sol = new ArrayList<>(); 
     getSubsets(input, length, 0, new HashSet<Integer>(), sol); 
     return sol; 
    } 

    //different arguments and returns a list of subsets 
    public ArrayList<Set<Integer>> getSubsets(Set<Integer> input, int length){ 
     ArrayList<Set<Integer>> sol = new ArrayList<>(); 
     ArrayList<Integer> copy = new ArrayList<Integer>(input); 
     getSubsets(copy, length, 0, new HashSet<Integer>(), sol); 
     return sol; 
    } 


    //removes the element from the input that increases the score by the highest value 
    public void increaseScore(Set<Integer> input){ 
     int index = -1; 
     double score = calcScore(scoreType,input); 
     List<Integer> list = new ArrayList<Integer>(input); 
     for (Integer element : list) { 
      ArrayList<Integer> copy_list = new ArrayList<Integer>(list); 
      copy_list.remove(element); 
      if (calcScore(scoreType,copy_list) > score){ 
       index = list.indexOf(element); 
       score = calcScore(scoreType,copy_list); 
      } 
     } 
     if (index != -1) 
      input.remove(list.get(index)); 
    } 

    public DCP(int maximumNodes, E scoreType, ScoringFunction<? super E> scoringFunction, ArrayList<Integer> parents){ 
     this.R = maximumNodes; 
     this.scoreType = scoreType; 
     this.scoringFunction = scoringFunction; 
     this.PA = parents; 
    } 
} 

Wenn ich mit threads = 1 mein Code in ConcurrentApp laufen, erhalte ich die folgende Ausdruck meiner Konsole basiert auf meiner print-Anweisungen:

All tasks submitted 
0 WAS REMOVED by thread 1 
1 WAS REMOVED by thread 0 
2 WAS REMOVED by thread 0 
3 WAS REMOVED by thread 0 
4 WAS REMOVED by thread 1 
5 WAS REMOVED by thread 0 
6 WAS REMOVED by thread 0 
[7, 8] 
All tasks completed 
Program completed in :22 seconds 

wobei die erste Zahl der aus meiner Liste entfernten Ganzzahl entspricht (z. "0 wurde von Thread 2 entfernt" bedeutet, dass der Wert 0 aus der Hauptliste entfernt wurde y). Diese Ausgabe ist sinnvoll, da jeder Wert, der gelöscht werden musste, einmal gelöscht wurde und das erwartete Ergebnis [7, 8] liefert. Dies sind die einzigen zwei Werte, die in diesem Fall nicht gelöscht wurden.

Allerdings, wenn ich meinen Code mit> 1 Thread ausgeführt, erhalte ich die folgende Ausgabe:

All tasks submitted 
0 WAS REMOVED by thread 2 
1 WAS REMOVED by thread 1 
2 WAS REMOVED by thread 0 
2 WAS REMOVED by thread 1 
3 WAS REMOVED by thread 1 
3 WAS REMOVED by thread 2 
4 WAS REMOVED by thread 0 
4 WAS REMOVED by thread 1 
5 WAS REMOVED by thread 0 
5 WAS REMOVED by thread 1 
6 WAS REMOVED by thread 0 
6 WAS REMOVED by thread 1 
7 WAS REMOVED by thread 1 
7 WAS REMOVED by thread 2 
8 WAS REMOVED by thread 1 
8 WAS REMOVED by thread 0 
[] 
All tasks completed 
Program completed in :0 seconds 

Wie Sie mehrere Male entfernt, da mehrere Threads in einigen Fällen sehen können, wurde der gleiche Wert, dass beschlossen Wert musste entfernt werden. Ein weiteres Problem ist, dass dies auch mein Ergebnis ändert, indem ich ein [] anstelle von [7, 8] gebe, weil das Programm fälschlicherweise entscheidet, dass 7 und 8 aus der Hauptliste y gelöscht werden müssen, wenn ich mehrere Threads verwende. I fixed dieses Problem des Mehrfachfaden Löschens durch statische auf das Sperrfeld Zugabe:

private static Object lock = new Object(); 

jedoch jetzt ich das Problem, dass die Laufzeit nicht ändert, wenn ich die Anzahl der Threads erhöhen. Die Ausgabe von Threads> = 1 nach der Zugabe von statischen unter:

All tasks submitted 
0 WAS REMOVED by thread 1 
1 WAS REMOVED by thread 1 
2 WAS REMOVED by thread 1 
3 WAS REMOVED by thread 0 
4 WAS REMOVED by thread 1 
5 WAS REMOVED by thread 0 
6 WAS REMOVED by thread 0 
[7, 8] 
All tasks completed 
Program completed in :22 seconds 

Die Anzahl der Threads nicht die Laufzeit verbessern, aber ich bekomme das richtige Ergebnis. Dieses Ergebnis und die Laufzeit sind genau gleich, egal ob ich 1 Thread oder viele verwende.

Frage: Wie ich es sehe, gibt es zwei mögliche Lösungen:

1) entfernen Sie das Schlüsselwort static auf das Schloss und einen Weg für einen Thread finden die Entfernung der Durchführung der anderen Threads zu erzählen, die überspringen entfernter Wert

2) behalte das statische Schlüsselwort und finde heraus, warum mein Programm nur 1 Thread benutzt, wenn mehr verfügbar sind.

Alle Ideen werden sehr geschätzt!

+5

Es ist schwer zu glauben, dass der ganze Code für die Frage relevant ist. Könnten Sie versuchen, es auf einen [mcve] zu reduzieren? – shmosel

+0

@smossel der erste Codeblock ist der Hauptcode, den Sie für das Problem benötigen. Ich habe vorher ähnlichen Code gepostet und mir wurde gesagt, dass ich auch den zweiten Codeblock einbauen soll, um die Dinge einfacher zu machen. Bitte konzentrieren Sie sich auf den ersten Code-Block, aber beziehen Sie sich auf den zweiten Code-Block, wenn Sie einige der Funktionen/Klassen im ersten Block verstehen müssen. Dies ist, glaube ich, ein vollständiges Beispiel, jedoch wird ein überprüfbares Beispiel aufgrund der verschiedenen Pakete und Abhängigkeiten, die zum Ausführen des Codes erforderlich sind, viel mehr Code benötigen. –

Antwort

0

(Sie müssen wirklich lernen, einen minimalen Teil des Codes zu posten, mein Freund).

Meine Diagnose: Ihr Programm verhält sich das gleiche in Multithreading weil processRemoval ist mit dem Rest der Fäden Synchronisierung vor der Verarbeitung des ganzen Satzes, also kein Wunder, warum der Satz nur durch den ersten Thread verarbeitet wird. In diesen Fällen ist es üblich, die Threads vor der Verarbeitung jedes Elements zu synchronisieren.So, es scheint, dass Sie Ihren synchronize Block in die Schleife verschieben sollten.

In diesem Fall ändern Sie jedoch den Satz innerhalb der Schleife, und dies wird wahrscheinlich ConcurrentModificationException produzieren. Um dies zu vermeiden, schlage ich vor, dass Sie auch die Verwendung von HashSet durch eine andere gleichzeitige Implementierung von Set ersetzen. Zum Beispiel, ConcurrentSkipListSet oder CopyOnWriteArraySet, treffen Sie Ihre Wahl.

+0

Schätzen Sie die Eingabe. Ich habe versucht, das 'synchronize' Schlüsselwort zu verschieben, aber scheinbar nichts zu tun (ich habe es in die for-Schleife verschoben und verschiedene Stellen innerhalb' ProcessRemoval' ausprobiert). Ich bekomme auch keine 'ConcurrentModificationException', weil meine Sets, auf die mehrere Threads zugreifen, alle von threadsicheren Implementierungen von' ConcurrentHashMap' unterstützt werden –

Verwandte Themen