자바 병렬 프로그래밍 - 5장 구성 단위

동기화된 컬렉션 클래스

동기화된 컬렉션 클래스는 Vector, HashTable이 존재한다. 이는 synchronizedXxx를 사용하지 않아도 동기화 된 기능을 제공해준다. 위와 같은 자료구조를 사용하다보면 여러개의 연산을 묶어 원자적인 하나의 단일 연산처럼 사용해야 할 때가 있다. 만약 하나의 자료구조에 여러 스레드가 위와 같은 단일 연산처럼 동작해야하는 메서드를 수행한다면 문제가 발생할 수도 있다. 아래의 예시를 보자.

public static Object getLast(Vector list) {
	int lastIndex = list.size() - 1;
	return list.get(lastIndex);
}

public static void deleteLast(Vector list) {
	int lastIndex = list.size() - 1;
	list.remove(lastIndex);
}

만약 위 vector에 10개의 요소가 담겨있고, 스레드 A가 getLast를 스레드 B가 deleteLast를 호출한다고 가정해보자. 스레드 A가 lastIndex를 받아오고 나서 remove()가 수행되고 get()이 수행된다고 하면 ArrayIndexOufOfBoundsException이 발생한다.

아래의 코드도 마찬가지로 문제가 발생할 수 있다. size를 얻어오고 get()을 호출하기 전에 만약 다른 스레드가 vector 스레드의 요소에 변경을 가하면 이 또한 마찬가지로 예외가 발생하게 된다.

for(int i = 0 ; i < vector.size(); i++) {
	doSomething(vector.get(i))
}

위 예제들의 요지는 컬렉션 클래스가 동기화 되더라도 이들의 api을 사용할 때는 동기화 문제가 발생할 수 있다. 위 두가지 예시 모두 락을 사용하면 간단하게 문제를 해결할 수 있다.

Collection 내에 들어있는 요소들을 반복하여 읽어내는 기본적인 방법은 Iterator이다. 만약 Iterator를 통해 요소들을 읽는 와중 해당 컬렉션에 다른 스레드가 변경을 가하면 어떻게 될까? 즉시 멈춤을 통해 이를 처리한다. 이러한 문제 또한 내부 요소를 반복하는 로직을 락으로 감싸면 해결 가능하다.

그리면 위와 같은 문제들이 발생하였을 때, 코드 전체를 동기화 시키는것은 문제가 있다. 해당 컬렉션에 많은 수의 요소가 들어있거나 요소마다 반복을 해야하는 로직이 있다거나 하면 스레드가 상당히 많은 시간을 대기상태에 머무르게 된다. 이러한 방안은 clone()을 통해 컬렉션을 복사해서 해결할 수도 있지만, 복제하는 비용 또한 고려해야한다.

이번에는 숨겨진 Iterator를 사용하는 예시도 살펴보자. 얼핏 보면 명시적으로 Iterator가 없기 때문에 사용되지 않는것처럼 보인다. 하지만, set 이라는 컬렉션 변수를 출력하면 toString()이 호출되고 컬렉션 클래스의 toString()을 살펴보면 iterator 메소드를 호출해 요소의 toString() 호출해준다. 이러한 경우에도 만약 해당 set 변수를 다른 스레드가 접근해서 변경을 가하면 ConcurrentModificationException이 발생할 수 있다.

public class MyClass {
	private final Set<Integer> set = new HashSet<>();

	public void print(){
		System.out.println(set);
	}
}

병렬 컬렉션

동기화된 컬렉션은 각 메서드를 수행할 때 항상 락을 확보해야한다. 그래서 HashMap의 get 또는 List의 contains와 같은 연산을 수행하면 훨씬 많은 비용이 들 수도 있다. HashMap을 고려해보면 객체들의 hashcode 값이 적절히 넓고 고르게 분포되어야 한쪽으로 데이터가 치우치는 현상이 발생하지 않을 것이다.

락을 사용한다는것이 위와 같은 잠재적인 문제를 포함한다. 이를 전혀 다른 방식으로 동기화를 수행하는 대표적인 병렬 컬렉션의 예시가 CoccurrentHashMap을 살펴본다. 이는 모든 연산에서 하나의 락을 공유하는 방식이 아닌 락 스트라이핑을 통해 세밀하게 동기화 방법을 구현했다. 값을 읽어가는 연산은 많은 스레드가 동시에 처리하게 하고 쓰게 연산은 제한된 수만큼 동시에 처리하도록 한다.

Iterator를 사용하는 부분도 다음과 같이 개선되었다. Iterator를 통해 순회를 하는 도중 변경이 가해지면 더이상 ConcurrentModificationException이 발생하지 않고 Iterator를 만든 시점을 기준으로 반복을 계속한다. 이를 미약한 일관성 전략이라고한다.

앞서 본바와 같이 동시 쓰기 연산이 가능하니 size() 등의 메서드의 의미가 조금 약해진다. 아무래도 정확한 값이기보다는 추정값이라고 할 수 있다.

다음은 CopyOnWriteArrayList이다. 이는 동기화된 List보다 훨씬 병렬성을 높여준다. 변경할 때마다 복사하는 컬렉션 클래스는 별 다른 동기화 작업 없이도 여러 스레드에 접근되어도 스레드 안정성을 확보한다. 해당 컬렉션에 변경을 가하면 복사본을 기준으로 반영되기에 이 시점에 Iterator등을 사용하더라도 동시 사용성에 문제가 없다. 물론 컬렉션의 데이터 수가 굉장히 많고 빈번히 변경되는 자료구조라면 비용이 클 수도 있다. 그렇기에 CopyOnWriteArrayList는 변경 작업보다 반복문으로 읽어내는 작업이 많은 로직에서 효과적이다. 예를 들면 이벤트 리스너들의 목록을 관리하는 리스트의 경우 적절하다.

블로킹 큐와 프로듀서-컨슈머 패턴

블로킹 큐는 put / take와 offer / poll 이라는 메서드를 가진다. 만약 큐가 가득 차 있다면 put 메서드는 값을 추가할 공간이 생길 때까지 대기한다. 반대로 큐가 비어있는 상태라면 take 메서드는 뽑아낼 값이 생길 때까지 대기한다. 큐의 구현체는 크기를 제한할수도, 제한하지 않을 수도 있다. 만약 큐에 제한을 두지 않는 구현체를 쓴다면 put 연산으로 인해 스레드가 블로킹 되지 않는다.

이러한 블로킹 큐는 생산자-소비자 패턴을 구현할 때 용이하다. 생산자-소비자 패턴은 해야할 일을 목록(큐)을 가운데에 두고 작업을 수행하는 주체와 작업을 생산하는 주체를 분리시킨다. 이러한 구조는 작업을 생산하는 곳과 소비하는곳을 각각 감당할 수 있는 부하를 조절할 수 있다. 또한 생산자와 소비자가 서로에 대한 정보를 가질 필요가 없다.

프로듀서가 컨슈머가 감당할 수 있는 작업량보다 많이 생산할 경우, 해당 작업을 쌓이게 되고 결국 메모리 오류가 발생할 수 있다. 하지만 큐의 크기를 제한한다면 put 메서드는 공간이 없을 경우 공간이 생길 때 까지 블로킹 된다. 반면 offer 메서드의 경우 큐에 값을 넣을 수 없을 때 대기하지 않고 곧바로 오류를 발생시킨다.

자바에서는 블로킹 큐를 구현하는 여러가지 구현체를 제공한다.

  • LinkedBlockingQueue
  • ArrayBlockingQueue
  • PriorityBlockingQueue : Comparator를 통해 정렬 기준 선정 가능
  • SyncronousQueue

이 중 SyncronousQueue는 큐에 요소가 쌓이지 않으며 큐 내부에 값을 저장할 수 있도록 공간을 할당하지도 않는다. 대신 큐에 값을 추가하려는 스레드나 값을 읽어가려는 스레드의 큐를 관리한다. 쉽게 말해 작업 생산 시, 곧바로 소비자에게 해당 작업을 직접 건네주는 형태이다. 프로듀서와 컨슈머가 직접 데이터를 주고 받을 때까지 대기하므로 프로듀서에서 컨슈머로 데이터가 넘어가는 레이턴시가 굉장히 짧다. 이러한 큐에는 추가된 데이터를 보관할 공간이 없기 때문에 put이나 take를 호출하면 호출한 메서드의 상대편 측에 해당하는 메서드가 호출될 때까지 대기하게 된다. 이러한 구조는 컨슈머가 충분히 프로듀서의 생산량을 감당하여 대기하는 경우에 사용하면 좋다.

프로듀서와 컨슈머 사이에서 사용되는 블로킹 큐들은 객체가 프로듀서 -> 컨슈머로 넘어가는 동작이 전부 동기화 되어있다. 이때 직렬 스레드 한정 기법을 사용한다. 이는 스레드에 한정된 객체는 특정 스레드 하나만이 소유권을 가지는 것을 의미한다, 프로듀서에서 컨슈머로 소유권이 넘어갈 경우 기존에 소유권을 가지던 프로듀서는 해당 객체의 상태에 대해 전혀 알 수 없다. 직렬 스레드 한정을 사용하는 또 다른 예시는 객체 풀이다. 객체 풀의 경우도 가지고 있는 객체를 외부 스레드에게 빌려주는 일이기 때문이다.

자바 6.0 에서는 Deque과 BlockingDeque가 추가된다. 덱이라는 것은 앞과 뒤 어느쪽으로도 객체를 삽입 또는 제거할 수 있다. 이러한 자료구조는 작업 가로채기(work stealing) 패턴을 사용할 때 덱을 활용할 수 있다. 앞서 살펴본 프로듀서-컨슈머 패턴에서는 모든 컨슈머가 하나의 큐를 공유해서 사용한다. 반면 작업 가로채기 패턴에서는 모든 컨슈머가 각자의 덱을 갖는다. 만약 특정 컨슈머가 자신의 덱에 있는 작업을 전부 소진하면 다른 컨슈머의 덱에서 제일 뒤에 있는 작업을 가로채 수행할 수 있다. 이러한 패턴은 하나의 큐를 바라보며 스레드간의 경합이 발생하지 않기에 규모가 큰 시스템에서 적절하다. 왜냐하면 덱을 소유하는 스레드는 제일 앞의 작업을 가로채기를 수행하는 스레드는 제일 뒤에서 작업을 가져오기 때문이다.

블로킹 메소드, 인터럽터블 메소드

스레드는 다음과 같은 다양한 이유들로 대기한다.

  • I/O 작업
  • 락 확보
  • sleep()
  • Future.get()

앞선 예시는 대표적인 예일 뿐이고 더 다양한 이유들로 스레드는 대기한다. 스레드가 블록되면 동작이 멈춰진 다음 다음 상태들 중 하나를 가진다.

  • BLOCKED
  • WAITING
  • TIMED-WAITING

블로킹 연산은 특정한 신호(I/O가 끝나거나, 락을 확보했거나, 작업결과를 받아오는 등)를 받아 신호가 확인되면 RUNNABLE 상태로 넘어가 다시 스케쥴링되어 CPU 연산을 수행할 수 있게 된다.

앞서 살펴본 블로킹 큐의 put과 take 메서드는 sleep 메서드와 같이 InterruptedException을 발생시킬 수 있다. 특정 메서드가 해당 예외를 발생시킬 수 있다는 것은 해당 메소드가 블로킹 메소드라는 의미이고 해당 메소드에 인터럽트가 걸리면 대기 중인 상태에서 풀려나고자 한다. 그러면 블로킹 상태인 스레드에 어떻게 인터럽트를 걸 수 있을까? Thread 클래스는 해당 스레드를 중단시킬 수 있도록 interrupt() 메서드를 제공한다. 또한 해당 스레드가 인터럽트가 걸린 상태인지를 알려주는 불리언 값이 있고 다른 스레드가 자신에게 인터럽트를 걸게 될 경우 해당 플래그가 true로 설정된다.

Java 인터럽트의 오해 중 하나가 인터럽트를 걸게 되면 인터럽트가 걸린 스레드는 수행중이던 작업을 멈춘다고 오해하는것이다. 사실은 인터럽트를 건다는 것은 해당 스레드에게 실행중인 작업을 멈추라고 요청을하는 것뿐이다. 이후 사용자가 작성한 인터럽트 핸들링 로직에 따라 들어온 요청을 처리해주면 되는 것이다. 일반적인 용례를 생각해보면 특정 메서드의 실행 시간이 IO 작업의 문제로 인해 너무 길어질 경우 일정 시간이 지난 뒤 실행을 중단 시킬 수 있다.

그러면 인터럽트가 걸렸을 때 어떻게 이를 처리하는지 알아보자.

  1. InterruptedException을 전달 : 그대로 상위 메소드에게 예외를 던짐
  2. 인터럽트를 무시하고 복구 : Thread.currentThread().interrupt()를 통해 해당 스레드에 인터럽트가 발생한 상황을 저장한다.

동기화 클래스들

스레드 간의 작업 흐름을 조절할 수 있는 클래스를 동기화 클래스라고 한다. 앞서 살펴본 블로킹 큐 또한 동기화 클래스이다. 모든 동기화 클래스들은 구조적인 특징이 있다. 이는 동기화 클래스에 접근하려는 스레드가 어느 경우에 통과하고 어느 경우에는 대기하도록 하는지 결정하는 상태 정보를 가진다. 또한 해당 상태를 변경하는 메서드를 제공한다.

Latch

래치는 스스로가 터미널 상태에 이를 때까지 스레드가 동작하는 과정을 늦출 수 있도록 하는 동기화 클래스이다. 여기서 말하는 터미널 상태라는 것을 스레드가 진입할 수 있는 조건을 만족하는 상태라고 이해할 수 있다. 즉, 특정 조건에 의해 래치가 닫혀 있으면 스레드는 블로킹 되어 래치 내로 진입할 수 없다. 래치의 구현체로는 CountDownLatch가 존재한다. 래치의 상태는 양의 정수 값으로 초기화한다. 이 값은 스레드가 대기되는 동안 발생되어야 하는 이벤트의 건수를 의미한다. countDown()이라는 메서드를 통해 설정된 카운터를 하나 낮추고 await() 메서드를 통해 래치 내부의 카운터가 0이 될 때까지 대기한다.

FutureTask

FutureTask 역시 래치와 비슷하게 동작한다. FutureTask의 생성자는 Callable를 필요로 하는데 이는 결과값을 리턴받는 Runnable이라 보면 된다. 이 클래스가 가질 수 있는 실행 상태는 시작전 대기, 시작됨, 종료됨과 같이 세 종류이다. 만약 FutureTask가 수행할 작업이 끝난다면 상태는 더 이상 변경되지 않는다.

Future.get()을 호출하는 경우에도 앞선 실행 상태에 따라 동작이 나뉜다. 작업이 종료 되었으면 그 즉시 결과를 리턴하고 그렇지 않으면 작업이 종료상태에 도달할 때까지 기다리다가 결과 또는 예외를 알려준다. 주로 시간이 오래 걸리는 작업을 미리 시작해두는 용도로 활용된다.

세마포어

카운팅 세마포어는 특정 자원이나 특정 연산을 동시에 사용하거나 호출할 수 있는 스레드의 수를 제한하고자 할 때 사용한다. 리소스 풀 또는 컬렉션의 크기에 제한을 두고자 할 때 사용할 수 있다. 세마포어 클래스는 가상의 permit 값을 통해 내부 상태를 관리한다. Permit은 생성자의 인자로 넘겨줄 수 있다. 각각의 스레드들은 acquire 또는 release 메서드들을 호출하며 permit을 획득 및 반납하며 내부 로직에 진입할 수 있다.

이러한 세마포어는 주로 DB 커넥션 풀과 같은 자원을 관리할 때 자주 사용된다. 예를 들어 자원을 요청했지만 남은 자원이 없을 때, 다른 스레드가 확보했던 자원을 반납받아 사용할 수 있을 때까지 대기하는 용도로 사용 할 수 있다. 또한 특정 컬렉션에서 크기를 제한하여 해당 컬렉션을 관리할 때도 세마포어가 활용 될 수 있다.

배리어

앞서서 래치를 활용해 여러 스레드가 하는 작업을 하나의 작업으로 묶어서 사용하는 방안을 보았다. 이러한 래치의 경우 한번 countdown이 되어 값이 0이 될 경우 해당 래치 객체를 재활용 할 수 없다. 즉, 터미널 상태에 진입하면 되돌릴 수 없게 된다.

지금 살펴볼 배리어의 경우 특정 이벤트가 발생할 때까지 여러개의 스레드를 대기 시킬 수 있다는 측면에서 래치와 비슷한 기능을 제공한다. 래치와의 차이점은 모든 스레드가 배리어 포인트에 동시에 이르러야 관문이 열리고 다음 로직을 실행할 수 있는것이다. 또한 배리어의 관문이 열리고 나서 await()를 다시 호출 할 경우 해당 배리어를 관문의 역할을 다시 수행할 수 있다.