Saturday, February 18, 2023

Producer Consumer Problem with Wait and Notify - Thread Example Tutorial

The Producer-Consumer Problem is a classical concurrency problem and in fact, it is one of the most powerful concurrency design patterns which is used in most multithreaded Java applications. In the last article, I have shown you how to solve the Producer-Consumer problem in Java using blocking Queue but one of my readers emailed me and requested a code example and explanation of solving the Producer-Consumer problem in Java with the wait and notify method as well Since it's often asked as one of the top coding questions in Java. In this Java tutorial, I have put the code example of the wait notify version of the earlier producer-consumer concurrency design pattern. 

You can see this is a much longer code with explicit handling blocking conditions like when the shared queue is full and when the queue is empty which can be tricky for anyone. Since we have replaced BlockingQueue with Vector we need to implement blocking using wait and notify and that's why we have introduced the produce(int i) and consume() method.

If you see I have kept the consumer thread a little slow by allowing it to sleep for 50 Milliseconds to give an opportunity to the producer to fill the queue, which helps to understand that the Producer thread is also waiting when Queue is full.

By the way, if you are new to multithreading in Java then I also suggest you join a course like Multithreading and Parallel Computing in Java from Udemy. It's a great course to learn the multithreading basics and become a better Java developer.  





Java program to solve Producer-Consumer Problem in Java

Here is a complete Java program to solve the classic producer-consumer problem in the Java programming language. In this program, we have used the wait and notify method from java.lang.Object class instead of using BlockingQueue for flow control which makes implementing producer-consumer pattern really easy.

Before solving the problem, let's revisit what is the producer-consumer problem first?

This is a classical synchronization problem that involves a fixed size buffer or queue which can have items or task added to it or removed from it by different producer and consumer threads. 

The key is to solve the problem in such a way that the producer should wait if the queue is full and the consumer should wait if the queue is empty which involves inter-thread communication. 

This problem is also known by different names in the technical world like a consumer-producer problem, bounded buffer problem, or a blocking queue problem. I highly recommend you master this problem to improve your concurrency skills.

If you need more such problems I suggest solving the multithreading problems given in Java Multithreading for Senior Engineering Interviews on Educative. It's an interactive course that teaches you how to solve classical concurrency problems like producer-consumer, dining philosophers, Barber shop problems, and Uber Ride problems. 

How to solve Producer Consumer Problem with Wait and Notify - Thread Example

Now, let's see the code and try to understand how does it work:

import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Java program to solve Producer Consumer problem using wait and notify
 * method in Java. Producer Consumer is also a popular concurrency design pattern.
 *
 * @author Javin Paul
 */

public class ProducerConsumerSolution {

    public static void main(String args[]) {
        Vector sharedQueue = new Vector();
        int size = 4;
        Thread prodThread = new Thread(new Producer(sharedQueue, size), "Producer");
        Thread consThread = new Thread(new Consumer(sharedQueue, size), "Consumer");
        prodThread.start();
        consThread.start();
    }
}

class Producer implements Runnable {

    private final Vector sharedQueue;
    private final int SIZE;

    public Producer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }

    @Override
    public void run() {
        for (int i = 0; i < 7; i++) {
            System.out.println("Produced: " + i);
            try {
                produce(i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }

    private void produce(int i) throws InterruptedException {

        //wait if the queue is full
        while (sharedQueue.size() == SIZE) {
            synchronized (sharedQueue) {
                System.out.println("The queue is full " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait();
            }
        }

        //producing element and notify consumers
        synchronized (sharedQueue) {
            sharedQueue.add(i);
            sharedQueue.notifyAll();
        }
    }
}

class Consumer implements Runnable {

    private final Vector sharedQueue;
    private final int SIZE;

    public Consumer(Vector sharedQueue, int size) {
        this.sharedQueue = sharedQueue;
        this.SIZE = size;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("Consumed: " + consume());
                Thread.sleep(50);
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }

        }
    }

    private int consume() throws InterruptedException {
        //wait if the queue is empty
        while (sharedQueue.isEmpty()) {
            synchronized (sharedQueue) {
                System.out.println("The queue is empty " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedQueue.size());

                sharedQueue.wait();
            }
        }

        //Otherwise consume element and notify the waiting producer
        synchronized (sharedQueue) {
            sharedQueue.notifyAll();
            return (Integer) sharedQueue.remove(0);
        }
    }
}

Output:
Produced: 0
The queue is empty Consumer is waiting, size: 0
Produced: 1
Consumed: 0
Produced: 2
Produced: 3
Produced: 4
Produced: 5
The queue is full Producer is waiting, size: 4
Consumed: 1
Produced: 6
The queue is full Producer is waiting, size: 4
Consumed: 2
Consumed: 3
Consumed: 4
Consumed: 5
Consumed: 6
The queue is empty Consumer is waiting, size: 0


If you look at the output there are a couple of points which is worth noting:

1. Both Producer and Consumer thread can run in any order, even though you have started it doesn't mean that the producer will produce first and then the consumer will consume. Whenever the producer gets a chance it keeps producing until the queue is full, the same goes with the consumer. 

2. You can see that the Consumer is waiting when the queue is empty and the Producer is waiting when the queue is full which is the expected behavior. 

3. Any change to the shared object, here sharedQueue which is a vector is happening inside the synchronized block so that changes made by one thread are visible to the other. Remember whenever a thread exit or enter a synchronized block, the memory barrier is refreshed. 

This is the standard behavior of the Java memory model and I highly recommend you to read Java Concurrency in Practice - The Book at least once to learn more about memory barriers, Java Memory Model, and happens-before in Java. 

Producer consumer pattern in Java using wait and notify


How to solve Producer Consumer Problem in Java with ExampleThat’s all on How to solve the producer-consumer problem in Java using the wait and notify method. I still think that using BlockingQueue to implement producer-consumer design patterns is much better because of its simplicity and concise code. At the same time, this problem is an excellent exercise to understand the concept of the wait and notify method in Java.


 Other Java Multithreading and Concurrency Articles you may like
  • How to do inter-thread communication in Java using wait-notify? (answer)
  • How to pause a Thread in Java? (solution)
  • 5 Courses to Learn Java Multithreading in-depth (courses)
  • Difference between volatile, synchronized, and atomic variable in Java (answer)
  • 10 Java Multithreading and Concurrency Best Practices (article)
  • Top 5 Books to Master Concurrency in Java (books)
  • Top 50 Multithreading and Concurrency Questions in Java (questions)
  • How to avoid deadlock in Java? (answer)
  • Difference between CyclicBarrier and CountDownLatch in Java? (answer)
  • Difference between ForkJoinPool and Executor Framework in Java(answer)
  • 5 Essential Skills to Crack Java Interviews (skills)
  • Understanding the flow of data and code in Java program (answer)
  • How to join two threads in Java? (answer)
  • Difference between Executor and ExecutorService in Java? (answer)
  • How to stop a Thread in Java? (answer)
  • What is Happens Before in Java Concurrency? (answer)

Thanks a lot for reading this article so far. If you like this example of solving the producer-consumer problem using wait and notify in Java then please share it with your friends and colleagues. If you have any questions or feedback then please drop a note.

P. S. - If you are new but what to learn Java Multithreading and Concurrency and looking for a free course to start with then I also, suggest you check out this awesome free Java Multithreading course on Udemy. 

25 comments:

  1. Good one. refreshed forgotten concepts :)

    ReplyDelete
  2. There is a little bug here. In class Producer in method run should be
    "while(true){}" before for(..), because this program is ended after produce "7 producers".

    ReplyDelete
  3. There is another bug : Vector sharedQueue = new Vector();
    This initialization may not even be visible in the consumer/producer threads and may result in strange errors. This needs to initialized safely.

    ReplyDelete
  4. this is completely wrong. for example the first synchronized block in consume() checks that the vector is no longer empty, then it jumps out of the while loop. but now it's out of the sync block. so control goes to another consumer, which consumes the item. when control comes back to the original consumer, it tries to pull out of an empty vector, wrong.

    ReplyDelete
    Replies
    1. I agree, with you, synchronized block should come before while condition. I think, this is good example of How multithreading code can go wrong :)

      Delete
    2. Understood your point, but I am confused about your below statement. Only one consumer created in the sample, how comes "origin consumer" and "another consumer". Are you saying IF we added more consumers?

      //QUOTE
      ...control goes to another consumer, which consumes the item. when control comes back to the original consumer, it tries to pull out of an empty vector, wrong.
      //UNQUOTE

      Delete
  5. Your code will always run into a deadlock because the producer and consumer threads will not read the true queue size. Producer thread should be modified to :
    while (true) {
    synchronized (queue) {
    while (queue.size() == size) {
    try {
    System.out
    .println("Producer thread waiting for consumer to take something from queue");
    queue.wait();
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    }

    Random random = new Random();
    int i = random.nextInt();
    System.out.println("Producer putting value : " + i
    + ": in the queue");
    queue.add(i);
    queue.notifyAll();
    }

    }

    And Consumer thread to :
    while (true) {
    synchronized (queue) {
    while (queue.isEmpty()) {
    System.out
    .println("Consumer thread waiting for producer to put something in queue");
    try {
    queue.wait();
    } catch (Exception ex) {
    ex.printStackTrace();
    }

    }
    System.out.println("Consumer taking value : "
    + queue.remove(0));
    queue.notifyAll();
    }

    }

    ReplyDelete
    Replies
    1. That's true, everything related to shared object should be inside synchronized block, so that changes made by one thread should be visible to other. In this example, there are two synchronized block whcih should be one and size shoudl be checked there as suggested by you.

      Delete
  6. class ConsumerProducer {
    private int count;

    public synchronized void consume()
    {

    while(count == 0)
    {
    try
    {
    wait();
    }catch(InterruptedException ie)
    {
    //keep trying
    }
    count --; //consumed
    }
    }

    private synchronized void produce() {
    count++;
    notify(); // notify the consumer that count has been increm ented.
    }
    }

    ReplyDelete
  7. How about solving Producer Consumer problem using Semaphore It actually can be solved using multiple way including BlockingQueue, wait and notify as shown above, but I am really interested in using Semaphore. A good exercise to learn Semaphore in Java

    ReplyDelete
  8. Nice example. You could make it even better by using an ArrayDeque, thereby facilitating the use of addFirst() and removeLast() methods. This would simulate a Queue much better. Also I noticed that the Consumer does not need the size of the Queue. This is a dead variable. Nice work on your blog, btw. Keep it up!!!!

    ReplyDelete
  9. I believe there is a race condition in the "produce()" method. If two threads were blocked because the queue is full, then a consumer's call to notifyAll() would unblock both of them, and they'd both add to the queue.

    ReplyDelete
    Replies
    1. I think in this program there is just one producer, but even in case of multiple producer, since adding to queue is happening inside synchornized block, only one thread will add value at a time, but I agree the code has bug. Size checking of queue should be inside first synchronized block, there is no need for one more synchronized block in this example.

      Delete
  10. You should try the example that is given here. http://javarevisited.blogspot.com/2015/07/how-to-use-wait-notify-and-notifyall-in.html

    ReplyDelete
  11. Hey can't we have the below code in the same synchronized block, the block in which we had written sharedQueue.wait()?

    synchronized (sharedQueue) {
    sharedQueue.add(i);
    sharedQueue.notifyAll();
    }

    ReplyDelete
  12. Very nice implementation i got full clarity.... Thank u so much Javin Paul

    ReplyDelete
  13. Please correct me if I am wrong.

    In the consumer part. I believe its better to notify the other threads only after it is consumed. Likewise, can it be written as below:

    synchronized(sharedQUeue){

    int x = sharedQUeue.remove(0);
    sharedQueue.notifyAll();
    return x;
    }

    ReplyDelete
    Replies
    1. @Kiran, yes, that would be better, but it doesn't affect the output of program because both statement are inside synchronized block and lock will be release only when thread goes output synchronized block, so even thread will get notification, they won't check again until you remove the element and release the lock.

      Having said that, I agree your solution is cleaner,

      Delete
  14. Please help me understand why we need to synchronize sharedQueue, when Vector is already thread safe.

    ReplyDelete
    Replies
    1. Hello Punit, you need to synchronize sharedQueue because you are using it for inter-thread communication i.e. for calling wait and notify. Vector is thread-safe for it's own operation e.g. add, remove or update, it will not help the producer to wait if queue is full or consumer to wait if queue is empty. Those are done by using wait and notify and there you need a common object to synchronize it. Since sharedQueue is common between Producer and Consumer we synchronize on that.

      Delete
  15. Vector itself is synchronized. Why do we need synchronized block for sharedQueue?

    ReplyDelete
  16. How can we stop the thread here. Even though i use boolean volatile variable to stop the thread, the program execution stops, but it still hold the sharedQueue resource.

    ReplyDelete
  17. Vector sharedQueue= new Vector();
    it is not visible to consumer and producer
    make it instance variable otherwise, you may give different locks to producer and consumer!

    ReplyDelete
  18. Please try this out:

    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;

    public class ProduserConsumerDemo {

    public static void main(String[] args) {
    List list = new CopyOnWriteArrayList<>();
    int size = 5;
    Producer producer = new Producer(list, size);
    Consumer consumer = new Consumer(list);
    Thread t1 = new Thread(producer, "Producer");
    Thread t2 = new Thread(consumer, "Consumer");
    t1.start();
    t2.start();
    }
    }

    class Producer implements Runnable {
    private final List list;
    private final int size;

    public Producer(List list, final int size) {
    this.list = list;
    this.size = size;
    }

    public void run() {
    try {
    produce();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    private void produce() throws InterruptedException {
    int i = 0;
    while (i >= 0) {
    synchronized (list) {
    while (list.size() == size) {
    System.out.println(
    "List is full." + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
    list.wait();
    }
    System.out.println("Produce :" + i);
    list.add(i++);
    Thread.sleep(50);
    list.notify();
    }
    }
    }
    }

    class Consumer implements Runnable {
    private final List list;

    public Consumer(List list) {
    this.list = list;
    }

    public void run() {
    try {
    consume();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    private void consume() throws InterruptedException {
    while (true) {
    synchronized (list) {
    while (list.isEmpty()) {
    System.out.println(
    "List is empty. " + Thread.currentThread().getName() + " is waiting. Size:" + list.size());
    list.wait();
    }
    System.out.println("Consumed item:" + list.remove(0));
    Thread.sleep(50);
    list.notify();
    }
    }
    }
    }

    ReplyDelete

Feel free to comment, ask questions if you have any doubt.