본문 바로가기
정리 예정

[Kafka] #7 아파치 카프카 컨슈머(Kafka Consumer) 자바(Java)예제 코드

by 왕 달팽이 2019. 2. 2.
반응형

카프카 컨슈머 그룹과 컨슈머의 동작 방식에 대해서 알아봤다. 이제 자바를 이용해서 카프카 데이터를 읽어가는 예제 프로그램을 작성해보겠다. 



이전 포스트에서 다뤘던 내용을 바탕으로 카프카 서버를 구동하고, "test"라는 이름의 토픽을 준비해보자.


카프카 컨슈머 예제

"test" 토픽에서 메시지를 소비하는 가장 간단한 샘플 코드는 다음과 같다. 



이 코드를 실행해 놓고, 이전에 만들었던 SimpleKafkaProducer를 구동하여 "test" 토픽에 데이터를 보내면 다음과 같은 결과를 보게 된다. 


String Value


SimpleKafkaProducer 에서 브로커에 전달한 데이터를 컨슈머가 읽어간 것을 확인할 수 있다. 


정상 동작하는 것을 봤으니 코드의 부분부분을 살펴보겠다.


1
2
3
4
5
6
7
Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id""ConsumerGroupA");
props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
 
KafkaConsumer<StringString> consumer = new KafkaConsumer<>(props);
cs


카프카 프로듀서를 만들었던 것처럼 카프카 컨슈머도 Properteisa 객체에 설정값들을 세팅해서 생성자를 호출 할 때 넘겨줘야 한다. 예제 코드에서 사용한 설정값들을 하나씩 살펴보자. 


"bootstrap.servers"는 카프카 컨슈머가 처음 구동될 때 접속할 브로커의 주소다. 위 예제에서는 로컬 환경에서 구축해놓은 하나의 브로커만 존재했기 때문에 bootstrap.servers 값에 하나의 주소만 세팅했다. 만일 카프카 브로커를 클러스터로 구축했다면 이 값을 두 개 이상 설정하는걸 권장한다. bootstrap.servers에 설정한 브로커 중 하나가 장애 상황일 때 다른 쪽으로 접속할 수 있어야 컨슈머가 정상 동작할 수 있기 때문이다. 


"group.id" 값은 카프카 컨슈머가 속할 컨슈머 그룹 이름을 의미한다. group.id 값을 명시한 컨슈머가 브로커에 접속하게 되면 group.id에 명시한 컨슈머 그룹으로 합류한 것으로 간주된다. group.id 값을 생략할 경우 어떤 그룹에도 속하지 않는 특이한 컨슈머로 동작하게 된다. 이런류의 컨슈머는 특정한 경우에만 사용한다.


"key.deserializer""value.deserializer"는 바이트로 표현된 Key, Value 값을 다시 객체로 만들어 주는 클래스다. 프로듀서에서 카프카로 메시지를 전달할 때 Key와 Value 값을 바이트로 변환하기 위해서 Serializer를 사용했던 것을 생각해보자. 그 동작의 반대 개념으로 바이트를 Key, Value 클래스로 되돌려줄 때 사용하는 클래스다. 이 예제에서는 String 타입의 Key, Value를 사용했으므로 StringDeserializer를 사용했다. 


이 설정들을 세팅한 Properties 객체를 인자로 넘겨서 KafkaConsumer 객체를 생성했다.


1
consumer.subscribe(Collections.singletonList("test"));
cs


컨슈머 객체를 만든 다음 subscribe() 메소드를 이용해 "test" 토픽에서 데이터를 읽어가도록 설정했다. subscribe() 메소드는 토픽의 이름을 담는 컬렉션 객체를 받을 수 있으며 여기서는 "test" 문자열 하나만을 담는 컬렉션 객체를 만들어 넘겼다. 


subscribe() 메소드는 카프카 컨슈머 객체의 구독정보를 덮어쓴다. 예를 들어 "test" 토픽을 subscribe() 메소드로 추가한 다음 consumer.subscribe(Collections.singletonList("test1")); 을 수행하면 "test"에 대한 구독 정보는 덮어쓰여지고 "test1" 토픽만 구독하게 된다. "test", "test1" 토픽을 모두 구독하고 싶은 경우 subscribe() 메소드의 인자로 Collection<String> 객체를 생성, 구독할 토픽을 추가해서 넘겨주면 된다.


만약 아무것도 구독하고 싶지 않은 경우 consumer.unsubscribe() 를 수행하면 된다.


1
consumer.subscribe("test*");
cs


토픽을 구독할 때 정규 표현식을 이용할 수도 있다. consumer.subscribe() 메소드에 "test*" 같은 패턴을 입력하면 test로 시작하는 모든 토픽을 구독할 수 있다. 


다음으로 카프카 컨슈머에서 가장 중요한 부분인 poll loop을 보겠다. 


1
2
3
4
5
6
while (true) {
    ConsumerRecords records = consumer.poll(100);
 
    for (ConsumerRecord record : records)
        System.out.println(record.value());
}
cs


while(true) 로 시작하는 반복문은 무한히 실행되며 브로커에서 메시지를 읽어 들인다. consumer.poll() 메소드를 실행하면 일정량의 ConsumerRecord 들이 담긴 ConsumerRecords 객체가 리턴된다. ConsumerRecords 객체의 각 Record들을 for each 구문으로 하나씩 돌면서 값을 출력한다. 


consumer.poll()의 인자로 입력된 100이라는 숫자는 100ms 정도를 기다렸다가 리턴하라는 의미이다. 100ms 정도만 기다렸다가 처리할 Record가 없으면 for each 문을 수행하지 않고 다음 consumer.poll()을 실행하게 된다.


사실 poll() 메소드는 카프카 브로커에서 메시지를 읽어오는 동작 이상의 기능을 수행한다. 카프카 컨슈머가 생성되고 최초로 poll() 메소드를 실행하게 되면 그룹 코디네이터(GroupCoordinator)를 호출해서 자신을 Consumer Group에 추가하고 파티션 소유권을 다시 받아온다. 또 poll() 메소드에서는 그룹 코디네이터에 하트비트(Heartbeat)를 전송하여 자신이 살아있음을 끊임없이 알리게 된다. (카프카 0.10.1 버전 이후에는 하트비트를 전송하는 전용 쓰레드가 만들어졌다.)


카프카 컨슈머와 관련해 유의해야 할 점이 하나있다. 하나의 쓰레드에서는 같은 그룹에 속하는 다수의 컨슈머를 함께 실행시킬 수 없다. 또 한 여러 쓰레드가 하나의 컨슈머 객체를 함께 사용할 수도 없다. 따라서 하나의 애플리케이션에서 같은 그룹의 여러 컨슈머를 실행시키고 싶으면 각 컨슈머를 별도의 쓰레드에서 실행하야 한다. 


카프카 컨슈머 설정

카프카 컨슈머에는 위에서 살펴봤던 기본적인 설정들 외에도 다양한 설정들이 있다.


fetch.min.bytes (default=1), fetch.max.wait.ms (default=500ms)

이 설정 값은 카프카 브로커 서버에서 한번에 받기를 원하는 데이터의 최소량을 의미한다. 즉 카프카 브로커가 컨슈머로 메시지를 보낼 때 이 설정값 만큼 데이터를 모았다가 한번에 전송한다. 데이터는 많지 않은데 CPU 사용량이 높거나 컨슈머의 숫자가 지나치게 많을 경우 이 값을 높여주면 효과가 있다.

브로커 서버가 데이터를 fetch.min.bytes 만큼 모았다가 전송 하긴 하지만 fetch.max.wait.ms 시간이 지나면 더 이상 기다리지 않고 지금까지 모아둔 데이터를 컨슈머에게 보낸다.  


max.partition.fetch.bytes (default = 1MB)

이 설정 값은 컨슈머가 파티션 별로 할당할 수 있는 메모리의 최대 값이다. 이 값은 브로커의 설정 값인 message.max.bytes 값보다 커야한다. 그렇지 않으면 브로커가 전송한 메시지를 컨슈머가 받지 못하게 된다. 하나의 컨슈머가 특정 토픽의 메시지를 처리하기 위해서 할당해야할 메모리 사이즈는 max.partition.fetch.bytes * (파티션 개수 / 컨슈머 개수)로 계산해 볼 수 있다. 다만 다른 컨슈머에 장애가 생겼을 때 리밸런싱이 일어나는 경우를 고려해야 할 수도 있다.

max.partition.fetch.bytes 값이 너무 큰 경우 카프카 브로커에 데이터가 많이 쌓여 있는 경우 consumer.poll() 에서 한번에 가져오는 데이터가 많아질 수 있다. 이 때 한번의 poll() 루프에서 처리해야하는 데이터가 많아 질 수 있으며 이 경우 session.timeout.ms 값으로 설정된 세션 타이아웃 시간을 넘어버려서 컨슈머가 의도치않은 장애상황을 맞이할 수도 있다. 이런 상황이 자주 발생한다면 max.partition.fetch.bytes 값을 줄이거나 session.timeout.ms 값을 늘리는 식으로 튜닝을 할 수 있다.


session.timeout.ms (default=10s)

이 설정 값은 카프카 브로커가 컨슈머에 장애가 생겼다고 판단하는데 걸리는 시간을 의미한다. 컨슈머는 주기적으로 그룹 코디네이터에게 하트비트를 전송하여 자신이 살아있음을 끊임없이 알린다. 컨슈머에게서 하트비트가 오지 않으면 그룹 코디네이터는 컨슈머에게 장애가 생겼는지 일시적인 지연인지 판단해야한다. 이 때 사용되는 값이 session.timeout.ms 값이다.

그룹 코디네이터는 컨슈머로부터 하트비트가 오지 않으면 session.timeout.ms까지 기다렸다가 컨슈머에 장애가 생겼다고 판단하게 된다. 이후 해당 컨슈머가 가지고 있었던 파티션 소유권을 다른 컨슈머에게 할당하기 위해 리밸런싱을 시작하게 된다. 이 설정 값은 컨슈머가 하트비트를 보내는 주기에 대한 설정값인 heartbeat.interval.ms와 밀접하게 관련이 있다. 일반적으로 heartbeat.interval.ms 값을 session.timeout.ms 값의 1/3 수준으로 설정한다. 

session.timeout.ms 값이 작은 경우 컨슈머에 조금이라도 이상상황이 발생했을 때, 빠르게 감지하여 리밸런싱을 수행할 수 있다. 하지만 처리할 데이터의 양이 많아 하트비트 전송이 지연되는 경우에 불필요한 리밸런싱이 자주 발생할 수 있다. 반대로 session.timeout.ms 값이 큰 경우 불필요한 리밸런싱은 줄어들지만 실제로 장애상황이 발생했을 때 현상을 해소하기까지 걸리는 시간이 늘어날 수 있다.


auto.offset.reset (default=latest)

이 설정값은 컨슈머가 읽어야 할 오프셋(Offset)이 없을 때의 동작을 지정한다. 이 설정은 "latest"와 "earliest"를 설정값으로 가질 수 있다. "latest"로 설정하면 최신 데이터부터 읽기 시작하며, "earliest"로 설정하면 브로커에 저장된 최초의 데이터부터 읽어오기 시작한다. 


enable.auto.commit (default=true)

이 설정값은 카프카 컨슈머의 오프셋(Offset) 정보를 어떻게 관리할 것인지를 지정할 때 사용한다. enable.auto.commit 설정 값이 true로 설정되어 있으면 사용자가 오프셋 커밋을신경쓰지 않아도 카프카 컨슈머 라이브러리에서 알아서 커밋을 해준다. 이 때 auto.commit.interval.ms 값으로 설정된 주기마다 브로커에 오프셋을 갱신해달라는 커밋 요청을 날리게 된다.

자동 커밋을 하게 되면 편리하지만 때에 따라서 데이터의 누락이 발생하거나 중복된 처리가 발생할 수 있다. 사용자가 스스로 오프셋 커밋을 관리하기 위해서는 enable.auto.commit 값을 false로 설정해야 한다.


partition.assignment.strategy (default=org.apache.kafka.clients.consumer.RangeAssignor)

이 설정값은 리밸런스가 발생했을 때 파티션의 소유권을 할당하는 로직을 지정하기 위해서 사용한다. 그룹 코디네이터가 리밸런스를 하기로 결정하면 컨슈머 그룹 리더(컨슈머)에게 파티션의 소유권을 재할당해달라고 요청을 보낸다. 그룹 리더는 이 설정값으로 입력된 클래스를 이용해서 파티션의 소유권을 재분배한다. 기본적으로는 RangeAssignor를 사용하지만 RoundRobinAssignor를 사용하거나 사용자가 생성한 Assignor를 이용해서 파티션 소유권 분배랄 할 수 있다.

RangeAssignor는 org.apache.kafka.clients.consumer.RangeAssignor 클랫로 구현되어있다. 이 Assignor는 각 컨슈머마다 토픽별 파티션을 연속적으로 할당하는 로직을 구현하고 있다. 예를들어 3개의 파티션으로 구성된 2개의 토픽(T1, T2)이 있다고 하자. 이 때, 2개의 컨슈머(C1, C2)에 다음과 같이 할당한다. 


C1 = {T1.P1, T1.P2, T2.P1, T2.P2}, C2={T1.P3, T2.P3}


3개의 파티션을 각각 절반씩 나눠서 2개 1개씩 나눠갖는 방식이다. 

RoundRobinAssignor를 사용하면 각 컨슈머들에 한번씩 번갈아가면서 파티션을 할당하게 된다. 위에서 할당했던 파티션 분배는 다음과 같이 이뤄지게 된다.

C1 = {T1.P1, T1.P3, T2.P2}, C2 = {T1. P2, T2.P1, T2.P3}

각 컨슈머들이 할당받는 파티션이 약간씩 다른 것을 볼 수 있다. 

사용자가 직접 Assignor를 구현할 수 있다. 이 때, 사용자는 org.apache.kafka.clients.consumer.internals.PartitionAssignor 인터페이스를 구현해야 한다. 

client.id

카프카 브로커가 컨슈머를 식별할 때 사용할 문자열이다. 

max.poll.records (default=500)

한번의 poll() 요청으로 가져올 records의 최대 개수를 의미한다. 이 값을 이용해서 한번에 처리할 수 있는 데이터의 양을 제어할 수 있다.

receive.buffer.bytes, send.buffer.bytes (default=64K)

컨슈머에서 사용할 TCP 송수신 버퍼의 크기를 의미한다. -1로 설정하면 운영체제에 설정된 기본값을 사용하게 된다. 카프키 브로커와 클라이언트 사이의 네트워크 홉이 많다면, 즉 다른 데이터 센터나 네트워크에 위치한다면 이 값을 올려보는 것도 좋다.


오프셋과 커밋

이전 포스트에서도 정리했듯이 오프셋(Offset)은 특정 컨슈머 그룹이 토픽의 파티션에서 어디까지 메시지를 읽어갔는지를 나타낸다고 했다. 컨슈머가 데이터를 읽어간 다음 "내가 여기까지 읽어갔어요"라고 브로커에 알리는 작업을 커밋(Commit)이라고 한다. 커밋 요청을 받으면 브로커는 __consumer_offset 이라는 내부 토픽에 오프셋 정보를 갱신한다. (이전 버전에서는 주키퍼에 이 정보를 저장했지만 write 오버헤드 때문에 신버전에서는 카프카 브로커의 내부 토픽으로 관리하게 바뀌었다.)

커밋을 어떻게 처리할 것인가에 따라서 데이터가 누락될 수도 있고 중복처리 될 수도 있다. 예를들어보자.


카프카의 특정 파티션에 12개의 메시지가 들어있다. 현재 컨슈머 그룹은 이전에 4번 메시지까지 읽어갔다고 브로커에 커밋되었고, 컨슈머는 5번부터 12번까지 메시지를 읽어가서 현재 10번 메시지까지 처리되고 있다. 가지고 온 12번 메시지까지 처리한 다음 브로커에 오프셋을 커밋할 예정이다.



하지만 10번 메시지를 처리하는 도중 컨슈머에 장애가 발생했다. 그룹 코디네이터는 이 컨슈머의 장애상황을 인지하고 이 파티션의 소유권을 다른 컨슈머에게 넘기기 위해 리밸런싱을 수행한다. 새로 이 파티션을 할당받은 이전에 수행되었던 컨슈머가 10번 메시지까지 처리했다는 사실을 알지 못하기 때문에 마지막으로 커밋된 오프셋인 4번 이후 메시지부터 다시 읽어서 처리하게 된다. 즉, 5번부터 9번까지의 메시지는 두 번 처리가 된다.


이런 중복처리를 방지하기 위해 데이터를 가져올 때 오프셋을 커밋하는 동작을 생각해볼 수있다. 



컨슈머가 poll() 메소드를 이용해서 12번 메시지까지 가져오는 동시에 오프셋을 커밋했다고 해보자. 5번부터 12번까지 메시지는 컨슈머가 읽어갔으며 브로커에는 12번까지 처리되었다고 오프셋이 커밋된다. 이제 컨슈머는 가져온 5번~12번 메시지를 처리한다.  그러다가 7번 메시지를 처리하는 과정에서 장애상황이 발생했다. 마찬가지로 그룹 코디네이터는 장애상황을 처리하기 위해 리밸런싱을 수행하고 새로운 컨슈머에게 이 파티션이 할당된다. 


하지만 새로 파티션을 할당받은 컨슈머는 이전 컨슈머가 7번 메시지까지만 처리했다는사실을 알지 못하기 때문에 마지막으로 커밋된 12번 이후 메시지부터 읽어서 처리하게 된다. 즉 8~12번 메시지는 처리되지 않고 누락되는 것이다. 


오프셋 커밋과 관련하여 중복처리와 누락이 발생할 수 있다는 점은 잘 인지하고 있어야 한다. 일반적으로는 중복처리를 허용하고 메시지 처리과정을 멱등적(idempotent)으로 작성하는 정도에서 타협한다. 분산환경에서 "Exactly-Once"를 구현하기 쉽지 않으니 "At-Least-Once" 정도로 만족하고 중복처리에 대응하는 방향으로 설계를 하는 것이다.


만약 "Exactly-Once"를 구현하고 싶다면 트랜잭션을 지원하는 RDBMS 같은 외부장치들을 추가로 이용하는 방안을 생각해 볼 수 있다.


자동 커밋(auto commit)

카프카의 오프셋 관리를 하는 가장 간단한 방법은 자동커밋(auto commit)을 이용하는 것이다. 카프카 컨슈머 사용자는 consumer.poll() 메소드를 이용해서 메시지들을 가져가기만 하면 되며, 카프카 컨슈머 라이브러리에서 자동으로 커밋 동작을 수행한다. 자동 커밋을 사용하려면 enable.auto.commit=true를 설정하여 컨슈머를 생성하면 된다.

 

자동 커밋의 커밋 주기는 auto.commit.interval.ms 값으로 설정할 수 있으며 기본값은 5초로 5초마다 오프셋을 커밋하게 된다. 이는 다음 5초가 지날때까지는 커밋이 발생하지 않는다는 것을 의미하며 그 사이에 장애가 발생할 경우 중복 처리가 발생할 수 있음을 나타낸다. 커밋 동작은 consumer.poll() 메소드 내부에서 발생하기 때문에 5초가 지나도 다음 poll() 메소드를 호출하지 않으면 커밋이 일어나지 않을 수 있다.

수동 커밋 - 현재 오프셋 커밋하기(동기적)

자동 커밋을 이용하지 않고 사용자가 수동으로 오프셋 커밋을 할 수도 있다. 우선 카프카 컨슈머를 생성할 때 enable.auto.commit=false 설정을 해야한다. 


다음은 사용자가 직접 오프셋 커밋을 수행하는 예제 코드다. 


1
2
3
4
5
6
7
8
9
10
11
while (true) {
    ConsumerRecords<StringString> records = consumer.poll(100);
    for (ConsumerRecord<StringString> record : records)
        System.out.println(record.value());
   
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        System.err.println("commit failed");
    }
}
cs


cunsumer.poll() 메소드를 이용해 메시지를 가져온 다음 각 메시지의 값을 출력한다. 그리고 consumer.commitSync() 메소드를 수행해서 consummer.poll() 메소드로 가지고 온 마지막 오프셋을 커밋한다. 메소드 이름에서 알 수 있듯이 커밋 요청을 날리고 수행이 완료될때까지 기다리는 동기적(synchronous) 방식의 커밋이다.

비동기적 커밋(Asynchronous Commit)

동기적 커밋은 커밋 요청을 브로커로 보내고 응답을 기다리기 때문에 애플리케이션의 수행이 일시적으로 중단될 수 있다는 단점이 있다. 즉 커밋을 자주 수행하게 되면 그 만큼 기다리는 시간도 늘어나서 컨슈머의 처리량이 줄어들게 된다.


1
2
3
4
5
6
7
while (true) {
    ConsumerRecords<StringString> records = consumer.poll(100);
    for (ConsumerRecord<StringString> record : records) 
        System.out.println(record.value());
 
    consumer.commitAsync();
}
cs


카프카 컨슈머는 오프셋 커밋 요청을 비동기적(Asynchronous)으로 보낼 수 있는 API를 제공한다. 동기적인 방식과 마찬가지로 enable.auto.commit=false로 설정하고 consumer.commitSync() 대신 consumer.commitAsync() 메소드를 요청하면 된다. 


다만 동기적인 방식의 경우 커밋 요청을 처리하다가 에러가 발생하면 commitSync() 메소드 안쪽에서 알아서 재시도를 하지만 consumer.commitAsync() 메소드는 재시도를 하지않는다. 재시도를 하지 않는 이유는 다음과 같다. 컨슈머가 오프셋 커밋을 2000과 3000에 수행한다고 해보자. 2000에 비동기적으로 브로커에 전송된 커밋 요청을 기다리지 않고 처리를 계속한 다음 3000에 브로커로 두 번째 커밋을 전송했다고 하자. 그런데 2000에 비동기적으로 전송한 커밋 요청에는 에러가 발생했고, 3000에 보낸 커밋은 정상적으로 처리가 되었다. 이 상황에서 에러가 생긴 2000을 다시 전송하게 되면 3000이라는 오프셋이 2000으로 다시 덮어쓰여지게 된다. 이와 같은 문제때문에 비동기적인 커밋 요청은 재시도를 하지 않는다. 


비동기적인 오프셋 커밋을 사용할 때, 콜백(callback)을 전달하여 브로커에서 응답이 정상적으로 처리되었을 때를 대비할 수 있다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
while (true) {
    ConsumerRecords<StringString> records = consumer.poll(100);
    for (ConsumerRecord<StringString> record : records)
        System.out.printf(record.value());
    
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
            if (e != null)
                System.err.println("Commit failed");
            else
                System.out.println("Commit succeeded");
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        }
    });
}
cs


consumer.commitAsync() 메소드에 OffsetCommitCallback 인터페이스를 구현한 익명 클래스를 정의해서 넘겨줬다. 브로커로부터 커밋 처리를 완료했다는 응답을 받으면 여기에 등록된 콜백이 실행된다.


비동기적인 방식의 커밋에서 재시도를 콜백을 이용해 구현할 수도 있다. 예를 들어 비동기적인 커밋을 요청할 때 커밋을 요청한 최신 오프셋을 항상 저장한다. 만약 커밋 요청이 실패했다는 응답이 돌아오면 콜백에서 최신 오프셋과 실패한 오프셋을 비교해서 같으면 재시도를 하는 식의 구현을 생각해 볼 수 있다.

반응형

댓글