非阻塞队列版生产者消费者

过去的,未来的
2019-12-28 / 0 评论 / 0 点赞 / 855 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2019-12-28,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

生产者代码

package com.example.demo;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Producer
 */
public class Producer implements Runnable {
  private volatile boolean running = true;
  private BlockingQueue<Data> queue;
  private AtomicInteger count;
  public Producer(BlockingQueue<Data> queue,AtomicInteger count) {
    this.queue = queue;
    this.count=count;
  }
  @Override
  public void run() {
    System.out.println("start [producer], id=" + Thread.currentThread().getId());
    try {
      while (running) {
        // 构造数据
        Thread.sleep(new Random().nextInt(3000));
        Data data = new Data(count.incrementAndGet());
        boolean ok = queue.offer(data, 2, TimeUnit.SECONDS);
        if (!ok) {
          System.err.println("failed to put data, data: " + data);
        }else{
            System.err.println(Thread.currentThread().getId()+"生产:" + data);
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
      Thread.currentThread().interrupt();
    }
  }
  public void stop() {
    this.running = false;
  }
}

消费者代码

package com.example.demo;

import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;

/**
 * Consumer
 */
public class Consumer implements Runnable {
  private BlockingQueue<Data> queue;
  public Consumer(BlockingQueue<Data> queue) {
    this.queue = queue;
  }
  @Override
  public void run() {
    System.out.println("start [consumer], id=" + Thread.currentThread().getId());
    try {
      while (true) {
        if (Thread.currentThread().isInterrupted()) {
          System.out.println("stop [consumer], id=" + Thread.currentThread().getId());
          break;
        }
        Data data = queue.take();
        if (null != data) {
          //int re = data.getIntDdata() * data.getIntDdata();
          Thread.sleep(new Random().nextInt(1000));// simulate the time consumption
          System.out.println(Thread.currentThread().getId() +":消费"+ data.getIntDdata());
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
      Thread.currentThread().interrupt();// 中断
    }
  }
}


测试代码

package com.example.demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MyComusers {
	public static void main(String[] args) {
		LinkedBlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();
		AtomicInteger count= new AtomicInteger(0);
	    Consumer consumer1 = new Consumer(queue);
	    Consumer consumer2 = new Consumer(queue);
	    Consumer consumer3 = new Consumer(queue);
	    Producer producer1 = new Producer(queue,count);
	    Producer producer2 = new Producer(queue,count);
	    Producer producer3 = new Producer(queue,count);
	    ExecutorService pool = new ThreadPoolExecutor(6, 6, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
	    pool.execute(producer1);
	    pool.execute(producer2);
	    pool.execute(producer3);
	    pool.execute(consumer1);
	    pool.execute(consumer2);
	    pool.execute(consumer3);
	    try {
	      Thread.sleep(10*1000);
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	    producer1.stop();
	    producer2.stop();
	    producer3.stop();
	    try {
	      Thread.sleep(3*1000);
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	    pool.shutdown();
	}
}


备注:输出并不一定准确 因为在打印处没有添加同步代码

0

评论区