本文最后更新于:2024年3月18日 凌晨
Java 线程通信
在Java程序中,synchronized解决了多线程竞争的问题,例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁。
1 2 3 4 5 6 7 class TaskQueue { Queue<String> queue = new LinkedList<>(); public synchronized void addTask (String s) { this .queue.add(s); } }
但是synchronized并没有解决多线程协调的问题。
仍然以上面的TaskQueue为例,我们再编写一个getTask()方法取出队列的第一个任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 class TaskQueue { Queue<String> queue = new LinkedList<>(); public synchronized void addTask (String s) { this .queue.add(s); } public synchronized String getTask () { while (queue.isEmpty()) { } return queue.remove(); } }
上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。
但实际上while()循环永远不会退出,因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。
因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。
如果深入思考一下,我们想要的执行效果是:
线程1可以调用addTask()不断往队列中添加任务。
线程2可以调用getTask()从队列中获取任务,如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。
因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态,当条件满足时,线程被唤醒,继续执行任务。
synchronized的线程通信
方法名
作用
wait()
表示线程会一直等待,直到其他线程通知,与sleep不同,会释放锁
wait(long timeout)
指定等待的毫秒数
notify()
唤醒一个处于等待状态的线程
notifyAll()
唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度
这两组方法配套使用,wait()方法使得线程进入阻塞状态,执行这两个方法时将释放相应对象占用的锁,从而可因对象资源锁定而处于等待的线程得到运行机会,wait()方法有两种形式,一种允许指定以毫秒为单位的一段时间作为参数,另一种没有参数,前者当对应的notify()方法被调用或者超出指定时间时使线程重新可执行状态,后者则必须由对应的notify()方法将线程唤醒,因调用wait()方法的对象在其他线程中调用notify()或notifyAll()方法,这种等待才能解除,这里要注意,notify()方式是从等待队列中随机选择一个线程唤醒,而notifyAll()方法则将使等待队列中的全部线程解除阻塞。
注意 :wait()方法与notify()方法在概念上有如下特征:
这对方法必须在synchronized方法或代码块中调用,只有在同步代码段中才存在资源锁定。
这对方法直接隶属于类Object类,而不是Thread类,也就是说,所有对象都拥有这一对方法。
生产者消费者模式
应用场景
假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费。
如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止。
如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止。
这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。
对于生产者,没有生产产品之前,要通知消费者等待。而生产了产品之后,又需要马上通知消费者消费。
对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。
在生产者消费者问题中,仅有synchronized是不够的。
synchronized可阻止并发更新同一个共享资源,实现了同步。
synchronized不能用来实现不同线程之间的消息传递(通信)
管程法
缓冲区:生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 class TestMonitor { static class Productor extends Thread { SynContainer container; public Productor (SynContainer container) { this .container = container; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { System.out.println("生产了第" + i + "件产品" ); container.push(new Product(i)); } } } static class Consumer extends Thread { SynContainer container; public Consumer (SynContainer container) { this .container = container; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { System.out.println("消费了第:" + container.pop().id + "件产品" ); } } } static class Product { int id; public Product (int id) { this .id = id; } } static class SynContainer { Product[] products = new Product[10 ]; int count = 0 ; public synchronized void push (Product product) { while (count == products.length) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } products[count] = product; count++; this .notifyAll(); } public synchronized Product pop () { while (count == 0 ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } count--; Product product = products[count]; this .notifyAll(); return product; } } public static void main (String[] args) { SynContainer synContainer = new SynContainer(); new Productor(synContainer).start(); new Consumer(synContainer).start(); } }
注意
线程也可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒,虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待,换句话说,等待应该总是出现在循环中。
所以在while()循环中调用wait(),而不是if语句,因为线程被唤醒时,需要再次获取this锁,多个线程被唤醒后,只有一个线程能获取this锁,此刻,该线程可以获取到队列的元素,然而,剩下的线程如果获取this锁后尝试获取队列中的元素,此刻队列可能已经没有任何元素了,所以,要始终在while循环中wait(),并且每次被唤醒后拿到this锁就必须再次判断。
信号灯法
[例12-3] :有一个南北向的桥,只能容纳一个人,现桥的两边分别有4人和3人,编制一个多线程程序让这些人到达对岸,在过桥的过程中显示谁在过桥及其走向。
基本思路 :每个人用一个线程代表,桥作为共享资源,引入一个标记变量表示桥的占用情况,取得上桥资格和下桥分别用两个方法模拟。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class PersonPassBridge extends Thread { private Bridge bridge; String id; public PersonPassBridge (String id, Bridge b) { bridge = b; this .id = id; } public void run () { bridge.getBridge(); System.out.println(id + "正过桥..." ); try { Thread.sleep((int ) (Math.random() * 100 )); } catch (InterruptedException e) {} bridge.goDownBridge(); } }class Bridge { private boolean engaged = false ; public synchronized void getBridge () { while (engaged) { try { wait(); } catch (InterruptedException e) {} } engaged = true ; } public synchronized void goDownBridge () { engaged = false ; notifyAll(); } }public class 过桥问题 { public static void main (String[] args) { Bridge b = new Bridge(); PersonPassBridge x; for (int k = 1 ; k <= 4 ; k++) { x = new PersonPassBridge("南边,第" + k + "人" , b); x.start(); } for (int k = 1 ; k <= 3 ; k++) { x = new PersonPassBridge("北边,第" + k + "人" , b); x.start(); } } } 南边,第1 人正过桥... 北边,第3 人正过桥... 南边,第2 人正过桥... 北边,第2 人正过桥... 南边,第3 人正过桥... 北边,第1 人正过桥... 南边,第4 人正过桥...
PersonPassBridge类(第1~18行)通过线程的运行模拟人等待过桥动作过程。
Bridge类(20~36行)模拟共享的桥,因为每次只能一个人在桥上,所以用一个逻辑变量engaged模拟桥的占用情况,true表示占用,false表示未占用,Bridge类中包含两个方法,方法getBridge()用于取得上桥的资格,goDownBridge()方法模拟下桥动作,它将释放占用的桥,在getBridge()用goDownBridge()方法中定义均加有synchronized修饰,可保证执行方法时必须取得对象锁,从而避免多个线程同时执行该方法。
过桥问题类(第38~51行)提供了mian()方法来测试具体的应用,分别创建了Bridge对象和代表南北方向的7个PersonPassBridge线程并启动运行,由于线程调度的机会带有一定的随机性,因此程序的执行结果不固定。
Condition
Condition可以替代wait和notify
Condition对象必须从Lock对象获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class TaskQueue { private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private Queue<String> queue = new LinkedList<>(); public void addTask (String s) { lock.lock(); try { queue.add(s); condition.signalAll(); } finally { lock.unlock(); } } public String getTask () { lock.lock(); try { while (queue.isEmpty()) { condition.await(); } return queue.remove(); } finally { lock.unlock(); } } }
可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
Condition提供的await(),signal(),signalAll()原理和synchronized锁对象的wait(),notify(),notifyAll()是一致的,并且其行为也是一样的:
await()会释放当前锁,进入等待状态。
signal()会唤醒某个等待线程。
signalAll()会唤醒所有等待线程。
唤醒线程从await()返回后需要重新获得锁。
此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()或signalAll()唤醒,可以自己醒来:
1 2 3 4 5 if (condition.await(1 , TimeUnit.SECOND)) { } else { }
可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。