4.1 전체 처리 흐름 개요

로그는 6단계를 거쳐 생성·수집·정규화·저장·분석됩니다.

[1단계] 로그 생성
  Application (LogLens 라이브러리)
    → /logs/be/app.log (JSON 형식)

[2단계] 로그 수집
  Fluent Bit (고객 서버)
    → tail input (파일 읽기)
    → JSON 파싱

[3단계] 1차 정규화
  Fluent Bit (transform.lua)
    → source_type 분류 (FE/BE/INFRA)
    → layer 정규화 (Controller/Service/Repository)
    → 불필요한 로그 필터링

[4단계] 메시지 큐 버퍼링
  Kafka (AI 서버)
    → application-logs 토픽
    → 7일간 보관 (168시간)

[5단계] 2차 정규화 및 인덱싱
  Logstash (AI 서버)
    → log_details 파싱
    → log_id 생성 (SHA256)
    → 인덱스 라우팅 (project_uuid_YYYY_MM)

  OpenSearch
    → 인덱스 생성
    → 5초 내 검색 가능

[6단계] 조회 및 분석
  FastAPI
    → OpenSearch 검색 쿼리
    → AI 분석 (선택적)

  LogLens Web (Spring Boot)
    → React Dashboard

4.2 단계별 상세 흐름

4.2.1 [1단계] 로그 생성

주체: Spring Boot Application + LogLens 라이브러리

동작 원리

사용자 요청 (POST /api/users)

Controller 메서드 호출

LogLens AOP가 가로챔

[Request 로그 생성]
    - timestamp: 요청 시작 시각
    - trace_id: UUID 자동 생성
    - layer: "Controller"
    - http_method: "POST"
    - request_uri: "/api/users"

메서드 실행

[Response 로그 생성]
    - execution_time: 실행 시간 (ms)
    - response_status: 200
    - response_body: 결과 데이터

/logs/be/app.log 파일에 기록

생성되는 로그 예시

{
  "@timestamp": "2025-01-15T10:30:45.123Z",
  "level": "INFO",
  "logger": "com.example.UserController",
  "trace_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "layer": "CONTROLLER",
  "message": "Request received",
  "log_details": {
    "http_method": "POST",
    "request_uri": "/api/users",
    "request_body": {
      "name": "John",
      "email": "john@example.com"
    }
  }
}

로그 파일 저장

/logs/
  └── be/
      ├── app.log                    (현재 로그, 50MB)
      ├── app.log.2025-01-15.1.gz    (압축 보관)
      └── app.log.2025-01-15.2.gz

4.2.2 [2단계] 로그 수집

주체: Fluent Bit (고객 서버)

동작 원리

[INPUT] tail
  Path: /logs/be/*.log
  Parser: json
  Refresh_Interval: 5초

파일 변경 감지

새 라인 읽기

오프셋 저장 (/var/log/fluentbit-be-offsets.db)
  → 재시작 시 중복 방지

JSON 파싱
  → 구조화된 필드로 분해

INPUT 설정

[INPUT]
    Name              tail
    Path              /logs/be/*.log
    Tag               app.logs
    Parser            json
    Refresh_Interval  5
    Read_from_Head    true
    DB                /var/log/fluentbit-be-offsets.db

처리 과정

  1. 파일 모니터링: 5초마다 파일 변경 확인
  2. 새 라인 읽기: 마지막 오프셋 이후 데이터 읽기
  3. JSON 파싱: 문자열 → 구조화된 객체
  4. 오프셋 저장: 읽은 위치를 DB에 저장

4.2.3 [3단계] 1차 정규화

주체: Fluent Bit (transform.lua)

동작 원리

파싱된 로그

[FILTER] lua (transform.lua)

source_type 자동 분류
  - layer == "FE" → source_type = "FE"
  - logger contains "mysql" → source_type = "INFRA"
  - 그 외 → source_type = "BE"

layer 정규화
  - "CONTROLLER" → "Controller"
  - "SERVICE" → "Service"
  - "REPOSITORY" → "Repository"

timestamp ISO 8601 변환
  - "2025-01-15 10:30:45" → "2025-01-15T10:30:45.123Z"

프로젝트 메타데이터 추가
  - project_uuid: ${PROJECT_UUID}
  - service_name: ${PROJECT_NAME}

불필요한 로그 필터링
  - trace_id == "unknown" → 제외
  - component_name == "HealthCheckController" → 제외

transform.lua 핵심 로직 예시

-- source_type 자동 분류
if record["layer"] == "FE" then
    record["source_type"] = "FE"
elseif record["logger"] and string.find(record["logger"], "mysql") then
    record["source_type"] = "INFRA"
else
    record["source_type"] = "BE"
end
 
-- layer 정규화
local layer_map = {
    ["CONTROLLER"] = "Controller",
    ["SERVICE"] = "Service",
    ["REPOSITORY"] = "Repository"
}
record["layer"] = layer_map[record["layer"]] or record["layer"]
 
-- timestamp ISO 8601 변환
record["timestamp"] = os.date("!%Y-%m-%dT%H:%M:%S.000Z", record["@timestamp"] / 1000)

정규화 전후 비교

Before (원본 로그):

{
  "@timestamp": "2025-01-15 10:30:45",
  "level": "INFO",
  "logger": "com.example.UserController",
  "layer": "CONTROLLER"
}

After (정규화 후):

{
  "timestamp": "2025-01-15T10:30:45.000Z",
  "log_level": "INFO",
  "logger": "com.example.UserController",
  "layer": "Controller",
  "source_type": "BE",
  "project_uuid": "9911573f-8a1d-3b96-98b4-5a0def93513b",
  "service_name": "Loglens"
}

4.2.4 [4단계] 메시지 큐 버퍼링

주체: Kafka (AI 서버)

동작 원리

Fluent Bit (고객 서버)

[OUTPUT] kafka
  Brokers: ai.loglens.store:9092
  Topic: application-logs
  Format: json
  Compression: snappy

Kafka Producer

Kafka Broker
  - 파티션: 1개 (기본)
  - 보관 기간: 7일 (168시간)
  - 압축: snappy

Logstash Consumer

Kafka 설정

KAFKA_LOG_RETENTION_HOURS: 168  # 7일
KAFKA_ADVERTISED_LISTENERS:
  - PLAINTEXT://kafka:19092        # 내부 통신
  - PLAINTEXT_HOST://ai.loglens.store:9092  # 외부 통신

Fluent Bit OUTPUT 설정

[OUTPUT]
    Name              kafka
    Match             *
    Brokers           ai.loglens.store:9092
    Topics            application-logs
    Format            json
    
    rdkafka.compression.type      snappy
    rdkafka.batch.size            16384
    rdkafka.batch.num.messages    1000

버퍼링의 역할

(1) 트래픽 급증 대응

고객 서버 트래픽 급증 (초당 10,000건)

Fluent Bit → Kafka (빠르게 전송)

Kafka가 버퍼링 (임시 저장)

Logstash가 처리 가능한 속도로 소비

(2) 장애 시 데이터 유실 방지

Logstash 다운

Kafka에 로그 계속 쌓임 (최대 7일)

Logstash 복구

마지막 오프셋부터 재소비

4.2.5 [5단계] 2차 정규화 및 인덱싱

주체: Logstash + OpenSearch (AI 서버)

Logstash 파이프라인

[INPUT] kafka
  bootstrap_servers: kafka:19092
  topics: application-logs
  group_id: logstash-consumer

[FILTER] mutate
  - timestamp 생성
  - log_id 생성 (SHA256)

[FILTER] ruby
  - log_details JSON 파싱
  - Java Map toString() → JSON 변환

[FILTER] mutate
  - log_level 대문자 변환
  - trace_id 기본값 설정 ("unknown")

[OUTPUT] opensearch
  index: %{safe_project_uuid}_%{+YYYY_MM}
  action: create

log_id 생성 (Ruby 코드)

# project_uuid + timestamp + message를 조합하여 SHA256 해시 생성
hash_input = "#{project_uuid}-#{timestamp}-#{message}"
log_id = Digest::SHA256.hexdigest(hash_input).to_i(16) % (2**63)

log_details 파싱

(1) JSON 형식

# 이미 JSON인 경우
log_details = JSON.parse(log_details_str)

(2) Java Map toString() 형식

# {key1=value1, key2=value2} 형식
log_details_str = "{http_method=POST, request_uri=/api/users}"

정규식으로 파싱

{
  "http_method": "POST",
  "request_uri": "/api/users"
}

인덱스 라우팅

index => "%{safe_project_uuid}_%{+YYYY_MM}"
 
# 예시:
# project_uuid: "loglens"
# 날짜: 2025년 1월
# 결과: "loglens_2025_01"

OpenSearch 인덱싱

Logstash Bulk Insert

OpenSearch 인덱스 자동 생성
  - loglens_2025_01 (없으면 생성)

문서 저장
  - log_id를 document ID로 사용
  - 중복 방지 (같은 log_id는 덮어쓰기)

Refresh (5초 내)

검색 가능

4.2.6 [6단계] 조회 및 분석

주체: FastAPI + LogLens Web + React Dashboard

검색 플로우

사용자 (React Dashboard)

[검색 요청]
  - 키워드: "NullPointerException"
  - 시간 범위: 최근 1시간
  - log_level: ERROR

LogLens Web (Spring Boot)

FastAPI REST API
  GET /api/v2/logs/search

OpenSearch 쿼리
  GET /loglens_2025_01/_search
  {
    "query": {
      "bool": {
        "must": [
          { "match": { "message": "NullPointerException" } },
          { "term": { "log_level": "ERROR" } },
          { "range": { "timestamp": { "gte": "now-1h" } } }
        ]
      }
    }
  }

검색 결과 (10건)

FastAPI 응답

LogLens Web → React Dashboard

AI 분석 플로우 (선택적)

사용자가 특정 로그 선택

[AI 분석 요청]
  POST /api/v2-langgraph/logs/{log_id}/analysis

FastAPI
  1. OpenSearch에서 로그 조회
  2. trace_id 기반 관련 로그 수집
  3. LLM에 전달 (OpenAI GPT)

AI 분석 결과
  - 에러 원인
  - 해결 방법
  - 관련 로그 요약

OpenSearch에 결과 저장 (ai_analysis 필드)

FastAPI 응답

React Dashboard 표시

4.3 장애 대응 메커니즘

각 컴포넌트는 장애 발생 시 로그 유실을 방지하기 위한 복구 메커니즘을 갖추고 있습니다.

4.3.1 Fluent Bit 장애

복구 메커니즘:

  • 로그 파일은 디스크에 계속 기록
  • 오프셋 DB로 읽은 위치 추적
  • 재시작 시 마지막 위치부터 수집 재개

결과: 로그 유실 없음

4.3.2 Kafka 장애

복구 메커니즘:

rdkafka.retry.backoff.ms: 100        # 100ms 간격 재시도
rdkafka.message.timeout.ms: 10000    # 10초 타임아웃
  • 단기 장애 시 자동 재시도
  • 장기 장애 시 Fluent Bit 내부 버퍼에 보관

결과: 단기 장애 자동 복구, 장기 장애는 파일에서 재수집

4.3.3 Logstash 장애

복구 메커니즘:

KAFKA_LOG_RETENTION_HOURS: 168  # 7일 보관
  • Kafka가 최대 7일간 로그 보관
  • Logstash 재시작 시 Consumer Offset부터 재소비

결과: 로그 유실 없음, 검색만 지연

4.4.4. OpenSearch 장애

복구 메커니즘:

  • Logstash 내부 큐에 임시 보관
  • Dead Letter Queue (DLQ) 활용
  • OpenSearch 복구 시 재처리

결과: 로그 유실 최소화