지난 포스트에서 자바를 이용한 카프카 컨슈머 프로그램을 작성해봤다. 그 중 오프셋 커밋과 관련된 기본적인 내용을 다뤘었다.
이번 포스트에서는 오프셋을 좀 더 상세히 다루는 방법에 대해서 정리해보겠다.
특정 오프셋으로 커밋하기
이전 포스트에서 다뤘던 오프셋 커밋 메소드들(commitSync(), commitAsync())은 poll() 메소드에서 가져온 마지막 오프셋을 커밋했다. 사용자는 "지금 오프셋을 커밋해줘"라고 요청을 보낼 수 있을 뿐이었다. 만약 한번에 가져오는 메시지의 양이 많거나 하나의 메시지를 처리하는데 오래걸린다면, poll() 메소드에서 가져온 메시지를 처리하는 중간중간에 처리가 완료된 오프셋을 커밋하고 싶을 수 있다.
카프카 컨슈머 API에는 특정 오프셋을 커밋할 수 있는 기능이 있어서 이런 동작을 구현할 수 있다. 예제를 살펴보자.
카프카의 커밋 메소드인 consumer.commitAsync() 메소드에 currentOffsets라는 인자를 넘겨준 부분을 볼 수 있다. currentOffsets는 TopicPartition 정보를 Key로 갖고 OffsetAndMetaData를 Value로 갖는 HashMap이다. 즉 커밋할 때 갱신할 토픽의 파티션 별 오프셋을 해시맵으로 만들어서 넘겨주는 부분이다.
위 코드에서는 매 100개의 메시지마다 한번씩 커밋을 요청하도록 코드를 작성했다.
특정 오프셋부터 읽기
특정 오프셋으로 커밋을 하는 기능이 있듯이 특정 오프셋부터 읽도록 설정하는 기능도 존재한다. 마치 파일을 열어서 lseek 함수를 이용해 현재 포지션을 변경하여 특정 부분부터 읽는게 카프카의 파티션 데이터를 읽을 때에도 적용된다.
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
public void seek(TopicPartition partition, long offset)
consumer의 메소드들 중 위 메소드가 메시지를 읽을 포지션을 변경할 때 사용되는 메소드다. seekToBeginning() 메소드는 입력받은 토픽 파티션들의 처음부터 메시지를 읽기 위해 사용된다. seekToEnd() 메소드는 입력받은 토픽 파티션들의 마지막부터 메시지를 읽기 위해 사용된다. seek() 메소드는 특정 토픽 파티션을 offset으로 지정한 부분부터 읽도록 설정하는 메소드다.
리밸런스 리스너
컨슈머들은 자신들에게 문제가 생기지 않았더라도 리밸런스에 의해 파티션을 새로 할당받을 수 있다. 예를 들어 5개의 컨슈머로 구성된 컨슈머 그룹에서 하나의 컨슈머에 장애가 발생하여 리밸런스가 일어났다고 하자. 장애가 생긴 컨슈머가 소유하던 파티션을 다른 컨슈머에게 나눠주기만하는 방식으로 리밸런스를 할수도 있지만 전체 파티션 소유권을 4개의 컨슈머만 존재한다고 가정하고 전체 파티션을 다시 할당할 수도 있다.
이 경우 자신이 할당받아서 처리하던 컨텍스트(Context)를 정리해야할 필요가 있을 수 있다. 카프카에서 메시지를 읽어 처리할 때 데이터나 통계 정보 등을 캐싱하고 관리한 다음 주기적으로 파일 쓰기를 하거나 다른 통계 수집 장치로 전송한다고 했을 때, 리밸런스로 파티션 소유권을 빼앗기기 전에 지금까지 캐싱해놨던, 수집해놨던 정보를 처리해야 할 필요가 있다. 이 때 리밸런스 리스너(Rebalance Listener)를 사용할 수 있다.
리밸런스 리스너는 리밸런스 이벤트가 발생했을 때 동작하는 메소드를 정의한 클래스다.
public interface ConsumerRebalanceListener {
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
리밸런스 리스너는 ConsumerRebalanceListener 인터페이스를 구현한 클래스를 사용할 수 있다.
onPartitionRevoked() 메소드는 리밸런싱이 시작되기 전에 호출된다. 즉, 현재 캐싱하고 있는 데이터나 컨텍스트가 있다면 이 메소드에서 정리를 해줘야한다.
onPartitionAssigned() 메소드는 리밸런싱이 끝나서 파티션 소유권이 할당되고 나면 호출된다. 캐싱을 위한 초기화 작업이 필요하다면 이 메소드에 구현하면 된다.
consumer.subscribe(topics, new HandleRebalance());
HandleRebalance 클래스가 ConsumerRebalanceListener 메소드를 구현한 클래스라고 하자. 이 리밸런스 리스너를 consumer.subscribe() 메소드의 인자로 넘겨주면 컨슈머에 설치된다.
댓글