In this Q&A, we'll go over a couple of ways in which we can implement a producer/consumer pattern. Both approaches take advantage of the concurrency classes introduced in Java 5. For this Q&A, I'm not considering pre Java 5 solution(s).
I - Implement a producer/consumer pattern using BlockingQueue
In this approach, producer adds elements to the BlockingQueue and consumer will consume elements from BlockingQueue. See example below
package com.javahowdoi.thread;
import java.util.concurrent.*;
public class ProducerConsumer {
private static class QueueProducer implements Runnable{
BlockingQueue<Integer> bq ;
public QueueProducer(BlockingQueue<Integer> bq) {
this.bq = bq;
}
@Override
public void run() {
for(int i =0; i < 100; ++i )
bq.add(i);
}
}
private static class QueueConsumer implements Runnable{
BlockingQueue<Integer> bq ;
public QueueConsumer(BlockingQueue<Integer> bq) {
this.bq = bq;
}
@Override
public void run() {
try {
while(true)
System.out.println(bq.take());
} catch(InterruptedException ie ) {
System.out.println("thread interrupted");
}
}
}
private static void producerConsumer() throws InterruptedException {
BlockingQueue<Integer> bq = new LinkedBlockingQueue<>();
Thread t1 = new Thread(new QueueProducer(bq));
Thread t2 = new Thread(new QueueConsumer(bq));
t1.start();
t2.start();
t1.join();
t2.interrupt();
}
public static void main(String[] args) throws InterruptedException {
producerConsumer();
}
}
II- Implement a producer/consumer pattern using Executor Service
In this approach, producer submits Runnable tasks to ExecutorService thread pool. Consumer executes the Tasks in ExecutorService thread pool. See example below
package com.javahowdoi.thread;
import java.util.concurrent.*;
public class ProducerConsumer {
private static class Task<T> implements Runnable{
private T e;
public Task(T e) {
this.e = e;
}
@Override
public void run() {
System.out.println(e);
}
}
private static void withExecutorService() throws InterruptedException {
ExecutorService es = Executors.newSingleThreadExecutor();
es.submit(new Task<Integer>(100));
es.submit(new Task<Integer>(200));
es.shutdown();
}
public static void main(String[] args) throws InterruptedException {
withExecutorService();
}
}
Comments
Post a Comment