星期四, 一月 20, 2022

Spring Cache Concurrency Problems

没有评论

 

1. invoke and complete @CacheEvict during @Cacheable

Imagine that the @Cacheable method reads some data at time 1, and the data changed at time 2. The @CacheEvict method evicts the cache, and when time 4 the @Cacheable method completed, it will add the old data to the cache, and the next time @Cacheable method will use the cache that holds the old data.

1.1. solve the problem

Use a high priority aop that stores time 1 and time 3, and at time 4, if time 1 is less than time 3, evict the cache.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Around("cacheableMethods(param)")
public Object aroundCache(ProceedingJoinPoint pjp, String param) throws Throwable {
    long beginTime = System.nanoTime();

    Object retVal = pjp.proceed();
    Long evictTime = cache.getIfPresent(param);
    if (evictTime != null && evictTime.compareTo(beginTime) > 0) {
        // remove Cache
        Boolean delete = redisTemplate.delete("note:test::" + param);
        log.info("deleted? {}", delete);
    }
    return retVal;
}

@After("cacheEvictMethods(param)")
public void afterEvict(String param) {
    cache.put(param, System.nanoTime());
}

Java volatile 关键字

没有评论

 本文使用汇编对 volatile 关键字进行分析

汇编参数
-XX:+UnlockDiagnosticVMOptions
-XX:+PrintAssembly
-XX:+TraceClassLoading
-Xcomp
-XX:CompileCommand=dontinline,*VolatileTest.main
-XX:CompileCommand=compileonly,*VolatileTest.main
未使用 volatile 变量的示例代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.pohvii.note.basic.concurrent;

/**
 * @author zhanglei
 * @date 2021/5/31
 */
public class NoVolatileTest {
    static int a = 0;

    public static void main(String[] args) {
        if (a == 0) {
            a++;
        }
    }
}
未使用 volatile 变量的示例汇编码
  0x000000010af3a220: sub    $0x18,%rsp
  0x000000010af3a227: mov    %rbp,0x10(%rsp)    ;*synchronization entry
                                                ; - com.pohvii.note.basic.concurrent.NoVolatileTest::main@-1 (line 11)

  0x000000010af3a22c: movabs $0x66ab68b18,%r10  ;   {oop(a 'java/lang/Class' = 'com/pohvii/note/basic/concurrent/NoVolatileTest')}
  0x000000010af3a236: mov    0x68(%r10),%r8d    ;*getstatic a
                                                ; - com.pohvii.note.basic.concurrent.NoVolatileTest::main@0 (line 11)

  0x000000010af3a23a: test   %r8d,%r8d
  0x000000010af3a23d: je     0x000000010af3a24b  ;*return
                                                ; - com.pohvii.note.basic.concurrent.NoVolatileTest::main@14 (line 14)

  0x000000010af3a23f: add    $0x10,%rsp
  0x000000010af3a243: pop    %rbp
  0x000000010af3a244: test   %eax,-0x5c9a24a(%rip)        # 0x00000001052a0000
                                                ;   {poll_return}
  0x000000010af3a24a: retq   
  0x000000010af3a24b: inc    %r8d
  0x000000010af3a24e: mov    %r8d,0x68(%r10)    ;*putstatic a
                                                ; - com.pohvii.note.basic.concurrent.NoVolatileTest::main@11 (line 12)
使用 volatile 变量的示例代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.pohvii.note.basic.concurrent;

/**
 * @author zhanglei
 * @date 2021/5/31
 */
public class VolatileTest {
    static volatile int a = 0;

    public static void main(String[] args) {
        if (a == 0) {
            a++;
        }
    }
}
使用 volatile 变量的示例汇编码
  0x0000000109ca1360: sub    $0x18,%rsp
  0x0000000109ca1367: mov    %rbp,0x10(%rsp)    ;*synchronization entry
                                                ; - com.pohvii.note.basic.concurrent.VolatileTest::main@-1 (line 11)

  0x0000000109ca136c: movabs $0x66ab68ac0,%r10  ;   {oop(a 'java/lang/Class' = 'com/pohvii/note/basic/concurrent/VolatileTest')}
  0x0000000109ca1376: mov    0x68(%r10),%r8d    ;*getstatic a
                                                ; - com.pohvii.note.basic.concurrent.VolatileTest::main@0 (line 11)

  0x0000000109ca137a: test   %r8d,%r8d
  0x0000000109ca137d: je     0x0000000109ca138b  ;*return
                                                ; - com.pohvii.note.basic.concurrent.VolatileTest::main@14 (line 14)

  0x0000000109ca137f: add    $0x10,%rsp
  0x0000000109ca1383: pop    %rbp
  0x0000000109ca1384: test   %eax,-0x5d1038a(%rip)        # 0x0000000103f91000
                                                ;   {poll_return}
  0x0000000109ca138a: retq   
  0x0000000109ca138b: mov    0x68(%r10),%r8d
  0x0000000109ca138f: inc    %r8d
  0x0000000109ca1392: mov    %r8d,0x68(%r10)
  0x0000000109ca1396: lock addl $0x0,(%rsp)     ;*putstatic a
                                                ; - com.pohvii.note.basic.concurrent.VolatileTest::main@11 (line 12)

源代码部分:两者仅第8行不同,一个使用了 volatile 变量,另一个未使用

汇编码部分:使用 volatile 变量的汇编码在 a++ 之前多了一次读的操作,之后多了一条 lock 前缀的命令

stack

stack starts at a high memory address and grows downwards

$

Immediate values are constants, and are prefixed by a $

%

Register names are prefixed by a %

rsp

stack pointer, points to the top of the current stack frame

rbp

base pointer, points to the base of the current stack frame

LOCK (prefix)

The LOCK prefix ensures that the CPU has exclusive ownership of the appropriate cache line for the duration of the operation, and provides certain additional ordering guarantees. This may be achieved by asserting a bus lock, but the CPU will avoid this where possible. If the bus is locked then it is only for the duration of the locked instruction.

The cache coherency mechanism automatically prevents two or more processors that have cached the same area of memory from simultaneously modifying data in that area.

FLINK 选举机制

没有评论

 flink HA 有两种实现, ZooKeeper 和 Kubernetes, flink 对两者进行了封装, 定义如下

// 选举服务
interface LeaderElectionService {
    // 启动选举
    start(LeaderContender)
    // 停止选举
    stop()
    // 确认主权
    confirmLeadership(..)
}
// 参选者
interface LeaderContender {
    // 获取主权后回调
    grantLeadership(..)
    // 失去主权后回调
    revokeLeadership()
    // 选举异常回调
    handleError(Exception)
}

1. 中间层抽象

LeaderElectionService 默认实现类是 DefaultLeaderElectionService, 通过创建 LeaderElectionDriver 进行选举, 选举成功后回调 LeaderElectionEventHandler, 失败后回调 FatalErrorHandler, 这两个 Handler 内部调用 LeaderContender 的方法

2. ZooKeeper 实现

LeaderElectionDriver 的 ZooKeeper 实现是 ZooKeeperLeaderElectionDriver, 内部使用 Curator 框架提供的 LeaderLatch 和 NodeCache 机制实现选举,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public ZooKeeperLeaderElectionDriver(..) {
    // 选举成功, 回调 isLeader, 否则回调 notLeader
    leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
    // 监听变化, 回调 nodeChanged
    cache = new NodeCache(client, leaderPath);
    // 选举异常, 回调 unhandledError
    client.getUnhandledErrorListenable().addListener(this);

    leaderLatch.addListener(this);
    leaderLatch.start();

    cache.getListenable().addListener(this);
    cache.start();
}

public void isLeader() {
    leaderElectionEventHandler.onGrantLeadership();
}

public void notLeader() {
    leaderElectionEventHandler.onRevokeLeadership();
}

public void unhandledError(String message, Throwable e) {
    fatalErrorHandler.onFatalError(..);
}

References

Hibernate 一级缓存

没有评论

一级缓存

hibernate 在应用层维护一个可重复读的持久上下文

  1. 在使用主键加载实体的时候,hibernate 首先在持久上下文中根据主键查询实体,如果存在则返回,不存在则请求数据库

  2. 在使用非主键加载实体的时候,hibernate 请求数据库,首先从结果集中解析出实体的主键,然后在持久上下文中根据主键查询实体,如果存在则返回,不存在则继续解析剩余部分

1. 例子

1.1. findById

1
2
3
4
5
6
7
8
@Test
@Transactional
void l1CacheTest() {
    // query
    employeeRepository.findById(10001); 1
    // no query
    employeeRepository.findById(10001); 2
}
1第一次执行 SQL
2第二次不执行 SQL

1.2. not findById

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED) 1
public void readUncommitted() {
    Optional<User> firstReadUser = userRepository.findFirstByUsername("admin");
    String firstReadPassword = firstReadUser.map(User::getPassword).get(); 2
    log.info("first read user admin password is {}", firstReadPassword);
    // debug point below
    // entityManager.clear(); 3
    Optional<User> secondReadUser = userRepository.findFirstByUsername("admin"); 2
    String secondReadPassword = secondReadUser.map(User::getPassword).get();
    log.info("second read user admin password is {}", secondReadPassword);
    assertNotEquals(firstReadPassword, secondReadPassword); 4
}
1设置事务隔离级别为可重复读
2两次都执行 SQL
3在第一次查询后开启另一个事务,修改数据
4断言失败

参考文献