본문 바로가기
정리 예정

[Kafka] #2 - 아파치 카프카(Apache Kafka) 설치 및 실행, Quickstart

by 왕 달팽이 2018. 12. 19.
반응형



아파치 카프카의 웹 페이지를 보면 카프카를 처음 접하는 사람들도 쉽게 써볼 수 있는 "퀵 스타트(Quick Start)" 항목이 있다. 다운로드부터 간단한 실행까지 쉽게 따라할 수 있는 예제들이 정리되어 있다. 


1. 다운로드

카프카 다운로드 페이지에서 카프카 릴리즈 패키지를 다운로드 할 수 있다. 참고로 카프카 아카이브에서 이전 버전을 포함한 다양한 버전의 카프카 릴리즈 패키지를 내려받을 수 있다.


적당한 패키지를 다운로드하고 압축을 풀어준다.


1
2
$ tar -xzf kafka_2.11-2.1.0.tgz
$ cd kafka_2.11-2.1.0​
cs


2. 서버 시작

압축을 풀어내면 카프카를 위한 다양한 바이너리들과 설정 파일들이 들어있다. 


카프카는 주키퍼를 사용해 내부 클러스터 멤버십 관리를 한다. 따라서 카프카를 구동하기 전에 주키퍼가 먼저 실행되어 있어야 한다. 따로 운영중인 주키퍼 앙상블이 없다면 카프카 릴리즈 패키지에 포함되어 있는 주키퍼를 사용한다.


1
2
3
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-12-12 18:31:57,228] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)​
...
cs


bin 디렉토리에 있는 zookeeper-server-start.sh 스크립트는 1개의 서버로 구동되는 주키퍼를 구동시켜준다.


터미널을 하나 더 열어서 카프카 브로커를 구동해보자. 


1
2
3
$ bin/kafka-server-start.sh config/server.properties
[2018-12-12 18:32:31,657] INFO KafkaConfig values: ​
...
cs


카프카 서버가 부팅된다. 이제 카프카 브로커가 구동되었다. 


3. 카프카 토픽 생성

카프카에서 사용할 토픽을 하나 만들어 보자. 카프카는 토픽을 만들 수 있는 스크립트를 제공한다. 


1
2
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
cs


kafka-topics.sh 라는 쉘 스크립트는 카프카 토픽을 만들고 관련 정보를 볼 수 있는 도구다. --topic 옵션으로 명시한 이름으로 토픽이 생성된다. 여기에서는 test 라는 이름의 토픽을 만들었다.


이제 만들어진 토픽을 확인해보자. 


1
2
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
cs


카프카 클러스터에 만들어진 토픽 리스트를 보여주는 명령이다. 방금 만든 test 라는 토픽을 확인할 수 있다. 


만약 브로커를 구동할 때 사용한 설정파일(server.properties)에 "auto.create.topics.enable" 항목이 설정되어 있으면, 프로듀서가 존재하지 않는 토픽에 메시지를 전송할 때 자동으로 토픽이 생성된다. 다만 무분별하게 토픽이 생성될 수 있기 때문에 추천하는 설정은 아니다. 꼭 필요한 경우만 설정한다. 


4. 카프카 메시지 전송(프로듀서)

이제 구동 중인 카프카에 메시지를 전송해보겠다. 


카프카 릴리즈 패키지에는 간단하게 문자열을 카프카로 전송해 볼 수 있는 콘솔 프로듀서가 제공된다. (bin 디렉토리 밑에 kafka-console-producer.sh 스크립트가 이에 해당한다.) 콘솔 프로듀서는 사용자의 표준 입력(Standard Input)에서 받은 입력을 카프카로 전송한다. 


1
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test​
cs


그리고 전송하고 싶은 메시지를 입력한다. 


1
2
Test Message 1
Test Message 2
cs


"Test Message 1", "Test Message 2"라는 문자열을 카프카로 전송했다. 


5. 카프카 메시지 읽기(컨슈머)

이제 전송한 메시지를 읽어보겠다. 콘솔 프로듀서와 마찬가지로 카프카 릴리즈 패키지에는 콘솔 컨슈머도 제공된다. 카프카에서 메시지를 읽어 표준출력(Standard Out)으로 출력해주는 툴이다. 


다음 명령을 수행한다. 


1
2
3
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning​
Test Message 1
Test Message 2
cs


아까 콘솔 프로듀서가 생성해서 전송한 "Test Message 1" 과 "Test Message 2"라는 문자열이 확인되었다. 

6. 여러 브로커 구동하기 

여기까지는 하나의 브로커만 동작하는 상태였다. 브로커를 하나만 띄우면 재미없으니 두 개의 브로커를 더 띄워서 카프카 클러스터에 추가해보겠다. 또 한 카프카 클러스터에서 브로커가 다운되었을 때 제대로 동작하는지도 살펴보겠다.

이 테스트에서 새로운 서버 머신이 필요하지는 않고 첫 번째 구동했던 머신에 두 브로커를 추가로 구동해보겠다.

새로 구동할 두 개의 브로커를 위한 설정 파일을 생성한다. 첫 번째 브로커의 설정파일인 config/server.properties 를 복사해서 사용하겠다. 

1
2
$ cp config/server.properties config/server-1.properties 
$ cp config/server.properties config/server-2.properties

cs


이대로 추가 브로커를 구동하면 구동이 안된다. 각각 브로커들을 구분할 수 있도록 설정 파일의 일부를 수정해야한다. 각 파일의 일부 설정을 다음과 같이 변경한다. 

1
2
3
4
5
6
7
8
9
10
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
cs

server.properties 파일은 다음과 같을 것이다. 

1
2
3
4
config/server.properties:
    broker.id=0
    listeners=PLAINTEXT://:9092
    log.dirs=/tmp/kafka-logs

cs


같은 머신에서 구동될 3개의 브로커가 각자 다른 설정값을 가져야 한다. 

broker.id 라는 설정은 카프카 클러스터를 이루는 브로커들을 구분할 수 있는 Unique 한 값이다. 반드시 순서대로 설정할 필요는 없고 같은 클러스터 내에서 겹치지만 않으면 된다. 
listeners 항목은 클라이언트의 요청을 받기 위한 카프카 브로커의 접속 정보다. 같은 머신에서 구동되기 때문에 서로 다른 포트를 사용하도록 설정해야한다. 
log.dirs 항목은 카프카 브로커가 전송받은 메시지를 저장하기 위한 파일 시스템 경로다. 프로듀서가 카프카에 메시지를 전송하면 브로커가 log.dirs에 저장된 디렉토리를 번갈아가면서 사용하여 파일로 메시지를 저장하게 된다. log.dirs는 1개 이상을 지정할 수 있다. 

설정 파일이 준비되면 2번째, 3번째 카프카 브로커를 구동한다. 주키퍼는 이미 구동중이기 때문에 카프카 브로커만 구동하면 된다. 

1
2
3
4
$  bin/kafka-server-start.sh config/server-1.properties &
... 
$ bin/kafka-server-start.sh config/server-2.properties &
...

cs


& 문자를 덧 붙여서 백그라운드로 카프카 브로커 서버를 구동한다. 구동된 카프카 브로커 서버는 kafka-server-stop.sh 스크립트로 종료시킬 수 있다. 이제 3대의 브로커가 구동되었다.

3대의 브로커를 이용하여 좀 더 향상된 가용성(Availability)을 지원하는 토픽을 만들어보자. "haTopic" 이라는 이름의 토픽을 만들어보자.

1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic haTopic
cs

토픽 생성시 --replication-factor 옵션을 3으로 주었다. 이는 카프카로 입력된 데이터가 3군데 중복해서 저장된다는 의미다. 3개의 복사본이 저장되므로 2대의 브로커에 장애가 생겼을 때에도 해당 메시지가 서비스 될 수 있다.  

1
2
3
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
haTopic
test
cs

생성된 토픽을 조회해보면 haTopic 과 이전에 만들어둔 test 가 있음을 확인할 수 있다. 

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic haTopic
Topic:haTopic    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: haTopic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1​

cs


haTopic 의 정보는 kafka-topics.sh의 --describe 옵션으로 확인 할 수 있다. 

haTopic 은 1개의 파티션으로 구성되어 있으며 RepliationFactor가 3으로 설정되어 있다. 생성할 때 준 값과 동일하다. 

haTopic의 유일한 파티션인 0번 파티션의 리더는 2로 설정되어 있다. 2는 브로커 설정에서 broker.id=2 로 설정되어 있는 브로커를 의미한다. 파티션 0은 3개의 사본으로 카프카에 저장되며 각각 2번 브로커, 0번 브로커, 1번 브로커에 저장된다. ISR 값은 In-Sync Replicas의 약자로 리더가 포함된 브로커가 죽거나 장애가 생겼을 때 새로운 리더로 선출 될 수 있는 브로커의 모임으로 생각하면 된다. 

아까 만들었던 test 토픽의 정보를 확인해보자. 

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0​
cs

아까 만들었던 test 토픽의 ReplicationFactor 는 1 이며 처음 구동했던 0 번 브로커에 모두 저장되어 있음을 확인할 수 있다. 

새로 만든 haTopic 에 메시지를 보내보자. 

1
2
3
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic haTopic
Test message to haTopic1​
Test message to haTopic2
cs

콘솔 프로듀서에서 위와 같은 메시지를 보내면

1
2
3
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic haTopic
Test message to haTopic1​
Test message to haTopic2​
cs

콘솔 컨슈머에서 위와 같이 확인할 수 있다. 

브로커의 개수와 관련없이 동일하게 사용할 수 있다. 

이제 브로커 중 하나를 죽여보자. 

1
2
3
4
$ ps aux | grep server.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
...
$ kill -9 7564
cs

처음에 구동했던 카프카 브로커를 죽였다. 

1
2
3
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic  haTopic
Topic:haTopic    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: haTopic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,1​
cs

haTopic 의 정보를 확인해보면 방금 죽인 0번 브로커가 ISR 에서 빠진 걸 볼 수 있다. 

하지만 haTopic 은 여전히 사용가능하다. 

1
2
3
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic haTopic
Test message to haTopic1​
Test message to haTopic2​
cs

이 때 --bootstrap-server 값으로는 살아있는 다른 서버를 명시하면 된다. 카프카에 저장된 메시지를 확인할 수 있다. 

퀵 스타트를 통해 간단하게 카프카 클러스터를 구축하고 콘솔 컨슈머, 콘솔 프로듀서를 이용해 테스트 해 봤다. 


반응형

댓글