T O P

[资源分享]     Condition实现原理

  • By - 楼主

  • 2021-07-19 20:02:12
  • Condition接口提供了与Object阻塞(wait())与唤醒(notify()notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition

    一、基本使用

    基于Condition实现生产者、消费者模式。代码基本与Object#wait()Object#notify()类似,只不过我们使用Lock替换了synchronized关键字。
    生产者

    public class Producer implements Runnable {
        private Lock lock;
        private Condition condition;
        private Queue<String> queue;
        private int maxSize;
    
        public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
            this.lock = lock;
            this.condition = condition;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            int i = 0;
            for (; ; ) {
                lock.lock();
                // 如果满了,则阻塞
                while (queue.size() == maxSize) {
                    System.out.println("生产者队列满了,等待...");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                queue.add("一个消息:" + ++i);
                System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i);
                condition.signal();
                lock.unlock();
            }
        }
    }
    

    消费者

    public class Consumer implements Runnable {
        private Lock lock;
        private Condition condition;
        private Queue<String> queue;
        private int maxSize;
    
        public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
            this.lock = lock;
            this.condition = condition;
            this.queue = queue;
            this.maxSize = maxSize;
        }
    
        @Override
        public void run() {
            for (; ; ) {
                lock.lock();
                while (queue.isEmpty()) {
                    System.out.println("消费者队列为空,等待...");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String obj = queue.remove();
                System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj);
                condition.signal();
                lock.unlock();
            }
        }
    }
    

    测试类

    public class ConditionProducerConsumer {
        public static void main(String[] args) {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            Queue<String> queue = new LinkedBlockingQueue<>();
            int maxSize = 10;
    
            Producer producer = new Producer(lock, condition, queue, maxSize);
            Consumer consumer = new Consumer(lock, condition, queue, maxSize);
    
            new Thread(producer).start();
            new Thread(consumer).start();
    
        }
    }
    

    二、源码分析

    上述示例中使用的LockReentrantLock,关于它的lock方法与unlock方法的原理详见ReentrantLock实现原理。上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:

    public class ReentrantLock implements Lock, java.io.Serializable {
    	...
    	public Condition newCondition() {
            return sync.newCondition();
        }
    	
    	abstract static class Sync extends AbstractQueuedSynchronizer {
    		...
    		final ConditionObject newCondition() {
                return new ConditionObject();
            }
    		...
    	}
    	...
    }
    

    上述的ConditionObject定义在AQS中,如下:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    	...
    	public class ConditionObject implements Condition, java.io.Serializable {
    		...
    	}
    	...
    }
    

    首先来分析下Condition#await()方法

    public final void await() throws InterruptedException {
    	if (Thread.interrupted())
    		throw new InterruptedException();
    	Node node = addConditionWaiter();
    	int savedState = fullyRelease(node);
    	int interruptMode = 0;
    	while (!isOnSyncQueue(node)) {
    		LockSupport.park(this);
    		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    			break;
    	}
    	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    		interruptMode = REINTERRUPT;
    	if (node.nextWaiter != null) // clean up if cancelled
    		unlinkCancelledWaiters();
    	if (interruptMode != 0)
    		reportInterruptAfterWait(interruptMode);
    }
    
    private Node addConditionWaiter() {
    	Node t = lastWaiter;
    	// If lastWaiter is cancelled, clean out.
    	if (t != null && t.waitStatus != Node.CONDITION) {
    		unlinkCancelledWaiters();
    		t = lastWaiter;
    	}
    	Node node = new Node(Thread.currentThread(), Node.CONDITION);
    	if (t == null)
    		firstWaiter = node;
    	else
    		t.nextWaiter = node;
    	lastWaiter = node;
    	return node;
    }
    

    根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
    image
    假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
    若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:
    image
    然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
    image
    在调用isOnSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。

    假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:

    public final void signal() {
    	if (!isHeldExclusively())
    		throw new IllegalMonitorStateException();
    	Node first = firstWaiter;
    	if (first != null)
    		doSignal(first);
    }
    
    private void doSignal(Node first) {
           do {
               if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
               first.nextWaiter = null;
               } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
    }
    

    执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
    image
    至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上操作。

    本文详细介绍了单个Condition队列的执行流程,其实一个Lock中可以有多个Condition队列,比如:JUC中提供的LinkedBlockingDequeArrayBlockingQueue

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册