여러개의 서버에 테스트를 할 수 있는 환경이 아니여서 한 서버에 전부 설치 하여 테스트 하였습니다.
1. https://kafka.apache.org/ 사이트로 접속 하여 파일 다운로드를 한다.
저의 경우 kafka_2.13-2.7.0.tgz 를 다운 받았습니다.
그리고 tar xvfz kafka_2.13-2.7.0.tgz 의 압축을 푼다
mv kafka_2.13-2.7.0 /app/kafka
로 내가 원하는 kafka 어플리케이션의 위치를 이동시켜서 지정해준다.
2. zookeeper 와 kafka의 데이터 디렉토리를 만든다.
zookeeper 의 경우 홀수로 구성 되어야 하는 문제가 있어서 3개의 구성(zookeeper1,zookeeper2,zookeeper3, kafka1, kafka2,kafka3)
으로 구성 합니다.
디렉토리는
/data/zookeeper1
/data/zookeeper2
/data/zookeeper3
3. zookeeper 의 노드 구분 ID 를 생성 해 줌
/data/zookeeper1
/data/zookeeper2
/data/zookeeper3
에 각각
echo 1 > /data/zookeeper1/myid
echo 2 > /data/zookeeper2/myid
echo 3 > /data/zookeeper3/myid
(myid에 넣을 수 있는 숫자는 1~255까지이다.)
을 실행 시켜서 데이터를 넣어 준다.
4. 카프카설치경로(/app/kafka)/config 에서 zookeeper 설정 파일을 수정 해 준다.
cd /app/kafka/config
cp zookeeper.properties zookeeper1.properties
cp zookeeper.properties zookeeper2.properties
cp zookeeper.properties zookeeper3.properties
vi zookeeper1.properties 와 같이 설정 파일을 수정 한다.
tickTime=2000
dataDir=/data/zookeeper1
clientPort=2181
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883
#server.(myid 의 id)= 서버 아이피 : peer 와 peer 끼리 연결 포트 : leader를 선출하기 위한 투표 포트
#원래 peer 연결 포트와 leader 선출 포트는 각 서버마다 동일하게 설정하지만 여기서는 동일 서버에 모두 설치 하기 때문에 다르게 설정함
vi zookeeper2.properties 와 같이 설정 파일을 수정 한다.
tickTime=2000
dataDir=/data/zookeeper2
clientPort=2182
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883
vi zookeeper3.properties 와 같이 설정 파일을 수정 한다.
tickTime=2000
dataDir=/data/zookeeper3
clientPort=2183
initLimit=10
syncLimit=5
server.1=192.168.122.1:28881:38881
server.2=192.168.122.1:28882:38882
server.3=192.168.122.1:28883:38883
zookeeper.properties 설정에 대한 참고 사항은 아래와 같다.
- dataDir : 인메모리 데이터베이스 스냅샷을 저장할 공간. 특정 directory가 명시되지 않으면, 트랜잭션 로그는 database에 기록된다
- clientPort : client의 연결을 listen할 port 번호
- maxClientCnxns : 하나의 클라이언트에서 동시접속하는 개수 제한, 기본값은 60이며, 0은 무제한
- tickTime : ZooKeeper가 사용하는 기본 시간 단위(milliseconds)로, heartbeats를 보내는데에 사용된다. 또한 세션 타임아웃 최소 단위는 tickTime의 2배로 설정
- initLimit : Zookeeper 서버들이 leader에 연결할 때 사용되는 최대 시간을 제한하기 위해 사용하는 timeout 단위
- syncLimit=: 서버가 leader로부터 얼마나 sync에 뒤쳐질 수 있는지를 제한하는 timeout 단위
5. zookeeper 실행, 중지 스크립트 작성
실행 스크립트 위치 : /app/kafka
vi start_zookeeper1.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper1.properties > /dev/null 2>&1 &
vi start_zookeeper2.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper2.properties > /dev/null 2>&1 &
vi start_zookeeper3.sh
nohup /app/kafka/bin/zookeeper-server-start.sh -daemon /app/kafka/config/zookeeper3.properties > /dev/null 2>&1 &
중지 스크립트 위치 : /app/kafka
vi stop_zookeeper1.sh
PID=`ps -ef | grep java | grep "zookeeper1" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}
vi stop_zookeeper2.sh
PID=`ps -ef | grep java | grep "zookeeper2" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}
vi stop_zookeeper3.sh
PID=`ps -ef | grep java | grep "zookeeper3" | grep -v grep | awk '{print $2}'`
echo "PID = $PID"
kill ${PID}
==================== 여기서부터는 kafka 설정
mkdir -p /app/logs/kafka1
mkdir -p /app/logs/kafka2
mkdir -p /app/logs/kafka3
으로 로그 디렉토리를 만들어 놓는다
6. $KAFKA_HOME/config 에서 server.properties 를 가지고
3개의 properties 를 만든다.
cp server.properties server1.properties
cp server.properties server2.properties
cp server.properties server3.properties
7. server.properties 들에 대한 설정을 한다. (kafka의 broker 설정)
vi server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.122.1:9091
advertised.listeners=PLAINTEXT://192.168.122.1:9091
log.dirs=/app/logs/broker1
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183
vi server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.122.1:9092
advertised.listeners=PLAINTEXT://192.168.122.1:9092
log.dirs=/app/logs/broker2
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183
vi server3.properties
broker.id=3
listeners=PLAINTEXT://192.168.122.1:9093
advertised.listeners=PLAINTEXT://192.168.122.1:9093
log.dirs=/app/logs/broker3
log.retention.hours=168
log.flush.interval.ms=5000
log.flush.interval.messages=20000
log.segment.bytes=1073741824
group.initial.rebalance.delay.ms=10000
zookeeper.connect=192.168.122.1:2181,192.168.122.1:2182,192.168.122.1:2183
server.properties 설정에 대한 참고 사항은 아래와 같다.
- broker.id : 브로커간 구분 해야할id 로, 설정하지 않을 경우 유니크한 아이디로 설정 될 것이나 zookeeper 에서 아이디 설정 한 것의 충돌을 피하기 위해 설정해 주는 것이 좋다.
- listeners : 서비스 접속 가능 IP:PORT → Broker 에 접근할 수 있는 URL 정보 (listener.security.protocol.map : HOST : PORT)
- advertised.listeners : Producer와 Consumer가 접근할 호스트와 포트를 지정, 기본값은 listeners를 사용.
- log.retention.hours : 로그 보관 주기 (단위는 시간)
- log.flush.interval.ms : 로그파일 flush interval time
- log.flush.interval.messages : 로그파일 flush 전 보관 메세지 수
- zookeeper.connect : broker topic 정보 등을 저장하는 zookeeper 에 접근 하기 위한 접속 정보
* listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
8. broker 기동 ,중지 스크립트를 생성 한다.
vi /app/kafka/start_kafka1.sh
nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server1.properties > /dev/null 2>&1 &
vi /app/kafka/start_kafka2.sh
nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server2.properties > /dev/null 2>&1 &
vi /app/kafka/start_kafka3.sh
nohup /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server3.properties > /dev/null 2>&1 &
vi /app/kafka/stop_kafka1.sh
PID=`ps -ef | grep java | grep "server1" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
vi /app/kafka/stop_kafka2.sh
PID=`ps -ef | grep java | grep "server2" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
vi /app/kafka/stop_kafka3.sh
PID=`ps -ef | grep java | grep "server3" | grep -v grep | awk '{print $2}'`
echo "$NAME-PID = $PID"
kill ${PID}
==========================================
테스트
토픽 생성
/app/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.122.1:9091 --replication-factor 1 --partitions 1 --topic test_topic
* kafka-topics.sh : 토픽 실행 파일
* --create : 토픽 생성 파라미터
* bootstarap.server : 호스트/포트 지정
* --replication-factor : replica 갯수 지정 (복제본의 수)
* --partitions : paritions 갯수 지정 (토픽을 몇개로 나눌 것인가)
* --topic [이름] : 생성될 토픽의 이름 지정
생성된 토픽 확인
/app/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.122.1:9091
* bin/kafka-topics.sh : 토픽 실행 파일
* --list : 토픽 리스트 파라미터
* --bootstrap-server : 호스트/포트 지정
메시지 생성 (produce)
/app/kafka/bin/kafka-console-producer.sh --broker-list 192.168.122.1:9091 --topic test_topic
* kafka-console-producer.sh : 프로듀서 콘솔 실행파일
* --bootstrap-server : 호스트/포트 지정
* --topic : 메세지를 보낼 토픽 지정
- 위의 명령어를 입력 시 입력 콘솔로 변함
- 이때 기본적으로 한줄 단위로 분리된 메세지로 보내지게 됨
메시지 소비하기 (consume) : topic의 메시지 받아서 읽기
/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.122.1:9091 --topic test_topic --from-beginning
* bin/kafka-console-consumer.sh : 컨슈머 콘솔 실행파일
* --bootstrap-server : 호스트/포트 지정
* --topic : 메세지를 읽을 토픽 지정
* --from-beginning : 메세지를 어디서 부터 읽을 지 지정
댓글