Skip to content

Latest commit

 

History

History
206 lines (162 loc) · 8.28 KB

2.并发编程的问题.md

File metadata and controls

206 lines (162 loc) · 8.28 KB

1.并发编程问题

synchronized 实现原理,CAS无锁化的原理,AQS是什么,ConcurrentHashMap的分段锁原理,线程池的原理,Java内存模型,volatile原理,Java并发包。

1.1 synchronized实现原理

synchronized的底层原理是跟jvm指令和monitor有关系的。如果再代码中用了synchronized关键字,在底层编译后的jvm指令中,会有monitorenter和monitorexit两个指令。

// monitorenter 进入加锁
synchronized (myObject) {
    // 代码...
    synchronized (myObject) {
        // 代码...
    }
}
// monitorexit 释放锁

上述代码中,如果一个线程第一次执行到synchronized (myObject) {}那里,获取到myObject对象的monitor的锁,计数器加1,然后第二次synchronized那里,会再次获取myObject对象的monitor锁,这个就是重入锁,然后计数器会再次加1,变成2。

synchronized实现原理

1.2 对CAS的理解以及其底层实现原理

synchronized实现:

public class MyObject {
    int i = 0;
    public synchronized void increment() { // 在一个对象实例的方法上架synchronized
        i++;
    }
}
// 多个线程都同时基于myObject这一个对象来执行 increment
// 此时只有一个线程可以成功对myObject加锁,可以对他关联的计数器加1,一旦多个线程并发去进行synchronized加锁,就会导致线程串行化,效率并不是太高。
MyObject myObject = new MyObject();
myOject.incretment();

CAS实现:

public class MyObjectCAS {
    // 底层就是基于CAS来进行实现的
    AtomicInteger integer = new AtomicInteger(0);

    // 多个线程执行此代码,不需要 synchronize d加锁也是线程安全的
    public void increment() {
        integer.incrementAndGet();
    }
}

CAS实现原理

CAS在底层硬件级别给你保证一定是原子的,同一时间只有一个线程可以执行CAS,先比较在设置,其他的线程的CAS同时间去执行会失败。

// 提供自增易用的方法,返回增加1后的值
public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 额外提供的compareAndSet方法
public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// Unsafe 类的提供的方法
public final int getAndAddInt (Object o,long offset, int delta){
        int v;
        do {
                v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
}

1.3 ConcurrentHashMap 实现线程安全的底层原理

JDK1.8以前,多个数组,分段加锁,一个数组一个锁。

JDK1.8以后,优化细粒度,一个数组,每个元素进行CAS,如果失败了说明元素已被使用,此时synchronized对数组元素加锁,链表+红黑树处理,对数组每个元素加锁。

jdk1.7

[数组1], [数组2], [数组3] => 每个数组对应一把锁分段加锁

ConcurrentHashMap采用 分段锁的机制,实现并发的更新操作,底层采用数组+链表的存储结构。 其包含两个核心静态内部类 Segment和HashEntry。

  1. Segment继承ReentrantLock用来充当锁的角色,每个 Segment 对象守护每个散列映射表的若干个桶。
  2. HashEntry 用来封装映射表的键 / 值对;
  3. 每个桶是由若干个 HashEntry 对象链接起来的链表。

一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组,下面我们通过一个图来演示一下 ConcurrentHashMap 的结构:

ConcurrentHashMap-JDK1.7

jdk1.8

jdk1.81.8的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。

ConcurrentHashMap-jdk1.8

[一个大的数组]
//数组里每个元素进行put操作时,都有一个不同的锁。刚开始put的时候,如果两个线程都在数组[5]位置进行put,这个时候,对数组[5]这个位置进行put的时候,才去的CAS策略。
// 同一个时间只能有一个线程进行操作,其他线程执行CAS都会失败。

分段加锁,通过对数组每个元素执行CAS策略,如果是很多线程对数组里不同的元素执行put,map是没有影响的。如果其他线程执行失败了,说明数组[5]这个位置已经被放进去值了,这个时候就需要基于链表+红黑树来进行处理。

此时执行:synchronized(数组[5]),基于链表或者红黑树在这个位置插进去自己的数据。

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

1.4 AQS 实现原理

ReentrantLock

state变量 -> CAS -> 失败后进入队列等待 -> 释放锁后唤醒

AQS原理

ReentrantLock lock = new RenntrantLock(true);

AQS原理-公平锁