2016-06-07 4 views
1

Ich habe gerade angefangen Multithreading zu lernen. Ich habe 5 Produzenten und 2 Konsumenten in mehreren Threads. Grundsätzlich fügt dieses Programm 100 Elemente in die Warteschlange ein. Der Producer beendet das Hinzufügen, wenn die Warteschlangengröße 100 beträgt. Ich möchte, dass der Consumer den Producer benachrichtigt, wenn der Consumer alle Elemente aus der Warteschlange entfernt, sodass der Producer das Hinzufügen erneut starten kann. Momentan wartet der Produzent, wird aber vom Verbraucher nie benachrichtigt.Warten und benachrichtigen in Consumer- und Producer-Threads

Hersteller:

public class Producer implements Runnable { 

private BlockingQueue sharedQueue; 
private final int queueSize; 
private Object lock = new Object(); 

    public Producer(BlockingQueue sharedQueue, int queueSize){ 
    this.sharedQueue = sharedQueue; 
    this.queueSize = queueSize; 
    } 

    public void run() { 
    while(true) { 
     if(sharedQueue.size()== queueSize){ 

       try { 
        synchronized (lock) { 
        sharedQueue.wait(); 
        } 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
     } 

     try { 
      sharedQueue.put("Producer: " + sharedQueue.size()); 
      Thread.sleep(500); 
      System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread()); 

      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

Verbraucher:

public class Consumer implements Runnable{ 

private BlockingQueue sharedQueue; 
private final int queueSize; 
private final int queueEmpty=0; 
private Object lock = new Object(); 

    public Consumer(BlockingQueue sharedQueue, int queueSize){ 
    this.sharedQueue = sharedQueue; 
    this.queueSize = queueSize; 
    } 
//Notify awaiting thread if the sharedQueue is empty 
    public void run() { 
    while (true) { 
     if(sharedQueue.size()==queueEmpty){ 
      synchronized (lock) { 
      this.notifyAll(); 
      } 
     } 
      try { 

        sharedQueue.take(); 
        Thread.sleep(800); 
        System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread()); 

      }catch(InterruptedException e){ 
       e.printStackTrace(); 
      } 
    } 

    } 
} 

Hauptklasse

public class App{ 

//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service 
public static void main(String[] args) 
{ 
    final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100); 
    final int queueSize =100; 
    final int producerNum = 5; 
    final int consumerNum = 2; 

    final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum); 
    final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum); 

    for(int i=0;i<producerNum;i++){ 
     Producer producer = new Producer(sharedQueue,queueSize); 
     executorProducer.execute(producer); 
    } 

    for(int j=0;j<consumerNum;j++){ 
     Consumer consumer = new Consumer(sharedQueue,queueSize); 
     executorConsumer.execute(consumer); 
    } 



    } 
} 
+0

Was denkst du 'this.notifyAll();' tut? Warum denkst du das? –

+0

Dieser Code sollte IllegalMonitorStateException auslösen, wenn notifyAll aufgerufen wird. –

+0

Ich denke, notifyAll wird alle Threads, die auf dieses Objekt warten, aufwachen. Die erwachten Threads können nicht fortfahren, bis der aktuelle Thread die Sperre für dieses Objekt aufgibt. Also mein Fehler hier ist, dass warten und benachrichtigen müssen für das gleiche Objekt verwendet werden? – hao

Antwort

2

von Oracle Dokumentation page:

BlockingQueue-Implementierungen sind Thread-sicher. Alle Warteschlangen Methoden erzielen ihre Wirkung atomar interne Sperren oder andere Formen der Gleichzeitigkeitssteuerung mit

Da Sie bereits BlockingQueues verwenden, können Sie loswerden wait() und notify() APIs erhalten.

Beispielcode für mehrere Hersteller und Verbraucher mit BlockingQueue:

import java.util.concurrent.*; 

public class ProducerConsumerDemo { 

    public static void main(String args[]){ 

    BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); 

     Thread prodThread1 = new Thread(new Producer(sharedQueue,1)); 
     Thread prodThread2 = new Thread(new Producer(sharedQueue,2)); 
     Thread consThread1 = new Thread(new Consumer(sharedQueue,1)); 
     Thread consThread2 = new Thread(new Consumer(sharedQueue,2)); 

     prodThread1.start(); 
     prodThread2.start(); 
     consThread1.start(); 
     consThread2.start(); 
    } 

} 

class Producer implements Runnable { 

    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.threadNo = threadNo; 
     this.sharedQueue = sharedQueue; 
    } 

    @Override 
    public void run() { 
     for(int i=1; i<= 5; i++){ 
      try { 
       int number = i+(10*threadNo); 
       System.out.println("Produced:" + number + ":by thread:"+ threadNo); 
       sharedQueue.put(number); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 

} 

class Consumer implements Runnable{ 

    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.sharedQueue = sharedQueue; 
     this.threadNo = threadNo; 
    } 

    @Override 
    public void run() { 
     while(true){ 
      try { 
       int num = sharedQueue.take(); 
       System.out.println("Consumed: "+ num + ":by thread:"+threadNo); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

Wie funktioniert es?

  1. Produzent Thread 1 legt von 11 bis hin Ganze Zahlen - 15into BlockingQueue
  2. Produzent Thread 2 von 21 ganzen Zahlen im Bereich setzt - 25 in BlockingQueue
  3. Alle Verbraucher Themen - Thema 1 oder Thema 2 Werte von BlockingQueue liest (Integer in diesem Beispiel)

Beispielausgabe:

Produced:21:by thread:2 
Produced:11:by thread:1 
Produced:12:by thread:1 
Produced:13:by thread:1 
Produced:14:by thread:1 
Produced:22:by thread:2 
Produced:23:by thread:2 
Produced:24:by thread:2 
Produced:25:by thread:2 
Consumed: 21:by thread:1 
Consumed: 12:by thread:1 
Consumed: 13:by thread:1 
Consumed: 14:by thread:1 
Consumed: 22:by thread:1 
Consumed: 23:by thread:1 
Consumed: 24:by thread:1 
Consumed: 25:by thread:1 
Produced:15:by thread:1 
Consumed: 11:by thread:2 
Consumed: 15:by thread:1 
Verwandte Themen