본문 바로가기
KAFKA

[ KAFKA ] 설치 및 테스트

by 정윤재 2021. 3. 11.

여러개의 서버에 테스트를 할 수 있는 환경이 아니여서 한 서버에 전부 설치 하여 테스트 하였습니다.

 

1. https://kafka.apache.org/ 사이트로 접속 하여 파일 다운로드를 한다.

 

저의 경우 kafka_2.13-2.7.0.tgz 를 다운 받았습니다.

그리고 tar xvfz kafka_2.13-2.7.0.tgz 의 압축을 푼다

mv kafka_2.13-2.7.0 /app/kafka

로 내가 원하는 kafka 어플리케이션의 위치를 이동시켜서 지정해준다.

 

2. zookeeper 와 kafka의 데이터 디렉토리를 만든다.

 

zookeeper 의 경우 홀수로 구성 되어야 하는 문제가 있어서 3개의 구성(zookeeper1,zookeeper2,zookeeper3, kafka1, kafka2,kafka3)
으로 구성 합니다.

디렉토리는

/data/zookeeper1
/data/zookeeper2
/data/zookeeper3

 

3. zookeeper 의 노드 구분 ID 를 생성 해 줌

 

/data/zookeeper1
/data/zookeeper2
/data/zookeeper3

 

에 각각

echo 1 > /data/zookeeper1/myid
echo 2 > /data/zookeeper2/myid
echo 3 > /data/zookeeper3/myid

(myid에 넣을 수 있는 숫자는 1~255까지이다.)

을 실행 시켜서 데이터를 넣어 준다.

 

4. 카프카설치경로(/app/kafka)/config 에서 zookeeper 설정 파일을 수정 해 준다.

 

cd /app/kafka/config
cp zookeeper.properties zookeeper1.properties
cp zookeeper.properties zookeeper2.properties
cp zookeeper.properties zookeeper3.properties


vi zookeeper1.properties 와 같이 설정 파일을 수정 한다.

 

tickTime=2000
dataDir=/data/zookeeper1
clientPort=2181
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883

#server.(myid 의 id)= 서버 아이피 : peer 와 peer 끼리 연결 포트 : leader를 선출하기 위한 투표 포트
#원래 peer 연결 포트와 leader 선출 포트는 각 서버마다 동일하게 설정하지만 여기서는 동일 서버에 모두 설치 하기 때문에 다르게 설정함

 

vi zookeeper2.properties 와 같이 설정 파일을 수정 한다.

 

tickTime=2000
dataDir=/data/zookeeper2
clientPort=2182
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883

 

vi zookeeper3.properties 와 같이 설정 파일을 수정 한다.

 

tickTime=2000
dataDir=/data/zookeeper3
clientPort=2183
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883

 

zookeeper.properties 설정에 대한 참고 사항은 아래와 같다.
- dataDir : 인메모리 데이터베이스 스냅샷을 저장할 공간. 특정 directory가 명시되지 않으면, 트랜잭션 로그는 database에 기록된다
- clientPort : client의 연결을 listen할 port 번호
- maxClientCnxns : 하나의 클라이언트에서 동시접속하는 개수 제한, 기본값은 60이며, 0은 무제한
- tickTime : ZooKeeper가 사용하는 기본 시간 단위(milliseconds)로, heartbeats를 보내는데에 사용된다. 또한 세션 타임아웃 최소 단위는 tickTime의 2배로 설정
- initLimit : Zookeeper 서버들이 leader에 연결할 때 사용되는 최대 시간을 제한하기 위해 사용하는 timeout 단위
- syncLimit=: 서버가 leader로부터 얼마나 sync에 뒤쳐질 수 있는지를 제한하는 timeout 단위

 

5. zookeeper 실행, 중지 스크립트 작성

 

실행 스크립트 위치 : /app/kafka

 

vi start_zookeeper1.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper1.properties > /dev/null 2>&1 &
vi start_zookeeper2.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper2.properties > /dev/null 2>&1 &
vi start_zookeeper3.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper3.properties > /dev/null 2>&1 &


중지 스크립트 위치 : /app/kafka

 

vi stop_zookeeper1.sh
PID=`ps -ef | grep java | grep "zookeeper1" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}

vi stop_zookeeper2.sh
PID=`ps -ef | grep java | grep "zookeeper2" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}

vi stop_zookeeper3.sh
PID=`ps -ef | grep java | grep "zookeeper3" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}

 

 

==================== 여기서부터는 kafka 설정

mkdir -p /app/logs/kafka1
mkdir -p /app/logs/kafka2
mkdir -p /app/logs/kafka3

으로 로그 디렉토리를 만들어 놓는다

 

6. $KAFKA_HOME/config 에서 server.properties 를 가지고

   3개의 properties 를 만든다.

 

cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties


7. server.properties 들에 대한 설정을 한다. (kafka의 broker 설정)

 

vi server1.properties

broker.id=1
listeners=PLAINTEXT://192.168.122.1:9091
advertised.listeners=PLAINTEXT://192.168.122.1:9091

log.dirs=/app/logs/broker1

log.retention.hours=168

log.flush.interval.ms=5000

log.flush.interval.messages=20000

log.segment.bytes=1073741824

group.initial.rebalance.delay.ms=10000

zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183

 

vi server2.properties

broker.id=2
listeners=PLAINTEXT://192.168.122.1:9092
advertised.listeners=PLAINTEXT://192.168.122.1:9092

log.dirs=/app/logs/broker2

log.retention.hours=168

log.flush.interval.ms=5000

log.flush.interval.messages=20000

log.segment.bytes=1073741824

group.initial.rebalance.delay.ms=10000

zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183

 

vi server3.properties

broker.id=3
listeners=PLAINTEXT://192.168.122.1:9093
advertised.listeners=PLAINTEXT://192.168.122.1:9093

log.dirs=/app/logs/broker3

log.retention.hours=168

log.flush.interval.ms=5000

log.flush.interval.messages=20000

log.segment.bytes=1073741824

group.initial.rebalance.delay.ms=10000

zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183


server.properties 설정에 대한 참고 사항은 아래와 같다.

- broker.id : 브로커간 구분 해야할id 로, 설정하지 않을 경우 유니크한 아이디로 설정 될 것이나 zookeeper 에서 아이디 설정 한 것의 충돌을 피하기 위해 설정해 주는 것이 좋다.
- listeners : 서비스 접속 가능 IP:PORT → Broker 에 접근할 수 있는 URL 정보 (listener.security.protocol.map : HOST : PORT)
- advertised.listeners : Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용.
- log.retention.hours : 로그 보관 주기 (단위는 시간)
- log.flush.interval.ms : 로그파일 flush interval time
- log.flush.interval.messages : 로그파일 flush 전 보관 메세지 수
- zookeeper.connect : broker topic 정보 등을 저장하는 zookeeper 에 접근 하기 위한 접속 정보

* listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL


8. broker 기동 ,중지 스크립트를 생성 한다.

 

vi /app/kafka/start_kafka1.sh

nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server1.properties > /dev/null 2>&1 &

vi /app/kafka/start_kafka2.sh

nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server2.properties > /dev/null 2>&1 &

vi /app/kafka/start_kafka3.sh

nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server3.properties > /dev/null 2>&1 &

 

vi /app/kafka/stop_kafka1.sh

PID=`ps -ef | grep java | grep "server1" | grep -v grep | awk '{print $2}'`

echo "$NAME-PID = $PID"

kill ${PID}


vi /app/kafka/stop_kafka2.sh

PID=`ps -ef | grep java | grep "server2" | grep -v grep | awk '{print $2}'`

echo "$NAME-PID = $PID"

kill ${PID}


vi /app/kafka/stop_kafka3.sh

PID=`ps -ef | grep java | grep "server3" | grep -v grep | awk '{print $2}'`

echo "$NAME-PID = $PID"

kill ${PID}

==========================================

 

테스트

 

토픽 생성

/app/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.122.1:9091 --replication-factor 1 --partitions 1 --topic test_topic

* kafka-topics.sh : 토픽 실행 파일
* --create : 토픽 생성 파라미터
* bootstarap.server : 호스트/포트 지정
* --replication-factor : replica 갯수 지정 (복제본의 수)
* --partitions : paritions 갯수 지정 (토픽을 몇개로 나눌 것인가)
* --topic [이름] : 생성될 토픽의 이름 지정


생성된 토픽 확인

/app/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.122.1:9091

* bin/kafka-topics.sh : 토픽 실행 파일
* --list : 토픽 리스트 파라미터
* --bootstrap-server : 호스트/포트 지정


메시지 생성 (produce)

/app/kafka/bin/kafka-console-producer.sh --broker-list 192.168.122.1:9091 --topic test_topic
* kafka-console-producer.sh : 프로듀서 콘솔 실행파일
* --bootstrap-server : 호스트/포트 지정
* --topic : 메세지를 보낼 토픽 지정
- 위의 명령어를 입력 시 입력 콘솔로 변함
- 이때 기본적으로 한줄 단위로 분리된 메세지로 보내지게 됨


메시지 소비하기 (consume) : topic의 메시지 받아서 읽기
/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.122.1:9091 --topic test_topic --from-beginning

* bin/kafka-console-consumer.sh : 컨슈머 콘솔 실행파일
* --bootstrap-server : 호스트/포트 지정
* --topic : 메세지를 읽을 토픽 지정
* --from-beginning : 메세지를 어디서 부터 읽을 지 지정

 


댓글