카프카 프로듀서에서 생각해봐야 할 프로퍼티 중 하나로 파티셔너(Partitioner)다. 파티셔너는 카프카 프로듀서로 메시지를 전송 할 때 토픽의 어떤 파티션으로 전송해야 할지를 결정해주는 클래스다.
카프카의 생성자에 넘겨주는 인자 중 하나인 Properties 객체에 "partitioner.class"라는 이름의 프로퍼티로 설정하면 된다. 별도로 설정하지 않으면 kafka.producer.DefaultPartitioner 클래스가 할당된다.
아파치 카프카 프로듀서의 파티셔너(Partitioner)
파티셔너로 사용할 수 있는 클래스는 org.apache.kafka.client.producer.Partitioner 인터페이스를 구현해야 한다. Partitioner 인터페이스는 다음과 같다.
public interface Partitioner extends Configurable, Closeable {
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
void close();
}
partition() 메소드의 입력 인자중 topic 은 메시지를 전송할 토픽을 의미한다. key, keyBytes 는 Key 클래스와 Key를 변환한 바이트 배열이다. value와 valuesBytes도 마찬가지로 키에 해당하는 데이터 클래스와 바이트 배열을 의미한다. Cluster 객체는 카프카 프로듀서가 넘겨주는 메타데이터로 토픽이 갖는 파티션 정보와 그 파티션이 어떤 브로커 서버에 있는지를 알 수 있게 한다.
이런 정보를 이용해서 몇 번 파티션에 메시지를 전송해야하는지 반환해준다.
기본 값인 kafka.producer.DefaultPartitioner 클래스는 다음과 같다.
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
사용자가 입력한 Key 바이트 배열의 해시 값을 구하여 파티션 개수로 모듈라(%) 연산을 하여 파티션 번호를 구해준다. 이 해시 값을 이용하여 메시지 키가 파티션들에 골고루 퍼질 수 있게 한다.
사용자가 메시지의 키 값을 명시하지 않았을 때는 좀 다르게 동작한다. 해시 값을 구할 Key 바이트 배열이 없기 때문에 랜덤하게 파티션을 하나 선정해서 라운드로빈(Round-Robin) 스타일로 파티션을 사용하게 된다.
사용자가 필요하다면 커스텀 파티셔너를 구현할 수도 있다. 단순히 Partitioner 인터페이스를 구현한 다음 프로듀서 설정 중 "partitioner.class" 항목으로 입력하기만 하면 된다.
Integer 타입의 키를 받아서 파티션을 정해주는 커스텀 파티셔너의 예제 코드다. 키 값을 파티션 개수로 모듈라 연산하여 파티션 번호를 얻어내는 단순한 파티셔너다.
파티셔너는 카프카 프로듀서와 컨슈머의 로직에 맞게 잘 구현해 사용하면 된다.
'정리 예정' 카테고리의 다른 글
[Kafka] #6 아파치 카프카 컨슈머와 컨슈머 그룹(Apache Kafka Consumer & Consumer Group) (2) | 2019.01.30 |
---|---|
[Kafka] #5 - 아파치 카프카 프로듀서(Apache Kafka Producer Partitioner) 설정 값 (0) | 2019.01.10 |
[Kafka] #3 - 아파치 카프카 프로듀서 (Apache Kafka Producer) 실행과 간단한 설정 예제(Configuration Example) (1) | 2019.01.07 |
[Kafka] #2 - 아파치 카프카(Apache Kafka) 설치 및 실행, Quickstart (1) | 2018.12.19 |
[Kafka] #1 - 아파치 카프카(Apache Kafka)란 무엇인가? (3) | 2018.12.18 |
댓글