Java HashMap 과 동시성 #2 [Pub/Pub] (ConcurrentModificationException)
운영 중인 서버에서 ConcurrentModificationException 가 발생하여 원인 파악을 시작하였다.
예외가 발생한 코드 재현
Map<Integer, Integer> reduce = IntStream.range(0, 100)
.parallel()
.mapToObj((num) -> {
Map<Integer, Integer> map = new HashMap<>();
map.put(num, num);
return map;
})
.reduce(new HashMap<>(), (a, b) -> {
a.putAll(b);
return a;
});
에러가 발생한 로직을 간단하게 요약하면 위와 같이 정리가 되는데 운영 코드에서는 mapToObj 부분에서 I/O 를 실행하고 있다.
I/O 를 Parallel 하게 실행하기 위하여 Stream 의 Parallel 을 사용하고 있다.
각 숫자를 key 와 value 로 가지는 Map 을 100개 생성하고 이 Map 들을 다시 하나의 Map 으로 합치는 코드이다.
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
at java.util.HashMap.putMapEntries(HashMap.java:512)
at java.util.HashMap.putAll(HashMap.java:785)
at ConcurrentMain.lambda$main$1(ConcurrentMain.java:48)
at java.util.stream.ReduceOps$1ReducingSink.combine(ReduceOps.java:85)
at java.util.stream.ReduceOps$1ReducingSink.combine(ReduceOps.java:72)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(ReduceOps.java:754)
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577)
at java.util.stream.AbstractTask.compute(AbstractTask.java:317)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1881)
at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2478)
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
at ConcurrentMain.main(ConcurrentMain.java:47)
재현한 코드에서 동일하게 에러가 발생하는 것을 확인할 수 있다.
예외가 발생한 원인 분석
StackTrace 를 따라가보면 HashMap 의 어느 부분에서 예외를 발생시켰는지 확인할 수 있다.
final Node<K,V> nextNode() {
Node<K,V>[] t;
Node<K,V> e = next;
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
if (e == null)
throw new NoSuchElementException();
if ((next = (current = e).next) == null && (t = table) != null) {
do {} while (index < t.length && (next = t[index++]) == null);
}
return e;
}
HashMap 의 InnerClass 인 HashIterator 의 nextNode 함수에서 예외를 발생시켰다.
우선 HashIterator.nextNode() 는 HashMap.putAll() 내부에서 아래와 같이 Iterator 를 돌리면서 하나씩 put 하고 있는 것을 볼 수 있다. for 문을 수행하여 다음 Map.Entry 를 가져오기 위해 HashIterator.nextNode() 가 호출되었다.
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
K key = e.getKey();
V value = e.getValue();
putVal(hash(key), key, value, false, evict);
}
그렇다면 HasIterator 의 modCount 와 expectedModCount 가 무엇이고 이 값들이 달리졌을때 예외를 던지는 이유는 무엇일까
modCount
- HashMap 의 지역변수
- HashMap 이 생성될때 항상 0 으로 값이 초기화
- put , remove 와 같은 Map 의 변경이 있을때 항상 값을 증가
expectedModCount
- HashIterator 의 지역변수
- HashIterator 의 생성자에서 HashMap 의 modCount 와 같은 값으로 할당
HashIterator() {
expectedModCount = modCount;
...
}
즉, 이 두 값이 다르다는 의미는 HashIterator 가 생성된 이후에 아직 순회가 진행중인 와중에 HashMap 에 어떠한 변경사항이 생겼다는 것을 의미한다.
예제 코드에서는 Stream.parallel() 를 사용하여 reduce() 절에서 여러 쓰레드가 하나의 Map 에 putAll 을 하는 상황이므로 putAll() 내부에서 Iterator 가 순회중인 와중에 Map 의 변경사항이 생겨서 예외가 발생한 것이다.
이러한 로직은 비단 HashMap 의 Iterator 에만 존재하는 것이 아니라 ArrayList 의 Iterator 에서도 같은 로직을 확인할 수 있다
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
Iterator 가 실행되는 동시에 누군가 Collection 의 값을 변경하는 것은 의도된 대로 동작하지 않는 다고 판단한 것으로 보인다.
극단적인 예시로 Iterator 가 동작중에 다른 쓰레드에서 계속해서 값을 put 한다면 Iterator 가 끝나지 않는 문제가 발생할 수 있다.
실제로 Iterator 를 돌리는 와중에 HashMap 에 변경이 가해지만 같은 예외가 발생하는 것을 확인할 수 있다.
Map<Integer, Integer> map = new HashMap<>();
for (int i = 0 ; i < 10000 ; i ++) {
map.put(i, i);
}
new Thread() {
@Override
public void run() {
super.run();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
map.remove(0);
}
}.start();
for (Map.Entry e : map.entrySet()) {
Thread.sleep(1000);
System.out.println(e);
}
해결
Map 을 ConcurrentHashMap 으로 변경하면 하나의 쓰레드만 putAll() 을 실행할 수 있으므로 ConcurrentModificationException 예외는 더 이상 발생하지 않는다. 또 다른 해결책으로는 Map 을 합치는 작업은 굳이 Parallel 로 실행되지 않아도 되므로 reduce 는 하나의 쓰레드에서 실행되도록 해도 해결될것이다.