0. 들어가며

ElasticStack을 구축하며 beat와 logstash를 연동하여 운영하는데, beat를 통해 데이터를 수집하는 서버에서 아래 이미지와 같은 에러가 지속적으로 발생함을 알수있었다.

해당 글은 지속적으로 발생하는 해당 에러를 조치하는 방안을 작성한 글이다.

 

1. 원인 파악

원인을 파악하기 위해 클라이언트(beat)에서 로그스태시로 telnet 접속시도를 했는데 정상적으로 커넥션이 되는걸 확인할 수 있었다. 즉 통신에는 문제가 없음을 확인하였고, 로그를 확인해보니 내용에 나와있는 connection reset by peer의 경우 클라이언트가 아닌 서버측에서 RST를 던져서 접속이 끊길때 발생하는 에러이기에, RST를 던진쪽이 서버(logstash)인지 

상단 또는 네트워크장비인지 확인을 하기위해 방화벽장비에서 패킷을 캡쳐하여 와이어샤크로 확인을해봤다.

 위 이미지와 서버내 tcpdump와 같은 추가적인 확인을 통해 서버(logstash)에서 RST를 던지고 있음을 확인하였고, 추가로 서버에서 통신을 끊는 이유를 확인하기 위하여 logstash에서  타임아웃관련 옵션을 공식 docs를 통해 확인해봤다.

Client_inactivity_timeout
 

Beats input plugin | Logstash Reference [8.15] | Elastic

If you configure the plugin to use 'TLSv1.1' on any recent JVM, such as the one packaged with Logstash, the protocol is disabled by default and needs to be enabled manually by changing jdk.tls.disabledAlgorithms in the $JDK_HOME/conf/security/java.security

www.elastic.co


위 공식 Docs 링크에 나와있듯이 logstash의 input plugin의 옵션중 client_inactivity_timeout이라는 옵션이 있으며,

해당 옵션은 클라이언트가 inactivity상태라면 X초후에 끊는다 라는 의미로 보인다.

* Close Idle clients after X seconds of inactivity.

기본값은 60초이므로 즉 예상하건데, beat에서 60초내에 데이터를 보내지않는다면 접속을 끊는다는 의미인거같다.

 

2. 해결방안

 

이미 위에 내용에서 예상했겠지만, 간단한 조치방법은 해당 옵션을 활성화 하여 알맞은 시간을 할당하는 방법이다. 필자는 타임아웃값을 30분(1800초)으로 줬으나, 만약 빈번하게 데이터흐름이 발생한다면 좀더시간을 줄여도 괜찮을듯하다.

 

vi /etc/logstash.conf


input {
  beats {
    port => 5044
    client_inactivity_timeout => 1800
  }
}
####중략######

 

알람 및 키바나 대시보드 구성과 ES성능 향상을 위해서 로그에서 필요한 부분만 가져오게끔 하는 파싱룰이 정말 중요하다.  logstash에는 grok이라는 filter가 있는데 해당 filter를 통해서 원하는 데로 로그를 파싱 할 수 있다.

grok filter란?
 

Grok filter plugin | Logstash Reference [8.12] | Elastic

Variable substitution in the id field only supports environment variables and does not support the use of values from the secret store.

www.elastic.co

정규화된 패턴을 통하여 로그를 파싱 할 수 있는데, 기본적인 패턴은 grok-patterns 파일을 참조하면 알 수 있다.
*7 버전에서는 grok-patterns 파일이 기본패키지에 patterns하단에 저장되어 있던 거로 기억하는데,
 8 버전부터는 다른 경로에 있는 것으로 보인다. 

기본적으로 아래와 같이 패턴 구문으로 구성된다.

%{SYNTAX:SEMANTIC}

* SYNTAX = 패턴명 : 정규화된 패턴 혹은 커스텀패턴의 이름이다.

* SEMANTIC= 식별자명 : 인덱스에 쌓을 때의 field명이다.

아래는 grok filter 예시문이다.

#로그
Feb 17 14:45:11 logmonitortest sshd[138557]: PAM service(sshd) ignoring max retries; 6 > 3
#파싱룰
%{SYSLOGBASE} PAM service\(sshd\) ignoring max retries\; %{INT:[failcount]} \> %{INT}

해당 룰을 아래 Test grok pattern에 접속하여 돌려보면 결괏값이 나온다.

Test Grok Pattern

SYSLOGBASE라는 패턴을 통해 기본적인 timestamp와 program pid 등은 저장이 되었고,

%{INT:[failcount]} 패턴을 통하여 실패 횟수를 failcount라는 field에 저장했다. 추후 키바나에서 해당 로그를 통하여 시각화를 할 수 있다. 
*만약 특정 패턴 데이터를 인덱스에 굳이 쌓고 싶지 않다면 식별자를 부여하지 않으면 된다. (파싱룰 끝자리 %{INT} 참고) 

 

만약 정규화된 패턴 이외에 커스텀패턴을 원한다면 grok 옵션의 patterns_dir을 통해 커스텀패턴을 만들 수 있다.

위에 예시로 생성한 파싱룰을 ZOSYSPATTERN이라는 커스텀 패턴으로 선언하여 진행해 보겠다.

 

커스텀패턴을 통하여 파싱 하더라도 기존과 같이 동일한 결괏값이 나오는 걸 확인할 수 있다.

 

로그파싱룰을 잡을 때 최대한 정규화된 패턴을 이용하되, 필요시 커스텀하는 게 좋아 보인다.

처음 테스트를 진행할 때 모든 로그에 대해 수집하여 각 로그별로 룰을 하나씩 잡다 보니 굉장히 많은 시간이 소요되었는데, %{SYSLOGBASE}라는 시스로그 기본 패턴을 알게 되어 룰 작성 시간과 룰 길이가 상당 부분 줄어들었다.

 

PCI-DSS 인증을 받게 되어, 기존에 없던 실시간 로그모니터링 시스템구축 프로젝트를 개인적으로 진행하게 되었다. 

로그모니터링 시스템은 상용툴도 있지만, 비용적인 측면을 고려하여 오픈소스 기반의 툴을 찾아보게 되었다.

여러 툴을 알아보던중 가장 유명한 ElasticSearch+Kibana+Logstash+Beats를 이용한 Elastic Stack으로 진행하게되었다.

 

1. 어떤방식으로 구성할것인가?

기본적으로 내부적으로 rsyslog를 통한 로그 수집 서버가 존재하여, 해당 서버에 beats를 올린뒤

로그모니터링 서버에 모니터링을 진행 할 특정 로그를 보내어,  

logstash를 통해 필요한 부분만 파싱하여 ElasticSearch에 보내고 Kibana를 통해 시각화 구성,

모니터링 룰에 따라 ElastAlert을 통해 Telegram 메시지를 발송하려고한다.

추가로 각서버에 MetricBeat를 통해 Metric 모니터링도 진행하고자한다. 로그분석서버의 경우 Docker-Compose를 통해 ELK를 같은 컨테이너 네트워크상에 구성해볼 예정이다.

 

 

2. 어떤 로그를 모니터링할것인가?

PCI-DSS 요건을 살펴보면  아래와 같이 매일 보안로그 및 보안이벤트가 검토되고있는지를 체크하는부분이 있다. 

PCI-DSS version 3.2.1 일부 발췌

모든 보안 장비 및 보안로그에 대한 모니터링이 이루어져야하지만,

1차적으로 운용중인 서버의 OS 보안 로그를 통해 적용해보고자 하며, 추후엔 보안 장비 로그까지 모니터링하고자한다.

 

 

+ Recent posts