4.1 전체 처리 흐름 개요
로그는 6단계를 거쳐 생성·수집·정규화·저장·분석됩니다.
[1단계] 로그 생성
Application (LogLens 라이브러리)
→ /logs/be/app.log (JSON 형식)
↓
[2단계] 로그 수집
Fluent Bit (고객 서버)
→ tail input (파일 읽기)
→ JSON 파싱
↓
[3단계] 1차 정규화
Fluent Bit (transform.lua)
→ source_type 분류 (FE/BE/INFRA)
→ layer 정규화 (Controller/Service/Repository)
→ 불필요한 로그 필터링
↓
[4단계] 메시지 큐 버퍼링
Kafka (AI 서버)
→ application-logs 토픽
→ 7일간 보관 (168시간)
↓
[5단계] 2차 정규화 및 인덱싱
Logstash (AI 서버)
→ log_details 파싱
→ log_id 생성 (SHA256)
→ 인덱스 라우팅 (project_uuid_YYYY_MM)
↓
OpenSearch
→ 인덱스 생성
→ 5초 내 검색 가능
↓
[6단계] 조회 및 분석
FastAPI
→ OpenSearch 검색 쿼리
→ AI 분석 (선택적)
↓
LogLens Web (Spring Boot)
→ React Dashboard4.2 단계별 상세 흐름
4.2.1 [1단계] 로그 생성
주체: Spring Boot Application + LogLens 라이브러리
동작 원리
사용자 요청 (POST /api/users)
↓
Controller 메서드 호출
↓
LogLens AOP가 가로챔
↓
[Request 로그 생성]
- timestamp: 요청 시작 시각
- trace_id: UUID 자동 생성
- layer: "Controller"
- http_method: "POST"
- request_uri: "/api/users"
↓
메서드 실행
↓
[Response 로그 생성]
- execution_time: 실행 시간 (ms)
- response_status: 200
- response_body: 결과 데이터
↓
/logs/be/app.log 파일에 기록생성되는 로그 예시
{
"@timestamp": "2025-01-15T10:30:45.123Z",
"level": "INFO",
"logger": "com.example.UserController",
"trace_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"layer": "CONTROLLER",
"message": "Request received",
"log_details": {
"http_method": "POST",
"request_uri": "/api/users",
"request_body": {
"name": "John",
"email": "john@example.com"
}
}
}로그 파일 저장
/logs/
└── be/
├── app.log (현재 로그, 50MB)
├── app.log.2025-01-15.1.gz (압축 보관)
└── app.log.2025-01-15.2.gz4.2.2 [2단계] 로그 수집
주체: Fluent Bit (고객 서버)
동작 원리
[INPUT] tail
Path: /logs/be/*.log
Parser: json
Refresh_Interval: 5초
↓
파일 변경 감지
↓
새 라인 읽기
↓
오프셋 저장 (/var/log/fluentbit-be-offsets.db)
→ 재시작 시 중복 방지
↓
JSON 파싱
→ 구조화된 필드로 분해INPUT 설정
[INPUT]
Name tail
Path /logs/be/*.log
Tag app.logs
Parser json
Refresh_Interval 5
Read_from_Head true
DB /var/log/fluentbit-be-offsets.db처리 과정
- 파일 모니터링: 5초마다 파일 변경 확인
- 새 라인 읽기: 마지막 오프셋 이후 데이터 읽기
- JSON 파싱: 문자열 → 구조화된 객체
- 오프셋 저장: 읽은 위치를 DB에 저장
4.2.3 [3단계] 1차 정규화
주체: Fluent Bit (transform.lua)
동작 원리
파싱된 로그
↓
[FILTER] lua (transform.lua)
↓
source_type 자동 분류
- layer == "FE" → source_type = "FE"
- logger contains "mysql" → source_type = "INFRA"
- 그 외 → source_type = "BE"
↓
layer 정규화
- "CONTROLLER" → "Controller"
- "SERVICE" → "Service"
- "REPOSITORY" → "Repository"
↓
timestamp ISO 8601 변환
- "2025-01-15 10:30:45" → "2025-01-15T10:30:45.123Z"
↓
프로젝트 메타데이터 추가
- project_uuid: ${PROJECT_UUID}
- service_name: ${PROJECT_NAME}
↓
불필요한 로그 필터링
- trace_id == "unknown" → 제외
- component_name == "HealthCheckController" → 제외transform.lua 핵심 로직 예시
-- source_type 자동 분류
if record["layer"] == "FE" then
record["source_type"] = "FE"
elseif record["logger"] and string.find(record["logger"], "mysql") then
record["source_type"] = "INFRA"
else
record["source_type"] = "BE"
end
-- layer 정규화
local layer_map = {
["CONTROLLER"] = "Controller",
["SERVICE"] = "Service",
["REPOSITORY"] = "Repository"
}
record["layer"] = layer_map[record["layer"]] or record["layer"]
-- timestamp ISO 8601 변환
record["timestamp"] = os.date("!%Y-%m-%dT%H:%M:%S.000Z", record["@timestamp"] / 1000)정규화 전후 비교
Before (원본 로그):
{
"@timestamp": "2025-01-15 10:30:45",
"level": "INFO",
"logger": "com.example.UserController",
"layer": "CONTROLLER"
}After (정규화 후):
{
"timestamp": "2025-01-15T10:30:45.000Z",
"log_level": "INFO",
"logger": "com.example.UserController",
"layer": "Controller",
"source_type": "BE",
"project_uuid": "9911573f-8a1d-3b96-98b4-5a0def93513b",
"service_name": "Loglens"
}4.2.4 [4단계] 메시지 큐 버퍼링
주체: Kafka (AI 서버)
동작 원리
Fluent Bit (고객 서버)
↓
[OUTPUT] kafka
Brokers: ai.loglens.store:9092
Topic: application-logs
Format: json
Compression: snappy
↓
Kafka Producer
↓
Kafka Broker
- 파티션: 1개 (기본)
- 보관 기간: 7일 (168시간)
- 압축: snappy
↓
Logstash ConsumerKafka 설정
KAFKA_LOG_RETENTION_HOURS: 168 # 7일
KAFKA_ADVERTISED_LISTENERS:
- PLAINTEXT://kafka:19092 # 내부 통신
- PLAINTEXT_HOST://ai.loglens.store:9092 # 외부 통신Fluent Bit OUTPUT 설정
[OUTPUT]
Name kafka
Match *
Brokers ai.loglens.store:9092
Topics application-logs
Format json
rdkafka.compression.type snappy
rdkafka.batch.size 16384
rdkafka.batch.num.messages 1000버퍼링의 역할
(1) 트래픽 급증 대응
고객 서버 트래픽 급증 (초당 10,000건)
↓
Fluent Bit → Kafka (빠르게 전송)
↓
Kafka가 버퍼링 (임시 저장)
↓
Logstash가 처리 가능한 속도로 소비(2) 장애 시 데이터 유실 방지
Logstash 다운
↓
Kafka에 로그 계속 쌓임 (최대 7일)
↓
Logstash 복구
↓
마지막 오프셋부터 재소비4.2.5 [5단계] 2차 정규화 및 인덱싱
주체: Logstash + OpenSearch (AI 서버)
Logstash 파이프라인
[INPUT] kafka
bootstrap_servers: kafka:19092
topics: application-logs
group_id: logstash-consumer
↓
[FILTER] mutate
- timestamp 생성
- log_id 생성 (SHA256)
↓
[FILTER] ruby
- log_details JSON 파싱
- Java Map toString() → JSON 변환
↓
[FILTER] mutate
- log_level 대문자 변환
- trace_id 기본값 설정 ("unknown")
↓
[OUTPUT] opensearch
index: %{safe_project_uuid}_%{+YYYY_MM}
action: createlog_id 생성 (Ruby 코드)
# project_uuid + timestamp + message를 조합하여 SHA256 해시 생성
hash_input = "#{project_uuid}-#{timestamp}-#{message}"
log_id = Digest::SHA256.hexdigest(hash_input).to_i(16) % (2**63)log_details 파싱
(1) JSON 형식
# 이미 JSON인 경우
log_details = JSON.parse(log_details_str)(2) Java Map toString() 형식
# {key1=value1, key2=value2} 형식
log_details_str = "{http_method=POST, request_uri=/api/users}"
↓
정규식으로 파싱
↓
{
"http_method": "POST",
"request_uri": "/api/users"
}인덱스 라우팅
index => "%{safe_project_uuid}_%{+YYYY_MM}"
# 예시:
# project_uuid: "loglens"
# 날짜: 2025년 1월
# 결과: "loglens_2025_01"OpenSearch 인덱싱
Logstash Bulk Insert
↓
OpenSearch 인덱스 자동 생성
- loglens_2025_01 (없으면 생성)
↓
문서 저장
- log_id를 document ID로 사용
- 중복 방지 (같은 log_id는 덮어쓰기)
↓
Refresh (5초 내)
↓
검색 가능4.2.6 [6단계] 조회 및 분석
주체: FastAPI + LogLens Web + React Dashboard
검색 플로우
사용자 (React Dashboard)
↓
[검색 요청]
- 키워드: "NullPointerException"
- 시간 범위: 최근 1시간
- log_level: ERROR
↓
LogLens Web (Spring Boot)
↓
FastAPI REST API
GET /api/v2/logs/search
↓
OpenSearch 쿼리
GET /loglens_2025_01/_search
{
"query": {
"bool": {
"must": [
{ "match": { "message": "NullPointerException" } },
{ "term": { "log_level": "ERROR" } },
{ "range": { "timestamp": { "gte": "now-1h" } } }
]
}
}
}
↓
검색 결과 (10건)
↓
FastAPI 응답
↓
LogLens Web → React DashboardAI 분석 플로우 (선택적)
사용자가 특정 로그 선택
↓
[AI 분석 요청]
POST /api/v2-langgraph/logs/{log_id}/analysis
↓
FastAPI
1. OpenSearch에서 로그 조회
2. trace_id 기반 관련 로그 수집
3. LLM에 전달 (OpenAI GPT)
↓
AI 분석 결과
- 에러 원인
- 해결 방법
- 관련 로그 요약
↓
OpenSearch에 결과 저장 (ai_analysis 필드)
↓
FastAPI 응답
↓
React Dashboard 표시4.3 장애 대응 메커니즘
각 컴포넌트는 장애 발생 시 로그 유실을 방지하기 위한 복구 메커니즘을 갖추고 있습니다.
4.3.1 Fluent Bit 장애
복구 메커니즘:
- 로그 파일은 디스크에 계속 기록
- 오프셋 DB로 읽은 위치 추적
- 재시작 시 마지막 위치부터 수집 재개
결과: 로그 유실 없음
4.3.2 Kafka 장애
복구 메커니즘:
rdkafka.retry.backoff.ms: 100 # 100ms 간격 재시도
rdkafka.message.timeout.ms: 10000 # 10초 타임아웃- 단기 장애 시 자동 재시도
- 장기 장애 시 Fluent Bit 내부 버퍼에 보관
결과: 단기 장애 자동 복구, 장기 장애는 파일에서 재수집
4.3.3 Logstash 장애
복구 메커니즘:
KAFKA_LOG_RETENTION_HOURS: 168 # 7일 보관- Kafka가 최대 7일간 로그 보관
- Logstash 재시작 시 Consumer Offset부터 재소비
결과: 로그 유실 없음, 검색만 지연
4.4.4. OpenSearch 장애
복구 메커니즘:
- Logstash 내부 큐에 임시 보관
- Dead Letter Queue (DLQ) 활용
- OpenSearch 복구 시 재처리
결과: 로그 유실 최소화