Database/DynamoDB

DynamoDB Stream 중복 처리 문제

소농배 2024. 11. 15. 10:19

DynamoDB Stream 은 Item 의 변경이 생겼을때 변경에 대한 Record 가 Stream 으로 전달되고, AWS Lambda , Kinesis Client 등으로 Record 를 전달 받아 Process 할 수 있는 기능이다.

 

 

kinesis client version => com.amazonaws:amazon-kinesis-client:1.14.4

 

Kinesis Client 를 사용하여 Stream Record 를 전달받는 경우, 아래와 같이 CheckPointer 를 사용하여 전달받은 Record 처리를 완료했다는 것을 알리게 된다.

    @Override
    public void processRecords(List<Record> list, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        // do something
        iRecordProcessorCheckpointer.checkpoint();
    }

 

AWS 에서는 Process 를 처리한 후에 checkpoint 를 하기를 권장한다. 따라서, 아래 문서에 설명하고 있는 것 처럼 Process 를 진행한 후에 모종의 이유(배포)로 checkpoint() 가 호출되지 못 한 경우, 다른 Worker 가 해당 Lease 의 Owner 가 되면서 해당 Record 가 다시한번 처리될 가능성이 충분히 존재한다.

https://docs.aws.amazon.com/ko_kr/streams/latest/dev/kinesis-record-processor-duplicates.html

 

위와 같이 checkpoint() 가 호출되지 못하여 다른 Worker 가 해당 Record 를 처리하는 경우는 매우 짧은 시간에 발생하기 때문에 Record 의 순서를 보장하는 DynamoDB Stream 에서는 문제가 발생할 가능성이 매우 적다.

 

하지만, 아래와 같이 student 테이블의 데이터가 변경이 생겼을때 Stream 을 통해 count 를 집계하는 구조로 동작중인 아키텍쳐에서 student 테이블과 student_count 테이블의 데이터 불일치가 발생하여 원인을 파악하기 시작하였다.

 

 

불일치가 발생한 데이터가 발생한 시점에 특정 클라이언트의 네트워크의 문제가 있어서 약 12시간 동안 통신이 불가능했었다. 

 

 

네트워크의 문제가 발생한 Worker 는 12시간동안 Stream 을 처리하지 못 하였고 당연히 또 다른 Worker 가 Lease 를 할당 받아 기존 Worker 에 문제가 발생한 시점부터 Record 를 처리하고 있었다.

 

네트워크의 문제가 발생한 Worker 의 네트워크 문제가 해결되어 다시 통신이 가능해진 시점에, 네트워크에 문제가있던 시점인 12시간 전의 Record 를 읽어와서 처리하는 현상이 확인되었다. 12시간 전 데이터를 일부분만 가져와서 처리해버리니, student 테이블과 student_count 테이블의 데이터 불일치가 발생하게 된 것이다.

 

12시간 전 일부분 데이터만 읽어와서 처리하고 Worker 가 제거된 사유는 아래와 같다.

  1. 아래에서 설명할 이유로 12시간 이전 데이터를 읽어와서 처리함
  2. Stream Record 는 최대 1MB 의 데이터만 읽어올 수 있음. student 테이블의 Item 하나당 데이터 사이즈가 크기 때문에 150개 데이터만 Record 로 읽어올 수 있었음.
  3. checkpoint() 를 호출하였지만, 해당 lease 의 Owner 는 이미 다른 Worker 이므로 ShutdownException 발생
  4. ShutdownException 을 받고 Processor 종료.

 

네트워크 문제가 발생할 당시의 데이터를 복구된 후에 다시 불러온 이유

 

Kinesis Client 의 ProcessTask.getRecordsResult() 함수를 호출하여 Record 를 불러오게 된다.

    private ProcessRecordsInput getRecordsResult() {
        try {
            return this.getRecordsResultAndRecordMillisBehindLatest();
        } catch (ExpiredIteratorException var5) {
            LOG.info("ShardId " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", var5);
            MetricsHelper.getMetricsScope().addData("ExpiredIterator", 1.0, StandardUnit.Count, MetricsLevel.SUMMARY);
            this.dataFetcher.advanceIteratorTo(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber(), this.streamConfig.getInitialPositionInStream());

            try {
                return this.getRecordsResultAndRecordMillisBehindLatest();
            } catch (ExpiredIteratorException var4) {
                String msg = "Shard " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException with a fresh iterator.";
                LOG.error(msg, var4);
                throw var4;
            }
        }
    }

 

네트워크가 복구될 당시에 ExpiredIteratorException 이 대량 발생하였고, 해당 예외가 발생하였을때 Iterator 값을 LagestPermittedCheckpointValue 로 변경하는 것을 확인할 수 있다. 

 

final ProcessRecordsInput processRecordsInput = getRecordsResult();
throttlingReporter.success();
List<Record> records = processRecordsInput.getRecords();

records = deaggregateRecords(records);

recordProcessorCheckpointer.setLargestPermittedCheckpointValue(
	filterAndGetMaxExtendedSequenceNumber(scope, records,
   	recordProcessorCheckpointer.getLastCheckpointValue(),
	recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));

 

LargestPermittedCheckpointValue 값은 getReordsResult() 한 결과를 저장하는 것을 볼 수 있다.

즉, ExpiredIteratorException 이 발생할 경우 해당 Worker 가 마지막으로 불러온 Record 의 Checkpoint 를 로컬에 저장하고 있다가 그 지점부터 다시 Record 를 불러오는 것이다.

이로인해, 네트워크가 복구된 시점에 네트워크에 문제가 발생할 시점의 Record 를 다시 처리하게 되었고 그 결과 두 테이블간의 데이터 불일치가 발생하였다.

 

해결방법

  1. student_count 테이블을 처리하는 로직에서 중복 처리를 방지.
    • 이미 Worker 가 많은 Stream 을 처리하고 있다면 하나하나 적용하기 힘듬
    • Processor 가 DB 에 저장하는게 아니라면, 중복 처리를 방지하는데 한계가 있음. ex) Kafka message 발행
  2. Record.getApproximateArrivalTimeStamp() 값이 특정 시간 이후라면, 유효하지 않은 Record 로 판단하고 Processor 에서 처리하지 않고 Skip
    • 만약 Processor 의 처리가 지연되면서 적용한 조건보다 Processor 가 받은 Record 시간이 길어진다면 정상 Record 를 처리하지 않게되는 문제 발생
  3. Lease Table 에 접근하여 Owner 가 현재 Worker 인지 확인
    • Record 를 처리하기위해 매번 Lease 테이블에 접근하는건 매우 비효율적.