java并发优化-降低锁颗粒

java并发模型是基于内存共享,多线程共享变量一般就会涉及到加锁,介绍几种降低锁竞争的方式,最终效果都是降低锁的颗粒或者降低锁的竞争次数

常见锁优化方式

  1. 减少锁的持有时间。

    例如对一个方法加锁不如对其中的同步代码行加锁。

  2. 读写锁。

    可只对锁操作加锁,读不加锁。这样读、读之间不互斥, 读、写和写、读互斥,可使用J.U.C中的ReadWriteLock

  3. 减少锁颗粒。

    如ConcurrentHashMap中对segment加锁,而不是整个map加锁。比如map长度为128,分成8份,8份之间不互斥,8份内部才互斥,可以有效降低锁的竞争。

  4. 乐观锁。

    使用CAS算法,和期待值对比,如果一样则执行,不一样则重试等方式。

  5. 锁粗化。

    如果一个方法有好几行都是同步代码,对这几行单独加锁,不如对这个方法加锁,可以减少锁的竞争次数。

场景

服务A和服务B都会调用服务C的接口poll,同一个服务调用时互斥需要加锁,不同服务之间调用不互斥。

比如A服务有两个实例A1和A2,B服务有两个实例B1和B2,A1和B1共同调用poll接口时可以并行访问,A1和A2共同访问时必须串行执行,即必须有一个请求执行完才可以执行完下一个。

弱引用+synchronized

弱引用作用可以加快垃圾回收,这里的服务名很少,所以生成的锁对象很少可以不使用弱引用。可是其他场景可能要生成的锁对象有很多,可以使用弱引用加快垃圾回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Component
public class StringLockProvider {

private final Map<Mutex, WeakReference<Mutex>> mutexMap = new WeakHashMap<>();

public Mutex getMutex(String id) {
if (id == null) {
throw new NullPointerException();
}
Mutex key = new MutexImpl(id);
synchronized (mutexMap) {
return mutexMap.computeIfAbsent(key, WeakReference::new).get();
}
}

public interface Mutex {
}

private static class MutexImpl implements Mutex {
private final String code;

private MutexImpl(String id) {
this.code = id;
}

public boolean equals(Object o) {
if (o == null) {
return false;
}
if (this.getClass() == o.getClass()) {
return this.code.equals(o.toString());
}
return false;
}

public int hashCode() {
return code.hashCode();
}

public String toString() {
return code;
}
}
}

@RestController
public class PollController {

@Autowired
private StringLockProvider lockProvider;

/**
* @param service 拉取的服务名
*/
@GetMapping("/v1/data/{service}")
public List<String> poll(@PathVariable("service") String service) {
List<String> data = new ArrayList<>(2 << 4);
synchronized (lockProvider.getMutex(service)) {
//同步代码,data.add
}
return data;
}
}

弱引用+ReentrantLock

此种效果和弱引用+synchronized一致, 此处不同主要在于ReentrantLock支持公平锁,谁也等待谁先获取,而synchronized非公平锁,随机选一个获取锁,当一个线程一直获取不到锁,需要等待较长时间,可能造成该接口超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RestController
public class PollController {

private final Map<String, WeakReference<ReentrantLock>> mutexMap = new ConcurrentHashMap<>();

/**
* @param service 拉取的服务名
*/
@GetMapping("/v1/data/{service}")
public List<String> poll(@PathVariable("service") String service) {
List<String> data = new ArrayList<>(2 << 4);
ReentrantLock lock = getReentrantLock(service);
lock.lock();
try {
//同步代码。data.add
} finally {
lock.unlock();
}
return data;
}

private ReentrantLock getReentrantLock(String id) {
if (id == null) {
throw new NullPointerException();
}
return mutexMap.computeIfAbsent(id, it -> new WeakReference<>(new ReentrantLock(true))).get();
}
}

CAS乐观锁+循坏

此处使用了while循环,使用不当会造成线程阻塞,阻塞过多可能会造成死机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@RestController
public class PollController {

private final Map<String, WeakReference<AtomicBoolean>> atomicMap = new ConcurrentHashMap<>();

/**
* @param service 拉取的服务名
*/
@GetMapping("/v1/data/{service}")
public List<String> poll(@PathVariable("service") String service) {
List<String> data = new ArrayList<>(2 << 4);
AtomicBoolean atomic = getAtomicBoolean(service);
//直到预期的值为true,才会成功,否则循环
while (!atomic.compareAndSet(true, false)) {
//Thread.sleep(100)
}
try {
//同步代码。data.add
}finally {
atomic.set(true);
}
return data;
}

private AtomicBoolean getAtomicBoolean(String id) {
if (id == null) {
throw new NullPointerException();
}
return atomicMap.computeIfAbsent(id, it -> new WeakReference<>(new AtomicBoolean(true))).get();
}
}