Advanced Java

代理

代理对象通过invoke,实现类与非核心功能的解耦。

public static void main(String[] args) {
Payment payment = new Payment("AliPay");
Pay proxy = ProxyUtil.createProxy(payment);

proxy.pay(amount);
}

class Payment implements Pay {

@Overide
public payResp pay(BigDecimal payAmount) {
// Payment Business...
}
}

interface Pay {

abstract payResp pay(BigDecimal payAmount);
}

class ProxyUtils {
public static Pay createProxy(Payment payment) {
return (Pay) Proxy.newProxyInstance(
ProxyUtil.class.getClassLoader(),
new Class[]{ Pay.class },
new InvocationHandler() {

@Overide
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// Payment Environment Init...
// Payment Safe Guard...
return method.invoke(payment, args);
}
}
);
}
}

反射

反射允许对成员变量、成员方法、构造方法信息进行编程访问。

例如,IDE的智能补全、参数提示,就是使用反射实现。

Class字节码获取

Class clazz;
clazz = Class.forName("com.yourpackage.TargetClass");
// 参数
clazz = TargetClass.class;
// 有实例时
clazz = instance.getClass();

场景

反射可以与配置文件结合,从而动态地创建对象。例如application.yml里数据库的配置、端口号等。

Properties prop = new Properties();
FileInputStream fis = new FileInputStream(ROOT + "src/main/resources/application.properties");
String dataSourceUrl = (String) perp.get("dataSource");
String dbName = (String) extractDbName(dataSourceUrl);

Class clazz = Class.forName(dbName);
Constructor con = clazz.getDeclaredConstructor();
Object o = con.newInstance();

...

HashMap

jdk1.7

jdk1.7的HashMap数据结构是:数组 + 单向链表

当哈希后,得到的数组槽位已经存放了其他元素,这时候就需要运用指针在同一个槽位存放多个元素。

头插法

jdk1.7使用的方法是头插法,这样就不需要遍历到链表尾部再插入,性能高。

void createEntry(int hash, K key, V value, int bucketIdx) {
Entry<K, V> e = table[bucketIdx];
// 这里使用头插法,在槽位头部插入新元素,并指向e,成为新的槽位引用
table[bucketIdx] = new Entry<>(hash, key, value, e);
size++;
}

public Entry(int h, K k, V v, Entry<K, V> n) {
this.value = v;
this.next = n;
this.key = k;
this.hash = h;
}

这种方法需要最终更新槽位指向新插入的节点,否则单向链表找不到新插入的元素。

利用2次方机器特性

HashMap实际初始化时,不是根据用户传入的容量,而是向上取整2的次方。

这是因为2次方可以将取模优化为位运算,避免除法(在机器中是暴力试除)
2的幂次取模小技巧:Xmod2n=X & (2n1)X \mod 2^n = X \text{ \& } (2^n - 1)

static int indexFor(int h, int length) {
return h & (length-1);
}
// 以2^4 = 16为例
0101 ****
mod 0001 0000
// 优化为位运算
0101 ****
& 0000 1111
// ****与操作后,一定小于16

位运算取最高位算法,翻倍(number - 1) << 1后取高位1

private static int roundUpToPowerOf2(int number) {
return number >= MAXIMUM_CAPACITY
? MAXIUM_CAPACITY
: (number > 1) ? Integer.highestOneBit((number - 1) << 1) : 1;
}

public static int highestOneBit(int i) {
i |= (i >> 1);
i |= (i >> 2);
i |= (i >> 4);
i |= (i >> 8);
i |= (i >> 16);
return i - (i >> 1);
}

highestOneBit()位运算原理

// 任意一个数
0001 ****
// i |= (i >> 1);
0001 ****
| 0000 1***
= 0001 1***
// i |= (i >> 2);
0001 1***
| 0000 011*
= 0001 111*
// i |= (i >> 4);
0001 111*
| 0000 0001
= 0001 1111
// 至此,*全部变为1
// i |= (i >> 8);
// i |= (i >> 16);
// int类型最多4Byte = 32bit,此时原数i最大已经覆盖了16位1,右移即可全部覆盖

// return i - (i >> 1);
0001 1111
- 0000 1111
= 0001 0000
// 得到最高位的1

搅动算法扩散哈希差异

实际上,HashMap除了调用hashCode()方法以外,还会使用位运算进行搅动,达到均匀分布的效果。

jdk1.7通过​​位运算混合哈希码的高低位信息​​,从而减少哈希冲突。
(如0x100000000x20000000),扰动后差异会扩散到更多低位(0x120000000x24000000),避免仅依赖低位导致冲突。

final int hash(Object k) {
int h = hashSeed;
if (0 != h && k instanceOf String) {
return stringHash32((String) k);
}

h ^= k.hashCode();
// 确保“仅在每个比特位存在常数倍差异”的哈希码,其冲突次数有上限(默认负载因子下约8次)​。
// '>>>' 是无符号右移
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

jdk1.8优化了这里的搅动逻辑(h = key.hashCode()) ^ (h >>> 16),保证搅动足够均匀的情况下减少运算,并结合红黑树进行优化。

resize重新哈希

resize直接*2,扩容成原来的两倍。由于初始化就设置capacity为2的幂次,所有扩容后仍然为2的幂次。

void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K, V> e: table) {
while (null != e) {
Entry<K, V> next = e.next;
if (rehash) {
// 基于新capacity,重新哈希
e.hash = (null == e.key ? 0 : hash(e.key));
}
int i = indexFor(e.hash, newCapacity);
// 头插法,e对应对象的next指向到newTable
e.next = newTable[i];
// newTable[i]这个槽位指向e对应对象,保证能遍历到新插入对象
newTable[i] = e;
e = e.next;
}
}
}

transfer()仍然使用头插法,这样会调转原来的顺序

// 原来可能是这样的
e -> 1
1 -> 2 -> 3 -> null

// 头插法, e.next = newTable[i];
1 -> null(newTable[i]初始化的Entry默认为null)
// 调整i指针, newTable[i] = e;
i -> 1
e -> 2

// 头插法
// 此时 1 -> null, 头部插入 2 -> 1
2 -> 1 -> null
i -> 2
e -> 3
...

// 最终结果
3 -> 2 -> 1 -> null

transfer()核心的2行代码可以看出,HashMap扩容时操作不原子,并发执行有问题。

Thread1: e.next = newTable[i]; // old
Thread2: e.next = newTable[i]; // old

Thread2: newTable[i] = e; // i -> 1 -> old
Thread1: newTable[i] = e; // i -> 2 -> old

更严重的问题是,并发transfer()会出现循环链表

// 原来的链表
1 -> 2 -> 3 -> null

// 并发resize()
// Thread1完成resize()
3 -> 2 -> 1 -> null
// 这时Thread2刚开始transfer,但是拿到的是旧链表 1 -> 2 -> 3
// 从1开始
i -> 1 -> old // 实际上old已经变成3 -> 2 -> 1了
// 死循环!
i -> 1 -> 3 -> 2 -> 1 -> 3 ...

rehash规律

在rehash里有一个规律:由于扩容后还是2的幂次,因此rehash的结果要么和原来相同;要么是原来的位置+高位

// 假设原来capacity是16,hash后使用indexFor取模
// 原来的slot
0101 0101
& 0000 1111
= 0000 0101
// capacity >> 1,变成32后
0101 0101
& 0001 1111
= 0001 0101
// 取模结果是原来加上高位16;

// 如果原数在高位不为1,那rehash结果和原来一致
0110 0101
& 0000 1111
& 0001 1111
= 0000 0101

jdk1.8

jdk1.8的HashMap数据结构是:数组 + (单向+双向)两种链表 + 红黑树

说用到双向链表是因为红黑树会记录父节点和子节点,相当于双向的。
当数组长度达到64且哈希冲突使链表长度达到8,该槽位会改用红黑树结构。
如果数组未达到64,只是链表长度达到8,那么会扩容。

拓展:为什么是8和64

  • 红黑树效率高,但是数组大小小于64的时候,红黑树频繁使用平衡算法开销比较大,而且红黑树存储空间是数组+链表的2倍(例如,树存储父、子节点,2个指针而不是1个)
  • 泊松分布:链表长度为0的概率是60%,为1的概率是30%,为2的概率是7%…为8的概率几乎可以忽略,是非常极端的情况;并且达到8的时候,链表查询的平均开销数学上大于红黑树。

更简洁的搅动算法

final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

懒初始化

jdk1.8只有在第一次调用put()时才真正为HashMap分配内存。

final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
Node<K, V>[] tab; Node<K, V> p; int n, i;
// 如果为空就初始化
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
...
}

尾插法

jdk1.8使用尾插法,避免头插法在并发下的死循环问题

final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {
...
// 使用for循环,binCount充当计数器,哈希冲突导致链表长度为8,就转为红黑树
for (int binCount = 0; ; ++binCount) {
// 遍历到尾节点
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1)
// 实际上会检查数组容量是否大于64,小于则只调用resize()
treeifyBin(tab, hash);
break;
}
// 遍历过程中发现相同Key,那么跳出循环,在代码最后做一个替换操作,并返回oldValue
if (e.hash == hash && ((k = e.kay) == key || (key != null && key.equals(k))))
break;
// 否则 p = p.next
p = e;
}
// 替换返回旧值
...
}

final void treeifyBin(Node<K, V>[] tab, int hash) {
int n, index; Node<K, V> e;
// 数组长度小于64,只扩容不转红黑树
if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY)
resize();
else ...
}

注意,尾插法仍然不能保证原子性,在并发情况下仍然会发生更新丢失。

resize批量迁移

在链表情况下,hash后的槽位依然符合规律
rehash的结果要么和原来相同;要么是原来的位置+高位

final Node<K, V>[] resize() {
...
// 槽位不为空才需要迁移
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
// e.next为空,只有1个元素,直接迁移
if (e.next == null)
// hash并取模newCap,得到新槽位
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K, V>) e).split(this, newTab, j, oldCap);
// 槽位引用为列表
else {
// 因为容量变了,hash()取模后不一定得到原来的槽位,需要全部重新hash
// 要么hash和原来一致
Node<K, V> loHead = null, loTail = null;
// 要么hash为原值 + 高位
Node<K, V> hiHead = null, hiTail = null;
Node<K, V> next;

do {
next = e.next;
// hash后高位不为1,和原来一致
if ((e.hash & oldCap) == 0) {
if (loTail == null)
// 初始化更新头节点
loHead = e;
else
// 其他时候更新尾节点
loTail.next = e;
// loTail = loTail.next
loTail = e;
}
else {
// 同样的逻辑处理高位
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
// 容量都是2的幂次,等于+高位
newTab[j + oldCap] = hiHead;
}
}
}
...
return newTab;
}

对于树的迁移,因为hash取模newCapacity导致槽位变化,也要分割。分割逻辑和链表一样,只不过多了判断:

  1. 分割后,是否需要退化为链表?红黑树节点少于6就退化;否则重新生成红黑树
  2. 如果全部hash取模后都在低位/高位,那直接迁移整棵红黑树。

Concurrent HashMap

ConcurrentHashMap通过分段Segment锁控制并发级别,只对核心put()逻辑上锁,并且在获取锁失败时预创建节点,实现了高效的原子更新。

HashTable的性能问题

HashTable通过使用synchronized关键字来使put()同步。
这时使用table.put(key, value),会直接为实例table上锁,从而保证put()原子性,解决并发下数据丢失问题。

然而,这样做锁的粒度太大了,

  1. 一些不需要原子执行的操作(如hash()indexFor()、潜在的resize())都串行执行,
  2. 并且即使两个put()操作没有竞争也会被上锁
    因此,使用HashTable效率低下。

Segment分段锁

static final class Segment<K, V> extends ReentrantLock implements Serializable {
// 注意,每个Segment代表HashEntry数组,因此叫做Segment段
transient volatile HashEntry<K, V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;

...
}

static final class HashEntry<K, V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K, V> next;

...
}

使用ConcurrentHashMap时,会这样执行put()

  1. 首先通过hash()得到槽位index
  2. 对槽位上锁,segment[index].lock();
  3. 生成Entry,new Entry(key, value, hashCode);
  4. 将Entry放入Segment[]
  5. 释放槽位锁,segment[index].unlock();

通过这样的操作:

  1. 保证了putVal()的原子性,解决了数据丢失问题
  2. 在两个线程hash槽位不同时,保证了并发性

可重入锁put

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 上锁
HashEntry<K, V> node = tryLock() ? null :
// 没有拿到锁的时候,如果没有相同Key,预先创建node
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 遍历 HashEntry e
// 发现相同Key,覆盖Value,并保存oldValue
// 遍历完,e == null,插入新HashEntry
else {
// 等待时已经创建好,直接插入node
if (node != null) node.setNext(first);
// jdk1.7 头插法
else node = new HashEntry<K, V>(hash, key, value, first);
// 其他逻辑,如扩容
...
}
} finally {
unlock();
}
return oldValue;
}

预创建node

tryLock()失败时,线程会尝试预创建node

private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K, V> first = entryForHash(this, hash);
HashEntry<K, V> e = first;
HashEntry<K, V> node = null;

int retries = -1;
// 遍历HashEntry链表
while (!tryLock()) {
HashEntry<K, V> f;
if (retries < 0) {
// 遍历完Entry,尝试创建新节点
if (e == null) {
if (node == null) {
// 创建新节点
node = new HashEntry<K, V>(hash, key, value, null);
}
retries = 0;
}
// 如果发现相同key,不创建
else if (key.equals(e.key)) retries = 0;
// 遍历
else e = e.next;
}
// 头节点有变化(新插入了节点),重置状态
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
// 饥饿,直接上锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
}
return node;
}

concurrencyLevel并发级别

每个Segment有多少个Entry?ConcurrentHashMap通过initialCapacity / concurrencyLevel来控制。

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
int ssize = 1;
// 这里主要用于计算sshift,ssize最后是concurrencyLevel向上取2幂次
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
...
// 实际总容量initialCapacity = concurrencyLevel向上取2次幂 * 每段Entry数量
int c = initialCapacity / ssize;
if (c * ssize < initalCapacity) ++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// cap = c向上取整2次幂
while (cap < c) cap <<= 1;

Segment<K, V> s0 =
new Segment<K, V>(loadFactor, (int)(cap * loadFactor),
// Segment容量为cap
(HashEntry<K, V>[]) new HashEntry[cap]);
// 2幂次Capacity
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
}