ZB-029-02生产者消费者模型

生产者消费者模型

  • 生产者和消费者之间有一个盘子
  • 盘子只能有一个包子
  • 生产者创造了一个包子 放置到盘子上。此时不会继续生产,而是等消费者把盘子上的包子吃了,再去生产 即盘子是空的菜生产
  • 消费者吃掉一个包子后,就不会在吃了,而是等待生产者 生产 即盘子上有包子才消费

1.Object.wait/notify解决

2.Lock/Condition解决

3.BlockingQueue解决

方式一:Object.wait/notify

Boss.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.github.hcsp.multithread;

public class Boss {
public static void main(String[] args) throws InterruptedException {
// 请实现一个生产者/消费者模型,其中:
// 生产者生产10个随机的整数供消费者使用(随机数可以通过new Random().nextInt()获得)
// 使得标准输出依次输出它们,例如:
// Producing 42
// Consuming 42
// Producing -1
// Consuming -1
// ...
// Producing 10086
// Consuming 10086
// Producing -12345678
// Consuming -12345678

Container container = new Container();
Object lock = new Object();

Producer producer = new Producer(container, lock);
Consumer consumer = new Consumer(container, lock);

producer.start();
consumer.start();

producer.join();
producer.join();
}
}

Container.java 放置包子的盘子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.github.hcsp.multithread;

import java.util.Optional;

public class Container {
private Optional<Integer> value = Optional.empty();

public Optional<Integer> getValue() {
return value;
}

public void setValue(Optional<Integer> value) {
this.value = value;
}
}

Consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.github.hcsp.multithread;

import java.util.Optional;

public class Consumer extends Thread {
Container container;
Object lock;

public Consumer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
// 只要盘子是空的 它就等待
while (!container.getValue().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 盘子不是空的就消费
Integer value = container.getValue().get();
// 清空盘子的内容
container.setValue(Optional.empty());
System.out.println("Consuming " + value);

// 唤醒生产者
lock.notify();
}
}
}
}

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.github.hcsp.multithread;

import java.util.Optional;
import java.util.Random;

public class Producer extends Thread {
Container container;
Object lock;

public Producer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
// 只要盘子里有东西 它就必须等待
while (container.getValue().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 盘子是空的就生产
int r = new Random().nextInt();
System.out.println("Producing " + r);
container.setValue(Optional.of(r));
// 唤醒消费者
lock.notify();
}
}
}
}

方式二:Lock/Condition

Boss.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.github.hcsp.multithread;

import java.util.concurrent.locks.ReentrantLock;

public class Boss {
public static void main(String[] args) throws InterruptedException {
// 请实现一个生产者/消费者模型,其中:
// 生产者生产10个随机的整数供消费者使用(随机数可以通过new Random().nextInt()获得)
// 使得标准输出依次输出它们,例如:
// Producing 42
// Consuming 42
// Producing -1
// Consuming -1
// ...
// Producing 10086
// Consuming 10086
// Producing -12345678
// Consuming -12345678

ReentrantLock lock = new ReentrantLock();
Container container = new Container(lock);

Producer producer = new Producer(container, lock);
Consumer consumer = new Consumer(container, lock);

producer.start();
consumer.start();

producer.join();
producer.join();
}
}

Container.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.github.hcsp.multithread;

import java.util.Optional;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Container {

private Condition notConsumedYet; // 还没被消费掉
private Condition notProducedYet; // 还没被生产出来
private Optional<Integer> value = Optional.empty();

public Container(ReentrantLock lock) {
this.notConsumedYet = lock.newCondition();
this.notProducedYet = lock.newCondition();
}

public Condition getNotConsumedYet() {
return notConsumedYet;
}

public Condition getNotProducedYet() {
return notProducedYet;
}

public Optional<Integer> getValue() {
return value;
}

public void setValue(Optional<Integer> value) {
this.value = value;
}
}

Consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.github.hcsp.multithread;

import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;

public class Consumer extends Thread {
Container container;
ReentrantLock lock;

public Consumer(Container container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
lock.lock();
// 只要盘子是空的 它就等待
while (!container.getValue().isPresent()) {
try {
container.getNotProducedYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 盘子不是空的就消费
Integer value = container.getValue().get();
// 清空盘子的内容
container.setValue(Optional.empty());
System.out.println("Consuming " + value);

// 唤醒生产者
container.getNotConsumedYet().signal();
} finally {
lock.unlock();
}
}
}
}

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.github.hcsp.multithread;

import java.util.Optional;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;

public class Producer extends Thread {
Container container;
ReentrantLock lock;

public Producer(Container container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
lock.lock();
// 只要盘子里有东西 它就必须等待
while (container.getValue().isPresent()) {
try {
container.getNotConsumedYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 盘子是空的就生产
int r = new Random().nextInt();
System.out.println("Producing " + r);
container.setValue(Optional.of(r));
// 唤醒消费者
container.getNotProducedYet().signal();
} finally {
lock.unlock();
}
}
}
}

方式三:BlockingQueue

signalQueue 是控制线程调度的 实际就是个开关

Boss.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.github.hcsp.multithread;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class Boss {
public static void main(String[] args) throws InterruptedException {
// 请实现一个生产者/消费者模型,其中:
// 生产者生产10个随机的整数供消费者使用(随机数可以通过new Random().nextInt()获得)
// 使得标准输出依次输出它们,例如:
// Producing 42
// Consuming 42
// Producing -1
// Consuming -1
// ...
// Producing 10086
// Consuming 10086
// Producing -12345678
// Consuming -12345678

BlockingDeque<Integer> queue = new LinkedBlockingDeque<>(1);
// 控制线程的调度
BlockingDeque<Integer> signalQueue = new LinkedBlockingDeque<>(1);

Producer producer = new Producer(queue, signalQueue);
Consumer consumer = new Consumer(queue, signalQueue);

producer.start();
consumer.start();

producer.join();
producer.join();
}
}

Consumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.github.hcsp.multithread;

import java.util.concurrent.BlockingDeque;

public class Consumer extends Thread {
BlockingDeque<Integer> queue;
BlockingDeque<Integer> signalQueue;

public Consumer(BlockingDeque<Integer> queue, BlockingDeque<Integer> signalQueue) {
this.queue = queue;
this.signalQueue = signalQueue;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("Consuming " + queue.take());
signalQueue.put(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.github.hcsp.multithread;

import java.util.Random;
import java.util.concurrent.BlockingDeque;

public class Producer extends Thread {
BlockingDeque<Integer> queue;
BlockingDeque<Integer> signalQueue;

public Producer(BlockingDeque<Integer> queue, BlockingDeque<Integer> signalQueue) {
this.queue = queue;
this.signalQueue = signalQueue;
}

@Override
public void run() {
for (int i = 0; i < 10; i++) {
int r = new Random().nextInt();
System.out.println("Producing " + r);
try {
queue.put(r);
signalQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}