一、并发vwin
首先,用1000个客户端进程来模拟并发,并使用信号量Semaphore 控制同时100个线程并发执行,采用同步器CountDownLatch 确保并发线程总数执行完成。模拟代码如下:
// 请求总数
public static int clientTotal = 1000;
// 同时并发执行的线程数
public static int threadTotal = 100;
public static int count = 0;
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal ; i++) {
executorService.execute(() - > {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
count++;
}
- 同步器CountDownLatch
同步器CountDownLatch是计数器向下减的闭锁;保证线程执行完再进行其他的处理。工作原理图如下:
主线程调用await()方法后进入等待状态,其他线程每次调用countDown()会使同步器计数减1,当同步器计数为0时,主线程继续执行。
- 同步器Semaphore
同步器Semaphore的作用是阻塞线程,并且控制同一时间的请求的并发量。工作原理同CountDownLatch,不同的是Semaphore计数是向上计数,使用前需要指定一个目标数值。
上述并发模拟代码我们多次执行,发现是线程不安全的,原因是i++不是原子性的。我们反编译如下代码:
public void inc() {
++i;
}
public void inc() {
Code:
0: aload_0
1: dup
2: getfield
5: lconst_1
6: ladd
7: putfield
10: return
}
由此可见,简单的++i由2,5,6,7四步组成:
- 2是获取当前i的值并放入栈顶
- 5是把常量1放入栈顶
- 6是把当前栈顶中两个值相加并把结果放入栈顶
- 7是把栈顶的结果赋给i变量
因此,java中简单的一句++i被转换成汇编后就不具有原子性了。这里保证多个操作的原子性可以使用synchronized来实现i的内存可见性,但更好的方式是使用非阻塞的CAS算法实现的原子性操作类。下面我们使用cas把它改造成线程安全的。
二、CAS原理
1.cas保证原子性
并发模拟代码修改:
public static AtomicLong count = new AtomicLong(0);
private static void add() {
count.incrementAndGet();
}
incrementAndGet源码:
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
getAndAddLong源码:
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
// 用native标识的方法,代表的是java底层的方法,不是用java实现的
public native long getLongVolatile(Object var1, long var2);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
方法调用过程:
- var1指的是当前传过来的count对象
- var2指的是当前的值
- var4指的是数字1
- var6指的是调用底层方法得到底层当前的值,如果没有其他线程处理count变量的时候,正常返回var2的值
对于var1这个对象,如果当前的值var2跟底层的值var6相同的话,就把底层的值var6更新成var6 + var4。那么为什么会出现当前的值var2跟底层的值var6不相同的情况呢?答案是和java内存模型有关,关于java内存模型我们下次再详细介绍。
2.cas底层原理
分析getAndAddLong源码:CAS有四个操作数,分别为:对象内存位置,对象中的变量的偏移量,变量预期值和新值。其操作含义是,如果对象obj中内存偏移量为valueOffset的变量值为expect,则使用新的值update替换旧值expect。此操作具有 volatile 读和写的内存语义。
下面分别从编译器和处理器的角度来分析,CAS 如何同时具有 volatile 读和 volatile 写的内存语义。
编译器
编译器不会对 volatile 读与 volatile 读后面的任意内存操作重排序;编译器不会对 volatile 写与 volatile 写前面的任意内存操作重排序。组合这两个条件,意味着为了同时实现 volatile 读和 volatile 写的内存语义,编译器不能对 CAS 与 CAS 前面和后面的任意内存操作重排序。
处理器
下面是 sun.misc.Unsafe 类的 compareAndSwapLong() 方法的源代码:
public final native boolean compareAndSwapLong(Object var1,
long var2,
long var4,
long var6);
下面是对应于 intel x86 处理器的源代码的片段:
// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \\
__asm je L0 \\
__asm _emit 0xF0 \\
__asm L0:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
可以看到调用了“Atomic::cmpxchg”方法:
mp是 os::is_MP() 的返回结果, os::is_MP() 是一个内联函数,用来判断当前系统是否为多处理器。如果当前系统是多处理器,该函数返回1。否则,返回0。 LOCK_IF_MP(mp) 会根据mp的值来决定是否 为cmpxchg指令添加lock前缀 。如果通过mp判断当前系统是多处理器(即mp值为1),则为cmpxchg指令添加lock前缀。否则,不加lock前缀。(单处理器自身会维护单处理器内的顺序一致性,不需要 lock 前缀提供的内存屏障效果)。
intel 的手册对 lock 前缀的说明如下:
- 确保对内存的读 - 改 - 写操作原子执行。在 Pentium 及 Pentium 之前的处理器中,带有 lock 前缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从 Pentium 4,Intel Xeon 及 P6 处理器开始,intel 在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在 lock 前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个缓存行(cache line)中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读 / 写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做缓存锁定(cache locking),缓存锁定将大大降低 lock 前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。
- 禁止该指令与之前和之后的读和写指令重排序。
- 把写缓冲区中的所有数据刷新到内存中。
上面的第 2 点和第 3 点所具有的内存屏障效果足以同时实现 volatile 读和volatile 写的内存语义。
三、CAS存在的问题
1.ABA问题
ABA问题是指在CAS操作的时候,其他线程将变量的值A改成了B,又改回了A,当前线程使用期望值A与当前变量A进行比较的时候发现A变量值没有变,于是CAS就将A值进行了交换操作。这个时候,其实该值已经被其他线程改变过,这与设计思想是不符合的。ABA问题的解决思路:每次变量更新的时候,把变量的版本号加1,那么之前的就变成了1A2B3A,从而解决了ABA问题。AtomicStampedReference的compareAndSet
:
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair< V > current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
这个方法相对于之前的compareAndSet方法,多了一个stamp的比较,stamp的值由每次更新的时候来维护的。
2.循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。CAS的底层实现,是在一个死循环内不断的尝试修改目标值,直到修改成功。如果竞争不激烈的情况下,它修改成功的几率很高,竞争激烈的情况下,修改失败的几率就很高,在大量修改失败的时候,这些原子操作就会进行多次的循环尝试,因此性能会受到影响。
基于这个原因,jdk1.8新增了一个原子类LongAdder用来解决这个问题。新增LongAdder与原始AtomicLong工作原理对比如下如所示:
这里有个知识点:对于普通类型的long和double变量,JVM允许将64位的读操作或写操作,拆成两个32位的操作
。那么LongAdder的实现是基于什么思想呢?它的核心其实是将热点数据分离,比如说,它可以将AtomicLong内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法,定位到其中一个数字进行计数,而最终的计数结果是这个数组的求和累加,其中热点数据value会被分割成多个单元的cell,每个cell独自维护内部的值,当前对象的实际值,由多有的cell累计合成,这样的话热点就进行了有效的分离,并提高了并行度。这样一来呢,LongAdder相当于是在AtomicLong的基础上,将单点的更新压力,分散到各个节点上;在低并发的时候,通过对base的直接更新,可以很好的保证和AtomicLong性能基本一致,而在高并发的时候,则通过分散提高了性能。
LongAdder缺点:在统计的时候,如果有并发更新,可能会导致统计的数据有些误差。所以如果是序列号生成等需要准确的数值,全局唯一的AtomicLong才是正确的选择。
3.只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java1.5开始JDK提供了**AtomicReference**类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。
四、CAS在锁机制中的应用
1.乐观锁
乐观锁在Java中是通过使用无锁编程来实现,最常采用的是CAS算法,Java原子类中的递增操作就通过CAS自旋实现的:
public static AtomicLong atomicLong = new AtomicLong();
atomicLong.incrementAndGet();
2.自旋锁&自适应自旋锁
自旋锁的实现原理同样也是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功:
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
3.无锁
无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。CAS原理及应用即是无锁的实现
4.轻量级锁
是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
这里说明一下,轻量级锁和偏向锁都是JDK1.6对synchronized的优化,优化后存在四种锁状态。关于java中的锁我们下次再详细介绍,下面给出这四种锁状态:
5.ReentrantLock
ReentrantLock主要利用CAS+CLH队列来实现,基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入CLH队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:
- 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取;
- 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。
CLH队列:带头结点的双向非循环链表。结构图如下:
结语
cas原理先介绍这些,关于java线程安全的三个特性:原子性、可见性、有序性以及cas中涉及到的java内存模型以及cpu多级缓存,后面会详细说明。
-
JAVA
+关注
关注
19文章
2966浏览量
104702 -
计数器
+关注
关注
32文章
2256浏览量
94476 -
CAS
+关注
关注
0文章
34浏览量
15201 -
VaR
+关注
关注
0文章
39浏览量
11336 -
同步器
+关注
关注
1文章
98浏览量
14629
发布评论请先 登录
相关推荐
评论