11 min read

자바 병렬 프로그래밍 - 7장 작업 실행

스레드가 작업 중일 때 자바에서 해당 작업을 강제로 멈추게 하는 방법은 없다. 다만 앞선 포스팅에서 본 것처럼 인터럽트를 통해 특정 스레드에게 해당 작업을 멈춰달라고 요청 할 수 있다. 이러한 멈춤을 요청할 때는 작업을 수행하던 스레드가 현재 작업에 대한 정리를 한 다음 종료 하도록 유도할 수 있다. 이러한 방안이 아무래도 안정성을 높일 수 있다.

작업 중단

작업을 중단시키는 구체적인 예시 상황은 아래와 같다.

  1. 사용자가 직접 취소 요청
  2. 시간이 제한된 작업
  3. 애플리케이션 이벤트
  4. 오류
  5. 애플리케이션 또는 서비스 종료

이러한 중단 요청이 발생했을 경우 사용할 수 있는 가장 합리적인 방법은 다음과 같다. 작업 취소 요청에 대한 플래그를 설정해두고 실행 중인 작업은 주기적으로 해당 플래그를 확인하고 만약 플래그가 true일 경우 실행하던 작업을 멈추도록 설계할 수 있다.

만약 cancel 플래그를 통해 작업을 중단시키는 로직을 구현할 경우 아래와 같은 상황에서는 문제가 발생한다. Cancel 플래그를 매번 확인하는 스레드가 블로킹이 걸려 대응을 하지 못하는 상황에서 작업 취소 요청이 발생하였다. 이러한 경우엔 취소 요청이 들어온 사실을 제 때 알아차리지 못하고 처리하지 못한다. 이러한 상황을 예방 할 수 있는 방안이 인터럽트이다.

앞선 포스팅에서 살짝 살펴보았는데 자바에서는 스레드에 인터럽트을 걸 수 있다. 스레드에 거는 인터럽트는 특정 스레드에게 적당한 상황이고 작업을 멈추려는 의지가 있는 상황이면 현재 실행중인 작업을 멈추고 다른 일을 하도록 신호를 보내는 것이다.(표현이 참 어렵다) 실제로 Java 명세를 보더라도 인터럽트가 작업을 취소하는 과정에서 특정한 역할을 해야한다라고 기술된 문서가 없다. 관용적으로 인터럽트는 작업을 중단하고자 하는 경우 사용된다고 한다.

모든 스레드들은 내부에 불리언 타입의 플래그로 인터럽트 여부를 가지고 있다. 해당 플래그를 조절할 수 있는 메서드는 3가지가 제공된다.

  • void interrupt();
  • boolean isInterrupted();
  • static boolean interrupted();

첫번째는 특정 스레드에 인터럽트를 가하는 메서드이다. isInterrupted는 현재 스레드가 인터럽트가 걸린 상황인지를 앞서 언급한 플래그 값을 통해 확인한다. interrupted는 동작이 특이하다. 현재 스레드의 인터럽트 상태를 해제하고 해제하기 전의 플래그 값을 리턴시켜준다. 해당 메서드가 유일하게 걸린 인터럽트를 해제할 수 있는 메서드이다.

Thread.sleep() 등과 같은 블로킹 메서드의 경우는 인터럽트 상태를 확인하다가 인터럽트가 걸리게 되면 InterruptException을 던진다. 반면 스레드가 블록되지 않은 상태에서 인터럽트가 걸린다면 인터럽트 플래그에 대한 확인 및 인터럽트가 걸렸을 경우 이에 대응하는 로직을 해당 스레드에 구현해줘야한다. 즉, interrupt 요청을 보낸다고 해도 해당 스레드가 처리중인 작업을 멈춘다는 보장은 없다. 단지 플래그 값을 true로 변경할 뿐이다. 정리하자면 실행 중인 스레드에 인터럽트를 건다라는 뜻은 해당하는 스레드가 상황을 스스로 판단하여 멈춰주기를 요청하는 것뿐이다.

아래 로직은 인터럽트의 상태를 확인하는 예시 코드이다. 스레드의 run 메서드 내에서 인터럽트 상태를 확인하는 부분이 두 군데 존재한다. 하나의 while문 내의 조건이고 하나는 put() 메서드를 호출할 때이다. put() 메서드의 경우 인터럽트가 발생했을 때 exception을 던져서 이를 감지한다. 반면 isInterrupted의 경우 직접적으로 해당 플래그를 확인하여 이를 감지한다. While 문을 통해 직접적으로 이를 감지하는 경우엔 while 문 내부에서 시작하는 작업들을 시작하지 않아 CPU 자원을 효율적으로 아낄 수 있다는 장점이 있다.

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;

    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }

    public void cancel() {
        interrupt();
    }
}

일반적으로 인터럽트를 처리하는 정책은 스레드 수준이나 서비스 수준에서 작업 중단 기능을 제공하는 것이다. 보통 InterruptException이 발생했을 때 가장 피해야할 구현은 catch로 해당 예외를 잡아서 아무 동작을 하지 않는것이다. 예외를 상위 스택으로 던져서 알리던, 현재 스레드의 인터럽트 플래그를 변경해서 알리든 어떠한 형태로든 상위 메소드로 인터럽트가 발생했다는 사실 자체를 넘겨주는것이 중요하다.

다음으로 자바 라이브러리에서 블로킹을 수행하는 여러 메서드들이 있다. 이 중에서 대부분은 인터럽트가 발생하면 즉시 InterruptException을 발생시켜 작업 중단 요청에 대응하도록 구현된다. 하지만 모든 메서드가 위와 같이 대응되도록 구현되어있지는 않다. 아래의 동작들은 즉시 InterruptException을 발생시키지 않는다. 이에 대한 방안도 다음과 같다.

  • java.io 패키지의 동기적 소켓 I/O : stream과 연결되 소켓 해제 -> SocketException
  • java.nio 패키지의 동기적 소켓 I/O : 인터럽트 시 ClosedByInterruptException 발생
  • Selector를 사용한 비동기적 I/O : close 호출 시 ClosedSelectorException
  • Lock 점유를 위한 대기 : lockInterruptibly 메서드 구현을 통해 응답하도록 변경

데몬 스레드

자바 프로세스 내에서 하나의 스레드에 하나의 기능을 부여하고 싶지만 해당 스레드가 동작한다는 이유만으로 JVM이 종료되지 않는것을 방지하고 싶은 경우가 존재한다. 이럴 때 사용할 수 있는 스레드가 데몬 스레드이다.

스레드는 두 종류로 나뉜다. 하나는 일반 스레드이고 나머지는 데몬 스레드이다. JVM이 구동될 때 main 스레드를 제외하고는 JVM 내부적으로 사용되는 스레드들은 모두 데몬 스레드이다. 새로운 스레드가 생성되면 해당 스레드의 부모 스레드의 데몬 설정을 그대로 사용한다. 그래서 main 스레드가 생성한 스레드들은 모두 데몬 스레드라고 할 수 있다.

이 둘 간은 종료 방식에 약간의 차이가 있다. 스레드 하나가 종료되면 JVM은 남아 있는 모든 스레드 중 일반 스레드가 있는 지 확인한다. 만약 남은 스레드가 모두 데몬 스레드라면 즉시 JVM 종료 절차를 진행한다. 결론적으로 데몬 스레드는 언제든 예기치 않게 종료될 가능성이 있다.

스레드 기반 서비스 중단

자바에서 스레드를 기반으로 하는 스레드 풀을 통해 기능을 구현하는 것을 일반적이다. 스레드 풀을 사용하면 해당 풀 내의 스레드들은 스레드풀에 소유된다. 즉, 풀 내의 스레드들에게 작업을 중단하라는 인터럽트를 발생 시킬 경우 스레드풀은 이를 적절히 처리하는 구현을 가지고 있어야 한다. 이를 위한 ExecutorService에서는 shutdown 메서드와 shutdownNow 메서드를 제공해준다.

아래의 LogWriter 클래스는 생산자-소비자 패턴을 활용하여 로그를 생성하는 스레드와 로깅을 하는 전담 스레드를 별도로 두어 로그 서비스를 구현하고 있다.

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

해당 서비스를 안전하게 종료 시키기 위해서는 몇가지 방안이 있다. 우선 출력 전담 스레드가 take()를 통해 블락킹이 되기 때문에 인터럽트를 발생 시키고 던져진 예외를 잡아서 해당 스레드를 종료시킬 수 있다. 다만 이 방법은 큐에 쌓여있는 로그들에 대한 추후 처리가 불가능해진다. 또한 큐가 가득찬 경우 해당 큐에 로그를 put하기 위해 진입한 생성자 스레드가 영원히 블락되는 문제가 발생할 수 있다.

이를 해결하려면 로그를 큐에 넣는 과정에 동기화를 걸고 매번 shutdown 플래그를 확인하면서 로그를 넣도록 구현할 수 있다.

이러한 구현은 ExecutorSevice를 활용하여도 해결 가능하다. 해당 클래스는 앞서 살펴본 것처럼 shutdown 메서드와 shutdownNow 메서드를 제공한다. shutdown을 안전하게 하던 작업을 모두 마친 뒤 종료하는 것이고 shutdownNow는 강제로 종료시키는 방안이다.이를 호출하면 먼저 실행 중인 모든 작업을 중단하도록 하고 아직 시작하지 않은 쌓여있는 작업의 목록을 그 결과로 리턴해준다.

shutdownNow 메서드는 즉시 동작을 종료하도록 구현되었기에 현재 진행중인 작업의 정보에 대해서 알려주지 않는다. 즉, 시작은 되었지만 정상적으로 종료되지 않은 작업에 대한 처리가 불가능하다. 이러한 점을 해결하기 위해 아래와 같이 스레드풀을 구현하여 사용해볼 수 있다. 작업을 실행 시키는 execute 메서드 내에서 현재 진행 중이 작업 정보를 자료구조에 추가해두어서 활용할 수 있다.

public void execute(final Runnable runnable) {
        exec.execute(new Runnable() {
            public void run() {
                try {
                    runnable.run();
                } finally {
                    if (isShutdown()
                            && Thread.currentThread().isInterrupted())
                        tasksCancelledAtShutdown.add(runnable);
                }
            }
        });
    }

비정상적인 스레드 종료 상황 처리

스레드가 제공하는 기능 중 UncachutExceptionHandler 라는 기능이 있다. 이는 사용자가 처리하지 못한 예외 상황이 발생할 경우 호출되어 해당 예외에 대한 처리를 할 수 있게 한다. 주로 해당 인터페이스의 구현은 예외 로그를 남기는 정도를 제공해준다. 참고로 해당 핸들러를 호출하여 예외를 처리하려면 작업을 실행시킬 때 반드시 submit()이 아닌 execute()를 통해 작업을 실행시켜야한다. submit에서는 예외가 발생하면 ExecutionException으로 감싸진 상태로 던져진다.