Java 线程通信

本文最后更新于:2024年3月18日 凌晨

Java 线程通信

  • 在Java程序中,synchronized解决了多线程竞争的问题,例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁。
1
2
3
4
5
6
7
class TaskQueue {
Queue<String> queue = new LinkedList<>();

public synchronized void addTask(String s) {
this.queue.add(s);
}
}
  • 但是synchronized并没有解决多线程协调的问题。
  • 仍然以上面的TaskQueue为例,我们再编写一个getTask()方法取出队列的第一个任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
class TaskQueue {
Queue<String> queue = new LinkedList<>();

public synchronized void addTask(String s) {
this.queue.add(s);
}

public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
  • 上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。
  • 但实际上while()循环永远不会退出,因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。
  • 因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。
  • 如果深入思考一下,我们想要的执行效果是:
    • 线程1可以调用addTask()不断往队列中添加任务。
    • 线程2可以调用getTask()从队列中获取任务,如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。
  • 因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态,当条件满足时,线程被唤醒,继续执行任务。

synchronized的线程通信

  • Java提供了几个方法解决线程之间的通信问题。
方法名 作用
wait() 表示线程会一直等待,直到其他线程通知,与sleep不同,会释放锁
wait(long timeout) 指定等待的毫秒数
notify() 唤醒一个处于等待状态的线程
notifyAll() 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度
  • 这两组方法配套使用,wait()方法使得线程进入阻塞状态,执行这两个方法时将释放相应对象占用的锁,从而可因对象资源锁定而处于等待的线程得到运行机会,wait()方法有两种形式,一种允许指定以毫秒为单位的一段时间作为参数,另一种没有参数,前者当对应的notify()方法被调用或者超出指定时间时使线程重新可执行状态,后者则必须由对应的notify()方法将线程唤醒,因调用wait()方法的对象在其他线程中调用notify()notifyAll()方法,这种等待才能解除,这里要注意,notify()方式是从等待队列中随机选择一个线程唤醒,而notifyAll()方法则将使等待队列中的全部线程解除阻塞。
  • 注意:wait()方法与notify()方法在概念上有如下特征:
    • 这对方法必须在synchronized方法或代码块中调用,只有在同步代码段中才存在资源锁定。
    • 这对方法直接隶属于类Object类,而不是Thread类,也就是说,所有对象都拥有这一对方法。

生产者消费者模式

应用场景

  • 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费。

  • 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止。

  • 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止。

  • 这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件。

  • 对于生产者,没有生产产品之前,要通知消费者等待。而生产了产品之后,又需要马上通知消费者消费。

  • 对于消费者,在消费之后,要通知生产者已经结束消费,需要生产新的产品以供消费。

  • 在生产者消费者问题中,仅有synchronized是不够的。

    • synchronized可阻止并发更新同一个共享资源,实现了同步。
    • synchronized不能用来实现不同线程之间的消息传递(通信)

管程法

  • 缓冲区:生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class TestMonitor {
// 生产者。
static class Productor extends Thread {
SynContainer container;

public Productor(SynContainer container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("生产了第" + i + "件产品");
container.push(new Product(i));
}
}
}

// 消费者。
static class Consumer extends Thread {
SynContainer container;

public Consumer(SynContainer container) {
this.container = container;
}

@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("消费了第:" + container.pop().id + "件产品");
}
}
}

// 产品。
static class Product {
int id;

public Product(int id) {
this.id = id;
}
}

// 缓冲区。
static class SynContainer {
// 容器。
Product[] products = new Product[10];
// 容器计数器。
int count = 0;

// 生产者放入产品。
public synchronized void push(Product product) {
// 如果容器满了,就需要等待消费者消费。
while (count == products.length) {
// 等待消费者消费。
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果没有满,就需要丢入产品。
products[count] = product;
count++;
// 可以通知消费者消费。
this.notifyAll();
}

// 消费者取走商品。
public synchronized Product pop() {
// 判断能否消费。
while (count == 0) {
// 等待生产者生产。
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果可以消费。
count--;
Product product = products[count];
// 通知生产者生产。
this.notifyAll();
return product;
}
}

public static void main(String[] args) {
SynContainer synContainer = new SynContainer();
new Productor(synContainer).start();
new Consumer(synContainer).start();
}
}

注意

  • 线程也可以唤醒,而不会被通知,中断或超时,即所谓的虚假唤醒,虽然这在实践中很少会发生,但应用程序必须通过测试应该使线程被唤醒的条件来防范,并且如果条件不满足则继续等待,换句话说,等待应该总是出现在循环中。
  • 所以在while()循环中调用wait(),而不是if语句,因为线程被唤醒时,需要再次获取this锁,多个线程被唤醒后,只有一个线程能获取this锁,此刻,该线程可以获取到队列的元素,然而,剩下的线程如果获取this锁后尝试获取队列中的元素,此刻队列可能已经没有任何元素了,所以,要始终在while循环中wait(),并且每次被唤醒后拿到this锁就必须再次判断。

信号灯法

  • 通过标志位实现不同进程之间的通信。

[例12-3]:有一个南北向的桥,只能容纳一个人,现桥的两边分别有4人和3人,编制一个多线程程序让这些人到达对岸,在过桥的过程中显示谁在过桥及其走向。

基本思路:每个人用一个线程代表,桥作为共享资源,引入一个标记变量表示桥的占用情况,取得上桥资格和下桥分别用两个方法模拟。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class PersonPassBridge extends Thread {
private Bridge bridge; // 桥对象。
String id; // 人的标识。

public PersonPassBridge(String id, Bridge b) {
bridge = b;
this.id = id;
}

public void run() {
bridge.getBridge(); // 等待过桥。
System.out.println(id + "正过桥...");
try {
Thread.sleep((int) (Math.random() * 100)); // 模拟过桥时间。
} catch (InterruptedException e) {}
bridge.goDownBridge(); // 下桥。
}
}

class Bridge {
private boolean engaged = false; // 桥的占用状态。

public synchronized void getBridge() { // 取得上桥资格。
while (engaged) {
try {
wait(); // 如果桥被占用就循环等待。
} catch (InterruptedException e) {}
}
engaged = true; // 占用桥。
}

public synchronized void goDownBridge() { // 下桥。
engaged = false;
notifyAll(); // 唤醒其他等待进程。
}
}

public class 过桥问题 {
public static void main(String[] args) {
Bridge b = new Bridge();
PersonPassBridge x;
for (int k = 1; k <= 4; k++) {
x = new PersonPassBridge("南边,第" + k + "人", b);
x.start();
}
for (int k = 1; k <= 3; k++) {
x = new PersonPassBridge("北边,第" + k + "人", b);
x.start();
}
}
}

南边,第1人正过桥...
北边,第3人正过桥...
南边,第2人正过桥...
北边,第2人正过桥...
南边,第3人正过桥...
北边,第1人正过桥...
南边,第4人正过桥...
  • PersonPassBridge类(第1~18行)通过线程的运行模拟人等待过桥动作过程。
  • Bridge类(20~36行)模拟共享的桥,因为每次只能一个人在桥上,所以用一个逻辑变量engaged模拟桥的占用情况,true表示占用,false表示未占用,Bridge类中包含两个方法,方法getBridge()用于取得上桥的资格,goDownBridge()方法模拟下桥动作,它将释放占用的桥,在getBridge()goDownBridge()方法中定义均加有synchronized修饰,可保证执行方法时必须取得对象锁,从而避免多个线程同时执行该方法。
  • 过桥问题类(第38~51行)提供了mian()方法来测试具体的应用,分别创建了Bridge对象和代表南北方向的7个PersonPassBridge线程并启动运行,由于线程调度的机会带有一定的随机性,因此程序的执行结果不固定。

Condition

  • Condition可以替代waitnotify
  • Condition对象必须从Lock对象获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();

public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}

public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
  • 可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。
  • Condition提供的await(),signal(),signalAll()原理和synchronized锁对象的wait(),notify(),notifyAll()是一致的,并且其行为也是一样的:
    • await()会释放当前锁,进入等待状态。
    • signal()会唤醒某个等待线程。
    • signalAll()会唤醒所有等待线程。
  • 唤醒线程从await()返回后需要重新获得锁。
  • 此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:
1
2
3
4
5
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒。
} else {
// 指定时间内没有被其他线程唤醒。
}
  • 可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!