realtime-chat 프로젝트의 문제: 채팅 메시지를 DB에 저장하면서 동시에 Elasticsearch에도 인덱싱해야 한다. 만약 서비스가 직접 ES를 호출하다가 ES가 일시 장애이면 — 메시지는 저장됐는데 검색 인덱스에는 없는 상태가 생긴다.
Outbox 패턴은 이 문제를 트랜잭션 경계를 DB 하나로 좁히는 방식으로 해결한다. 메시지 저장과 outbox 이벤트 기록을 같은 트랜잭션으로 묶고, 별도 프로세스(OutboxProcessor)가 나중에 ES에 인덱싱한다.
@Transactional
public SendResult sendMessage(SendMessageRequest req) {
Message msg = messageRepository.save(Message.create(...));
// ⚡ 여기서 ES가 죽거나 네트워크가 끊기면?
esClient.index(i -> i.index("messages").id(...).withJson(...));
// → DB엔 저장됐지만 검색에선 영영 안 보임 💀
}
두 시스템은 서로 다른 트랜잭션 경계를 가지므로 원자성을 보장할 수 없다. DB 커밋이 성공해도 ES 호출이 실패할 수 있고, 반대 순서도 마찬가지다.
BaseEntity가 Spring의 AbstractAggregateRoot를 상속한다. 덕분에 registerEvent()로 이벤트를 모아두고, repository.save() 시점에 Spring이 자동 발행한다.
// Message.java — 도메인 이벤트를 자신이 등록
public static Message create(ChatRoom chatRoom, User sender, ...) {
Message msg = new Message(chatRoom, sender, content, ...);
msg.registerEvent(new MessageDomainEvent(EventType.CREATED, msg));
return msg;
}
public void edit(Long userId, String newContent) {
this.content = newContent;
this.edited = true;
registerEvent(new MessageDomainEvent(EventType.UPDATED, this));
}
public void markDeleted(Long userId) {
this.deleted = true;
registerEvent(new MessageDomainEvent(EventType.DELETED, this));
}
MessageDomainEvent를 수신하면, Message를 JSON으로 직렬화하고 같은 트랜잭션 안에서 outbox_event 테이블에 저장한다.
// OutboxEventPublisher.java
@EventListener // 동일 트랜잭션에서 호출됨
public void handle(MessageDomainEvent domainEvent) {
Message message = domainEvent.message();
String payload = toJson(MessageDocument.from(message));
OutboxEvent outboxEvent = switch (domainEvent.eventType()) {
case CREATED -> OutboxEvent.messageCreated(message.getId(), payload);
case UPDATED -> OutboxEvent.messageUpdated(message.getId(), payload);
case DELETED -> OutboxEvent.messageDeleted(message.getId(), payload);
};
outboxEventRepository.save(outboxEvent); // DB 커밋과 함께 원자적으로 저장
// 즉시 처리 시도를 위한 이벤트 발행 (트랜잭션 밖에서 실행됨)
eventPublisher.publishEvent(new OutboxEventCreatedEvent(outboxEvent.getId()));
}
프로젝트는 즉시 처리 + 폴링 안전망 두 레이어로 처리한다.
// OutboxProcessor.java
// ① 즉시 처리: 트랜잭션 커밋 직후 비동기로 ES 인덱싱 시도
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOutboxCreated(OutboxEventCreatedEvent event) {
outboxEventRepository.findById(event.outboxEventId())
.ifPresent(this::processEvent);
}
// ② 폴링 안전망: 5초마다 미처리 이벤트 재시도
@Scheduled(fixedDelay = 5000)
@Transactional
public void pollUnprocessedEvents() {
outboxEventRepository
.findByProcessedFalseAndRetryCountLessThanOrderByCreatedAtAsc(MAX_RETRY)
.forEach(this::processEvent);
}
private void processEvent(OutboxEvent event) {
try {
switch (event.getEventType()) {
case CREATED -> esIndexService.index(event.getPayload());
case UPDATED -> esIndexService.update(event.getPayload());
case DELETED -> esIndexService.delete(event.getPayload());
}
event.markProcessed(); // processed = true
} catch (Exception e) {
event.incrementRetry(); // retryCount++
if (event.getRetryCount() >= MAX_RETRY) { // 5회 초과
deadLetterEventRepository.save(DeadLetterEvent.from(event));
event.markProcessed(); // DLQ로 이동 후 종료
}
}
}
왜 즉시 처리와 폴링 둘 다 쓰나?
5회 재시도 후에도 실패하면 dead_letter_event 테이블로 이동한다. 이 레코드는 수동으로 확인하고 재처리하거나 알림을 발송하는 용도로 쓴다.
파란 점선 박스 안은 하나의 DB 트랜잭션 — 커밋이 성공하면 messages와 outbox_event가 동시에 저장된다.
이후 OutboxProcessor가 두 가지 경로로 Elasticsearch 인덱싱을 보장한다.
| 항목 | realtime-chat 구현 | 비고 |
|---|---|---|
| 신뢰성 | At-least-once 보장 | 폴링 안전망 덕분 |
| ES 동기화 지연 | 즉시 ~ 5초 이내 | 즉시 처리 성공 시 거의 즉시 |
| 중복 인덱싱 가능성 | 있음 | ES는 같은 doc_id로 덮어쓰므로 무해 |
| 재시도 전략 | 고정 5회 | Exponential backoff 적용 시 개선 가능 |
| 폴링 DB 부하 | 5초마다 쿼리 | processed=false 인덱스가 있으면 경량 |
| DLQ 처리 | 수동 확인 필요 | 알람/재처리 로직 추가 가능 |