카프카로 데이터를 전송하는 프로듀서와 다르게 카프카에서 데이터를 읽어가는 컨슈머는 컨슈머 그룹이라는 개념에 대해서 먼저 알아야 할 필요가 있다.
이전 포스트에서 간략하게 컨슈머의 메시지 읽기 동작에 대해 언급했었다. 이번 포스트에서는 좀 더 자세히 설명하여 카프카 컨슈머의 동작 방식에 대해서 확실하게 이해하고 넘어가도록 하겠다.
컨슈머 그룹(Consumer Group)
카프카 프로듀서가 전송한 메시지는 토픽의 파티션에 나눠서 저장된다. 파티션에 저장된 이 메시지들은 컨슈머(Consumer)들에 의해 읽혀진다. 하나 이상의 카프카 컨슈머들은 컨슈머 그룹(Consumer Group)을 형성한다. 컨슈머가 특정 파티션의 데이터 소비를 위한 개념이라면 컨슈머 그룹은 토픽 전체의 데이터 소비 관리를 위해 사용된다. ("컨슈머 그룹이 토픽을 소비한다." & "컨슈머가 파티션을 소비한다."라고 생각하면 된다.)
예를 들어보자. 4개의 파티션으로 구성된 "Topic A"라는 토픽을 생각해보자. 이를 "Consumer Group A"라는 컨슈머 그룹이 소비를 하고 있으며 이 컨슈머 그룹에는 "Consumer 1"이라는 컨슈머가 속해있다.
컨슈머 그룹에 속한 컨슈머는 하나뿐이기 때문에 "Consumer 1" 컨슈머가 모든 파티션의 데이터를 소비한다. 위 그림에서 보는 바와 같이 하나의 컨슈머가 4개의 파티션에서 데이터를 읽어간다. 참고로 카프카 토픽의 파티션과 컨슈머의 연결을 소유권(Ownership)이라고 하며, "Consumer 1"이 "Partition 1"을 소유(own)한다고 표현한다.
하지만 이렇게 컨슈머를 구성하면 프로듀서에서 생산하는 메시지를 하나의 컨슈머가 모두 처리하기 버거울 수 있다. 그렇게 되면 컨슈머 쪽이 병목점(Bottleneck)이 되어 카프카 파티션에 처리되지 않은 메시지가 쌓이게 된다. 이런 지연의 발생은 데이터 파이프라인의 실시간성을 떨어트리고 결국 장애를 발생시키게 될 가능성이 높다.
이런 문제를 해결하기 위해서 데이터를 읽어가는 컨슈머 쪽의 스케일 아웃(Scale-out)이 필요하다. 다시말하면 컨슈머를 하나 이상 추가하여 "Topic A"에 쌓이는 데이터의 처리량을 늘리는 작업이 필요하다는 것이다.
스케일 아웃을 위해 컨슈머 그룹에 하나의 컨슈머를 추가했다. 이제 컨슈머 그룹에는 기존에 존재하던 "Consumer 1"과 더불어 "Consumer 2"가 존재하게 되었다. 컨슈머 그룹에 컨슈머가 추가되면, 컨슈머 그룹에 속한 컨슈머들이 파티션들의 소유권을 나눠갖게 된다. 잠시후 알아보겠지만 이런 과정을 리밸런싱(Rebalancing)이라고 한다.
4개의 파티션의 소유권을 2개의 컨슈머가 2개씩 나눠갖는 이상적인 시나리오를 생각해볼 수 있다. 이렇게 구성되면 컨슈머가 토픽에 저장된 데이터를 읽어가는 속도가 2배 가까이 증가하기 때문에 지연이 덜 발생하게 된다.
그렇다면 컨슈머 그룹에 컨슈머를 계속 추가하면 그에 비례해서 계속 처리량(Throughput)이 증가할까? 그렇지는 않다.
4개의 컨슈머로 구성된 컨슈머 그룹을 생각해보자. 이 컨슈머 그룹이 "Topic A"를 소비한다면 위 그림처럼 소유권이 할당될 것이다. 아름답게 각자 하나씩 파티션을 담당해서 데이터를 읽어가게 된다.
이런 아름다운 그림에서도 지연이 발생한다면? 습관적으로 컨슈머 하나를 컨슈머 그룹에 추가해서 처리량을 증가시키려고 할 것이다.
하지만 효과는 없다. 카프카에서는 "하나의 파티션을 두개 이상의 컨슈머가 소비할 수 없다". 따라서 파티션이 4개인 토픽에는 최대 4개의 컨슈머만 동작할 수 있다.
카프카가 이와 같은 제약을 만들게 된 이유는 "파티션 내에서의 순서보장"을 위해서다. 카프카는 토픽 수준에서의 순서보장은 지원하지 않는다. 카프카 프로듀서가 첫 번째 메시지를 1번 파티션에 전송하고, 두 번째 메시지를 3번 파티션에 전송했다고 하자. 1번 파티션과 2번 파티션을 소비하는 컨슈머가 같지 않을 수 있기 때문에 나중에 전송된 두 번째 메시지가 먼저 컨슈머에 의해 소비될 수 있다.
하지만 카프카는 같은 파티션에서의 순서는 보장한다. 즉 프로듀서가 1번 파티션으로 첫 번째, 두 번째 메시지를 순차적으로 전송했다고 하면 컨슈머에서도 첫 번째, 두 번째 메시지 순으로 소비됨이 보장된다.
이러한 특성을 이용해 순서가 상관없는 메시지들은 서로 다른 파티션으로 전송하여 처리량과 고가용성을 도모하고, 순서가 중요한 메시지는 키(Key) 값과 필요하다면 파티셔너(Partitioner)를 이용해 한 파티션으로 몰아서 전송할 수 있다.
그럼 4개의 파티션과 4개의 컨슈머로 구성된 상황에서의 병목은 어떻게 해소해야 할까? 프로듀서의 전송량이 피크를 치고 다시 떨어지게 되면 자연스레 지연 상황이 해소되기도 한다. 하지만 프로듀서의 데이터 전송량이 서비스 증가 등으로 영구히 늘어났다면 토픽에 파티션을 추가하고 컨슈머도 추가하는 스케일 아웃 방법을 고려해봐야 한다. 단 추가된 파티션은 삭제 할 수 없으니 잘 고려해야 한다.
다중 컨슈머 그룹(Multiple Consumer Group)
카프카의 큰 장점 중 하나가 다중 컨슈머 그룹(Multiple Consumer Group)을 지원한다는 것이다. 일반적인 메시지 큐는 한 컨슈머가 읽어가면 다른 컨슈머는 그 다음 메시지를 읽어간다. 즉, 메시지 큐에 [1], [2], [3]이 있으면 첫 번째 컨슈머가 [1]을 읽어가고 두 번째 컨슈머는 [2]를 읽어가게 되는 것이다.
하지만 카프카는 컨슈머 그룹 단위로 다른 컨슈머 그룹과는 독립적으로 데이터를 읽어갈 수 있다. 다시말하면 [1], [2], [3]이 저장되어 있을 때, A 컨슈머 그룹이 [1]을 읽어간 상황에서 B 컨슈머 그룹도 [1]을 읽을 수 있다는 점이다.
그림으로 보자면 위 그림과 같다. "Consumer Group A"는 4개의 컨슈머, "Consumer Group B"는 2개의 컨슈머로 구성된 컨슈머 그룹이다. 이 컨슈머 그룹은 동시에 "Topic A"의 데이터를 소비할 수 있으며 서로에게 간섭하지 않는다. 소유권 구성도 "Consumer Group A"는 각 컨슈머들이 하나씩 파티션을 소유하고 있고, "Consumer Group B"는 2개씩 소유한다.
두 컨슈머 그룹의 컨슈머 구성이 다르기 때문에 아마도 두 컨슈머 그룹의 처리량도 다를 것이다. 하지만 한쪽이 느리게 읽어간다고 해서 다른 한쪽에 영향을 주지는 않는다. 각 컨슈머 그룹엔 남 신경쓰지 않고 자신이 읽어야 할 메시지를 묵묵히 읽기만 할 뿐이다.
리밸런싱(Rebalancing)
위에서 잠깐 언급했던 리밸런싱을 살펴보겠다.
4개의 파티션을 하나의 컨슈머가 처리하는 첫 번째 구성에서 병목점이 생겨 컨슈머 하나를 추가했다. 하나의 컨슈머가 추가된 이후 파티션의 소유권을 나눠 갖는 작업이 수행되는데 이 과정을 리밸런싱(Rebalancing)이라고 한다.
리밸런싱은 병목을 해소하기 위해 컨슈머가 추가되었을 때만 발생하는게 아니다. 장애 상황 등에서 컨슈머를 사용할 수 없을 때에도 발생할 수 있다.
4개의 파티션을 두 개의 컨슈머가 잘 소비하고 있다가 "Consumer 2"가 비정상적인 상황으로 동작하지 못하는 경우가 발생할 수 있다. 이 경우 "Consumer 2"에 할당되었던 파티션2, 파티션 4의 메시지는 소비되지 않고 남게된다.
결국 파티션2, 파티션 4의 메시지는 처리되지 않고 쌓이는 지연 현상이 발생하게 된다. 이런 상황은 바람직하지 않은 상황이기 때문에 살아있는 컨슈머들끼리 소유권을 다시 나눠갖는다. 이 경우 살아있는 컨슈머는 "Consumer 1" 밖에 없기 때문에 "Consumer 1"이 모든 소유권을 가져가게 된다.
리밸런싱(Rebalancing)이 일어나는 과정을 간략하게 살펴보자.
카프카 컨슈머 그룹은 그룹 코디네이터(Group Cordinator)라고 하는 카프카 브로커 서버를 하나 할당받게 된다. 컨슈머 그룹을 이루는 컨슈머들은 자신이 장애상황에 빠지지 않았다는 알림으로 그룹 코디네이터에게 하트비트(Hearthbeat)를 전송한다. 마치 심장박동과 같은 메시지를 주기적으로 그룹 코디네이터에게 전송하여 자신이 살아있음을 알리는 것이다.
만약 컨슈머가 일정 기간동안 하트비트를 전송하지 않는다면 그룹 코디네이터는 컨슈머에 장애상황이 발생했다고 판단, 리밸런싱 동작을 시작하게 된다. 컨슈머가 정상적으로 종료하는 상황의 경우 컨슈머가 그룹 코디네이터에게 종료함을 알려주고 리밸런싱이 발생하게 된다.
리밸런싱이 시작되면 그룹 코디네이터는 컨슈머 그룹 리더(Consumer Group Leader)에게 살아있는 컨슈머 목록을 전달하게 되며, 컨슈머 그룹 리더는 이 목록을 이용해 Partition Assignor 클래스를 이용해 소유권 분배를 수행한다. 컨슈머 그룹 리더는 컨슈머 그룹에 속한 컨슈머 중 최초로 접속한 컨슈머로 선정된다.
소유권 분배 정보가 결정되면 분배 결과를 그룹 코디네이터에게 전송하고, 그룹 코디네이터는 이 분배정보를 각각 컨슈머에게 전달한다. 결과적으로 컨슈머는 자신에게 할당된 파티션 정보만 알 수 있다.
오프셋(Offset)
리밸런싱과 관련하여 카프카 브로커 서버에 유지해야하는 정보가 있다. 바로 특정 토픽을 할당받아 메시지를 읽어가던 컨슈머가 어디까지 읽었는지에 대한 정보다. 예를 들어보자.
이전의 예와 마찬가지로 4개의 파티션으로 구성된 "Topic A"를 2개의 컨슈머로 구성된 컨슈머 그룹이 소비한다고 하자. "Topic A"의 1번 파티션에는 5개의 메시지가 저장되어 있고 그 중 4개의 메시지가 컨슈머에 의해 처리된 상황이다. 2번 파티션은 4개의 메시지 중 3개가 처리된 상황이며 3번은 4개 중 3개, 4번 파티션은 3개의 메시지 모두 처리된 상황이다.
이 상황에서 2번 컨슈머에 장애가 생겼다고 해보자. 그러면 2번 컨슈머가 담당했던 2번, 4번 파티션을 1번 컨슈머가 가져가는 리밸런싱이 발생하게 된다.
1번 컨슈머는 자신이 이전에 담당했던 1번, 3번 파티션 외에 2번, 4번 파티션을 새로 담당하게 되었다. 1번 컨슈머는 2번, 4번 파티션의 메시지를 어디서부터 읽어야 할까? 이 정보를 오프셋(Offset)이라고 하며 카프카 브로커 서버에 저장된다. (이전 버전에는 주키퍼에 저장되었지만 주키퍼의 write 오버헤드 때문에 카프카 내부 토픽으로 관리하는 방식으로 변경되었다.)
카프카의 오프셋은 "컨슈머 그룹 단위로 해당 파티션을 어디까지 읽었는지"를 저장한다. 즉, "1번 컨슈머가 1번 파티션의 4번 메시지까지 읽었다"가 아니라 "컨슈머 그룹A가 1번 파티션의 4번 메시지까지 읽었다"를 저장하는 것이다. 이렇게 저장하면 리밸런스 이후에 새로운 컨슈머가 파티션을 할당받아도 이전 파티션과 같은 컨슈머 그룹에 속해있을 것이기 때문에 문제 없이 5번 메시지부터 읽으면 된다.
그러면 두 개 이상의 컨슈머 그룹이 토픽을 소비하는 상황에서는 어떨까?
컨슈머 그룹 A의 경우 파티션 1,2,3,4에 대해 각각 4, 4, 3, 3번 메시지까지 읽었다고 오프셋이 저장되어있다. 반면 컨슈머 그룹 B의 경우 파티션 1, 2, 3, 4에 대해 3, 3, 3, 2번 메시지까지 읽었다고 오프셋이 저장되어 있다. 위에서 말했듯이 오프셋은 "컨슈머 그룹과 토픽 파티션 사이의 정보"다. 따라서 컨슈머 그룹이 다르면 별도의 오프셋 정보가 각각 저장 관리되며 다른 컨슈머 그룹의 오프셋은 영향을 미치지 않는다.
이 개념을 잘 이해했다면 카프카 컨슈머의 동작 방식을 잘 이해했다는 말이다. 이 개념을 기반으로 카프카 컨슈머 예제와 각종 설정값에 대해서 알아보겠다.
댓글