手写一个简单的线程池

手写一个简单的线程池

线程池核心代码

package com.app.lock.app;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadPool implements Executor {
    /**
     * 线程池名称
     */
    private String poolName;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 当前线程数
     */
    private AtomicInteger count=new AtomicInteger(0);

    BlockingQueue<Runnable> takeQueue;

    private RejectPolcy rejectPolcy;


    public MyThreadPool(String poolName, int maxSize, int coreSize, AtomicInteger count, BlockingQueue<Runnable> takeQueue, RejectPolcy rejectPolicy) {
        this.poolName = poolName;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.count = count;
        this.takeQueue = takeQueue;
        this.rejectPolcy = rejectPolicy;
    }

    @Override
    public void execute(Runnable command) {
        int countSize=count.get();
        if(addWork(command,true)){
            return;
        }
        if(takeQueue.offer(command)){
        }else{
            if(addWork(command,false)){
                //rejectedExecutionHandler.rejectedExecution(command,);
                rejectPolcy.reject(this);
            }
        }
    }
    //添加线程到线程池里
    public boolean addWork(Runnable run,boolean core){
        for(;;){
            int max=core?coreSize:maxSize;
            int countSize=count.get();
            if(countSize>max){
                return false;
            }
            //CAS更新线程
            while (count.compareAndSet(countSize,countSize+1)){
                new Thread(()->{
                    Runnable task=run;
                    while(task!=null||(task=getTask())!=null){
                        try {
                            task.run();
                        }finally {
                            task=null;
                        }
                    }
                }).start();
                break;
            }
        }
    }
    public Runnable getTask(){
        try{
            return takeQueue.take();
        }catch (InterruptedException e){
            count.decrementAndGet();
            return null;
        }
    }
}

拒绝策略接口

package com.app.lock.app;

public interface RejectPolcy {
    public void reject(MyThreadPool threadPool);
}

策略简单实现

package com.app.lock.app;

public class MyRejectPolicy implements RejectPolcy {
    @Override
    public void reject(MyThreadPool threadPool) {
        System.out.println("线程池满了");
    }
}

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://www.fengpt.cn/archives/手写一个简单的线程池