代理
代理对象通过invoke
,实现类与非核心功能的解耦。
public static void main(String[] args) { Payment payment = new Payment("AliPay"); Pay proxy = ProxyUtil.createProxy(payment);
proxy.pay(amount); }
class Payment implements Pay {
@Overide public payResp pay(BigDecimal payAmount) { } }
interface Pay {
abstract payResp pay(BigDecimal payAmount); }
class ProxyUtils { public static Pay createProxy(Payment payment) { return (Pay) Proxy.newProxyInstance( ProxyUtil.class.getClassLoader(), new Class[]{ Pay.class }, new InvocationHandler() {
@Overide public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { return method.invoke(payment, args); } } ); } }
|
反射
反射允许对成员变量、成员方法、构造方法信息进行编程访问。
例如,IDE的智能补全、参数提示,就是使用反射实现。
Class字节码获取
Class clazz; clazz = Class.forName("com.yourpackage.TargetClass");
clazz = TargetClass.class;
clazz = instance.getClass();
|
场景
反射可以与配置文件结合,从而动态地创建对象。例如application.yml
里数据库的配置、端口号等。
Properties prop = new Properties(); FileInputStream fis = new FileInputStream(ROOT + "src/main/resources/application.properties"); String dataSourceUrl = (String) perp.get("dataSource"); String dbName = (String) extractDbName(dataSourceUrl);
Class clazz = Class.forName(dbName); Constructor con = clazz.getDeclaredConstructor(); Object o = con.newInstance();
...
|
HashMap
jdk1.7
jdk1.7的HashMap数据结构是:数组 + 单向链表
当哈希后,得到的数组槽位已经存放了其他元素,这时候就需要运用指针在同一个槽位存放多个元素。
头插法
jdk1.7使用的方法是头插法,这样就不需要遍历到链表尾部再插入,性能高。
void createEntry(int hash, K key, V value, int bucketIdx) { Entry<K, V> e = table[bucketIdx]; table[bucketIdx] = new Entry<>(hash, key, value, e); size++; }
public Entry(int h, K k, V v, Entry<K, V> n) { this.value = v; this.next = n; this.key = k; this.hash = h; }
|
这种方法需要最终更新槽位指向新插入的节点,否则单向链表找不到新插入的元素。
利用2次方机器特性
HashMap实际初始化时,不是根据用户传入的容量,而是向上取整2的次方。
这是因为2次方可以将取模优化为位运算,避免除法(在机器中是暴力试除)
2的幂次取模小技巧:Xmod2n=X & (2n−1)
static int indexFor(int h, int length) { return h & (length-1); }
|
// 以2^4 = 16为例 0101 **** mod 0001 0000 // 优化为位运算 0101 **** & 0000 1111 // ****与操作后,一定小于16
|
位运算取最高位算法,翻倍(number - 1) << 1
后取高位1
private static int roundUpToPowerOf2(int number) { return number >= MAXIMUM_CAPACITY ? MAXIUM_CAPACITY : (number > 1) ? Integer.highestOneBit((number - 1) << 1) : 1; }
public static int highestOneBit(int i) { i |= (i >> 1); i |= (i >> 2); i |= (i >> 4); i |= (i >> 8); i |= (i >> 16); return i - (i >> 1); }
|
highestOneBit()
位运算原理
// 任意一个数 0001 **** // i |= (i >> 1); 0001 **** | 0000 1*** = 0001 1*** // i |= (i >> 2); 0001 1*** | 0000 011* = 0001 111* // i |= (i >> 4); 0001 111* | 0000 0001 = 0001 1111 // 至此,*全部变为1 // i |= (i >> 8); // i |= (i >> 16); // int类型最多4Byte = 32bit,此时原数i最大已经覆盖了16位1,右移即可全部覆盖
// return i - (i >> 1); 0001 1111 - 0000 1111 = 0001 0000 // 得到最高位的1
|
搅动算法扩散哈希差异
实际上,HashMap除了调用hashCode()
方法以外,还会使用位运算进行搅动,达到均匀分布的效果。
jdk1.7通过位运算混合哈希码的高低位信息,从而减少哈希冲突。
(如0x10000000
和0x20000000
),扰动后差异会扩散到更多低位(0x12000000
和0x24000000
),避免仅依赖低位导致冲突。
final int hash(Object k) { int h = hashSeed; if (0 != h && k instanceOf String) { return stringHash32((String) k); }
h ^= k.hashCode(); h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); }
|
jdk1.8优化了这里的搅动逻辑(h = key.hashCode()) ^ (h >>> 16)
,保证搅动足够均匀的情况下减少运算,并结合红黑树进行优化。
resize重新哈希
resize直接*2,扩容成原来的两倍。由于初始化就设置capacity为2的幂次,所有扩容后仍然为2的幂次。
void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K, V> e: table) { while (null != e) { Entry<K, V> next = e.next; if (rehash) { e.hash = (null == e.key ? 0 : hash(e.key)); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = e.next; } } }
|
transfer()
仍然使用头插法,这样会调转原来的顺序
// 原来可能是这样的 e -> 1 1 -> 2 -> 3 -> null
// 头插法, e.next = newTable[i]; 1 -> null(newTable[i]初始化的Entry默认为null) // 调整i指针, newTable[i] = e; i -> 1 e -> 2
// 头插法 // 此时 1 -> null, 头部插入 2 -> 1 2 -> 1 -> null i -> 2 e -> 3 ...
// 最终结果 3 -> 2 -> 1 -> null
|
从transfer()
核心的2行代码可以看出,HashMap扩容时操作不原子,并发执行有问题。
Thread1: e.next = newTable[i]; Thread2: e.next = newTable[i];
Thread2: newTable[i] = e; Thread1: newTable[i] = e;
|
更严重的问题是,并发transfer()
会出现循环链表
1 -> 2 -> 3 -> null
3 -> 2 -> 1 -> null
i -> 1 -> old
i -> 1 -> 3 -> 2 -> 1 -> 3 ...
|
rehash规律
在rehash里有一个规律:由于扩容后还是2的幂次,因此rehash的结果要么和原来相同;要么是原来的位置+高位
// 假设原来capacity是16,hash后使用indexFor取模 // 原来的slot 0101 0101 & 0000 1111 = 0000 0101 // capacity >> 1,变成32后 0101 0101 & 0001 1111 = 0001 0101 // 取模结果是原来加上高位16;
// 如果原数在高位不为1,那rehash结果和原来一致 0110 0101 & 0000 1111 & 0001 1111 = 0000 0101
|
jdk1.8
jdk1.8的HashMap数据结构是:数组 + (单向+双向)两种链表 + 红黑树
说用到双向链表是因为红黑树会记录父节点和子节点,相当于双向的。
当数组长度达到64且哈希冲突使链表长度达到8,该槽位会改用红黑树结构。
如果数组未达到64,只是链表长度达到8,那么会扩容。
拓展:为什么是8和64
- 红黑树效率高,但是数组大小小于64的时候,红黑树频繁使用平衡算法开销比较大,而且红黑树存储空间是数组+链表的2倍(例如,树存储父、子节点,2个指针而不是1个)
- 泊松分布:链表长度为0的概率是60%,为1的概率是30%,为2的概率是7%…为8的概率几乎可以忽略,是非常极端的情况;并且达到8的时候,链表查询的平均开销数学上大于红黑树。
更简洁的搅动算法
final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }
|
懒初始化
jdk1.8只有在第一次调用put()
时才真正为HashMap分配内存。
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K, V>[] tab; Node<K, V> p; int n, i; if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; ... }
|
尾插法
jdk1.8使用尾插法,避免头插法在并发下的死循环问题
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { ... for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); if (binCount >= TREEIFY_THRESHOLD - 1) treeifyBin(tab, hash); break; } if (e.hash == hash && ((k = e.kay) == key || (key != null && key.equals(k)))) break; p = e; } ... }
final void treeifyBin(Node<K, V>[] tab, int hash) { int n, index; Node<K, V> e; if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY) resize(); else ... }
|
注意,尾插法仍然不能保证原子性,在并发情况下仍然会发生更新丢失。
resize批量迁移
在链表情况下,hash后的槽位依然符合规律:
rehash的结果要么和原来相同;要么是原来的位置+高位
final Node<K, V>[] resize() { ... if ((e = oldTab[j]) != null) { oldTab[j] = null; if (e.next == null) newTab[e.hash & (newCap - 1)] = e; else if (e instanceof TreeNode) ((TreeNode<K, V>) e).split(this, newTab, j, oldCap); else { Node<K, V> loHead = null, loTail = null; Node<K, V> hiHead = null, hiTail = null; Node<K, V> next;
do { next = e.next; if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { } } while ((e = next) != null); if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } ... return newTab; }
|
对于树的迁移,因为hash取模newCapacity导致槽位变化,也要分割。分割逻辑和链表一样,只不过多了判断:
- 分割后,是否需要退化为链表?红黑树节点少于6就退化;否则重新生成红黑树
- 如果全部hash取模后都在低位/高位,那直接迁移整棵红黑树。
Concurrent HashMap
ConcurrentHashMap通过分段Segment锁控制并发级别,只对核心put()
逻辑上锁,并且在获取锁失败时预创建节点,实现了高效的原子更新。
HashTable的性能问题
HashTable通过使用synchronized
关键字来使put()
同步。
这时使用table.put(key, value)
,会直接为实例table
上锁,从而保证put()
原子性,解决并发下数据丢失问题。
然而,这样做锁的粒度太大了,
- 一些不需要原子执行的操作(如
hash()
、indexFor()
、潜在的resize()
)都串行执行,
- 并且即使两个
put()
操作没有竞争也会被上锁,
因此,使用HashTable效率低下。
Segment分段锁
static final class Segment<K, V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K, V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor;
... }
static final class HashEntry<K, V> { final int hash; final K key; volatile V value; volatile HashEntry<K, V> next;
... }
|
使用ConcurrentHashMap时,会这样执行put()
:
- 首先通过
hash()
得到槽位index
- 对槽位上锁,
segment[index].lock();
- 生成Entry,
new Entry(key, value, hashCode);
- 将Entry放入
Segment[]
- 释放槽位锁,
segment[index].unlock();
通过这样的操作:
- 保证了
putVal()
的原子性,解决了数据丢失问题
- 在两个线程hash槽位不同时,保证了并发性
可重入锁put
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { else { if (node != null) node.setNext(first); else node = new HashEntry<K, V>(hash, key, value, first); ... } } finally { unlock(); } return oldValue; }
|
预创建node
在tryLock()
失败时,线程会尝试预创建node
private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K, V> first = entryForHash(this, hash); HashEntry<K, V> e = first; HashEntry<K, V> node = null;
int retries = -1; while (!tryLock()) { HashEntry<K, V> f; if (retries < 0) { if (e == null) { if (node == null) { node = new HashEntry<K, V>(hash, key, value, null); } retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } } return node; }
|
concurrencyLevel并发级别
每个Segment有多少个Entry?ConcurrentHashMap通过initialCapacity / concurrencyLevel
来控制。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } ... int c = initialCapacity / ssize; if (c * ssize < initalCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;
Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]); Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize]; }
|