生产者代码
package com.example.demo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Producer
*/
public class Producer implements Runnable {
private volatile boolean running = true;
private BlockingQueue<Data> queue;
private AtomicInteger count;
public Producer(BlockingQueue<Data> queue,AtomicInteger count) {
this.queue = queue;
this.count=count;
}
@Override
public void run() {
System.out.println("start [producer], id=" + Thread.currentThread().getId());
try {
while (running) {
// 构造数据
Thread.sleep(new Random().nextInt(3000));
Data data = new Data(count.incrementAndGet());
boolean ok = queue.offer(data, 2, TimeUnit.SECONDS);
if (!ok) {
System.err.println("failed to put data, data: " + data);
}else{
System.err.println(Thread.currentThread().getId()+"生产:" + data);
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
this.running = false;
}
}
消费者代码
package com.example.demo;
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
/**
* Consumer
*/
public class Consumer implements Runnable {
private BlockingQueue<Data> queue;
public Consumer(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start [consumer], id=" + Thread.currentThread().getId());
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("stop [consumer], id=" + Thread.currentThread().getId());
break;
}
Data data = queue.take();
if (null != data) {
//int re = data.getIntDdata() * data.getIntDdata();
Thread.sleep(new Random().nextInt(1000));// simulate the time consumption
System.out.println(Thread.currentThread().getId() +":消费"+ data.getIntDdata());
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();// 中断
}
}
}
测试代码
package com.example.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MyComusers {
public static void main(String[] args) {
LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();
AtomicInteger count= new AtomicInteger(0);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
Producer producer1 = new Producer(queue,count);
Producer producer2 = new Producer(queue,count);
Producer producer3 = new Producer(queue,count);
ExecutorService pool = new ThreadPoolExecutor(6, 6, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
pool.execute(producer1);
pool.execute(producer2);
pool.execute(producer3);
pool.execute(consumer1);
pool.execute(consumer2);
pool.execute(consumer3);
try {
Thread.sleep(10*1000);
} catch (Exception e) {
e.printStackTrace();
}
producer1.stop();
producer2.stop();
producer3.stop();
try {
Thread.sleep(3*1000);
} catch (Exception e) {
e.printStackTrace();
}
pool.shutdown();
}
}
备注:输出并不一定准确 因为在打印处没有添加同步代码
评论区