侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

线程池的结构和原理

再见理想
2022-05-29 / 0 评论 / 0 点赞 / 456 阅读 / 2,009 字

一,java多线程池的支持——ThreadPoolExecutor

java的线程池支持主要通过ThreadPoolExecutor来实现,我们使用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的,所以ThreadPoolExecutor十分重要。
线程池的创建:

具体解释一下上述参数:

corePoolSize 核心线程池大小:

核心池的大小,如果调用了prestartAllCoreThreads()或者prestartCoreThread()方法,会直接预先创建corePoolSize的线程,否则当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;这样做的好处是,如果任务量很小,那么甚至就不需要缓存任务,corePoolSize的线程就可以应对;

maximumPoolSize 线程池最大容量大小:

线程池最大线程数,表示在线程池中最多能创建多少个线程,如果运行中的线程超过了这个数字,那么相当于线程池已满,新来的任务会使用RejectedExecutionHandler 进行处理;

keepAliveTime 线程池空闲时,线程存活的时间:

表示线程没有任务执行时最多保持多久时间会终止,然后线程池的数目维持在corePoolSize 大小;

TimeUnit 时间单位:

参数keepAliveTime的时间单位;

ThreadFactory 线程工厂:

主要用来创建线程,比如可以指定线程的名字;

BlockingQueue任务队列 一个阻塞队列:

用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,才会放在这里;

RejectedExecutionHandler 线程拒绝策略:

如果线程池已满,执行拒绝策略。

二,线程池处理的流程

① 先判断线程池中的核心线程们是否空闲,如果空闲,就把这个新的任务指派给某一个空闲线程去执行。如果没有空闲,并且当前线程池中的核心线程数还小于 corePoolSize,那就通过 addWorker() 再创建一个核心线程。

② 如果线程池的线程数已经达到核心线程数,并且这些线程都繁忙,workQueue.offer() 方法就把这个新来的任务放到等待队列中去。

③ 如果等待队列又满了,那么查看一下当前线程数是否到达 maximumPoolSize,如果还未到达,就继续创建线程。如果已经到达了,就交给 RejectedExecutionHandler 来决定怎么处理这个任务(拒绝策略)。

关键代码:

	public void execute(Runnable command) {
		if (command == null)
			throw new NullPointerException();
		int c = ctl.get();
		if (workerCountOf(c) < corePoolSize) {
			if (addWorker(command, true))
				return;
			c = ctl.get();
		}
		if (isRunning(c) && workQueue.offer(command)) {
			int recheck = ctl.get();
			if (! isRunning(recheck) && remove(command))
				reject(command);
			else if (workerCountOf(recheck) == 0)
				addWorker(null, false);
		}
		else if (!addWorker(command, false))
			reject(command);
	}

三,拒绝策略 RejectedExecutionHandler

①**AbortPolicy**: (默认)丢弃任务并抛出RejectedExecutionException异常。

②**CallerRunsPolicy**: 由调用线程(提交任务的线程,比如main主线程)直接执行此任务。

③**DiscardOledestPolicy**:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

④**DiscardPolicy**:丢弃任务,但是不抛出异常。

以上策略都实现了RejectedExecutionHandler接口。当然,可自定义拒绝策略。

下面看看CallerRunsPolicy代码:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
     //Creates a {@code CallerRunsPolicy}.
    public **CallerRunsPolicy**() { }
     //Executes task r in the caller's thread, unless the executor
     // has been shut down, in which case the task is discarded.
     //@param r the runnable task requested to be executed
     // @param e the executor attempting to execute this task
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
          ** r.run();**
        }
    }
}

如果线程池和队列都满了,那做为调用者,就自己去执行 r.run(),有帮于减轻线程池的压力。

四,介绍几种线程池

①**newFixedThreadPool**(): 返回一个固定线程数量的线程池。等待队列Queue使用LinkedBlockingQueue无界队列实现。

②**newSingleThreadExecutor**(): 返回只有一个线程的线程池。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。等待队列Queue使用LinkedBlockingQueue实现。

③**newCacheThreadPool**(): 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。Queue使用SynchronousQueue实现。

④**newScheduledThreadPool**(): 返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。

方法:schedule():在给定时间,对任务进行一次调度。

scheduleAtFixedRate()

任务调度的频率的一定的。

schedulWithFixedDelay()

创建并执行一个周期性任务。

五,等待队列分类

WorkQueue指被提交但未执行的任务队列,是一个BlockingQueue接口的对象,仅存放Runnable对象。按照功能分类:

直接提交的队列(SynchronousQueue:容量大小为0)

提交的任务不会被真是保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略。

有界任务队列(ArrayBlockingQueue)

仅当在任务队列满时,才可能将线程数提升到 corePoolSize 以上;若总线程数大于maxmumPoolSize,则执行拒绝策略

无界任务队列(LinkedBlockingQueue)

优先任务队列(PriorityBlockingQueue:)

带有执行优先级的队列。

六,工作线程和等待队列

工作线程里的逻辑都封装在Worker这个内部类里了。代码如下所示:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();  // allow interrupts
    boolean completedAbruptly = true;
	
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            /** If pool is stopping, ensure thread is interrupted;
            if not, ensure thread is not interrupted.  This
            requires a recheck in second case to deal with
            shutdownNow race while clearing interrupt**/
			
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

有几个地方要注意的,第一,我们可以使用beforeExecute和afterExecute这两个方法去监控任务的执行情况,这些方法在ThreadPoolExecutor里都是空方法,可以重写这些方法来实现线程池的监控。
第二,就是线程的逻辑是不断地执行一个循环,去调用 getTask 方法来获得任务:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {       //**相对while(true),指令更少,不占用寄存器,而且没有判断跳转,效率更高。**
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || **workQueue.isEmpty()**)) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

这个方法的核心点在于workQueue.poll,workQueue的类型是:

private final BlockingQueue<Runnable> workQueue;

这是一个阻塞队列!!这就意味着,如果某一个线程试图从这个队列中取数据,而这个队列里没有数据的时候,线程就会进入休眠了。
至于前边的那些逻辑,加加减减的,并不重要,感兴趣的可以自己查看这个方法的注释,总而言之,就是返回一个null,让worker线程退出。所以,getTask里能返回 null 的分支都是满足线程退出条件的

0

评论区