2017-11-22 3 views
0

Ich versuche eine Aufgabe zu erledigen, bei der mehrere Produzenten und Konsumenten Zufallszahlen in einem beschränkten Puffer produzieren und konsumieren müssen. Hier ist mein Code:Producer-Consumer, gebundener Puffer-Eingabefehler?

import java.util.Deque; 
import java.util.LinkedList; 
import java.util.Random; 
import java.util.Scanner; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ExecutorService; 
import java.nio.Buffer; 
import java.util.concurrent.TimeUnit; 

public class ProducerConsumer 
{ 
    interface Buffer 
    { 
     public abstract void insert(int item); 
     public abstract int remove(); 
    } 

    static class BoundedBuffer implements Buffer 
    { 
     private static final int maxSize = 5; 

     private Semaphore mutex; 
     private Semaphore empty; 
     private Semaphore full; 

     private int count; 
     private int in; 
     private int out; 
     private int[] buffer; 
     //private ArrayList<Integer> buffer = new ArrayList<Integer>(); 

     public BoundedBuffer() 
     { 
      mutex = new Semaphore(1); 
      empty = new Semaphore(5); 
      full = new Semaphore(0); 

      count = 0; 
      in = 0; 
      out = 0; 
     } 

     public synchronized int remove() 
     { 
      int item = 0; 

      while (count == 0) 
      { 

      } 

      try{ 
       full.acquire(); 
       mutex.acquire(); 
      }catch (InterruptedException e) { 
       System.out.println("REMOVAL ERROR: " + e); 
      } 

      count--; 
      item = buffer[out]; 
      out = (out+1) % maxSize; 

      System.out.println("  Consumer consumed: " + item); 

      mutex.release(); 
      empty.release(); 

      return item; 
     } 

     public synchronized void insert(int item) 
     { 
      while (count == maxSize) 
      { 

      } 

      try{ 
       empty.acquire(); 
       mutex.acquire(); 
      }catch (InterruptedException e) { 
       System.out.println("INSERTION ERROR: " + e); 
      } 

      count++; 
      buffer[in] = item; 
      in = (in+1) % maxSize; 

      System.out.println("Producer produced " + item); 

      mutex.release(); 
      full.release(); 
     } 
    } 

    static class Producer implements Runnable 
    { 
     private Buffer buffer; 

     public Producer(Buffer b) 
     { 
      buffer = b; 
     } 

     public void run() 
     { 
      Random proRand = new Random(); 
      Random sleepRand = new Random(); 

      for (int i = 0; i < 100; i++) 
      { 
       try{ 
        Thread.sleep(sleepRand.nextInt((500 - 0) + 0)); 
       }catch (InterruptedException e) { 
        System.out.println("PRODUCER INTERRUPT: " + e); 
       } 

       buffer.insert(proRand.nextInt((99999 - 10000) + 10000)); 
      } 

     } 
    } 

    static class Consumer implements Runnable 
    { 
     private Buffer buffer; 

     public Consumer(Buffer b) 
     { 
      buffer = b; 
     } 

     public void run() 
     { 
      Random sleepRand = new Random(); 

      for (int i = 0; i < 100; i++) 
      { 
       try{ 
        Thread.sleep(sleepRand.nextInt((500 - 0) + 0)); 
       }catch (InterruptedException e) { 
        System.out.println("CONSUMER INTERRUPT: " + e); 
       } 

       buffer.remove(); 
      } 
     } 
    } 

    public static void main(String[] args) 
    { 
     Scanner scanner = new Scanner(System.in); 
     int sleepTime = scanner.nextInt(); 
     int numPro = scanner.nextInt(); 
     int numCon = scanner.nextInt(); 
     scanner.close(); 

     System.out.println("Using arguments from command line"); 
     System.out.println("Sleep time = " + sleepTime); 
     System.out.println("Producer threads = " + numPro); 
     System.out.println("Consumer threads = " + numCon); 
     System.out.println(); 

     Buffer shared = new BoundedBuffer(); 

     /* proThread = new Thread(new Producer(shared)); 
     Thread conThread = new Thread(new Consumer(shared)); 

     proThread.start(); 
     conThread.start();*/ 


     ExecutorService proPool = Executors.newFixedThreadPool(numPro); 
     for (int i = 0; i < numPro; i++) 
     { 
      proPool.submit(new Producer(shared)); 

     } 
     proPool.shutdown(); 

     ExecutorService conPool = Executors.newFixedThreadPool(numCon); 
     for (int i = 0; i < numCon; i++) 
     { 
      conPool.submit(new Consumer(shared)); 
     } 
     conPool.shutdown(); 

     try{ 
      if (!proPool.awaitTermination(20, TimeUnit.SECONDS)) 
      { 
       proPool.shutdownNow(); 
      } 
     }catch (InterruptedException e) { 
      System.out.println("TERMINATION ERROR: " + e); 
     } 

     try{ 
      if (!conPool.awaitTermination(20, TimeUnit.SECONDS)) 
      { 
       conPool.shutdownNow(); 
      } 
     }catch (InterruptedException e) { 
      System.out.println("TERMINATION ERROR: " + e); 
     } 

     /*for (int i = 0; i < numPro; i++) 
     { 
      Runnable produce = new Producer(); 
     } 

     for (int i = 0; i < numCon; i++) 
     { 
      Runnable consume = new Consumer(); 
     }*/ 

     //Runnable produce = new Producer(); 
     //Runnable consume = new Consumer(); 

     //Thread pro = new Thread(produce, "pro"); 
     //Thread con = new Thread(consume, "con"); 
    } 

} 

Gerade jetzt, ich erhalte die folgende Ausgabe nach '20 5 1' Eingabe:

„20 5 1

Mit Argumenten von der Kommandozeile

Schlafzeit = 20

Produzent threads = 5

Consumer th liest = 1

INSERTION ERROR: java.lang.InterruptedException

INSERTION ERROR: java.lang.InterruptedException

INSERTION ERROR: java.lang.InterruptedException

AUS- ERROR: java.lang. InterruptedException

Einfügefehlers: java.lang.InterruptedException“

ich bin ein wenig verwirrt was das verursacht. Brauche ich eine andere Datenstruktur für meinen begrenzten Puffer?

Antwort

0

konnte ich die folgenden Fehler erkennen:

  • int [] Puffer initialisiert wird nie für "count", "in" und "out"
  • nicht verwenden
  • Verwendung flüchtiges Stichwort „synchronisiert“, wenn Sie bereits Schlösser mit
  • die „voll“ und „leer“ Semaphore sind nicht erforderlich, entfernen Sie sie
  • da Sie einfügen und 100-mal mit bis zu 500 ms Pausen zu entfernen, verwenden Sie am wenigsten 50sec in awaitTermination()

Die folgende Version des Programms funktioniert

public static class ProducerConsumer { 

     interface Buffer { 
      void insert(int item); 

      int remove(); 
     } 

     static class BoundedBuffer implements Buffer { 

      private static final int maxSize = 5; 

      private final int[] buffer = new int[maxSize]; 

      private final Semaphore mutex; 
      private volatile int count; 
      private volatile int in; 
      private volatile int out; 

      //private ArrayList<Integer> buffer = new ArrayList<Integer>(); 

      public BoundedBuffer() { 

       mutex = new Semaphore(1); 
       count = 0; 
       in = 0; 
       out = 0; 
      } 

      public int remove() { 

       int item = 0; 

       while (count == 0) { 
       } 

       try { 
        mutex.acquire(); 
       } catch (InterruptedException e) { 
        System.out.println("REMOVAL ERROR: " + e); 
       } 

       count--; 
       item = buffer[out]; 
       out = (out + 1) % maxSize; 

       System.out.println("Consumer consumed: " + item); 

       mutex.release(); 

       return item; 
      } 

      public void insert(int item) { 

       while (count == maxSize) { 
       } 

       try { 
        mutex.acquire(); 
       } catch (InterruptedException e) { 
        System.out.println("INSERTION ERROR: " + e); 
       } 

       count++; 
       buffer[in] = item; 
       in = (in + 1) % maxSize; 

       System.out.println("Producer produced " + item); 

       mutex.release(); 
      } 
     } 

     static class Producer implements Runnable { 

      private Buffer buffer; 

      public Producer(Buffer b) { 
       buffer = b; 
      } 

      public void run() { 

       Random proRand = new Random(); 
       Random sleepRand = new Random(); 

       for (int i = 0; i < 100; i++) { 
        try { 
         Thread.sleep(sleepRand.nextInt(500)); 
        } catch (InterruptedException e) { 
         System.out.println("PRODUCER INTERRUPT: " + e); 
        } 

        try { 
         buffer.insert(proRand.nextInt((99999 - 10000) + 10000)); 
        } catch (Exception e) { 
         System.out.println("Error while inserting " + e); 
        } 
       } 

      } 
     } 

     static class Consumer implements Runnable { 
      private Buffer buffer; 

      public Consumer(Buffer b) { 
       buffer = b; 
      } 

      public void run() { 

       Random sleepRand = new Random(); 

       for (int i = 0; i < 100; i++) { 
        try { 
         Thread.sleep(sleepRand.nextInt(500)); 
        } catch (InterruptedException e) { 
         System.out.println("CONSUMER INTERRUPT: " + e); 
        } 

        try { 
         buffer.remove(); 
        } catch (Exception e) { 
         System.out.println("Error while removing " + e); 
        } 
       } 
      } 
     } 

     public static void main(String[] args) { 

      int sleepTime = 20; 
      int numPro = 5; 
      int numCon = 1; 

      System.out.println("Using arguments from command line"); 
      System.out.println("Sleep time = " + sleepTime); 
      System.out.println("Producer threads = " + numPro); 
      System.out.println("Consumer threads = " + numCon); 
      System.out.println(); 

      Buffer shared = new BoundedBuffer(); 

     /* proThread = new Thread(new Producer(shared)); 
     Thread conThread = new Thread(new Consumer(shared)); 

     proThread.start(); 
     conThread.start();*/ 

      ExecutorService proPool = Executors.newFixedThreadPool(numPro); 
      for (int i = 0; i < numPro; i++) { 
       proPool.submit(new Producer(shared)); 

      } 
      proPool.shutdown(); 

      ExecutorService conPool = Executors.newFixedThreadPool(numCon); 
      for (int i = 0; i < numCon; i++) { 
       conPool.submit(new Consumer(shared)); 
      } 
      conPool.shutdown(); 

      try { 
       if (!proPool.awaitTermination(50, TimeUnit.SECONDS)) { 
        proPool.shutdownNow(); 
       } 
      } catch (InterruptedException e) { 
       System.out.println("TERMINATION ERROR: " + e); 
      } 

      try { 
       if (!conPool.awaitTermination(50, TimeUnit.SECONDS)) { 
        conPool.shutdownNow(); 
       } 
      } catch (InterruptedException e) { 
       System.out.println("TERMINATION ERROR: " + e); 
      } 
     } 

    } 
Verwandte Themen