Producer Consumer Problem is a classical concurrency problem and in fact
it is one of the concurrency design pattern. In last article we have seen
solving Producer
Consumer problem in Java using blocking Queue but one of my reader emailed
me and requested code example and explanation of solving Producer Consumer
problem in Java with wait
and notify method as well, Since its often asked as one of the top coding
question in Java. In this Java tutorial, I have put the code example of
wait notify version of earlier producer consumer concurrency design pattern.
You can see this is much longer code with explicit handling blocking conditions
like when shared queue is full and when queue is empty. Since we have replaced BlockingQueue
with Vector we need to implement blocking using wait
and notify and that's why we have introduced produce(int i) and consume() method.
If you see I have kept consumer thread little slow by allowing it to sleep for 50 Milli second to give an opportunity to producer to fill the queue, which helps to understand that Producer thread is also waiting when Queue is full.
If you see I have kept consumer thread little slow by allowing it to sleep for 50 Milli second to give an opportunity to producer to fill the queue, which helps to understand that Producer thread is also waiting when Queue is full.
Java program to solve Producer Consumer Problem in Java

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Java program to solve Producer Consumer problem using wait and notify
* method in Java. Producer Consumer is also a popular concurrency design pattern.
*
* @author Javin Paul
*/
public class ProducerConsumerSolution {
public static void main(String args[]) {
Vector sharedQueue = new Vector();
int size = 4;
Thread prodThread = new Thread(new Producer(sharedQueue, size), "Producer");
Thread consThread = new Thread(new Consumer(sharedQueue, size), "Consumer");
prodThread.start();
consThread.start();
}
}
class Producer implements Runnable {
private final Vector sharedQueue;
private final int SIZE;
public Producer(Vector sharedQueue, int size) {
this.sharedQueue = sharedQueue;
this.SIZE = size;
}
@Override
public void run() {
for (int i = 0; i < 7; i++) {
System.out.println("Produced: " + i);
try {
produce(i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private void produce(int i) throws InterruptedException {
//wait if queue is full
while (sharedQueue.size() == SIZE) {
synchronized (sharedQueue) {
System.out.println("Queue is full " + Thread.currentThread().getName()
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Java program to solve Producer Consumer problem using wait and notify
* method in Java. Producer Consumer is also a popular concurrency design pattern.
*
* @author Javin Paul
*/
public class ProducerConsumerSolution {
public static void main(String args[]) {
Vector sharedQueue = new Vector();
int size = 4;
Thread prodThread = new Thread(new Producer(sharedQueue, size), "Producer");
Thread consThread = new Thread(new Consumer(sharedQueue, size), "Consumer");
prodThread.start();
consThread.start();
}
}
class Producer implements Runnable {
private final Vector sharedQueue;
private final int SIZE;
public Producer(Vector sharedQueue, int size) {
this.sharedQueue = sharedQueue;
this.SIZE = size;
}
@Override
public void run() {
for (int i = 0; i < 7; i++) {
System.out.println("Produced: " + i);
try {
produce(i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private void produce(int i) throws InterruptedException {
//wait if queue is full
while (sharedQueue.size() == SIZE) {
synchronized (sharedQueue) {
System.out.println("Queue is full " + Thread.currentThread().getName()
+ " is waiting , size: " + sharedQueue.size());
sharedQueue.wait();
}
}
//producing element and notify consumers
synchronized (sharedQueue) {
sharedQueue.add(i);
sharedQueue.notifyAll();
}
}
}
class Consumer implements Runnable {
private final Vector sharedQueue;
private final int SIZE;
public Consumer(Vector sharedQueue, int size) {
this.sharedQueue = sharedQueue;
this.SIZE = size;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Consumed: " + consume());
Thread.sleep(50);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private int consume() throws InterruptedException {
//wait if queue is empty
while (sharedQueue.isEmpty()) {
synchronized (sharedQueue) {
System.out.println("Queue is empty " + Thread.currentThread().getName()
sharedQueue.wait();
}
}
//producing element and notify consumers
synchronized (sharedQueue) {
sharedQueue.add(i);
sharedQueue.notifyAll();
}
}
}
class Consumer implements Runnable {
private final Vector sharedQueue;
private final int SIZE;
public Consumer(Vector sharedQueue, int size) {
this.sharedQueue = sharedQueue;
this.SIZE = size;
}
@Override
public void run() {
while (true) {
try {
System.out.println("Consumed: " + consume());
Thread.sleep(50);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private int consume() throws InterruptedException {
//wait if queue is empty
while (sharedQueue.isEmpty()) {
synchronized (sharedQueue) {
System.out.println("Queue is empty " + Thread.currentThread().getName()
+ " is waiting ,
size: " + sharedQueue.size());
sharedQueue.wait();
}
}
//Otherwise consume element and notify waiting producer
synchronized (sharedQueue) {
sharedQueue.notifyAll();
return (Integer) sharedQueue.remove(0);
}
}
}
Output:
Produced: 0
Queue is empty Consumer is waiting , size: 0
Produced: 1
Consumed: 0
Produced: 2
Produced: 3
Produced: 4
Produced: 5
Queue is full Producer is waiting , size: 4
Consumed: 1
Produced: 6
Queue is full Producer is waiting , size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Queue is empty Consumer is waiting , size: 0
sharedQueue.wait();
}
}
//Otherwise consume element and notify waiting producer
synchronized (sharedQueue) {
sharedQueue.notifyAll();
return (Integer) sharedQueue.remove(0);
}
}
}
Output:
Produced: 0
Queue is empty Consumer is waiting , size: 0
Produced: 1
Consumed: 0
Produced: 2
Produced: 3
Produced: 4
Produced: 5
Queue is full Producer is waiting , size: 4
Consumed: 1
Produced: 6
Queue is full Producer is waiting , size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
Queue is empty Consumer is waiting , size: 0
That’s all on How to solve producer-consumer problem in Java using
wait and notify method. I still think that using BlockingQueue to
implement producer consumer design pattern is much better because of its
simplicity and concise code. At the same time this problem is an excellent
exercise to understand concept of wait and notify method in Java.
Further Learning
Multithreading and Parallel Computing in Java
Applying Concurrency and Multi-threading to Common Java Patterns
Java Concurrency in Practice - The Book
Java Concurrency in Practice Bundle by Heinz Kabutz
Other Java concurrency Interview Questions you may like
Good one. refreshed forgotten concepts :)
ReplyDeleteThere is a little bug here. In class Producer in method run should be
ReplyDelete"while(true){}" before for(..), because this program is ended after produce "7 producers".
There is another bug : Vector sharedQueue = new Vector();
ReplyDeleteThis initialization may not even be visible in the consumer/producer threads and may result in strange errors. This needs to initialized safely.
this is completely wrong. for example the first synchronized block in consume() checks that the vector is no longer empty, then it jumps out of the while loop. but now it's out of the sync block. so control goes to another consumer, which consumes the item. when control comes back to the original consumer, it tries to pull out of an empty vector, wrong.
ReplyDeleteI agree, with you, synchronized block should come before while condition. I think, this is good example of How multithreading code can go wrong :)
DeleteUnderstood your point, but I am confused about your below statement. Only one consumer created in the sample, how comes "origin consumer" and "another consumer". Are you saying IF we added more consumers?
Delete//QUOTE
...control goes to another consumer, which consumes the item. when control comes back to the original consumer, it tries to pull out of an empty vector, wrong.
//UNQUOTE
Agree
DeleteYour code will always run into a deadlock because the producer and consumer threads will not read the true queue size. Producer thread should be modified to :
ReplyDeletewhile (true) {
synchronized (queue) {
while (queue.size() == size) {
try {
System.out
.println("Producer thread waiting for consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producer putting value : " + i
+ ": in the queue");
queue.add(i);
queue.notifyAll();
}
}
And Consumer thread to :
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out
.println("Consumer thread waiting for producer to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consumer taking value : "
+ queue.remove(0));
queue.notifyAll();
}
}
That's true, everything related to shared object should be inside synchronized block, so that changes made by one thread should be visible to other. In this example, there are two synchronized block whcih should be one and size shoudl be checked there as suggested by you.
Deleteclass ConsumerProducer {
ReplyDeleteprivate int count;
public synchronized void consume()
{
while(count == 0)
{
try
{
wait();
}catch(InterruptedException ie)
{
//keep trying
}
count --; //consumed
}
}
private synchronized void produce() {
count++;
notify(); // notify the consumer that count has been increm ented.
}
}
How about solving Producer Consumer problem using Semaphore It actually can be solved using multiple way including BlockingQueue, wait and notify as shown above, but I am really interested in using Semaphore. A good exercise to learn Semaphore in Java
ReplyDeleteNice example. You could make it even better by using an ArrayDeque, thereby facilitating the use of addFirst() and removeLast() methods. This would simulate a Queue much better. Also I noticed that the Consumer does not need the size of the Queue. This is a dead variable. Nice work on your blog, btw. Keep it up!!!!
ReplyDeleteI believe there is a race condition in the "produce()" method. If two threads were blocked because the queue is full, then a consumer's call to notifyAll() would unblock both of them, and they'd both add to the queue.
ReplyDeleteI think in this program there is just one producer, but even in case of multiple producer, since adding to queue is happening inside synchornized block, only one thread will add value at a time, but I agree the code has bug. Size checking of queue should be inside first synchronized block, there is no need for one more synchronized block in this example.
DeleteYou should try the example that is given here. http://javarevisited.blogspot.com/2015/07/how-to-use-wait-notify-and-notifyall-in.html
ReplyDeleteHey can't we have the below code in the same synchronized block, the block in which we had written sharedQueue.wait()?
ReplyDeletesynchronized (sharedQueue) {
sharedQueue.add(i);
sharedQueue.notifyAll();
}
Very nice implementation i got full clarity.... Thank u so much Javin Paul
ReplyDeletePlease correct me if I am wrong.
ReplyDeleteIn the consumer part. I believe its better to notify the other threads only after it is consumed. Likewise, can it be written as below:
synchronized(sharedQUeue){
int x = sharedQUeue.remove(0);
sharedQueue.notifyAll();
return x;
}
@Kiran, yes, that would be better, but it doesn't affect the output of program because both statement are inside synchronized block and lock will be release only when thread goes output synchronized block, so even thread will get notification, they won't check again until you remove the element and release the lock.
DeleteHaving said that, I agree your solution is cleaner,
Please help me understand why we need to synchronize sharedQueue, when Vector is already thread safe.
ReplyDeleteHello Punit, you need to synchronize sharedQueue because you are using it for inter-thread communication i.e. for calling wait and notify. Vector is thread-safe for it's own operation e.g. add, remove or update, it will not help the producer to wait if queue is full or consumer to wait if queue is empty. Those are done by using wait and notify and there you need a common object to synchronize it. Since sharedQueue is common between Producer and Consumer we synchronize on that.
DeleteVector itself is synchronized. Why do we need synchronized block for sharedQueue?
ReplyDeleteHow can we stop the thread here. Even though i use boolean volatile variable to stop the thread, the program execution stops, but it still hold the sharedQueue resource.
ReplyDeleteVector sharedQueue= new Vector();
ReplyDeleteit is not visible to consumer and producer
make it instance variable otherwise, you may give different locks to producer and consumer!
Please try this out:
ReplyDeleteimport java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ProduserConsumerDemo {
public static void main(String[] args) {
List list = new CopyOnWriteArrayList<>();
int size = 5;
Producer producer = new Producer(list, size);
Consumer consumer = new Consumer(list);
Thread t1 = new Thread(producer, "Producer");
Thread t2 = new Thread(consumer, "Consumer");
t1.start();
t2.start();
}
}
class Producer implements Runnable {
private final List list;
private final int size;
public Producer(List list, final int size) {
this.list = list;
this.size = size;
}
public void run() {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void produce() throws InterruptedException {
int i = 0;
while (i >= 0) {
synchronized (list) {
while (list.size() == size) {
System.out.println(
"List is full." + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
list.wait();
}
System.out.println("Produce :" + i);
list.add(i++);
Thread.sleep(50);
list.notify();
}
}
}
}
class Consumer implements Runnable {
private final List list;
public Consumer(List list) {
this.list = list;
}
public void run() {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void consume() throws InterruptedException {
while (true) {
synchronized (list) {
while (list.isEmpty()) {
System.out.println(
"List is empty. " + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
list.wait();
}
System.out.println("Consumed item:" + list.remove(0));
Thread.sleep(50);
list.notify();
}
}
}
}