11 min read

<Java> redis를 곁들여 Selector를 사용해보기

본 포스팅은 java의 nio 패키지의 Selector를 활용하여 Redis와 간단한 연산을 통해 통신하는 법을 알아봅니다. 또한 논블로킹과 블로킹 처리의 초당 요청 처리수의 비교도 해보는 포스팅입니다. 단순히 생각해보면 무조건 논블로킹이 더 많은 초당 요청 수를 가져야 할 것 같은데 그렇지 않았던 과정을 살펴보겠습니다.

Selector란?

Selector는 비동기적으로 여러 채널의 I/O 이벤트를 관리할 수 있는 클래스입니다. 이를 활용한다면 단일 스레드에서 여러 네트워크 채널의 I/O 이벤트들을 효율적으로 처리할 수 있습니다. 주로 Selector와 함께 나오는 키워드들이 멀티플렉싱과 비동기인데 간단히 알아보고 사용법으로 넘어가겠습니다.

멀티플렉싱이란 앞서 언급된것처럼 하나의 Selector 객체가 여러 채널 객체의 I/O를 동시에 관리하는 것입니다. 이렇게 될 경우 자바 프로세스로부터 연결된 N개의 네트워크 연결을 Selector 객체 하나로 관리할 수 있습니다. 대표적인 예로 캐시 클러스터와의 연결이 있습니다. 보통 클러스터의 형태는 하나의 WAS가 N개의 캐시 노드와 TCP 연결을 맺어 캐시 API 대한 요청과 응답을 처리합니다. 이러한 클러스터와의 연결을 구현할 때, Selector를 효과적으로 사용할 수 있습니다.

두번째로 비동기 논블로킹 처리인데, 이는 I/O 연산의 요청을 보낸 뒤, 이에 대한 응답을 기다리지 않고 곧바로 다음 응답을 또 다시 보내고 만약 특정한 채널로부터 READ 신호가 들어오면 들어온 데이터를 버퍼를 통해 읽어들일 수 있습니다. 결국 스레드가 블로킹 되는 시간이 없으니까 효율적인 I/O 이벤트 처리가 가능해집니다.

Selector 사용 방법 알아보기

Java의 Selector 클래스는 넌블로킹 I/O 연산을 관리하는 데 사용됩니다. Selector를 활용하면 단일 스레드로 여러 네트워크 채널의 I/O 이벤트를 효율적으로 처리할 수 있습니다. 이는 서버가 동시에 여러 클라이언트 연결을 효율적으로 관리할 수 있게 해주며, 자원 사용을 최적화하고 성능을 향상시킵니다.

네트워크 연결을 맺는 과정에서 Selector의 역할과 사용 방법은 아래와 같습니다.사용법이 익숙치 않다면 간단히 목차 정도만 읽으시고 뒤에 나올 코드 구현과 연결지어 읽으시면 됩니다.

1. SelectorChannel 생성 및 설정

  • 먼저, Selector 인스턴스를 생성합니다. Selector.open() 메서드를 사용하여 이를 수행할 수 있습니다.
  • 네트워크 연결에 사용될 Channel을 생성합니다. 넌블로킹 모드에서만 Selector와 함께 사용될 수 있는 SocketChannel 또는 ServerSocketChannel이 존재하는데 이는 TCP 연결 기반에서 사용됩니다.
  • 생성된 Channel을 넌블로킹 모드로 설정합니다. .configureBlocking(false) 메서드를 사용하여 이를 설정할 수 있습니다.

2. ChannelSelector에 등록

  • ChannelSelector에 등록합니다. 이때, Channel이 관심을 가지는 연산 집합을 지정합니다. 예를 들어, 서버 소켓 채널에서는 연결 요청을 받을 준비가 되었는지(SelectionKey.OP_ACCEPT), 소켓 채널에서는 읽기(SelectionKey.OP_READ) 또는 쓰기(SelectionKey.OP_WRITE) 준비가 되었는지 등을 지정할 수 있습니다.
  • Channel.register(selector, ops) 메서드를 사용하여 ChannelSelector에 등록합니다. 여기서 ops는 관심 있는 연산 집합입니다.

3. Selector를 사용한 이벤트 처리

  • Selectorselect() 메서드를 호출하여 준비된 이벤트가 있는지 확인합니다. 이 메서드는 선택된 키(이벤트가 발생한 채널과 연산 집합을 나타냄)의 수를 반환합니다.
  • select() 메서드가 0이 아닌 값을 반환하면, 하나 이상의 채널이 I/O 연산을 수행할 준비가 되었다는 의미입니다.
  • SelectorselectedKeys() 메서드를 사용하여 준비된 이벤트의 집합을 가져옵니다. 이벤트가 발생한 각 채널에 대해 반복하여 해당 이벤트를 처리합니다.
  • 예를 들어, SelectionKey.OP_ACCEPT 이벤트가 발생한 경우 서버 소켓 채널에서는 클라이언트의 연결 요청을 수락하고, SelectionKey.OP_READ 이벤트가 발생한 경우 소켓 채널에서는 데이터를 읽습니다.

4. 연결 해제 및 정리

  • 연결이 더 이상 필요하지 않거나 클라이언트가 연결을 종료한 경우, 관련된 ChannelSelector 리소스를 정리해야 합니다.
  • Channel을 닫고, Selector에서 해당 키를 제거합니다. 필요한 경우, Selector도 닫을 수 있습니다.

이제 Selector를 사용하기 위해 어떤 클래스들과 메서드들이 사용되는지 대략적으로 눈에 익히셨을 겁니다. 앞서 나온 내용을 바탕으로 실제 레디스 서버에 요청을 보내는Selector를 구현해봅시다.Redis 프로세스의 경우 로컬호스트의 6379 포트에 띄워놓았습니다.

Selector 사용하기

redis에는 “PING”이라는 api가 존재합니다. 이는 해당 캐시 노드가 정상적인 상태라면 “PONG”이라는 리턴값을 반환하는 아주 간단한 연산입니다.Selector를 통해 레디스 프로세스와 TCP 연결을 맺고 해당 네트워크 연결에 ping 연산을 보내는 구현을 만들어 보겠습니다. 참고로 1초동안 Ping 연산을 계속해서 보내도록 구현하였습니다. 코드는 아래와 같습니다.

한개의 메인 스레드가 계속해서 연산을 쓰고 읽고를 1초동안 반복합니다. 그러면 아래의 경우 Selector의 장점을 활용한 구현일까요? 사실 상 아래와 같이 하나의 연결만을 수행할 경우 동기적으로 PING 연산이 수행되어 Selector의 장점을 가져갈 수 없습니다.

public class NonBlockingPingPong {
    static Integer count = 0;
    public static void main(String[] args) throws IOException {
        // Selector 생성
        Selector selector = Selector.open();

        // Redis서버에 연결할 SocketChannel 생성
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false); // non-blocking 모드로 설정
        channel.connect(new InetSocketAddress("localhost", 6379)); // Redis 서버 주소

        // 연결 작업을 위한 SelectionKey 등록
        channel.register(selector, SelectionKey.OP_CONNECT);

        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() < start + 1000) {
            selector.select(); // 준비된 이벤트가 있는지 확인

            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();

                if (key.isConnectable()) {
                    // 연결 완료 처리
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if (socketChannel.finishConnect()) {
                        // 연결 성공 후, 쓰기 작업을 위한 SelectionKey 등록
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                        // Redis 서버에 PING 명령 전송
                        socketChannel.write(ByteBuffer.wrap("PING\r\n".getBytes()));
                    }
                } else if (key.isReadable()) {
                    // 데이터 읽기 처리
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = socketChannel.read(buffer);
                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.limit()];
                        buffer.get(bytes);
//                        System.out.println("Received: " + new String(bytes));
                        count += 1;
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    }
                } else if (key.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // PING 명령 보내기
                    socketChannel.write(ByteBuffer.wrap("PING\r\n".getBytes()));
                    // 다시 읽기 상태로 전환
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
        }
        System.out.println("count = " + count);
    }
}

비교를 위해 BlockingPingPong 이라는 구현을 통해 Selector를 통해 채널을 관리하지 않고 직접 채널을 만들고 TCP 연결을 맺는 테스트를 실행시켜봅시다.

public class BlockingPingPong {

    static Integer count = 0;

    public static void main(String[] args) {
        try {
            // Redis 서버에 연결할 SocketChannel 생성 및 Blocking 모드로 설정
            SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 6379));
            channel.configureBlocking(true);

            long start = System.currentTimeMillis();
            while (System.currentTimeMillis() - start < 1000) { // 1초 동안 반복
                // Redis 서버에 PING 명령 전송
                channel.write(ByteBuffer.wrap("PING\r\n".getBytes()));

                // 데이터 읽기 처리
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while (channel.read(buffer) > 0) {
                    buffer.flip();
                    byte[] bytes = new byte[buffer.limit()];
                    buffer.get(bytes);
//                    System.out.println("Received: " + new String(bytes));
                    count += 1;
                }
            }

            // 작업 완료 후 자원 해제
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("count = " + count);
    }
}

결과를 비교해보면 집계된 count 수가 Selector 구현으로 처리 했을 때 더 적은 요청을 수행한 결과도 도출됩니다.

그러면 어떤식으로 구현을 가져가야 온전히 Selector의 장점을 활용할 수 있을까요?

  1. 여러개의 채널을 사용하는 구현
  2. Ping 연산보다는 무거운 I/O 작업을 수행하는 연산 수행
  3. local에서 Local로의 요청이 아닌, Network 레이턴시 적용

대략 위의 3가지 정도의 방안을 적용 시키면 확실히 Selector의 성능을 확인해볼 수 있을 것 같습니다.

Selector 완전히 활용하기

우선 무료로 진행할 수 사항인 1번과 2번을 적용 시켜 봅시다. 먼저 redis 프로세스를 6380 포트에도 띄우는 작업이 선행되어야합니다. 해당 포트로 새로운 연결을 생성해야하기 때문입니다.

하나의 프로세스를 더 띄웠다고 가정하고 이번에는 명령어를 ping이 아닌 set 명령어를 사용하도록 변경해야합니다. 대략 아래와 같은 String을 보내야합니다. *3은 해당 명령어의 인자의 개수 입니다. 여기서는 SET, <key값>, <value값> 입니다. 그리고 $3해당 인자의 길이를 나타냅니다.

    "*3\r\n$3\r\nSET\r\n" + "$43\r\ntoLongestMyKey123456789year2024month04day02\r\n"
    + "$7\r\nmyValue\r\n";

위 두가지 사항을 적용 시킨 구현은 아래와 같습니다.

public class NonBlockingDoubleSet {
    static Integer count = 0;
    private static final String SET_BASE = "*3\r\n$3\r\nSET\r\n";
    private static final String SET_KEY = "$43\r\ntoLongestMyKey123456789year2024month04day02\r\n";
    private static final String SET_VALUE = "$7\r\nmyValue\r\n";
    public static void main(String[] args) throws IOException {

        StringBuilder sb = new StringBuilder();
        sb.append(SET_BASE + SET_KEY + SET_VALUE);
        String cmd = sb.toString();
        // Selector 생성
        Selector selector = Selector.open();

        // Redis 서버에 연결할 SocketChannel 생성
        SocketChannel channel1 = SocketChannel.open();
        channel1.configureBlocking(false); // non-blocking 모드로 설정
        channel1.connect(new InetSocketAddress("localhost", 6379)); // Redis 서버 주소

        // Redis 서버에 연결할 SocketChannel 생성
        SocketChannel channel2 = SocketChannel.open();
        channel2.configureBlocking(false); // non-blocking 모드로 설정
        channel2.connect(new InetSocketAddress("localhost", 6380)); // Redis 서버 주소

        // 연결 작업을 위한 SelectionKey 등록
        channel1.register(selector, SelectionKey.OP_CONNECT);
        channel2.register(selector, SelectionKey.OP_CONNECT);

        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() < start + 1000) {
            selector.select(); // 준비된 이벤트가 있는지 확인

            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();

                if (key.isConnectable()) {
                    // 연결 완료 처리
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if (socketChannel.finishConnect()) {
                        // 연결 성공 후, 읽기 작업을 위한 SelectionKey 등록
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                        // Redis 서버에 PING 명령 전송
                        socketChannel.write(ByteBuffer.wrap(cmd.getBytes()));
                    }
                } else if (key.isReadable()) {
                    // 데이터 읽기 처리
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = socketChannel.read(buffer);
                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.limit()];
                        buffer.get(bytes);
//                        System.out.println("Received: " + new String(bytes));
                        count += 1;
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                    }
                } else if (key.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    // PING 명령 보내기
                    socketChannel.write(ByteBuffer.wrap(cmd.getBytes()));
                    // 다시 읽기 상태로 전환
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
        }
        System.out.println("count = " + count);
    }
}

만약 1초동안 두개의 채널을 Selector를 사용하지 않고 관리하려면 어떻게 해야할까요? 앞서 본 BlockingPingPong 클래스 내의 작업을 2개의 스레드를 만들어 작업을 맡기는 형태로 수행해야합니다. 그러면 2개가 아닌 n개로 채널이 늘어나면 그만큼의 스레드의 수도 늘어나겠죠. 최적화를 위해 스레드풀을 사용한다해도 결국엔 스레드를 유지해야하는 비용은 절감시키지 못합니다.

서두에 언급된 것처럼 Selector가 모든 상황에서의 절대 우위을 가진 마법같은 구현체는 아닙니다. 하나의 채널만을 관리하는 구현에서는 오히려 기존의 Channel을 직접 생성하는 방식이 요청 수가 많은 경우도 있었습니다. 하지만 구동된 java process와 네트워크 연결의 수가 증가하면 할 수록 Selctor라는 클래스가 가지는 장점을 최대한 활용할 수 있습니다.