Java性能 -- 并发设计模式

线程上下文模式

  1. 线程上下文指的是贯穿线程整个生命周期的对象中的一些全局信息,如Spring中的ApplicationContext
  2. 可以使用ThreadLocal实现上下文
    • ThreadLocal是线程本地变量,可以实现多线程的数据隔离,每个线程只能访问各自内部的副本变量

Thread-Per-Message模式

  1. 一个消息一个线程
    • 在Socket通信中,一个线程监听IO事件,每当监听到一个IO事件,就交给另一个处理线程执行IO操作
  2. 如果遇到高并发,就会出现严重的性能问题,因为线程在操作系统中也是昂贵的资源,不能无限制地创建
    • 如果针对每个IO请求都创建一个线程来处理,在有大量请求同时进来时,就会创建大量线程
    • 每次请求都需要创建销毁线程,性能开销很大
  3. 可以使用线程池来代替线程的创建和销毁

ServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@AllArgsConstructor
public class ServerHandler implements Runnable {
private Socket socket;

@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
String msg;
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
while ((msg = in.readLine()) != null && msg.length() != 0) {
System.out.println("server received : " + msg);
out.print("received~\n");
out.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
}
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Server {
private static final int DEFAULT_PORT = 12345;
private static ServerSocket server;

public static void start() throws IOException {
start(DEFAULT_PORT);
}

public static void start(int port) throws IOException {
if (server != null) {
return;
}

try {
server = new ServerSocket(port);
while (true) {
Socket socket = server.accept();
new Thread(new ServerHandler(socket)).start();
}
} finally {
if (server != null) {
server.close();
}
}
}

public static void main(String[] args) {
new Thread(() -> {
try {
Server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}

Worker-Thread模式

  1. Worker是工人的意思,代表在Worker-Thread模式中,会有一些工人不断轮流处理过来的工作
    • 当没有工作时,工人会处于等待状态,直到有新的工作进来
    • 除了工人角色,Worker-Thread模式还包括了流水线产品
  2. 相比于Thread-Per-Message模式
    • 可以减少频繁创建、销毁线程所带来的性能开销
    • 也能避免无限制创建线程所带来的内存溢出风险

Package

1
2
3
4
5
6
7
8
9
@Data
public class Package {
private String name;
private String address;

public void execute() {
System.out.println(Thread.currentThread().getName() + " executed " + this);
}
}

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Worker extends Thread {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private final PackageChannel channel;

public Worker(String name, PackageChannel channel) {
super(name);
this.channel = channel;
}

@Override
public void run() {
while (true) {
channel.take().execute();
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

PackageChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class PackageChannel {
private final static int MAX_PACKAGE_NUM = 100;

private final Package[] packageQueue;
private final Worker[] workerPool;
private int head;
private int tail;
private int count;

public PackageChannel(int workers) {
this.packageQueue = new Package[MAX_PACKAGE_NUM];
this.head = 0;
this.tail = 0;
this.count = 0;
this.workerPool = new Worker[workers];
this.init();
}

private void init() {
for (int i = 0; i < workerPool.length; i++) {
workerPool[i] = new Worker("Worker-" + i, this);
}
}

/**
* push switch to start all of worker to work
*/
public void startWorker() {
Arrays.asList(workerPool).forEach(Worker::start);
}

public synchronized void put(Package packageReq) {
while (count >= packageQueue.length) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.packageQueue[tail] = packageReq;
this.tail = (tail + 1) % packageQueue.length;
this.count++;
this.notifyAll();
}

public synchronized Package take() {
while (count <= 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Package request = this.packageQueue[head];
this.head = (this.head + 1) % this.packageQueue.length;
this.count--;
this.notifyAll();
return request;
}
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Test {
public static void main(String[] args) {
// 新建8个工人
final PackageChannel channel = new PackageChannel(8);
// 开始工作
channel.startWorker();
// 为流水线添加包裹
for (int i = 0; i < 100; i++) {
Package packageReq = new Package();
packageReq.setAddress("test");
packageReq.setName("test");
channel.put(packageReq);
}
}
}

Future模式

Task

1
2
3
public interface Task<T, P> {
T doTask(P param);
}

TaskService

1
2
3
4
5
6
7
public interface TaskService<T, P> {
// 提交任务,不返回结果
Future<?> submit(Runnable runnable);

// 提交任务,返回结果
Future<T> submit(Task<T, P> task, P param);
}

Future

1
2
3
4
5
public interface Future<T> {
T get();

boolean done();
}

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FutureTask<T> implements Future<T> {
private T result;
private boolean isDone = false;
private final Object Lock = new Object();

@Override
public T get() {
synchronized (Lock) {
while (!isDone) {
try {
Lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return result;
}

@Override
public boolean done() {
return isDone;
}

public void finish(T result) {
synchronized (Lock) {
if (isDone) {
return;
}
this.result = result;
isDone = true;
Lock.notifyAll();
}
}
}

TaskServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TaskServiceImpl<T, P> implements TaskService<T, P> {
@Override
public Future<?> submit(Runnable runnable) {
new Thread(() -> runnable.run(), Thread.currentThread().getName()).start();
return new FutureTask<Void>();
}

@Override
public Future<T> submit(Task<T, P> task, P param) {
FutureTask<T> future = new FutureTask<>();
new Thread(() -> {
T t = task.doTask(param);
future.finish(t);
}, Thread.currentThread().getName()).start();
return future;
}
}

TaskA

1
2
3
4
5
6
7
8
9
10
11
12
public class TaskA<T, P> implements Task<T, P> {

@Override
public T doTask(P param) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return (T) param;
}
}

App

1
2
3
4
5
6
7
8
public class App {
public static void main(String[] args) {
TaskService<String, String> taskService = new TaskServiceImpl<>();
Task<String, String> task = new TaskA<>();
Future<String> future = taskService.submit(task, "zhongmingmao");
System.out.println(future.get());
}
}
0%