Skip to main content

How can I produce and consume data in different threads

In this Q&A, we'll go over a couple of ways in which we can implement a producer/consumer pattern.  Both approaches take advantage of the concurrency classes introduced in Java 5.  For this Q&A, I'm not considering pre Java 5 solution(s).

I - Implement a producer/consumer pattern using BlockingQueue

In this approach, producer adds elements to the BlockingQueue and consumer will consume elements from BlockingQueue.  See example below

package com.javahowdoi.thread;

import java.util.concurrent.*;

public class ProducerConsumer {

    private static class QueueProducer implements Runnable{
        BlockingQueue<Integer> bq ;
        public QueueProducer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }

        @Override
        public void run() {
            for(int i =0; i < 100; ++i )
                bq.add(i);
        }
    }

    private static class QueueConsumer implements Runnable{
        BlockingQueue<Integer> bq ;
        public QueueConsumer(BlockingQueue<Integer> bq) {
            this.bq = bq;
        }

        @Override
        public void run() {
            try {
                while(true)
                    System.out.println(bq.take());
            } catch(InterruptedException ie ) {
                System.out.println("thread interrupted");
            }
        }
    }

    private static void producerConsumer() throws InterruptedException {
        BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
        Thread t1 = new Thread(new QueueProducer(bq));
        Thread t2 = new Thread(new QueueConsumer(bq));
        t1.start();
        t2.start();
        t1.join();
        t2.interrupt();
    }

    public static void main(String[] args) throws InterruptedException {
        producerConsumer();
    }
}

II- Implement a producer/consumer pattern using Executor Service

In this approach, producer submits Runnable tasks to ExecutorService thread pool.  Consumer executes the Tasks in ExecutorService thread pool.  See example below

package com.javahowdoi.thread;

import java.util.concurrent.*;

public class ProducerConsumer {

    private static class Task<T> implements Runnable{
        private T e;
        public Task(T e) {
            this.e = e;
        }

        @Override
        public void run() {
            System.out.println(e);
        }
    }

    private static void withExecutorService() throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor();
        es.submit(new Task<Integer>(100));
        es.submit(new Task<Integer>(200));
        es.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        withExecutorService();
    }
}

Comments