0. 들어가며

Elasticsearch는 기본적으로 UTC 타임존을 사용하여 데이터를 저장합니다. 이는 시스템 간 일관성을 유지하는 데 유리하지만, 사용자나 클라이언트가 있는 지역에 맞춰진 로컬 타임존으로 데이터를 보여줘야 하는 경우에는 불편할 수 있습니다. 특히 Kibana와 같은 시각화 프로그램은 타임존을 자동으로 변환하여 표시하지만,

ElastAlert2는 기본적으로 이 기능을 제공하지 않기 때문에 알람에 표시되는 시간이 UTC로 나타납니다.

키바나 Discover에 표시되는 timestamp와 실제 document에 보여지는 timestamp
alert발송시 UTC timestamp를 나타내고있다.

1. Enhancement Module

ElastAlert2에서는 알람 발송 전에 사용자의 환경 또는 니즈에 맞춰서 알람 match를 변경할 수 있는 enhancement 모듈을

지원합니다. 이를 통해 타임스탬프 데이터를 알람이 전송되기 전에 적절한 타임존(KST)으로 변환할 수 있습니다.

 

2. 디렉토리/파일 생성

먼저 ElastAlert2의 디렉터리에 아래와 같은 구조로 디렉터리를 생성합니다.

[root@test ~]# cd /elastalert
[root@test elastalert]# mkdir elasticalert_modules
[root@test elastalert]# cd elasticalert_modules/
[root@test elasticalert_modules]# touch __init__.py
[root@test elasticalert_modules]# touch time_enhancement.py
[root@logmonitor elasticalert_modules]# ll
합계 0
-rw-r--r-- 1 root root 0  9월 25 12:25 __init__.py
-rw-r--r-- 1 root root 0  9월 25 12:26 time_enhancement.py

 

 

3. time_enhancement.py 작성

이제 타임존을 변환하는 실제 코드를 작성합니다.앞서 미리 생성해 둔  time_enhancement.py 파일을 수정하여 , ElastAlert2의 elastalert_modules 디렉터리에 저장합니다.

[root@test elastalert]# vi elastalert_modules/time_enhancement.py


from datetime import timedelta, timezone 
from elastalert.enhancements import BaseEnhancement  
from elastalert.util import pretty_ts, ts_to_dt

class TimeEnhancement(BaseEnhancement):

    def process(self, match):
        # 매치에 '@timestamp' 키가 있으면 실행
        if '@timestamp' in match:
            # +9시간(KST: Korean Standard Time) 오프셋을 가진 타임존 생성 ('KST'라는 이름 지정)
            KST = timezone(timedelta(hours=+9), 'KST')
            
            # '@timestamp' 값을 datetime 형식으로 변환
            timestamp = ts_to_dt(match['@timestamp'])
            
            # UTC 시간에서 KST 시간으로 변환한 후, 사람이 읽기 쉽게 포맷하여 'timestamp_kst'에 추가
            match['timestamp_kst'] = pretty_ts(timestamp.astimezone(KST), False)

 

이 코드는 다음과 같이 작동합니다:

  1. 타임스탬프 확인: match 데이터에 @timestamp 필드가 존재하는지 확인합니다. 이 필드가 없으면 타임존 변환이 실행되지 않습니다.
  2. KST 타임존 생성: timezone과 timedelta를 사용하여 UTC+9 시간대에 해당하는 한국 표준시(KST)를 정의합니다.
  3. 타임스탬프 변환: ts_to_dt 함수로 @timestamp 값을 datetime 객체로 변환합니다.
  4. UTC → KST 변환: datetime 객체의 astimezone() 메서드를 사용해 UTC 시간을 KST 시간으로 변환합니다.
  5. KST 시간 추가: 변환된 KST 시간을 사람이 읽기 좋은 형식으로 포맷한 후, timestamp_kst라는 이름으로 match에 추가합니다. 

4. Rule 설정

ElastAlert2의 규칙(rule) 파일에 enhancements 옵션과 선언한 timestamp_kst를 추가하여

위에서 작성한 TimeEnhancement 클래스를 호출하도록 설정하면, 알람이 전송될 때 자동으로 로컬 타임존으로

변환된 시간이 포함됩니다.

[root@test elastalert]# vi rules/file_integrity_rule.yaml
###########중략###############

match_enhancements:
- "elastalert_modules.time_enhancement.TimeEnhancement"
# "filepath.filename.modulename(classname)"으로 작성

#@timestamp,timestamp_kst 각각 출력
alert_text_args: ["timestamp_kst","@timestamp"]
alert_text_type: alert_text_only
alert_text : |
  time : {0}
  UTC : {1}
###중략####

5. 마무리

위와 같이 기본적으로 UTC로 저장되는 Elasticsearch 데이터를 Elastalert2의 enhancement 모듈을 통해 

로컬 타임존으로 변환하여 사용자에게 알맞은 시간대에 맞춰 알람을 전송할 수 있습니다.

 

6. 참고 URL

https://elastalert2.readthedocs.io/en/latest/recipes/adding_enhancements.html#enhancements

 

알람 및 키바나 대시보드 구성과 ES성능 향상을 위해서 로그에서 필요한 부분만 가져오게끔 하는 파싱룰이 정말 중요하다.  logstash에는 grok이라는 filter가 있는데 해당 filter를 통해서 원하는 데로 로그를 파싱 할 수 있다.

grok filter란?
 

Grok filter plugin | Logstash Reference [8.12] | Elastic

Variable substitution in the id field only supports environment variables and does not support the use of values from the secret store.

www.elastic.co

정규화된 패턴을 통하여 로그를 파싱 할 수 있는데, 기본적인 패턴은 grok-patterns 파일을 참조하면 알 수 있다.
*7 버전에서는 grok-patterns 파일이 기본패키지에 patterns하단에 저장되어 있던 거로 기억하는데,
 8 버전부터는 다른 경로에 있는 것으로 보인다. 

기본적으로 아래와 같이 패턴 구문으로 구성된다.

%{SYNTAX:SEMANTIC}

* SYNTAX = 패턴명 : 정규화된 패턴 혹은 커스텀패턴의 이름이다.

* SEMANTIC= 식별자명 : 인덱스에 쌓을 때의 field명이다.

아래는 grok filter 예시문이다.

#로그
Feb 17 14:45:11 logmonitortest sshd[138557]: PAM service(sshd) ignoring max retries; 6 > 3
#파싱룰
%{SYSLOGBASE} PAM service\(sshd\) ignoring max retries\; %{INT:[failcount]} \> %{INT}

해당 룰을 아래 Test grok pattern에 접속하여 돌려보면 결괏값이 나온다.

Test Grok Pattern

SYSLOGBASE라는 패턴을 통해 기본적인 timestamp와 program pid 등은 저장이 되었고,

%{INT:[failcount]} 패턴을 통하여 실패 횟수를 failcount라는 field에 저장했다. 추후 키바나에서 해당 로그를 통하여 시각화를 할 수 있다. 
*만약 특정 패턴 데이터를 인덱스에 굳이 쌓고 싶지 않다면 식별자를 부여하지 않으면 된다. (파싱룰 끝자리 %{INT} 참고) 

 

만약 정규화된 패턴 이외에 커스텀패턴을 원한다면 grok 옵션의 patterns_dir을 통해 커스텀패턴을 만들 수 있다.

위에 예시로 생성한 파싱룰을 ZOSYSPATTERN이라는 커스텀 패턴으로 선언하여 진행해 보겠다.

 

커스텀패턴을 통하여 파싱 하더라도 기존과 같이 동일한 결괏값이 나오는 걸 확인할 수 있다.

 

로그파싱룰을 잡을 때 최대한 정규화된 패턴을 이용하되, 필요시 커스텀하는 게 좋아 보인다.

처음 테스트를 진행할 때 모든 로그에 대해 수집하여 각 로그별로 룰을 하나씩 잡다 보니 굉장히 많은 시간이 소요되었는데, %{SYSLOGBASE}라는 시스로그 기본 패턴을 알게 되어 룰 작성 시간과 룰 길이가 상당 부분 줄어들었다.

 

PCI-DSS 인증을 받게 되어, 기존에 없던 실시간 로그모니터링 시스템구축 프로젝트를 개인적으로 진행하게 되었다. 

로그모니터링 시스템은 상용툴도 있지만, 비용적인 측면을 고려하여 오픈소스 기반의 툴을 찾아보게 되었다.

여러 툴을 알아보던중 가장 유명한 ElasticSearch+Kibana+Logstash+Beats를 이용한 Elastic Stack으로 진행하게되었다.

 

1. 어떤방식으로 구성할것인가?

기본적으로 내부적으로 rsyslog를 통한 로그 수집 서버가 존재하여, 해당 서버에 beats를 올린뒤

로그모니터링 서버에 모니터링을 진행 할 특정 로그를 보내어,  

logstash를 통해 필요한 부분만 파싱하여 ElasticSearch에 보내고 Kibana를 통해 시각화 구성,

모니터링 룰에 따라 ElastAlert을 통해 Telegram 메시지를 발송하려고한다.

추가로 각서버에 MetricBeat를 통해 Metric 모니터링도 진행하고자한다. 로그분석서버의 경우 Docker-Compose를 통해 ELK를 같은 컨테이너 네트워크상에 구성해볼 예정이다.

 

 

2. 어떤 로그를 모니터링할것인가?

PCI-DSS 요건을 살펴보면  아래와 같이 매일 보안로그 및 보안이벤트가 검토되고있는지를 체크하는부분이 있다. 

PCI-DSS version 3.2.1 일부 발췌

모든 보안 장비 및 보안로그에 대한 모니터링이 이루어져야하지만,

1차적으로 운용중인 서버의 OS 보안 로그를 통해 적용해보고자 하며, 추후엔 보안 장비 로그까지 모니터링하고자한다.

 

 

+ Recent posts