publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); // Node是链表中一个节点,包含一个元素和下一个元素的引用 }
/** * 插入指定元素到队列的尾部,如果没有空间的话,等待。 * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); //创建插入链表的节点node final ReentrantLock putLock = this.putLock; // 使用自旋锁,确保插入时线程安全 final AtomicInteger count = this.count; // 原子类型整型 putLock.lockInterruptibly();// 可中断加锁 try { /* * 在这里的count并没有使用锁来保护,这是因为这里只有递减操作,并且我 * 们在容量大小更改的时候将会发送信号,这和在其他等待guard计数相似。 */ while (count.get() == capacity) { notFull.await(); //等待,直到有空间插入元素 } enqueue(node); // 插入元素 c = count.getAndIncrement(); //插入成功 if (c + 1 < capacity) notFull.signal(); //释放信号,队列未满 } finally { putLock.unlock(); //释放锁 } if (c == 0) signalNotEmpty(); //发送信号,表明当前队列为空。使用全局takeLock 自旋锁来加锁设置发送信号 } public E take()throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); //发送信号告知当前队列已满,使用全局putLock 自旋锁来加锁发送信号。 return x; }
Note: put在插入的时候,会一直等待插入成功;如果需要设置等待超时时间,需要使用offer(E e, long timeout, TimeUnit unit)来插入元素。 此外,take方法和put方法整体流程基本一样。
3.2 ThreadFactory 介绍
ThreadFactory,线程工厂,顾名思义,就是采用工厂模式来创建线程实例。使用ThreadFactory方式构建线程,可以不调用{@link Thread#Thread(Runnable) new Thread}方法来new 一个新的线程,这样可以更方便的让应用使用定制好了的线程子类,属性等。 ThreadFactory接口,只有一个需要实现的方法,接口定义为:
1 2 3 4 5 6 7 8 9 10 11 12
publicinterfaceThreadFactory{
/** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); }
RejectedExecutionHandler类是对线程池中不能被执行的任务,所需要采用的处理策略的指定。当 executor 不能接受某个任务时,可以由 ThreadPoolExecutor 调用RejectedExecutionHandler指定的处理方法。这种不能接受任务的情况,很容易就发生了,比如当超出其界限而没有更多可用的线程或队列池时,或者关闭 Executor 时。默认情况下,private static final RejectedExecutionHandler defaultHandler = new AbortPolicy()。
1 2 3 4 5 6 7 8 9 10 11 12 13
publicinterfaceRejectedExecutionHandler{
/** * <p>In the absence of other alternatives, the method may throw * an unchecked {@link RejectedExecutionException}, which will be * propagated to the caller of {@code execute}. * * @param r the runnable task requested to be executed * @param executor the executor attempting to execute this task * @throws RejectedExecutionException if there is no remedy */ voidrejectedExecution(Runnable r, ThreadPoolExecutor executor); }
/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */ publicstaticclassDiscardOldestPolicyimplementsRejectedExecutionHandler{ /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ publicDiscardOldestPolicy(){ }
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e){ if (!e.isShutdown()) { e.getQueue().poll(); // 丢弃最老的元素 e.execute(r); //重试执行该拒绝任务 } } }
/** * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ privatebooleanaddWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //运行的状态
// Check if queue empty only if necessary.非running状态 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); // 新建worker,并且指定第一个任务 final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c);
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {//正常的线程状态 if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); workers.add(w); //非core线程数时,加入到任务队列中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //执行worker任务,详细见下面分析 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这段代码比较复杂,其主要就是判断新建worker线程的环境条件,如果可以创建,则执行相应地任务w = new Worker(firstTask);final Thread t = w.thread; t.start(),否则返回false;execute方法会执行相关拒绝策略的操作。
在run方法完成之后,就会调用afterExecute方法,这也会抛出一个异常,当然也会导致线程down掉。According to JLS Sec 14.20, this exception is the one that will be in effect even if task.run throws.
finalvoidrunWorker(Worker w){ Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts。使该worker状态为0,即可以运行新的任务。 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //如果task为null的时候,则从队列中获取任务 w.lock(); //设置0为1,表示该worker不可用,原子操作。 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 在这些情况下,需要中断当前的线程。 wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; thrownew Error(x); } finally { afterExecute(task, thrown);//执行后处理异常等信息 } } finally { task = null; w.completedTasks++; w.unlock();//恢复当前worker可工作 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly);//为脏worker执行清扫工作和记账工作,true时,方法会把worker的线程移出,或者替换worker等 } }
/** * 从queue中获取需要执行的任务 * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait. * * @return task, or null if the worker must exit, in which case * workerCount is decremented */ private Runnable getTask(){ boolean timedOut = false; // Did the last poll() time out?
List<String> list = Lists.newArrayList("thread-11", "thread-21", "thread-31", "thread-41"); final List<String> results = Collections.synchronizedList(new ArrayList<String>()); final CountDownLatch latch = new CountDownLatch(list.size());