본문 바로가기
정리 예정

[Kafka] #3 - 아파치 카프카 프로듀서 (Apache Kafka Producer) 실행과 간단한 설정 예제(Configuration Example)

by 왕 달팽이 2019. 1. 7.
반응형

카프카에 대한 간단한 개념과 카프카 서버 구동 방법에 대해서 간략하게 알아봤다. 



이전 글에서는 콘솔에서 메시지를 전송하고 읽을 수 있는 콘솔 프로듀서와 콘솔 컨슈머를 사용해봤다. 이제 자바(Java)를 이용해서 카프카에 메시지를 전송할 수 있는 프로듀서를 구현해 볼 차례다.


카프카는 자바를 위한 프로듀서 클라이언트 라이브러리를 지원한다. 카프카 라이브러리를 사용하기 위한 메이븐(Maven) Dependency는 다음과 같다.


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

<dependency>

   <groupId>org.apache.kafka</groupId>

   <artifactId>kafka-clients</artifactId>

   <version>2.0.0</version>

</dependency>


메이븐 pom.xml 파일에 추가하고 관련 내용을 import 하면 카프카 클라이언트를 사용할 수 있다. 


테스트를 위해 Quickstart 포스트에서 봤듯이 로컬 환경에 주키퍼와 카프카 서버를 띄우겠다. 또 프로듀서가 전송한 데이터를 확인하기 위해 콘솔 컨슈머를 붙여 놓겠다.


Quickstart 포스트에서 다운받아 놓은 경로에서 다음을 수행한다. 우선


$ bin/zookeeper-server-start.sh config/zookeeper.properties


를 수행해서 주키퍼를 구동시키고


$ bin/kafka-server-start.sh config/server.properties


를 수행해서 카프카 브로커를 구동시킨다. "test"라는 이름의 토픽이 없다면 다음 명령을 수행해서 하나 만들어 준다. 


$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


이제 "test"라는 토픽을 가지고 테스트를 진행한다.


$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning​


"test"라는 이름의 카프카 토픽에 메시지를 전송하는 가장 단순한 코드는 다음과 같다. 



코드를 하나씩 살펴보겠다. 


Properties props = new Properties(); 

props.put("bootstrap.servers", "localhost:9092"); 

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


Producer<String, String> producer = new KafkaProducer<>(props);


카프카 프로듀서는 Properties 객체를 인자로 받는다. 카프카가 동작하기 위해 필요한 설정값들을 Properties 객체에 설정해서 프로듀서 생성자(Constructor)로 넘겨주게 된다. Properties 객체에 설정값을 입력하기 위해 put() 메소드를 사용했다. put() 메소드는 카프카 설정 이름과 그에 해당하는 값을 Key-value 형태로 입력받는다. 


첫 번째 설정인 "bootstrap.servers"는 카프카 프로듀서가 접속할 브로커의 주소다. 카프카 프로듀서가 생성되면 "localhost:9092" 필요한 정보를 요청하게 된다. 이 주소를 통해 특정 토픽-파티션을 어떤 브로커가 가지고 있는지와 같은 정보들을 처음으로 요청하게 된다. 

"bootstrap.servers" 설정은 2개 이상으로 구성될 수도 있다. 하나의 주소만 입력했을 경우, 입력한 브로커에 장애가 생기거나 여러 이유로 접속이 안되면 카프카 프로듀서는 동작하지 않게 된다. 하지만 2개 이상의 주소를 입력한 경우 둘 중 하나만 정상 접속이 되어도 정상적으로 프로듀서가 동작할 수 있게 된다. 


"key.serializer"와 "value.serializer"는 카프카에 입력되는 메시지를 어떤 형태로 저장할 것인가에 대한 내용이다. 카프카는 단순히 문자열을 전송하고 읽는데에 그치지 않고 다양한 클래스들을 전송할 수 있는 기능을 제공한다. 이 때, 전송되는 클래스를 어떤 방식에 따라 바이트 배열(byte array)로 변경할 것인지 명시해야 한다. (예를 들어, 한글 문자를 UNICODE로 인코딩 할지, UTF-8, UTF-16 혹은 EUC-KR로 인코딩 할지에 따라 생성되는 바이트 배열은 다르게 된다.)


key.serializer, value.serializer는 각각 카프카 메시지의 키(key)와 값(value)을 바이트 배열로 만들때 사용할 클래스가 입력된다. 이번 예제에서는 문자열(String)을 카프카로 전송할 것이기 때문에 카프카에 내장된 StringSerializer를 사용했다. 카프카에는 기본적으로 ByteArraySerializer, StringSerializer, IntegerSerializer 가 포함되어 있다.


ProducerRecord<String, String> record = new ProducerRecord<>("test", "String Key", "String Value");


이제 카프카 서버로 보낼 메시지를 만들어 보겠다. 카프카 프로듀서는 프로듀서 레코드(ProducerRecord) 객체를 입력받아 카프카 서버로 전송한다. 전송하고 싶은 메시지를 ProducerRecord 객체로 만들어야 한다. ProducerRecord 클래스는 두 개의 Parameterized type(형인자 타입)을 갖는다. 각각 키와 데이터의 클래스를 의미한다. 우리 예제에서는 둘 다 문자열이므로 <String, String>을 입력한다. 


ProducerRecord 생성자의 첫 번째 인자는 토픽(Topic) 이름이다. "test" 토픽에 메시지를 전송하기 위해 테스트를 했으므로 "test"를 명시한다. 두 번째 인자는 카프카 메시지의 키 값이다. 세 번째는 전송할 데이터를 쓰면된다.


try {

    producer.send(record); 

} catch (Exception e) { 

    e.printStackTrace(); 


producer.close();


이제 만들어진 ProducerRecord 객체를 Producer 객체의 send() 메소드로 전송한다. producer의 send() 메소드 처리 과정에서 네트워크 단절이나 브로커 장애 등의 다양한 상황이 발생할 수 있으므로 발생하는 Exception 들을 적절히 처리해야 한다. 이번에는 그냥 stackTrace만 찍고 넘어간다. 


이 간단한 프로듀서 예제에는 문제가 있다. producer.send() 메소드의 정의를 보면 Future 객체를 반환하게 되어 있다. 위 예제에서는 이 반환값을 무시하고 있다.


카프카는 내부적으로 Sender 쓰레드를 생성하여 카프카 브로커와의 통신을 맡긴다. producer.send() 메소드는 sender 쓰레드가 읽을 수 있는 위치에 ProducerRecord를 전달해놓고 바로 반환된다. 따라서 send() 메소드가 반환된 시점에 카프카 서버로의 전송이 보장되지 않는다. 심지어 백그라운드로 동작하는 Sender 쓰레드가 읽지 못 했을 수도 있다. 


이런 상황에서 장애가 발생했다면, 프로듀서 로직에서는 전송된 것으로 처리 되지만 실제로는 전송되지 않아 메시지 유실이 발생할 수 있다. 따라서 메시지가 전송됨을 보장할 방법이 필요하다. 



가장 간단한 방법은 프로듀서가 send() 메소드로 보낸 메시지를 Sender 쓰레드가 전송할 때까지 기다리는 것이다. send() 메소드가 Future 객체를 반환하기 때문에 이 객체에 get() 메소드를 호출해서 종료를 기다리면 된다. get() 메소드는 메시지가 실제로 전송되어 브로커의 소켓에 전송될 때까지 기다린다. 


이 방법은 메시지가 카프카 브로커로 전송됨을 보장 할 수 있지만 그 때까지 기다리기 때문에 약간의 비효율이 있을 수 있다. 이 비효율이 아깝다면 비동기(asynchronous)적인 방법으로 전송된 사실을 처리할 수 있다.



람다가 아닌 익명 클래스를 사용하면 producer.send() 메소드를 다음과 같이 구현할 수 있다.


try{

    producer.send(record,new Callback() {

        @Override

        public void onCompletion(RecordMetadata recordMetadata,Exception e){

            if(e!=null)

                e.printStackTrace();

        }

    });

} catch(Exception e) {

    e.printStackTrace();

}


이 방법은 처음에 구현했던 방법처럼 바로 반환된다. 하지만 나중에 메시지가 실제로 카프카 브로커 서버로 전송이 되었을 때, 세 번째로 입력된 익명 클래스의 onCompletion 메소드가 실행된다. 


따라서 실제로 메시지가 전송되었을 때 처리되어야 하는 작업을 이 콜백(Callback) 함수로 구현해 놓으면 언젠가 실제로 전송이 되었을 때 실행된다. 사용자 로직에서는 비동기 방식으로 메시지를 전송해 놓고 대기하는 대신 다른 일을 할 수 있어 좀 더 효율적이다. 하지만 전송되기 전에 장애가 발생하거나 전송되고 실행되어야 하는 작업이 있을 때 추가로 로직을 구현해야하는 복잡성(Complexity)이 증가한다는 단점이 있다. 


그 밖에도 카프카 프로듀서와 관련된 설정값들을 많이 있다. 각각의 설정에 따라 카프카 프로듀서의 효율이 달라지기도하고 전송된 메시지가 유실되는 상황을 막을 수도 있다. 카프카 프로듀서 설정에 대한 내용은 별도의 포스트를 할애하여 정리하도록 하겠다.




반응형

댓글