线程池核心代码
package com.app.lock.app;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
public class MyThreadPool implements Executor {
/**
* 线程池名称
*/
private String poolName;
/**
* 最大线程数
*/
private int maxSize;
/**
* 核心线程数
*/
private int coreSize;
/**
* 当前线程数
*/
private AtomicInteger count=new AtomicInteger(0);
BlockingQueue<Runnable> takeQueue;
private RejectPolcy rejectPolcy;
public MyThreadPool(String poolName, int maxSize, int coreSize, AtomicInteger count, BlockingQueue<Runnable> takeQueue, RejectPolcy rejectPolicy) {
this.poolName = poolName;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.count = count;
this.takeQueue = takeQueue;
this.rejectPolcy = rejectPolicy;
}
@Override
public void execute(Runnable command) {
int countSize=count.get();
if(addWork(command,true)){
return;
}
if(takeQueue.offer(command)){
}else{
if(addWork(command,false)){
//rejectedExecutionHandler.rejectedExecution(command,);
rejectPolcy.reject(this);
}
}
}
//添加线程到线程池里
public boolean addWork(Runnable run,boolean core){
for(;;){
int max=core?coreSize:maxSize;
int countSize=count.get();
if(countSize>max){
return false;
}
//CAS更新线程
while (count.compareAndSet(countSize,countSize+1)){
new Thread(()->{
Runnable task=run;
while(task!=null||(task=getTask())!=null){
try {
task.run();
}finally {
task=null;
}
}
}).start();
break;
}
}
}
public Runnable getTask(){
try{
return takeQueue.take();
}catch (InterruptedException e){
count.decrementAndGet();
return null;
}
}
}
拒绝策略接口
package com.app.lock.app;
public interface RejectPolcy {
public void reject(MyThreadPool threadPool);
}
策略简单实现
package com.app.lock.app;
public class MyRejectPolicy implements RejectPolcy {
@Override
public void reject(MyThreadPool threadPool) {
System.out.println("线程池满了");
}
}
评论区