카프카 프로듀서를 생성할 때 프로퍼티를 명시한다. 이전에 봤던 "bootstrap.servers"와 "key.serialized", "value.serializer"를 비롯한 설정가능한 다른 프로퍼티들도 많이 있다. 이번 포스트에서는 카프카 프로듀서에 설정가능한 프로퍼티들을 알아보겠다.
bootstrap.servers
카프카 프로듀서가 최초로 접속할 때 필요한 주소들이다. 모든 카프카 클러스터를 입력할 필요는 없지만 두 개 이상 입력하기를 권장한다. 입력한 브로커 주소 중 하나라도 가용 상태면 프로듀서의 동작에 문제는 없다.
key.serializer
카프카 메시지의 키(key)를 바이트 배열로 만들어 줄 클래스를 명시한다. org.apache.kafka.common.serialization 인터페이스를 구현한 클래스를 사용할 수 있다. 카프카에서는 기본적으로 StringSerializer, IntegerSeriailzer, ByteSerializer 를 제공하고 있다.
value.serializer
카프카 메시지의 데이터 값(value)을 바이트 배열로 만들어 줄 클래시를 명시한다. 마찬가지로 org.apache.kafka.common.serialization 인터페이스를 구현한 클래스를 사용할 수 있다.
acks
카프카 프로듀서의 프로퍼티 중 가장 중요한 프로퍼티라고 할 수 있다. acks=0, acks=1, akcs=all 중 한 값을 가질 수 있다. 이 값은 카프카 브로커 서버로 전송된 메시지의 유실 가능성과 매우 밀접한 관계가 있다.
설명을 위해 Replication Factor가 2인 카프카 토픽을 예로 들겠다. Replication Factor가 2이기 때문에 브로커로 전송된 메시지는 1개의 리더 리플리카(Leader Replica)와 1개의 팔로워 리플리카(Follower Replica)로 저장된다. 리더 리플리카와 팔로워 리플리카는 서로 다른 브로커 서버에 저장된다.
설정 값 |
성능 |
유실 가능성 | 설명 |
acks=0 |
높음 |
높음 | 프로듀서는 브로커로 전송하기만 하고 응답을 기다리지 않는다 |
acks=1 |
중간 |
중간 | 프로듀서는 리더 리플리카를 가지고 있는 브로커의 응답만 기다린다 |
acks=all(-1) |
낮음 |
낮음 | 프로듀서는 ISR을 구성하는 리플리카들이 메시지를 복사해 갈 때까지 기다린다. |
각 옵션의 동작을 그림으로 설명해보면 다음과 같다.
acks=0
카프카 프로듀서는 브로커 서버로 메시지를 전송하기만 한다. 브로커가 메시지를 받아서 제대로 처리했는지 응답을 기다리지 않는다.
카프카 프로듀서는 리더 리플리카를 가지고 있는 브로커에 메시지를 전달한다. 하지만 브로커의 응답을 기다리지는 않는다. 따라서 카프카 프로듀서가 메시지를 보내는 도중 리더 리플리카를 가지고 있는 브로커에 장애가 발생하거나 다운되면 메시지가 유실될 수 있다. 브로커가 안전하게 처리했는지 여부를 카프카 프로듀서는 모르기 때문이다.
응답을 안 받아서 메시지 유실의 위험이 있는 대신 속도는 빠르다.
acks=1
프로듀사가 리더 리플리카를 가지고 있는 브로커 서버의 응답 만을 기다린다.
리더 브로커는 프로듀서로부터 메시지를 받아서 안전하게 처리한 다음 응답을 프로듀서로 보낸다. 이 후 팔로워 리플리카를 담당하는 브로커들이 이 메시지를 읽어서 복제한다. 적어도 리더는 안전하게 메시지를 보관했기 때문에 acks=0으로 저장한 것보다는 안전하다. 대신 응답을 기다리기 때문에 느릴 수 있다.
하지만 여전히 메시지 유실의 가능성은 존재한다. (1) 프로듀서가 리더 리플리카에게 메시지를 전송하고 리더 리플리카가 안전하게 메시지를 보관하고 (2) 응답을 전송한다. 카프카 프로듀서는 카프카 브로커 서버들이 안전하게 자신의 메시지를 저장했다고 생각한다. 이제 팔로워 리플리카가 리더의 메시지를 복제해가려는 순간 리더에 장애가 발생한다. 리더의 브로커가 다운되었기 때문에 팔로워는 데이터를 읽어갈 수 없다. 시간이 흐르고 가용성을 위해 리더가 다시 선출되고, 팔로워가 리더로 선정되어 새로운 리더가 된다.
하지만 새로운 리더는 프로듀서가 전송했고 응답까지 받은 메시지를 가지고 있지 않는다. 프로듀서가 보낸 메시지는 유실된 것이다.
acks=all(-1)
akcs의 마지막 설정은 all 혹은 -1 이다.
all 설정의 경우에도 프로듀서는 메시지 전송 이후 리더 리플리카의 응답을 기다린다. 대신 리더 리플리카는 ISR(In-Sync Replica)를 구성하는 팔로워 리플리카들이 메시지를 복제해 갈때까지 기다린 후 팔로워들이 안전하게 저장함을 확인한 다음 프로듀서에게 응답을 보낸다. 프로듀서가 보낸 메시지가 ISR을 구성하는 팔로워들에 복제됨이 보장되기 때문에 안정성이 더 높다.
ISR은 리더의 내용을 모두 복제하고 있는 리플리카들을 의미한다.(In-Sync라는 단어를 생각해보면 된다) 리더에 장애가 발생해서 새로운 리더를 선출 할 때, ISR을 구성하고 있는 리플리카들만이 리더로 선정될 자격을 얻는다. 즉, acks=all 로 설정한 상태에서 응답을 받았다면, 리더의 장애 시나리오에서도 메시지 유실이 없게 된다.
(더 정확히 말하자면, acks=all 설정인 경우 리더 리플리카는 브로커의 프로퍼티 중 "min.insync.replica"에 명시된 숫자의 리플리카가 복제된 경우 응답을 보낸다. 즉, Replication Factor가 3이면 3개의 브로커 서버에 메시지를 저장하게 된다. 그런데 "min.insync.replica"가 2로 설정되었다면, 2개의 리플리카만 안전하게 저장되면 리더가 응답을 보낸다. 2개에는 리더도 포함이므로 나머지 한 팔로워가 복제를 하지 못 한 상황에서도 리더는 응답을 준다.)
buffer.size
브로커들에게 전송될 메시지들을 저장할 버퍼로 사용될 메모리의 양을 설정한다. 만약 프로듀서의 Sender 쓰레드가 브로커 서버로 데이터를 전송하는 속도보다 빠르게 데이터를 추가하면 "max.block.ms"까지 기다렸다가 가용 공간이 없으면 TimeoutException을 발생시키게 된다.
compression.type
기본적으로 카프카 프로듀서는 브로커 서버로 메시지를 전송할 때 압축되지 않은 데이터를 그대로 전송한다. 하지만 전송할 데이터가 너무 많아서 네트워크 대역폭(Network Bandwidth)에 부하를 주는 상황이거나 카프카 저장 공간이 부족해지는 상황이라면 데이터를 압축하는 것을 고려해볼만하다.
카프카 프로듀서는 "snappy", "gzip", "lz4" 중 하나의 알고리즘을 사용해서 압축을 수행할 수 있다. snappy 알고리즘은 적당한 압축률에 CPU 부하가 적고 성능이 좋다. gzip 알고리즘은 CPU 부하와 압축 시간을 많이 소모하지만 뛰어난 압축률을 얻을 수 있다. 자신에게 적당한 알고리즘을 선택하면된다.
압축한 데이터는 카프카 브로커에 압축 알고리즘에 대한 메타 정보와 함께 압축된 상태로 저장된다. 이후 컨슈머가 데이터를 읽어 갈 때 압축을 풀어서 원본 데이터를 복원하게 된다. 컨슈머 입장에서 압축은 Transparent 하게 동작된다. 즉, 컨슈머는
retries
프로듀서가 브로커 서버로 메시지를 보낼 때 발생한 에러는 일시적일 수 있다. 예를 들어 파티션의 리더 리플리카가 순간적인 장애로 사용할 수 없을 경우, 조금만 기다리면 장애 상황이 해소되거나 새로운 리더 리플리카가 선출되어 정상적으로 메시지 전송을 할 수 있게 된다. "retries" 프로퍼티는 프로듀서가 일시적인 장애에 대해 몇 번 재시도 할 것인가를 의미한다. 이 재시도는 프로듀서 라이브러리 내부에서 일어나기 때문에 에러가 사용자에게 전파되지 않고, 몇 번 재시도하다가 성공하게 된다.
재시도 간에는 "retry.backoff.ms"만큼의 대기시간을 두어 의미없이 많은 요청을 하지 않게 컨트롤 할 수 있다. 일반적으로 retry.backoff.ms 프로퍼티는 운영중에 발생하는 브로커 서버의 장애 복구시간을 설정하는게 좋다. 즉, 장애가 발생했을 때 새로운 리더 리플리카가 선출되는데 걸리는 시간을 이 프로퍼티의 값으로 설정하는 것을 권장한다.
"retries" 프로퍼티와 관련해서 주의해야 할 점이 있다. 또 다른 프로퍼티인 "max.in.flight.requests.per.connection"이 2 이상인 경우 일시적인 에러로 인한 재시도때문에 메시지 전송 순서가 바뀔 수 있다는 점이다. 예를 들어 프로듀서가 "메시지1", "메시지 2" 순서로 전송했다고 하자. max.in.flight.requests.per.connection 값이 1이면 한 번에 한 개의 메시지만 보낸다. 따라서 "메시지 1"이 정상적으로 전송되고 나서 "메시지 2"를 보낸다. max.in.flight.requests.per.connection 값이 2면 한 번에 두 개의 메시지를 보내게 된다. "메시지 1"과 "메시지 2"를 전송했는데 타이밍 이슈로 "메시지 2"는 전송에 성공하고 "메시지 1"만 실패한 경우 프로듀서는 "메시지 1"을 재전송하게 된다. 결국 브로커는 "메시지 2", "메시지 1" 순으로 메시지를 받게 된다.
batch.size
프로듀서는 같은 토픽-파티션으로 전송되는 메시지를 "배치(batch)"라는 단위로 모아서 전송한다. "batch.size"는 이 배치의 사이즈를 결정하는 프로퍼티로 메시지를 어느정도 모았다가 브로커로 전송할 것인지를 결정하는 설정 값이다. 기본적으로 프로듀서는 배치 사이즈만큼 데이터가 모이면 브로커로 한번에 전송한다.
물론 프로듀서가 무조건 배치가 가득 찰 때까지 기다리는 것은 아니다. "linger.ms"에 설정된 시간이 지나면 배치가 가득차지 않아도 브로커로 메시지를 전송한다. 따라서 batch.size 값을 너무 크게 잡으면 batch.size 만큼의 메시지가 모이기도 전에 전송하기 때문에 사용하지도 못하는 메모리를 할당받는 낭비를 발생시킬 수도 있다. 반대로 너무 작게 잡으면 너무 자주 배치 전송이 발생해서 비효율적인 동작을 보일 수도 있다.
max.in.flight.requests.per.connection
이 프로퍼티는 한 번에 몇 개의 요청(Request)을 전송할 것인가를 결정한다. 이 프로퍼티가 1로 설정되어 있으면 프로듀서는 한 번에 하나의 요청을 전송하고 응답을 받은 이후 다음 요청을 전송한다. 이 프로퍼티가 2 이상으로 설정되어 있으면 설정된 만큼 요청을 전송하고 응답을 기다린다.
위에서 "retries" 프로퍼티와 함께 사용할 때 메시지 전송 순서가 뒤바뀔 수 있는 문제가 있다.
timeout.ms, request.timeout.ms
"request.timeout.ms" 설정은 "acks" 설정과 관련이 있다. 프로듀서는 브로커 서버에 메시지를 전송하고 acks 설정에 따라 기다리게 된다. 이 때, 얼마나 기다려야 하는가를 " request.timeout.ms" 설정으로 컨트롤 할 수 있다. "timeout.ms" 설정 역시 비슷한 역할을 한다. 하지만 이 설정은 deprecated 되어 삭제될 예정이므로 "request.timeout.ms" 를 사용하자.
metadata.fetch.timeout.ms
프로듀서가 동작하기 위해서는 메타 데이터가 필요하다. 메타 데이터란 어떤 파티션이 어떤 브로커 서버에 저장되어 있는지, 어떤 토픽이 어떤 파티션을 가지고 있는지 등이다. 이 메타 데이터는 카프카 프로듀서에 캐싱되어 있으며 주기적으로 갱신된다. 만일 만료되었거나 정합성이 훼손된 메타데이터가 있다면 카프카 브로커 서버에 최신 메타 데이터를 요청해야 한다. 이 때, 메타 데이터에 대한 요청을 전송하고 대기한다. 이 설정 값은 언제까지 기다릴 것인가를 컨트롤한다. 하지만 이 설정 값 역시 deprecated 되어 삭제 될 예정이며, "max.block.ms" 설정으로 컨트롤 할 것을 권장하고 있다.
max.block.ms
카프카 프로듀서가 동작하면서 기다려야 할 일이 있을 때, 최대한 이 값으로 설정된 시간까지 기다린다. 버퍼가 모자랄 때, 메타 데이터 응답을 기다릴 때 등 다양한 곳에서 이 설정이 사용된다. 여기에 명시한 시간이 지나면 TimeoutExceptiopn 이 발생한다.
max.request.size
프로듀서가 브로커 서버에 전송하는 요청(Request)의 최대 크기. 프로듀서는 사용자로부터 ProducerRecord 형태로 메시지를 받는다. 이 메시지는 배치(Batch) 형태로 모았다가 브로커로 보낸다. 배치를 브로커로 보낼 때 전송할 브로커 서버에 해당하는 배치들을 Request로 또 다시 모아서 전송하는데 이 Request의 크기를 컨트롤 하는 설정값이다. (부연 설명을 하자면, 1번 브로커가 받을 준비가 되었을 때, 1번 브로커로 보내야 하는 배치들을 모아서 전송한다. 배치들은 각 파티션 별로 하나씩 존재하기 때문에 1번 브로커가 여러 배치를 받아서 처리할 수 있다.)
receive.buffer.bytes, send.buffer.bytes
이 설정은 카프카 프로듀서가 브로커 서버와 통신할 때 사용하는 소켓의 TCP 버퍼 크기를 의미한다. -1로 설정하면 운영체제의 기본 설정 값을 사용한다.
댓글