EVENT
EVENT
Event 는 특정 시점에 발생한 의미 있는 상태 변화나 행위를 의미한다. 그리고 다음과 같은 특징을 지닌다.
- 시간성(Temporality): 특정 시점에 발생
- 불변성(Immutability): 이미 발생한 사실
- 의미성(Significance): 시스템에 의미 있는 변화
- 관찰 가능성(Observability): 감지하고 반응할 수 있음
Event vs Command vs State:
Type | Event | Command | State |
---|---|---|---|
시제 | 과거형 (OrderCreated) | 명령형 (CreateOrder) | 현재형 (OrderStatus) |
변경 가능성 | 불변 | 실행 전까지 변경 가능 | 변경 가능 |
실패 가능성 | 이미 발생한 사실 | 실패 가능 | N/A |
목적 | 알림, 기록 | 행위 요청 | 현재 상태 표현 |
Event Sourcing
이벤트 소싱(event sourcing) 은 Domain-Driven Design 에서 개발한 기법이다. CDC(change data capture)와 유사하게 애플리케이션의 상태 변화를 모두 변경 이벤트 로그(immutable)로 저장한다. 저장만 가능하며 갱신이나 삭제는 불가능하다. CDC 와 다른 점은 이 아이디어를 적용하는 ABSTRACTION 레벨이 다르다는 것이다.
이벤트 소싱은 Data Modeling 에 쓸 수 있는 강력한 기법이다. 새로 발생하는 Side Effect 를 기존 이벤트에서 쉽게 분리할 수 있다.
이벤트 소싱의 핵심은 Command 와 Event 를 분리하는 것이다. 명령은 수행되기 전에 Validation 을 거치게 된다. 이벤트는 생성 시점에 사실(fact) 가 된다.
Naming Rules:
// 좋은 이벤트 명명
public class OrderCreatedEvent { } // 과거형, 명확한 의미
public class PaymentProcessedEvent { } // 구체적인 행위
public class InventoryLevelChangedEvent { } // 상태 변화 명시
// 피해야 할 명명
public class OrderEvent { } // 너무 일반적
public class CreateOrder { } // 명령형 (Command와 혼동)
public class OrderInfo { } // 이벤트임이 불분명
Examples:
// 이벤트 스토어
@Repository
public class EventStore {
private final JdbcTemplate jdbcTemplate;
public void saveEvents(String aggregateId, List<DomainEvent> events) {
events.forEach(event -> {
jdbcTemplate.update(
"INSERT INTO events (aggregate_id, event_type, event_data, version, timestamp) VALUES (?, ?, ?, ?, ?)",
aggregateId,
event.getClass().getSimpleName(),
serialize(event),
getNextVersion(aggregateId),
event.getOccurredAt()
);
});
}
public List<DomainEvent> getEvents(String aggregateId) {
return jdbcTemplate.query(
"SELECT * FROM events WHERE aggregate_id = ? ORDER BY version",
new EventRowMapper(),
aggregateId
);
}
}
// 이벤트로부터 상태 재구성
public class BankAccount {
private String accountId;
private BigDecimal balance;
private List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 이벤트로부터 계좌 상태 재구성
public static BankAccount fromEvents(List<DomainEvent> events) {
BankAccount account = new BankAccount();
events.forEach(account::apply);
return account;
}
public void withdraw(BigDecimal amount) {
if (balance.compareTo(amount) < 0) {
throw new InsufficientFundsException();
}
MoneyWithdrawnEvent event = new MoneyWithdrawnEvent(
accountId, amount, balance.subtract(amount)
);
apply(event);
uncommittedEvents.add(event);
}
private void apply(DomainEvent event) {
if (event instanceof AccountOpenedEvent) {
apply((AccountOpenedEvent) event);
} else if (event instanceof MoneyDepositedEvent) {
apply((MoneyDepositedEvent) event);
} else if (event instanceof MoneyWithdrawnEvent) {
apply((MoneyWithdrawnEvent) event);
}
}
private void apply(MoneyWithdrawnEvent event) {
this.balance = event.getNewBalance();
}
}
// 사용 예시
public class BankAccountService {
private final EventStore eventStore;
public void withdraw(String accountId, BigDecimal amount) {
// 1. 이벤트로부터 현재 상태 재구성
List<DomainEvent> events = eventStore.getEvents(accountId);
BankAccount account = BankAccount.fromEvents(events);
// 2. 비즈니스 로직 실행
account.withdraw(amount);
// 3. 새로운 이벤트 저장
eventStore.saveEvents(accountId, account.getUncommittedEvents());
}
}
CQRS 와 조합하여 사용할 수 도 있다.
// Command Side에서 이벤트 발생
@Service
public class OrderCommandService {
private final EventStore eventStore;
private final EventPublisher eventPublisher;
public void createOrder(CreateOrderCommand command) {
Order order = new Order(command);
// 이벤트 저장
eventStore.saveEvents(order.getId(), order.getUncommittedEvents());
// 이벤트 발행 (Query Side 업데이트용)
order.getUncommittedEvents().forEach(eventPublisher::publish);
}
}
// Query Side 에서 이벤트 구독하여 읽기 모델 업데이트
@EventHandler
public class OrderViewUpdater {
private final OrderViewRepository viewRepository;
@EventListener
public void on(OrderCreatedEvent event) {
OrderView view = OrderView.builder()
.orderId(event.getOrderId())
.customerId(event.getCustomerId())
.status("CREATED")
.createdAt(event.getOccurredAt())
.build();
viewRepository.save(view);
}
@EventListener
public void on(OrderShippedEvent event) {
OrderView view = viewRepository.findById(event.getOrderId());
view.updateStatus("SHIPPED");
view.setShippedAt(event.getOccurredAt());
viewRepository.save(view);
}
}
Audit Event
// 감사 이벤트
public class AuditEvent extends DomainEvent {
private final String userId;
private final String action;
private final String resource;
private final String resourceId;
private final Map<String, Object> beforeState;
private final Map<String, Object> afterState;
private final String ipAddress;
private final String userAgent;
// 민감한 정보 마스킹
public Map<String, Object> getMaskedAfterState() {
Map<String, Object> masked = new HashMap<>(afterState);
if (masked.containsKey("password")) {
masked.put("password", "***");
}
if (masked.containsKey("creditCard")) {
masked.put("creditCard", maskCreditCard((String) masked.get("creditCard")));
}
return masked;
}
}
// 감사 이벤트 자동 생성
@Aspect
@Component
public class AuditAspect {
private final EventPublisher eventPublisher;
@Around("@annotation(Auditable)")
public Object auditMethod(ProceedingJoinPoint joinPoint) throws Throwable {
String methodName = joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
// 실행 전 상태 캡처
Object beforeState = captureState(args);
try {
Object result = joinPoint.proceed();
// 실행 후 상태 캡처
Object afterState = captureState(result);
// 감사 이벤트 발행
AuditEvent auditEvent = new AuditEvent(
getCurrentUserId(),
methodName,
getResourceType(args),
getResourceId(args),
beforeState,
afterState,
getCurrentUserIp(),
getCurrentUserAgent()
);
eventPublisher.publish(auditEvent);
return result;
} catch (Exception e) {
// 실패 감사 이벤트
AuditEvent failureEvent = new AuditEvent(
getCurrentUserId(),
methodName + "_FAILED",
getResourceType(args),
getResourceId(args),
beforeState,
Map.of("error", e.getMessage()),
getCurrentUserIp(),
getCurrentUserAgent()
);
eventPublisher.publish(failureEvent);
throw e;
}
}
}
// 사용 예시
@Service
public class UserService {
@Auditable
public User updateUser(String userId, UpdateUserRequest request) {
User user = userRepository.findById(userId);
user.update(request);
return userRepository.save(user);
}
}
Outbox Pattern
이벤트 유실 방지를 위해 Outbox Pattern 을 구현할 수 있다.
// Outbox Pattern 구현
@Entity
public class OutboxEvent {
@Id
private String id;
private String aggregateId;
private String eventType;
private String eventData;
private LocalDateTime createdAt;
private boolean processed;
}
@Service
@Transactional
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
public void createOrder(CreateOrderCommand command) {
// 1. 비즈니스 로직 실행
Order order = new Order(command);
orderRepository.save(order);
// 2. 같은 트랜잭션에서 Outbox에 이벤트 저장
OutboxEvent outboxEvent = new OutboxEvent(
UUID.randomUUID().toString(),
order.getId(),
"OrderCreated",
serialize(new OrderCreatedEvent(order)),
LocalDateTime.now(),
false
);
outboxRepository.save(outboxEvent);
// 트랜잭션 커밋 시 둘 다 저장됨
}
}
// 별도 프로세스에서 Outbox 이벤트 발행
@Component
public class OutboxEventPublisher {
private final OutboxEventRepository outboxRepository;
private final EventPublisher eventPublisher;
@Scheduled(fixedDelay = 1000) // 1초마다 실행
public void publishPendingEvents() {
List<OutboxEvent> pendingEvents = outboxRepository.findByProcessedFalse();
for (OutboxEvent outboxEvent : pendingEvents) {
try {
DomainEvent domainEvent = deserialize(outboxEvent.getEventData());
eventPublisher.publish(domainEvent);
// 발행 성공 시 처리 완료 표시
outboxEvent.setProcessed(true);
outboxRepository.save(outboxEvent);
} catch (Exception e) {
log.error("Failed to publish event: {}", outboxEvent.getId(), e);
// 재시도 로직 또는 Dead Letter Queue 처리
}
}
}
}
Immutable Event
거래(transaction)가 일어나면 거래 정보를 원장(ledger) 에 append-only 방식으로 기록한다. 원장은 본질적으로 돈, 상품, 서비스를 교환한 정보를 설명한 이벤트 로그다. 따라서, 거래 실수에 대한 내역도 추가가된다. 이러한 내용은 회계 감사에 중요한 사유가 된다.
불변 이벤트(immutable event) 는 훨씬 많은 정보를 포함한다. 예를 들어 사용자가 장바구니에 항목 하나를 넣었다가 제거한 경우, 주문 이행 관점에서는 첫 이벤트를 취소한 것에 불과하지만, 데이터 분석가에게는 고객이 특정 항목을 구매하려 했다가 하지 않았다는 것을 알 수 있다.
References
- Designing Data-Intensive Application / Martin Kleppmann