์ด๋ฒ ์๊ฐ์๋ yamlํ์ผ์ ์ฌ์ฉํ์ฌ Docker์์ Kafka๋ฅผ ์คํํด ๋ณด์.
โ . GCP์ Docker์ปจํ ์ด๋๋ก Kafka ํ๊ฒฝ ๊ตฌ์ถํ๊ธฐ
1. GCP VM ์ธ์คํด์ค ์ค์
์ด์ ์ airflow์ ๋น์ทํ ๊ณผ์ ์ด๋ค.
full-stack Kafka yamlํ์ผ์ ์คํํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ์ปดํจํ ์์์ด ๋ง์ vm์์ง์ ์ ํํ์๋ค.
- c2-standard-4
- Ubuntu-20.04 30GB
2. ์๋ฒ์ Docker ์ค์น
โ ์์คํ ์ ํจํค์ง ๋ชฉ๋ก์ ์ต์ ์ํ๋ก ์ ๋ฐ์ดํธ
โก Docker๋ฅผ ๋ค์ด๋ก๋ํ๊ณ ์ค์นํ๊ธฐ ์ํด ํ์ํ ์ถ๊ฐ์ ์ธ ํจํค์ง๋ฅผ ์ค์น(apt-transport-https, ca-certificates, curl, software-properties-common)
โข Docker์ ๊ณต์ GPG ํค๋ฅผ ๋ค์ด๋ก๋ํ์ฌ APT ํจํค์ง ๊ด๋ฆฌ์์ ์ถ๊ฐ
โฃ Docker ํจํค์ง๋ฅผ ์ ๊ณตํ๋ ์ ์ฅ์๋ฅผ ์์คํ ์ ์ถ๊ฐ
โค ์๋ก ์ถ๊ฐ๋ Docker ์ ์ฅ์ ์ ๋ณด๋ฅผ ํฌํจํ์ฌ ํจํค์ง ๋ชฉ๋ก์ ์ ๋ฐ์ดํธ
โฅ ์ค์น ๊ฐ๋ฅํ Docker ๋ฒ์ ์ ๋ชฉ๋ก์ ํ์ โฆ ์์ ์ค์ ํ ์ ์ฅ์๋ฅผ ํตํด Docker Community Edition (CE)๋ฅผ ์ค์น + Docker ์์ง ํ์ฑํ
sudo apt update
sudo apt install apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable"
sudo apt update
apt-cache policy docker-ce
sudo apt install docker-ce
3. ์๋ฒ์ Docker-compose ์ค์น
โ Docker compose ํจํค์ง ๋ค์ด๋ก๋ (2 ๋ฒ์ ์ด์์ ๋ค์ด๋ก๋ํ์. 1 ๋ฒ์ ์ ์ค๋ฅ๊ฐ ๋ง์ด ๋๋ค)
โก ๋ค์ด๋ก๋ํ Docker Compose ๋ฐ์ด๋๋ฆฌ ํ์ผ์ ์คํ ๊ถํ์ ๋ถ์ฌ
โข ํ์ฌ ์ฌ์ฉ์๋ฅผ Docker ๊ทธ๋ฃน์ ์ถ๊ฐ
sudo curl -L https://github.com/docker/compose/releases/download/v2.4.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
sudo usermod -aG docker $USER
4. Docker image๋ก ํ๊ฒฝ ๊ตฌ์ถํ๊ธฐ
1) ๋์ปค ์ด๋ฏธ์ง๊ฐ ์๋ ๊นํ๋ธ ๋ ํฌ์งํ ๋ฆฌ ํด๋ก
git clone https://github.com/conduktor/kafka-stack-docker-compose
2) Kafka ๋์ปค ์ด๋ฏธ์ง ๋ค์ด๋ก๋ ๋ฐ ์ปจํ ์ด๋ ์คํ
kafka-stack-docker-compose ํด๋๋ก ์ด๋ํ ๋ค, ๋์ปค ์ด๋ฏธ์ง๋ฅผ ๋ค์ด๋ก๋ํ๊ณ ์ปจํ ์ด๋๋ฅผ ์คํ์ํจ๋ค.
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml pull
docker compose -f full-stack.yml up
โ ก. Kafka ์นํ์ด์ง ์ ์ํ๊ธฐ
gcp-vm๋จธ์ ์ธ๋ถ ip:8080 ์ฃผ์๋ก ์ ์ํ๋ฉด ๋ค์๊ณผ ๊ฐ์ด kafka conduktor๋ฅผ ์น์ผ๋ก ์ ์ํ ์ ์๋ค.
๋ก๊ทธ์ธ ์์ด๋, ํจ์ค์๋๋ ๋ค์๊ณผ ๊ฐ๋ค.
login: admin@admin.io password: admin
โ ข. python์ ์ด์ฉํด Kafka ์์ ์ํํ๊ธฐ
1. python ๋ชจ๋ ์ค์นํ๊ธฐ
pip3 install kafka-python
2. Producer ๋ง๋ค๊ธฐ ์์
๋ก์ปฌ Kafka ์ธ์คํด์ค๋ฅผ ์ฐ๊ฒฐํ๋ KafkaProducer ๊ฐ์ฒด๋ฅผ ์์ฑ
์ ์กํ๋ ค๋ ๋ฐ์ดํฐ๋ฅผ json ๋ฌธ์์ด๋ก ๋ณํํ ๋ค์ UTF-8๋ก ์ธ์ฝ๋ฉํ์ฌ ์ง๋ ฌํํ๋ ๋ฐฉ๋ฒ์ ์ ์
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=[Broker์ ๋ชฉ๋ก],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
data={key:value}
producer.send('topic ์ด๋ฆ', value=data)
Topic์ด ์ ๋ง๋ค์ด์ง ๊ฒ์ ์น UI์์ ํ์ธํ ์ ์๋ค.
3. Consumer ๋ง๋ค๊ธฐ
consumer = KafkaConsumer(
'์๋นํ topic์ด๋ฆ',
bootstrap_servers=[brokerip:8080์ ์งํฉ],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='๋์ผํ ์์์ ์๋นํ๋ consumer์ ๊ทธ๋ฃน์ผ๋ก ์ง์ ',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
- auto_offset_reset
- earlist : ํ์ ์ ์ฅ๋ ์์๋๋ก ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฒ ๋ค
- latest : ํ์ ์๋ก ์๊ธด ๋ฐ์ดํฐ๋ถํฐ ์ฝ๊ฒ ๋ค
- enable_auto_commit
- True : kafka consumer object๊ฐ ์์์ offset๊ฐ์ ๊ธฐ๋ก
- False : kafka consumer object๊ฐ ์์์ offset๊ฐ์ ๊ธฐ๋กํ์ง ์์(๋ณดํต ํ์ ์์๋ ์ํํด์ ์ ์ฌ์ฉํ์ง ์์. ๋์ , Kafka consumer ํจ์์ค commit์ ์ฌ์ฉํด์ ๋ช ์์ ์ผ๋ก ์์น๋ฅผ ๊ธฐ๋ก)