본문 바로가기
정리 예정

[Kafka] #4 - 아파치 카프카 프로듀서(Apache Kafka Producer Partitioner) 파티셔너

by 왕 달팽이 2019. 1. 9.
반응형

카프카 프로듀서에서 생각해봐야 할 프로퍼티 중 하나로 파티셔너(Partitioner)다. 파티셔너는 카프카 프로듀서로 메시지를 전송 할 때 토픽의 어떤 파티션으로 전송해야 할지를 결정해주는 클래스다.


카프카의 생성자에 넘겨주는 인자 중 하나인 Properties 객체에 "partitioner.class"라는 이름의 프로퍼티로 설정하면 된다. 별도로 설정하지 않으면 kafka.producer.DefaultPartitioner 클래스가 할당된다. 


아파치 카프카 프로듀서의 파티셔너(Partitioner)

파티셔너로 사용할 수 있는 클래스는 org.apache.kafka.client.producer.Partitioner 인터페이스를 구현해야 한다. Partitioner 인터페이스는 다음과 같다. 


public interface Partitioner extends Configurable, Closeable { 


    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); 

    

    void close(); 

}


partition() 메소드의 입력 인자중 topic 은 메시지를 전송할 토픽을 의미한다. key, keyBytes 는 Key 클래스와 Key를 변환한 바이트 배열이다. value와 valuesBytes도 마찬가지로 키에 해당하는 데이터 클래스와 바이트 배열을 의미한다. Cluster 객체는 카프카 프로듀서가 넘겨주는 메타데이터로 토픽이 갖는 파티션 정보와 그 파티션이 어떤 브로커 서버에 있는지를 알 수 있게 한다. 


이런 정보를 이용해서 몇 번 파티션에 메시지를 전송해야하는지 반환해준다. 


기본 값인 kafka.producer.DefaultPartitioner 클래스는 다음과 같다. 


public class DefaultPartitioner implements Partitioner {


    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();


    public void configure(Map<String, ?> configs) {}


    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        if (keyBytes == null) {

            int nextValue = nextValue(topic);

            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

            if (availablePartitions.size() > 0) {

                int part = Utils.toPositive(nextValue) % availablePartitions.size();

                return availablePartitions.get(part).partition();

            } else {

                // no partitions are available, give a non-available partition

                return Utils.toPositive(nextValue) % numPartitions;

            }

        } else {

            // hash the keyBytes to choose a partition

            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

        }

    }


    private int nextValue(String topic) {

        AtomicInteger counter = topicCounterMap.get(topic);

        if (null == counter) {

            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

            if (currentCounter != null) {

                counter = currentCounter;

            }

        }

        return counter.getAndIncrement();

    }


    public void close() {}


}


사용자가 입력한 Key 바이트 배열의 해시 값을 구하여 파티션 개수로 모듈라(%) 연산을 하여 파티션 번호를 구해준다. 이 해시 값을 이용하여 메시지 키가 파티션들에 골고루 퍼질 수 있게 한다. 


사용자가 메시지의 키 값을 명시하지 않았을 때는 좀 다르게 동작한다. 해시 값을 구할 Key 바이트 배열이 없기 때문에 랜덤하게 파티션을 하나 선정해서 라운드로빈(Round-Robin) 스타일로 파티션을 사용하게 된다.


사용자가 필요하다면 커스텀 파티셔너를 구현할 수도 있다. 단순히 Partitioner 인터페이스를 구현한 다음 프로듀서 설정 중 "partitioner.class" 항목으로 입력하기만 하면 된다.



Integer 타입의 키를 받아서 파티션을 정해주는 커스텀 파티셔너의 예제 코드다. 키 값을 파티션 개수로 모듈라 연산하여 파티션 번호를 얻어내는 단순한 파티셔너다.


파티셔너는 카프카 프로듀서와 컨슈머의 로직에 맞게 잘 구현해 사용하면 된다. 




반응형

댓글