본문 바로가기
정리 예정

[Kafka] #9 아파치 카프카 컨슈머(Kafka Consumer) - Shutdown Hook를 이용한 컨슈머 종료

by 왕 달팽이 2019. 2. 6.
반응형

C 프로그래밍을 작성할 때 종료 핸들러(Exit Handler)라는 것을 작성해야할 때가 있다. 원격 서버와의 통신시 클라이언트 연결이 서버 쪽의 리소스를 잡아먹고 있을 때, 적절히 정리 작업을 하지 않으면 서버측 리소스가 점유된 상태로 남아버리기 때문에 종료시에 적당한 정리 작업을 해줘야 할때가 있다.


카프카 컨슈머의 경우에도 쓰레드가 종료할 때 생성했던 consumer 객체를 consumer.close() 메소드로 정리를 해주는게 좋다. 그렇지 않으면 일정기간동안 브로커 서버에 종료되는 클라이언트와 관련된 내용이 남아있을 수 있기 때문이다.


C 프로그램에서 사용했던 atexit() 같은게 Java에서도 존재한다. 셧다운 후크(Shutdown hook) 라는 기능으로 자바 애플리케이션이 강제 혹은 정상적으로 종료될 때 JVM 런타임이 실행시켜주는 기능이다.


셧다운 후크를 이용한 정리 방법

셧다운 후크는 Runtime을 얻어와서 추가할 수 있다.


final Thread mainThread = Thread.currentThread();


Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() { 

consumer.wakeup();


try {

mainThread.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});


addShutdownHook() 메소드에 Thread 를 구현한 익명 클래스를 집어 넣으면, 런타임 JVM이 셧다운 될 때 ShutdownHook()으로 등록한 쓰레드가 start() & run() 된다. 정상종료뿐만 아니라 [Ctrl] + c 를 누르는 동작에서도 셧다운 훅이 동작한다.


run() 메소드에서 유의해야할 부분은 consumer.wakeup(); 부분이다. consumer.wakeup()을 실행하면 consumer 객체에 "넌 이제 종료해야한다"라는 정보를 남겨둔다. 이후 consumer.poll() 메소드를 실행할 때 이 정보를 보고 WakeupException을 발생시킨다.


main 쓰레드 쪽은


try {

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records) {

// doSomeThing..

}

consumer.commitAsync();

}

} catch (WakeupException e) {

// ignore

} finally {

consumer.close();

}


이런식으로 구성하면 된다. 


while(true) 를 이용하여 무한 루프를 만들었기 때문에 이 루프를 안전하게 벗어날 방법이 필요하다. 만약 JVM이 셧다운 후크를 구동시켰다면 consumer 객체에 wakeup 정보를 남겼을 것이고 다음번 poll() 메소드에서 WakeupException을 발생시킨다. 이 예외는 무시하며 finally 구문에서 consumer 객체를 close() 한다.


consumer.close() 메소드에서는 필요하다면 오프셋을 커밋하고, 현재 컨슈머가 그룹을 떠난다고 그룹 코디네이터(Group Codinator)에게 메시지를 전송한다. 그룹 코디네이터는 세션 타임아웃까지 기다리지 않고 곧 바로 리밸런싱을 수행하여 의도치 않은 지연을 발생시키지 않게 된다. 



반응형

댓글