클라우드 엔지니어/클라우드 캠프과정

kafka + django + elasticsearch +logstash+kibana

해아's 2022. 10. 11. 16:53

파이썬 새프로젝트 생성

 

파이썬 > 카프카에게 메세지를 보내는 소스를 작성

 

아래소스는 파이썬코드로 만들어진 test 토픽인 프로듀서이다.

from kafka import KafkaProducer
import time

producer = KafkaProducer(
    bootstrap_servers=['192.168.179.100:9092']
)
start = time.time()

for i in range(100):
    producer.send('test', value="test".encode("utf-8"))
    #              토픽         메세지
    producer.flush()

print("elapsed :", time.time() - start)

실행하면 안된다

카프카 서버가 리스너가 #advertised.listeners=PLAINTEXT://your.host.name:9092

막혀있어서 아이피주소를 바꿔준다

그후

 

파이썬 코드로 만들어진 test토픽을 받아오는 컨슈머를 생성한다.

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['192.168.179.100:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: loads(x.decode('utf-8')),
    consumer_timeout_ms=10000
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value ))

print('[end] get consumer list')

이제 연습한것을 장고에 올려보자

from kafka import KafkaProducer
from json import dumps

# 토픽 방식은 아래 방식으로
'''
 logging.post.like
 logging.post.un_like
 logging.post.create

 보낼떄 제이슨 타입의 형식을 대체적으로 사용한다.

'''
producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['192.168.179.100:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

@login_required(login_url='/accounts/login')
def like(request, bid):
    post = Post.objects.get(id=bid)
    user = request.user
    if post.like.filter(id=user.id).exists():
        post.like.remove(user)
        data = {'user': user.id, 'post': post.id, 'like_cnt': post.like.count()}
        producer.send('logging.post.like', value=data)
        producer.flush()
        return JsonResponse({'message': 'deleted', 'like_cnt': post.like.count()})
    else:
        post.like.add(user)
        data = {'user': user.id, 'post': post.id, 'like_cnt': post.like.count()}
        producer.send('logging.post.like', value=data)
        producer.flush()
        return JsonResponse({'message': 'added', 'like_cnt': post.like.count()})

 

카프카에서 아래의 엔진(커스터머) 에게 보낸다.

Elasticsearch,Kibana,Logstash,Elastic Stack , Beats

https://www.elastic.co/kr/

 

Logstash > 카프카에게서 메세지를 받으면 Elasticsearch에 맞는 형태로 변환

Kibana > Elasticsearch 시각화툴

 

 

실습

가상머신 준비 3대

 

엘라스틱 서치 : 120

로그스테치 : 110

키바나 : 130

systemctl stop firewalld
sudo yum install net-tools

1번서버 셋팅(엘라스틱서치) ip 192.168.179.120

0. 가상머신 준비
		cpu	mem	프로그램
	centos8	2	4	엘라스틱서치 ip:120
	centos8	1	2	로그스태시 ip: 110
	centos8	1	2	키바나 ip: 130

1. 엘라스틱 서치
  1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

  2) 설치
	dnf -y install elasticsearch

  3) 실행
	systemctl enable elasticsearch
	systemctl restart elasticsearch

  4) 확인
	curl http://127.0.0.1:9200

 

https://msyu1207.tistory.com/entry/Elasticsearch-%EC%84%A4%EC%B9%98-%EB%B0%8F-%EC%99%B8%EB%B6%80-%ED%97%88%EC%9A%A9

 

Elasticsearch 설치 및 외부 허용 설정하기 #1

안녕하세요😁 유유자적한 개발자 유로띠 입니다 😉 오늘은 ELK 의 E에 해당하는 Elasticsearch에 대해 포스팅해보도록 하겠습니다 👏👏👏👏 What is Elasticsearch ❓ 텍스트, 숫자, 위치 기반 정보, 정

msyu1207.tistory.com

 

외부접속허용하기

vi /etc/elasticsearch/elasticsearch.yml

56라인 주석제거및 허용아이피
  network.host: 0.0.0.0
74라인주석제거
  cluster.initial_master_nodes: ["node-1", "node-2"]

systemctl restart elasticsearch

 

2번서버 셋팅 ip 192.168.179.110

 

input {
  file {
    type => "seucure_log" #보안로그타입
    path => "/var/log/secure" #로그 주소
  }
}
filter {
  grok {
    add_tag => [ "sshd_fail" ] #sshd 접속오류태그
    match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IP:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }

 #메세지중 Failed 인녀석만 불러오는것이다.
  }
}

output {
  elasticsearch {
    hosts => ["http://엘라스틱서치IP주소:9200"]
    index => "sshd_fail-%{+YYYY.MM}"
  }
}

2. 로그스태시
  1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

  2) 설치
	dnf -y install logstash

  3) 간단한 파이프라인 설정
	chgrp logstash /var/log/secure
	chmod 640 /var/log/secure
	vi /etc/logstash/conf.d/sshd.conf


input {
  file {
    type => "seucure_log"
    path => "/var/log/secure"
  }
}
filter {
  grok {
    add_tag => [ "sshd_fail" ]
    match => { "message" => "Failed %{WORD:sshd_auth_type} for %{USERNAME:sshd_invalid_user} from %{IP:sshd_client_ip} port %{NUMBER:sshd_port} %{GREEDYDATA:sshd_protocol}" }
  }
}

output {
  elasticsearch {
    hosts => ["http://엘라스틱서치IP주소:9200"]
    index => "sshd_fail-%{+YYYY.MM}"
  }
}

  4) 실행
	systemctl enable logstash
	systemctl restart logstash

  5) 확인
	curl 엘라스틱서치IP주소:9200/_cat/indices?v

3번서버까지 설치하면아래와 같이 정보가 나오고

2번서버에 sshd에러 로그를 엘라스틱 서치에 저장하게 하였으니

2번서버에 일부로 로그인에러를 만들면 아래와 같이 생길것이다

 

 

 

3번서버 키바나 서버 ip : 192.168.179.130

 

3. 키바나
  1) 레포지토리 추가
cat > /etc/yum.repos.d/elasticsearch.repo <https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

  2) 설치
	dnf -y install kibana

 
  3) 설정
	vi /etc/kibana/kibana.yml
		# 7번 라인 주석 해제 후 다음과 같이 설정
		server.host: "0.0.0.0"

		# 32번 라인 주석 해제 후 
		elasticsearch.hosts: ["http://    엘라스틱 서치의 IP    :9200"]

  4) 실행
	systemctl enable kibana
	systemctl restart kibana

설정후 시간이좀 지난뒤 접속해보면 대쉬보드형태로 뜬다

 

 

 

설정에서 로그스태쉬에서 설정한 sshd에러를 불러오게 한다

 

 

그러고나면 대쉬보드에서 나타난다

여기까지가 기본이고 다음에는  kafka에서 데이타를 받아오게 설정한다.

 

 

logstash 서버에 설정하기

vi /etc/logstash/conf.d/kafka.conf

input {
  kafka {
    bootstrap_servers => "192.168.179.100:9092"
    topics => ["logging.post.like"]
  }
}

output {
  file {
    path => "/etc/logstash/data/kafka.log"
  }
}


#kafka.log 로그파일 만들기
mkdir /etc/logstash/data
touch /etc/logstash/data/kafka.log
chmod 755 /etc/logstash/data/kafka.log
chown logstash:logstash /etc/logstash/data/kafka.log

systemctl stop logstash
systemctl start logstash

tail -f /etc/logstash/data/kafka.log <<계속읽어오기

output 수정하기

output {
  elasticsearch {
    hosts => ["http://192.168.179.120:9200"]
    index => "post-like-%{+YYYY.MM}"
  }
}

 

하고나면 완료....

고생하였다..

 

 

 

참고자료

더보기

https://needjarvis.tistory.com/607

 

[카프카] Python으로 Kafka에 전송(Producer)하고 가져오기(consumer)

카프카(Kafka)에서는 다양한 언어로 데이터를 주고 받는 기능을 제공하는데 본 포스팅은 파이썬(Python)으로 구현하는 프로듀서(producer)/컨슈머(consumer) 즉 데이터를 보내고 받는 방법을 설명한다.

needjarvis.tistory.com

 

https://jyoondev.tistory.com/185

 

카프카 - (3) 토픽

카프카 토픽 생성 먼저 지난번 포스팅에서 사용한 네트워크를 통한 통신으로 토픽을 생성해본다. 로컬 기기의 kafka_2.12-2.5.0 디렉토리에서 다음 커맨드를 실행해 준다. * 지난 포스팅에서 카프카

jyoondev.tistory.com

 

. 파이썬 producer, consumer
  1) 프로듀서
from kafka import KafkaProducer
import time

producer = KafkaProducer(
    bootstrap_servers=['192.168.100.201:9092']
)
start = time.time()

for i in range(100):
    producer.send('test', value="test".encode("utf-8"))
    producer.flush()

print("elapsed :", time.time() - start)



  2) 컨슈머
from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['192.168.100.201:9092']
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))

print('[end] get consumer list')




=========================================
	json 주고 받기
=========================================

1. 프로듀서
from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['192.168.100.201:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

start = time.time()

for i in range(100):
    data = {'str' : 'result'+str(i)}
    producer.send('test', value=data)
    producer.flush()

print("elapsed :", time.time() - start)



2. 컨슈머
from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['192.168.100.201:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: loads(x.decode('utf-8')),
    consumer_timeout_ms=10000
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value ))

print('[end] get consumer list')

 

728x90
반응형