Stream Processing
Message Broker and Event Sourcing
- Stream meaning
- Stream Processing
- Event
- Messaging System
- Direct Messaging System
- Message Broker
- Consumers
- Log-Based Message Broker
- Change data capture
- Event Sourcing
- Links
- References
Stream meaning
- 스트림이란 시간 흐름에 따라 점진적으로 생산된 데이터를 일컫는다.
- 이벤트 스트림은 일괄 처리 데이터와는 반대로 한정되지 않고 점진적으로 처리된다.
Stream Processing
스트림 처리의 기본 개념은 고정된 시간 조각이라는 개념을 완전히 버리고 단순히 이벤트가 발생할 때마다 처리 해야 한다는 것이다.
Event
스트림 처리 문맥에서 레코드(record) 는 보통 이벤트(event) 라고 하지만 특정 시점에 일어난 사건에 대한 세부 사항을 포함하는, 작고 독립된 불변 객체라는 점에서 본질적으로 동일하다. 이벤트는 일반적으로 이벤트 발생 타임 스탬프(event occurrence timestamp) 를 포함한다.
이벤트는 텍스트 문자열이나 JSON 또는 이진 형태로 부호화된다. 즉, 이벤트는 부호화 과정을 통해 JSON 과 같은 타입으로 저장된다. 또한 이벤트를 다른 노드에서 처리하게끔 네트워크를 통해 전송할 수 있다.
Producer and Consumer
생산자(Producer, [발행자(publisher), 발송자(sender)])가 이벤트를 한 번 만들면 해당 이벤트를 복수의 소비자(Consumer, [구독자(subscriber), 수신자(recipient)]) 가 처리할 수 있다.
파일 시스템에서는 관련 레코드 집합을 파일 이름으로 식별하지만 스트림 시스템에서는 대게 토픽(Topic) 이나 스트림(Stream) 으로 관련 이벤트를 묶는다.
- e.g Redis 를 활용하여 Producer 가 이벤트를 Redis 에 기록하고, Consumer 가 이를 소비하는 방식으로 스트림 처리를 구현할 수도 있다. 또는 MongoDB 를 통해서도 가능하다. 데이터베이스를 사용해야 하면 Trigger 를 사용할 수 있다.
Messaging System
메시징 시스템을 구축하는 가장 간단한 방법은 생산자와 소비자 사이에 Unix pipe 나 TCP Connection 과 같은 통신 채널을 사용하는 방법이다. Unix pipe 나 TCP 는 전송자와 수신자 1:1 매핑을 한다. 반면 메시징 시스템은 다수의 생산자 노드가 같은 토픽으로 메시지를 전송할 수 있고 다수의 소비자 노드가 토픽 하나에서 메시지를 받아갈 수 있다.
Backpressure
생산자가 소비자가 메시지를 처리하는 속도보다 빠르게 메시지를 전송하는 경우의 대안:
- 시스템은 메시지를 버리거나 큐에 메시지를 버퍼링하거나, 생산자가 메시지를 더 보내지 못하게 막는다. 이를 배압(Backpressure) 또는 흐름 제어(Flow control) 라고 한다.
메시지가 큐에 버퍼링될 때 큐 크기가 증가함에 따라 어떤 현상이 생기는지 이해하는 것이 중요하다. 큐 크기가 메모리 크기보다 커지면 시스템이 중단되는지?, 메시지를 디스크에 쓰는지?, 디스크에 쓴다면 디스크 접근이 메시징 시스템의 성능에 어떤 영향을 주는지 등
노드가 죽거나 일시적으로 오프라인이 되면 어떻게 되는지
- 디스크에 기록하거나 복제본을 생성하거나 둘 다 해야 한다.
Direct Messaging System
직접 메시징 시스템(생산자에서 소비자로 메시지를 직접 전달)에는 아래와 같은 것들이 있다.
- UDP 멀티캐스트
- 브로커가 필요 없는 메시징 라이브러리
- TCP 또는 IP 멀티캐스트 상에서 발행/구독 메시징을 구현
- UDP 메시징
- Webhook - 서비스 Callback URL 을 다른 서비스에 등록하는 방식
단점으로는 소비자가 오프라인이면 메시지를 전달하지 못하는 상태에 있는 동안 전송된 메시지를 유실할 수 있다.
Message Broker
직접 메시징 시스템의 대안으로 널리 사용되는 방법이 메시지 브로커(Message Broker) 이다. 메시지 브로커는 근본적으로 메시지 스트림 처리하는 데 최적화된 데이터베이스의 일종이다. 메시지 브로커는 서버로 구동된다.
생산자는 브로커로 메시지를 전송하고 소비자는 브로커에서 메시지를 읽는다. 생산자가 메시지를 보낼 때 생산자는 브로커가 해당 메시지를 버퍼에 넣었는지만 확인하고 소비자가 메시지를 처리하기까지 기다리지 않는다. 메시지를 소비자로 배달하는 것은 정해지지 않는 미래 시점이지만(대개는 순식간에) 때로는 큐에 백로그가 있다면 상당히 늦을 수도 있다.
어떤 메시지 브로커는 XA or JTA 를 이용해 2PC 를 수행하기도 한다.
Consumers
Load Balancing
각 메시지는 소비자 중 하나 로 전달된다. 따라서 소비자들은 해당 토픽의 메시지를 처리하는 작업을 공유한다. 브로커는 메시지를 전달할 소비자를 임의로 지정한다. 이 패턴은 메시지를 처리하는 비용이 비싸서 처리를 병렬화하기 위해 소비자를 추가 하고 싶을 때 유용하다. JMS 에서는 이 방식을 공유 구독(shared subscription) 이라 한다.
Fan out
각 메시지는 모든 소비자에게 전달된다. 팬 아웃 방식을 사용하면 여러 독립적인 소비자가 브로드캐스팅된 동일한 메시지를 서로 간섭 없이 청취할 수 있다. 이것은 같은 입력 파일을 읽어 여러 다른 일괄 처리 작업에서 사용하는 것 과 동일하다. 이 기능은 JMS 에서 Topic 구독으로 제공된다.
이 두 가지 패턴은 함께 사용 가능하다. 예를 들어 두 개의 소비자 그룹에서 하나의 토픽을 구독하고 각 그룹은 모든 메시지를 받지만 그룹 내에서는 각 메시지를 하나의 노드만 받게하는 식이다. 즉, 여러 컨슈머가 동일 토픽에서 메시지를 읽을 때 사용하는 주요 패턴 이 Load Balancing and Fan out 이다.
카프카의 경우, Consume 될 때 카프카에서 메시지가 제거되지 않아서 소비자를 여럿 추가 할 수 있고 각 컨슈머는 자체 메시지 오프셋을 유지 관리할 수 있다.
카프카에서는 컨슈머 그룹내에서 하나의 컨슈머가 토픽의 파티션을 구독하는 경우 그룹내의 다른 컨슈머가 파티션을 구독할 수 없다. 대신 다른 그룹이라면 가능하다.
Log-Based Message Broker
로그 기반 메시지 브로커(log-based message broker)의 기본 아이디어는 데이터베이스의 지속성 있는 저장 방법과 메시징 시스템의 지연 시간이 짧은 알림 기능을 조합 하려는 노력에서 파생되었다.
Producer 가 보낸 메시지는 로그 끝에 추가하고 소비자는 로그를 순차적으로 읽어 메시지를 받는다. 이것이 Kafka 의 기본 아이디어다. 소비자가 로그 끝에 도달하면 새 메시지가 추가됐다는 알림을 기다린다.
Partitioning
디스크 하나를 쓸 때보다 처리량을 높이기 위해 확장하는 방법으로 로그를 파티셔닝(Partitioning) 하는 방법이 있다. 각 파티션은 다른 파티션과 독립적으로 읽고 쓰기가 가능한 분리된 로그가 된다. 토픽은 같은 형식의 메시지를 전달하는 파티션들의 그룹 이다. 각 파티션 내에서 브로커는 모든 메시지에 오프셋(offset) 이라고 부르는 단조 증가하는 순번을 부여한다. 파티션이 추가 전용이고 따라서 파티션 내 전체 메시지는 전체 순서가 있기 때문에 순번을 부여하는 것은 타당하다. 단, 다른 파티션 간 메시지의 순서는 보장하지 않는다.
Apache Kafka 가 이러한 방식으로 동작하는 로그 기반 메시지 브로커다. 구글 클라우드 Pub/Sub 은 아키텍처는 비슷하지만 노출된 API 는 로그 추상화가 아닌 JMS 형식이다.
Load Balancing and Fan out
로그 기반 메시지 브로커는 팬아웃 방식을 제공하기 때문에 소비자가 서로 영향 없이 독립적으로 로그를 읽을 수 있고 메시지를 읽어도 로그에서 삭제되지 않는다.
Kafka Consumer Multi-Thread Strategy
하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당된다. 그리고 하나의 컨슈머는 여러 파티션에 할당될 수 있다. 이런 특징을 가장 잘 살리는 방법은 1개의 애플리케이션에 구독하고자 하는 토픽의 파티션 개수만큼 컨슈머 스레드 개수를 늘려서 운영하는 것이다.
컨슈머 스레드를 늘려서 운영하면 각 스레드에 각 파티션이 할당되며 파티션의 레코드들을 병렬처리할 수 있다.
여기서 주의해야 할 점은 구독하고자 하는 토픽의 파티션 개수만큼만 컨슈머 스레드를 운영하는 것이다.
1개의 애플리케이션에 n 개의 컨슈머 스레드를 띄울 수 있다.
Consumer offset
파티션 하나를 순서대로 처리하면 메시지를 어디까지 처리했는지 알기 쉽다. 소비자의 현재 오프셋보다 작은 오프셋을 가진 메시지는 이미 처리한 메시지고 소비자의 현재 오프셋보다 큰 오프셋을 가진 메시지는 아직 처리하지 않은 메시지다. 따라서 브로커는 모든 개별 메시지마다 보내는 확인 응답을 추적할 필요가 없다. 단지 주기적으로 오프셋을 기록하면 된다.
확인 응답이란 메시지를 잃어버리지 않기 위해 클라이언트가 메시지 처리가 끝났을 때 브로커가 메시지를 큐에서 제거할 수 있게 브로커에게 명시적으로 알리는 것을 의미한다.
Change data capture
변경 데이터 캡처(Change data capture, CDC)는 데이터베이스에 기록하는 모든 데이터 변화를 관찰해 다른 시스템으로 데이터를 복제할 수 있는 형태로 추출하는 과정이다. CDC 는 데이터가 기록되자마자 변경 내용을 스트림으로 제공할 수 있으면 특히 유용하다. 예를 들면 데이터베이스의 변경 사항을 캡처해 검색 색인에 꾸준히 반영할 수 있다.
CDC 는 본질적으로 변경 사항을 캡처할 데이터베이스 하나를 리더로 하고 나머지를 팔로워로 한다. 로그 기반 메시지 브로커는 원본 데이터베이스에서 변경 이벤트를 전송하기에 적합하다. 메시지 순서를 유지하기 때문이다. CDC 를 구현하는데 데이터베이스 Trigger 를 사용하기도 한다.
Bottled Water 는 쓰기 전 로그를 복호화하는 API 를 사용해 PostgreSQL 용 CDC 를 구현하고 있다. Maxwell 과 Debezium 은 binlog 를 파싱해 유사한 방식으로 MySQL 용 CDC 를 구현한다.
Snapshot
데이터베이스 스냅숏은 변경 로그의 위치나 오프셋에 대응돼야 한다. 그래야 스냅숏 이후에 변경 사항을 적용할 시점을 알 수 있다.
Log Compaction
로그 컴팩션의 원리는 간단하다. 저장 엔진은 주기적으로 같은 키의 로그 레코드를 찾아 중복을 제거하고 각 키에 대해 가장 최근에 갱신된 내용만 유지한다. 컴팩션과 병합은 백그라운드로 실행된다. 로그 구조화 저장 엔진에서 특별한 널 값(툼스톤(tombstone)) 으로 갱신하는 것은 키의 삭제를 의미하고 로그 컴팩션을 수행할 때 실제로 값을 제거한다.
로그 컴팩션은 각 레코드의 최신 버전만을 보유하고 덮어 쓰여진 버전은 삭제한다.
Event Sourcing
이벤트 소싱(Event Sourcing)은 DDD 에서 개발한 기법이다. 이벤트 소싱은 변경 데이터 캡처와 유사하게 애플리케이션 상태 변화를 모두 변경 이벤트 로그로 저장한다. 이때 이벤트 저장은 단지 추가만 가능하고 갱신이나 삭제는 권장하지 않거나 금지한다. 즉, 애플리케이션 로직은 이벤트 로그에 기록된 불변 이벤트를 기반으로 구축된다.
이벤트 소싱을 사용하면 어떤 상황이 발생한 후에 상황 파악이 쉽기 때문에 디버깅에 도움이 되고 애플리케이션 버그를 방지한다.
이벤트 소싱의 철학은 이벤트(Event) 와 명령(Command) 을 구분하는 데 주의한다. 애플리케이션은 먼저 명령이 실행 가능한지 확인해야 한다. 무결성이 검증되고 명령이 승인되면 명령은 지속성 있는 불변 이벤트가 된다.
이벤트는 생성 시점에 사실(fact) 이 된다. 사용자가 나중에 예약을 변경하거나 취소하더라도 이전에 특정 좌석을 예약 했다는 사실은 달라지지 않는다.
Links
- Simple & Easy Notification Service #2 : 메시지 발송 처리를 위한 Kafka 사용기
- Exactly-Once Semantics Are Possible: Here’s How Kafka Does It
References
- 데이터 중심 애플리케이션 설계 / Martin Kleppmann 저 / 위키북스
- 아파치 카프카 애플리케이션 프로그래밍 / 최원영 저 / 비제이퍼블릭