[DevOps] k8s Log 수집 시스템 구성

안녕하세요? 정리하는 개발자 워니즈입니다. 이번시간에는 k8s에서의 로그 수집에에 대해서 정리를 해보도록 하겠습니다. 작년에 포스팅했던 글 중에서 k8s에서의 로깅이 어떤식으로 이뤄지는지에 대해서 정리한 내용이 있습니다.

Docker Container에 대해서 어플리케이션을 구동하게 되면, /var/lib/docker/containers라는 공간안에 도커의 UUID값을 갖고 폴더를 생성하게 됩니다. 그리고 하위로 들어가면 json포맷의 로그를 남기고 있습니다.

1. 로깅 아키텍처 구성

쿠버네티스에서 로그를 집계하는 방식에는 다음 3가지의 방법이 존재합니다.

1) Daemonset 사용

Daemonset은 특정 포드가 항상 모든 클러스터 노드에서 실행되고 있는지 확인합니다. 이 포드는 에이전트 이미지(예:fluentd)를 실행하고 노드에서 중앙 서버로 로그를 보내는 역햘을 합니다.

Logging_1

2) Sidecar 사용

사이드카는 응용 프로그램 컨테이너와 같은 포드에서 실행되는 컨테이너를 참조하는데 사용되는 용어입니다. 포드가 작동하는 방식으로 인해 사이드카 컨테이너는 동일한 볼륨에 액세스 할 수 있으며 다른 컨테이너와 동일한 네트워크 인터페이스를 공유 합니다. 사이드카 컨테이너는 애플리케이션에서 로그를 가져와 보낼 수 있습니다.

Loggin_2

3) 애플리케이션 사용

중앙 로그 서버에 주기적으로 로그를 보내도록 애플리케이션을 간단하게 설계할 수 있습니다. 그러나 이 방법은 응용 프로그램이 해당 로그 서버에 밀접하게 연결되어 있으므로 권장되지 않습니다. 서버에서 허용하는 특정 형식으로 로그를 생성해야 합니다.

Logging_3

설계한 아키텍처

Logging_4

필자가 구상한 아키텍처는 위의 그림과 가장 유사합니다. 사실 사내에 그려놓은 아키텍처가 있지만, 가져올 수 가 없어서 유사한 아키텍처로 대체를 합니다.

한가지 다른부분은 Buffer Layer입니다. 위의 그림에서 Kafka 이전 레이어에서 Springframework app을 통해서 fluentd로부터 받은 chunk들에 대해서 다시한번 queing을 진행합니다. 이후에 kafka로 보내서 다시한번 Buffer를 두고 저장소인 elasticsearch로전송 합니다.

  • Source : Fluentd를 DaemonSet으로 각 Worker Node에 배포 ( Log Aggregation )
  • Bridge : Fluentd Gateway로 호출 ( Buffer Layer )
  • Data Sink : Elasticsearch로 데이터 적재 ( Storage Layer )
  • Visualize : 전사 IU 시스템을 통해서 데이터 확인 ( View Layer )

2. 로그 포맷 확인

위의 아키텍처를 정의했고, Daemonset 구성으로 Fluentd어플리케이션을 각 Worker Node에 구동하여 수집하기로 했습니다. 그러면 수집하고자 하는 로그 포맷에 대한 정의가 필요합니다.

필자는 2가지의 로그를 수집하기 위해 다음과 같이 분석을 했습니다.

1) SpringBoot Log ( App )

수집하고자 하는 App Log의 기본 포맷은 스프링 부트의 기본 로그 appender를 사용하고있었습니다.

Spring boot Logging
스프링의 로그 포맷은 위의 하이퍼링크를 참고 부탁드립니다.

2019-03-05 10:57:51.112  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52
2019-03-05 10:57:51.253  INFO 45469 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2019-03-05 10:57:51.253  INFO 45469 --- [ost-startStop-1] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1358 ms
2019-03-05 10:57:51.698  INFO 45469 --- [ost-startStop-1] o.s.b.c.e.ServletRegistrationBean        : Mapping servlet: 'dispatcherServlet' to [/]
2019-03-05 10:57:51.702  INFO 45469 --- [ost-startStop-1] o.s.b.c.embedded.FilterRegistrationBean  : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]

The following items are output:

  • Date and Time: Millisecond precision and easily sortable.
  • Log Level: ERROR, WARN, INFO, DEBUG, or TRACE.
  • Process ID.
  • A --- separator to distinguish the start of actual log messages.
  • Thread name: Enclosed in square brackets (may be truncated for console output).
  • Logger name: This is usually the source class name (often abbreviated).
  • The log message.

2) Nginx ingress Controller ( Access )

수집하고자 하는 Access Log의 기본 포맷은 Ingress Nginx의 기본 로그 포맷을 사용했습니다.

Ingress Nginx Logging
Ingress Nginx의 로그 포맷은 위의 하이퍼링크를 참고 부탁드립니다.

  • remote_addr : 192.xxx.xxx.0
  • 구분자 : –
  • remote_user : –
  • time_local : 02/Dec/2021:21:35:41 +0900
  • method : POST
  • request_uri : /test
  • status_body_bytes_sent : 0
  • http_referer –
  • http_user_agent : ReactorNetty/0.9.20.RELEASE
  • request_length : 1000
  • request_time : 0.006
  • proxy_updatem_name : test-upstream-20080
  • proxy_alternative_upstream_name : –
  • upstream_addr : 192.xxx.xx.xx:20080
  • upstream_reponse_lenth : 0
  • upstream_reponse_time : 0.006
  • upstream_status : 200
  • request_id : xxxx

3. Fluentd config 구성

위와 같이 분석을 한 내용을 토대로 Fluentd의 Config 작성을 수행했습니다. config안에서는 수집된 로그 라인을 어떤형태로 잘라서 어떤 키값에 매핑을 할것이고 어떤 형태대로 라벨링을 할것인지를 정의합니다.

마치 로그를 적절한 형태대로 요리를 해서 포장까지 수행한다음에 배달을 요청합니다. 이런형태가 Fluentd에서 하는 역할입니다.

1) source

   <source>
      @type tail
      path {{ .Values.appLogPath }}
      exclude_path ["/var/log/containers/fluentd*", "/var/log/containers/*istio*.log"]
      pos_file /var/log/fluentd-containers.log.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>
    <source>
      @type tail
      path {{ .Values.accessLogPath }}
      exclude_path ["/var/log/containers/fluentd*", "/var/log/containers/*istio*.log"]
      pos_file /var/log/fluentd-containers-nginx.log.pos
      tag nginx.*
      read_from_head true
      <parse>
        @type json
      </parse>
    </source>

App Log, Access Log에 대한 Input(Source) 형태가 위와 같습니다. helm chart에서 values를 통해서 Input을 제어하도록 구성했습니다.

별도로 Istio, Fluentd에 대한 로그는 수집을 하지 않도록 설정했습니다.

2) filter

1. Concat

    <filter kubernetes.**>
      @type concat
      key   log
      multiline_start_regexp /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}/
      timeout_label @NORMAL
    </filter>
    <filter nginx.**>
      @type concat
      key   log
      multiline_start_regexp /^\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}/
      timeout_label @NORMAL
    </filter>

위와 같이 filter내에서는 시간을 기점으로 각 로그를 연결(Concat)을 하면서 로그 하나의 라인을 결정했습니다. Docker의 로그 형태를 보면 알겠지만, Spring에서 떨어지는 에러로그 같은 것들을 보면, 라인 바이 라인으로 한줄씩 로그가 떨어집니다.

이러한 로그들을 라인 별로 수집을 하는것이 아니라 떨어진 에러 로그를 하나의 메시지로 해서 붙여서 하나의 단위(Chunk)로 만들게 됩니다.

2. Parsing

      <filter kubernetes.**>
        @type parser
        key_name log
        <parse>
          @type regexp
          expression /^(?<timestamp_text>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\s+(?<level>[^\s]+)\s+(?<pid>\d+)\s+---\s+\[\s*(?<thread>[^\s]+)\]\s+(?<class>[^\s]+)\s+:\s+(?<message>.+)/m
        </parse>
      </filter>
      <filter nginx.**>
        @type parser
        key_name log
        <parse>
          @type regexp
          expression /^(?<proxy_protocol_addr>[^ ]*) (?<remote_addr>[^ ]*) (?<remote_user>[^ ]*) \[(?<time_local>[^\]]*)\] "(?<method>\S+)(?: +(?<request>[^\"]*?)(?: +\S*)?)?" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*)(?: "(?<http_referer>[^\"]*)" "(?<http_user_agent>[^\"]*)"(?:\s+(?<request_length>[^ ]+))?) (?<request_time>[^ ]*) \[(?<proxy_upstream_name>[^ ]*)\] \[(?<proxy_alternative_upstream_name>[^ ]*)\] (?<upstream_addr>[^ ]*) (?<upstream_response_length>[^ ]*) (?<upstream_response_time>[^ ]*) (?<upstream_status>[^ ]*) (?<req_id>[^\"]*)$/m
        </parse>
      </filter>

parser를 통해서 로그 라인의 각 요소마다 자르는 작업을 진행합니다.

3. k8s metadata

      <filter **>
           @type kubernetes_metadata
           verify_ssl false
      </filter>

추가적으로 정리를 하겠지만, 위의 kubernetes_metadata라는 플러그인을 사용하게 되면 로그 라인이 어떤 k8s의 object로 부터 파생했는지에 대한 데이터를 추출할 수 있습니다. 예를들어 어떤 pod로 부터 온것인지, 어떤 네임스페이스인지를 추출할 수 있습니다.

4. Record transforming

    <filter kubernetes.**>
        @type record_transformer
        enable_ruby
        remove_keys timestamp_text, kubernetes, docker
        <record>
          server_id ${record.dig("kubernetes", "pod_name")}
          timestamp ${(Time.strptime(record["timestamp_text"] + " +09:00", "%Y-%m-%d %H:%M:%S.%L %z").to_f * 1000).to_i}
          marker ${record.dig("kubernetes", "labels", "phase")}-${record.dig("kubernetes", "labels", "project_name")}
          namespace ${record.dig("kubernetes", "namespace_name")}
        </record>
      </filter>
      <filter nginx.**>
        @type record_transformer
        enable_ruby
        remove_keys kubernetes, docker, proxy_protocol_addr, request, body_bytes_sent, proxy_upstream_name, proxy_alternative_upstream_name, upstream_addr, req_id
        <record>
          time ${(Time.strptime(record["time_local"], "%d/%b/%Y:%H:%M:%S %z").to_f * 1000).to_i}
          remote_addr ${record["remote_addr"]}
          remote_user ${record["remote_user"]}
          method ${record["method"]}
          request_uri ${record["request"]}
          status ${record["status"]}
          bytes_sent ${record["body_byte_sent"]}
          http_referer ${record["http_referer"]}
          http_user_agent ${record["http_user_agent"]}
          request_length ${record["request_length"]}
          request_time ${record["request_time"]}
          upstream_proxy_name ${record["proxy_upstream_name"]}
          upstream_response_length ${record["upstream_response_length"]}
          upstream_response_time ${record["upstream_response_time"]}
          upstream_response_status ${record["upstream_status"]}
          upstream_request_id ${record["req_id"]}
          pod_name ${record.dig("kubernetes", "pod_name")}
          namespace ${record.dig("kubernetes", "namespace_name")}
        </record>
      </filter>

파싱된 요소를 다시 elasticsearchschema에 맞게 key값을 변형 시켜줍니다.

3) output

      <match kubernetes.**>
          @type rewrite_tag_filter
          <rule>
            key marker
            pattern /(.+)/
            tag {{ .Values.appLogTable }}
          </rule>
      </match>
      <match nginx.**>
          @type rewrite_tag_filter
          <rule>
            key upstream_proxy_name
            pattern /(.+)/
            tag {{ .Values.accessLogTable }}
          </rule>
      </match>
      <match **>
        @type forward
        heartbeat_type tcp
        connect_timeout 10
        keepalive true
        expire_dns_cache 600
        dns_round_robin true
        <server>
          host {{ .Values.fluentHost }}
          port {{ .Values.fluentPort }}
        </server>
      </match>

위에서 매핑된 단위(Chunk)마다 로그가 수집을 위한 준비가 다 됐고, 이제 적절한 저장소로 호출(Foward)를 해주면 됩니다. 위와 같이 tagging을 통해서 elasticsearch로 입력될 내용을 정의하고 보내줍니다.

4. Fluentd 플러그인

1) fluent-plugin-concat

fluent-plugin-concat

앞서 정리한대로, 로그 라인의 첫 시간이 노출된것을 인지한 다음 계속해서 수집되는 로그중 다음 시간이 나타나는 지점까지를 하나의 로그로 계속해서 연결(Concat)을 수행합니다.

    <filter kubernetes.**>
      @type concat
      key   log
      multiline_start_regexp /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}/
      timeout_label @NORMAL
    </filter>

2) fluent-plugin-kubernetes_metadata_filter

fluentd-plugin-kubernetes_metadata

로그를 수집할 때, 해당 로그가 어떤 Pod로부터왔는지, 어떤 Namespace를 쓰는지까지 수집이 된다면 로그 분석함에 있어서 중요한 정보로 사용 될 수 있습니다.

      <filter **>
           @type kubernetes_metadata
           verify_ssl false
      </filter>

위의 한줄을 통해서 docker의 로그 라인들이 어떤 k8s의 오브젝트로부터 추출이 되었는지를 획득 할 수 있습니다.

5. 마치며…

이번시간에는 k8s의 로그 수집에 대해서 정리를 해보았습니다. DevOps 엔지니어로서 어플리케이션들의 로그를 적절한 기간 보유하기 위해서 로그 수집 시스템을 구성하면서 수집만으로 끝나는것이 아니라 수집된 데이터를 기반으로 또다른 정보를 생산하면 더욱 좋을 것이라고 생각이 들었습니다.

다음시간에는 kibana에서 대시보드를 어떤식으로 구성하고 어떻게 표현을 하여 2차 정보를 생산할 수 있는지를 정리 하겠습니다.

6. 참조

수집부터 비주얼라이즈까지 로깅서비스 Amazon ElasticSearch
분산 로그 & 데이타 수집기 Fluentd
Fluentd 의 활용. ElasticSearch, Kibana 을 사용한 Nginx Log 수집
fluentd 사용사례
카카오의 전사 리소스 모니터링 시스템

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다