0. 들어가며

ElasticStack을 구축하며 beat와 logstash를 연동하여 운영하는데, beat를 통해 데이터를 수집하는 서버에서 아래 이미지와 같은 에러가 지속적으로 발생함을 알수있었다.

해당 글은 지속적으로 발생하는 해당 에러를 조치하는 방안을 작성한 글이다.

 

1. 원인 파악

원인을 파악하기 위해 클라이언트(beat)에서 로그스태시로 telnet 접속시도를 했는데 정상적으로 커넥션이 되는걸 확인할 수 있었다. 즉 통신에는 문제가 없음을 확인하였고, 로그를 확인해보니 내용에 나와있는 connection reset by peer의 경우 클라이언트가 아닌 서버측에서 RST를 던져서 접속이 끊길때 발생하는 에러이기에, RST를 던진쪽이 서버(logstash)인지 

상단 또는 네트워크장비인지 확인을 하기위해 방화벽장비에서 패킷을 캡쳐하여 와이어샤크로 확인을해봤다.

 위 이미지와 서버내 tcpdump와 같은 추가적인 확인을 통해 서버(logstash)에서 RST를 던지고 있음을 확인하였고, 추가로 서버에서 통신을 끊는 이유를 확인하기 위하여 logstash에서  타임아웃관련 옵션을 공식 docs를 통해 확인해봤다.

Client_inactivity_timeout
 

Beats input plugin | Logstash Reference [8.15] | Elastic

If you configure the plugin to use 'TLSv1.1' on any recent JVM, such as the one packaged with Logstash, the protocol is disabled by default and needs to be enabled manually by changing jdk.tls.disabledAlgorithms in the $JDK_HOME/conf/security/java.security

www.elastic.co


위 공식 Docs 링크에 나와있듯이 logstash의 input plugin의 옵션중 client_inactivity_timeout이라는 옵션이 있으며,

해당 옵션은 클라이언트가 inactivity상태라면 X초후에 끊는다 라는 의미로 보인다.

* Close Idle clients after X seconds of inactivity.

기본값은 60초이므로 즉 예상하건데, beat에서 60초내에 데이터를 보내지않는다면 접속을 끊는다는 의미인거같다.

 

2. 해결방안

 

이미 위에 내용에서 예상했겠지만, 간단한 조치방법은 해당 옵션을 활성화 하여 알맞은 시간을 할당하는 방법이다. 필자는 타임아웃값을 30분(1800초)으로 줬으나, 만약 빈번하게 데이터흐름이 발생한다면 좀더시간을 줄여도 괜찮을듯하다.

 

vi /etc/logstash.conf


input {
  beats {
    port => 5044
    client_inactivity_timeout => 1800
  }
}
####중략######

 

0. 들어가며

auditbeat가 구동중인 linux 서버에서 rsyslog를 연동 할 경우 messages 로그에

설정한 auditbeat log가 남는것을 확인 할 수 있다. 이는 로그를 관리해야하는 입장에서 시스템 로그와 섞이기에 로그확인에 불편항 사항을 초래 할 수있다. 하지만 추가 설정 없이 기본 설정으로 운영 할 경우 rsyslog를 통하여 messages 로그에 남는 auditbeat 로그의 컨트롤 (ex.예외처리,파일분리 등)이 불가한데 아래는 이와같은 문제를 해결해나가는 과정이다.

1. auditbeat facility 확인

auditbeat 의 공식 docs를 찾아봐도 syslog의 facility에 대한 내용이 남겨져있는건 확인을 못했다.

(필자가 못찾은거일수도..)

서치중에 아래와 같이 libbeat내에 하드코딩된 내역을 확인 할 수 있었다. 확인된 내용으로 설정을 진행해보자.

 

2. 설정

위 내용을 토대로 syslog상에서 local0 퍼실리티에 대해 beat.log로 출력 및 messages에 예외처리 설정을 진행했다

[root@test ~]# vi /etc/rsyslog.conf

*.info;mail.none;authpriv.none;cron.none;local0.none                /var/log/messages
local0.*		/var/log/beat.log

그러나 여전히 messages log에 쌓이며 beat.log는 쌓이지않는 현상이 발생하였다.

관련하여 공식 문서를 좀더 확인해보던중 아래와 같은 옵션을 확인할수있었다.

 

messages에 자동으로 남는걸보고 syslog에 대한 설정이 기본적으로 되어있는거라 생각했는데,

해당 옵션이 있는걸 보고 auditbeat.yml에 아래와 같이 추가 설정을 진행해주었다.

[root@test ~]# vi /etc/auditbeat/auditbeat.yml

logging.to_syslog: true

위와 같이 진행 후 rsyslog에 설정된 내역대로 로그가 정상적 쌓이는걸 확인 할 수있었다. 

 

3. 결론

공식DOCS에 나와있는 여러 옵션 중 쓸모없는 옵션은 없다라는걸 느끼게 됐다.

messages에 남는걸보고 당연히 syslog 설정이 되어있다고 생각한점도 반성해야 될 부분중 하나인것 같다.

0. 들어가며

Elasticsearch는 기본적으로 UTC 타임존을 사용하여 데이터를 저장합니다. 이는 시스템 간 일관성을 유지하는 데 유리하지만, 사용자나 클라이언트가 있는 지역에 맞춰진 로컬 타임존으로 데이터를 보여줘야 하는 경우에는 불편할 수 있습니다. 특히 Kibana와 같은 시각화 프로그램은 타임존을 자동으로 변환하여 표시하지만,

ElastAlert2는 기본적으로 이 기능을 제공하지 않기 때문에 알람에 표시되는 시간이 UTC로 나타납니다.

키바나 Discover에 표시되는 timestamp와 실제 document에 보여지는 timestamp
alert발송시 UTC timestamp를 나타내고있다.

1. Enhancement Module

ElastAlert2에서는 알람 발송 전에 사용자의 환경 또는 니즈에 맞춰서 알람 match를 변경할 수 있는 enhancement 모듈을

지원합니다. 이를 통해 타임스탬프 데이터를 알람이 전송되기 전에 적절한 타임존(KST)으로 변환할 수 있습니다.

 

2. 디렉토리/파일 생성

먼저 ElastAlert2의 디렉터리에 아래와 같은 구조로 디렉터리를 생성합니다.

[root@test ~]# cd /elastalert
[root@test elastalert]# mkdir elasticalert_modules
[root@test elastalert]# cd elasticalert_modules/
[root@test elasticalert_modules]# touch __init__.py
[root@test elasticalert_modules]# touch time_enhancement.py
[root@logmonitor elasticalert_modules]# ll
합계 0
-rw-r--r-- 1 root root 0  9월 25 12:25 __init__.py
-rw-r--r-- 1 root root 0  9월 25 12:26 time_enhancement.py

 

 

3. time_enhancement.py 작성

이제 타임존을 변환하는 실제 코드를 작성합니다.앞서 미리 생성해 둔  time_enhancement.py 파일을 수정하여 , ElastAlert2의 elastalert_modules 디렉터리에 저장합니다.

[root@test elastalert]# vi elastalert_modules/time_enhancement.py


from datetime import timedelta, timezone 
from elastalert.enhancements import BaseEnhancement  
from elastalert.util import pretty_ts, ts_to_dt

class TimeEnhancement(BaseEnhancement):

    def process(self, match):
        # 매치에 '@timestamp' 키가 있으면 실행
        if '@timestamp' in match:
            # +9시간(KST: Korean Standard Time) 오프셋을 가진 타임존 생성 ('KST'라는 이름 지정)
            KST = timezone(timedelta(hours=+9), 'KST')
            
            # '@timestamp' 값을 datetime 형식으로 변환
            timestamp = ts_to_dt(match['@timestamp'])
            
            # UTC 시간에서 KST 시간으로 변환한 후, 사람이 읽기 쉽게 포맷하여 'timestamp_kst'에 추가
            match['timestamp_kst'] = pretty_ts(timestamp.astimezone(KST), False)

 

이 코드는 다음과 같이 작동합니다:

  1. 타임스탬프 확인: match 데이터에 @timestamp 필드가 존재하는지 확인합니다. 이 필드가 없으면 타임존 변환이 실행되지 않습니다.
  2. KST 타임존 생성: timezone과 timedelta를 사용하여 UTC+9 시간대에 해당하는 한국 표준시(KST)를 정의합니다.
  3. 타임스탬프 변환: ts_to_dt 함수로 @timestamp 값을 datetime 객체로 변환합니다.
  4. UTC → KST 변환: datetime 객체의 astimezone() 메서드를 사용해 UTC 시간을 KST 시간으로 변환합니다.
  5. KST 시간 추가: 변환된 KST 시간을 사람이 읽기 좋은 형식으로 포맷한 후, timestamp_kst라는 이름으로 match에 추가합니다. 

4. Rule 설정

ElastAlert2의 규칙(rule) 파일에 enhancements 옵션과 선언한 timestamp_kst를 추가하여

위에서 작성한 TimeEnhancement 클래스를 호출하도록 설정하면, 알람이 전송될 때 자동으로 로컬 타임존으로

변환된 시간이 포함됩니다.

[root@test elastalert]# vi rules/file_integrity_rule.yaml
###########중략###############

match_enhancements:
- "elastalert_modules.time_enhancement.TimeEnhancement"
# "filepath.filename.modulename(classname)"으로 작성

#@timestamp,timestamp_kst 각각 출력
alert_text_args: ["timestamp_kst","@timestamp"]
alert_text_type: alert_text_only
alert_text : |
  time : {0}
  UTC : {1}
###중략####

5. 마무리

위와 같이 기본적으로 UTC로 저장되는 Elasticsearch 데이터를 Elastalert2의 enhancement 모듈을 통해 

로컬 타임존으로 변환하여 사용자에게 알맞은 시간대에 맞춰 알람을 전송할 수 있습니다.

 

6. 참고 URL

https://elastalert2.readthedocs.io/en/latest/recipes/adding_enhancements.html#enhancements

 

알람 및 키바나 대시보드 구성과 ES성능 향상을 위해서 로그에서 필요한 부분만 가져오게끔 하는 파싱룰이 정말 중요하다.  logstash에는 grok이라는 filter가 있는데 해당 filter를 통해서 원하는 데로 로그를 파싱 할 수 있다.

grok filter란?
 

Grok filter plugin | Logstash Reference [8.12] | Elastic

Variable substitution in the id field only supports environment variables and does not support the use of values from the secret store.

www.elastic.co

정규화된 패턴을 통하여 로그를 파싱 할 수 있는데, 기본적인 패턴은 grok-patterns 파일을 참조하면 알 수 있다.
*7 버전에서는 grok-patterns 파일이 기본패키지에 patterns하단에 저장되어 있던 거로 기억하는데,
 8 버전부터는 다른 경로에 있는 것으로 보인다. 

기본적으로 아래와 같이 패턴 구문으로 구성된다.

%{SYNTAX:SEMANTIC}

* SYNTAX = 패턴명 : 정규화된 패턴 혹은 커스텀패턴의 이름이다.

* SEMANTIC= 식별자명 : 인덱스에 쌓을 때의 field명이다.

아래는 grok filter 예시문이다.

#로그
Feb 17 14:45:11 logmonitortest sshd[138557]: PAM service(sshd) ignoring max retries; 6 > 3
#파싱룰
%{SYSLOGBASE} PAM service\(sshd\) ignoring max retries\; %{INT:[failcount]} \> %{INT}

해당 룰을 아래 Test grok pattern에 접속하여 돌려보면 결괏값이 나온다.

Test Grok Pattern

SYSLOGBASE라는 패턴을 통해 기본적인 timestamp와 program pid 등은 저장이 되었고,

%{INT:[failcount]} 패턴을 통하여 실패 횟수를 failcount라는 field에 저장했다. 추후 키바나에서 해당 로그를 통하여 시각화를 할 수 있다. 
*만약 특정 패턴 데이터를 인덱스에 굳이 쌓고 싶지 않다면 식별자를 부여하지 않으면 된다. (파싱룰 끝자리 %{INT} 참고) 

 

만약 정규화된 패턴 이외에 커스텀패턴을 원한다면 grok 옵션의 patterns_dir을 통해 커스텀패턴을 만들 수 있다.

위에 예시로 생성한 파싱룰을 ZOSYSPATTERN이라는 커스텀 패턴으로 선언하여 진행해 보겠다.

 

커스텀패턴을 통하여 파싱 하더라도 기존과 같이 동일한 결괏값이 나오는 걸 확인할 수 있다.

 

로그파싱룰을 잡을 때 최대한 정규화된 패턴을 이용하되, 필요시 커스텀하는 게 좋아 보인다.

처음 테스트를 진행할 때 모든 로그에 대해 수집하여 각 로그별로 룰을 하나씩 잡다 보니 굉장히 많은 시간이 소요되었는데, %{SYSLOGBASE}라는 시스로그 기본 패턴을 알게 되어 룰 작성 시간과 룰 길이가 상당 부분 줄어들었다.

 

인프라 담당자로 경력을 쌓다 보니, 쿼리나 이런 부분에 대해서는 예전 호스팅업체를 다닐 때 mysql에서 select정도만 

사용해 보고 지식이 전무한 상태이다. Elastic Stack을 임시 구축하며 느끼기에 Elastic Stack에 대한 구축도 중요하지만

운영도 해야기에 껍질만 구축해선 안된다는 생각이 들었다.
그리고 정작 원하는 데이터를 가지고 알람 설정등을 진행해야하는데, 개발 인력의 도움 없이 혼자서 진행하려고 하니, 

구축은 어느정도 했으나  엘라스틱서치에 데이터 조회 부분부터 막막하여 , ElasticSearch의 기초적인 부분을 공부하고자 한다.

 

ElasticSearch 중요한 개념 두 가지

 

첫 번째. Documents 

데이터베이스적 관점에서 생각한다면 문서는 어떤 Entity를 나타내는 데이터베이스에서 한 줄의 행과 같은 역할을 하며 검색의 대상이다. 모든 문서는 유니크한 ID를 가지고 있으며 직접 할당할 수도 있다.

두 번째. Indices

 Index는 ES에서 쿼리 할 수 있는 가장 높은 단계의 Entity이며, 여러 문서를 포함할 수 있다. 데이터베이스로 치면 테이블과 같은 개념이다. Index는 데이터유형의 정의하는 체계가 있으며, 하나의 ES와 index에는 한 가지 유형의 Documents만 사용할 수 있다.

*기존 데이터베이스와 비교하자면, 클러스터(데이터베이스) 인덱스(테이블) 문서(행)로 생각하면 이해하기가 쉽다.

 

ElasticSearch 통신 방법 및 CRUD

*ElasticSearch는 RESTful API 기반으로  통신을 한다.

이는 HTTP요청과 응답으로 ElasticSearch와 소통을 한다는 의미이며, 아래와 같은 메서드 동작을 통해 소통한다.

HTTP 메서드 CRUD SQL
GET Read Select
PUT Update Update
POST Create Insert
DELETE Delete Delete

* CRUD란 (Create Read Update Delete)를 의미한다. 

 

Elastic Search 필터/쿼리 동작

필터명 필터동작 쿼리명 쿼리동작
terms 용어 필터 match_all 모두 반환되는 기본값,미지정시 모든결과반환
range gt(초과),gte(이상),lt(미만),lte(이하) match 특정 단어에 대한 검색
exist 존재여부 multi_match 여러필드 동시에 쿼리
missing 해당 필터가 없는문서 bool bool 필터와 동일
bool must(and),must not(not),shoud(or) match_phrase 구절 매치
(함께 붙어있어야함)
slop사용에 따라 변경

예시 )  
* index_test의 모든 값 반환

curl -XGET -H 'Content-Type: application/json' http://127.0.0.1:9200/index_test/_search?pretty -d '{ "query": { "match_all":{} } }'  | more


* secure_test index의 syslog에 pam이 포함되고, pid가 300000 이하인 값을 조회.

curl -XGET -H 'Content-Type: application/json' http://127.0.0.1:9200/secure-test/_search?pretty -d '{
"query": { "bool": { "must": { "term": {"syslog":"pam"}},"filter":{"range":{"pid": {"lte": 300000}}} } } }'

 

*구문 쿼리는 쿼리 블록으로 싸여있고, 필터는 필터 블록으로 싸여있으며 쿼리 내에서 필터블록 필터 내에서 쿼리를 사용할 수 있다.

 

* 구절 매치 쿼리 match_phrase의 경우 함께 붙어있어야 정상조회 되지만, "slop"을 사용 시에 단어 사이에 slop 수만큼 단어를 매치시킬 수 있다.

curl -XGET -H 'Content-Type: application/json' http://127.0.0.1:9200/secure-test-8.11.4-2024.02.01/_search?pretty -d '
{
"query": {
"match_phrase": {
"syslog_detail": {"query": "pam_unix(sshd:session): opened","slop":1}
}
}
}'

 * 결괏값 (pam_unix(sshd:session): 과 opened사이에 session이 들어갔으나 정상적으로 조회)

--중략--
"syslog_detail" : "pam_unix(sshd:session): session opened for user root by (uid=0)"

*추가로 반전된 구절도 매치할 수 있음, slop 1 설정 시 opend pam_unix(sshd:session): 와도 일치한다는 의미.

 

 

+ Recent posts