saay, hi

중복 취득 확인 테스트 본문

kafka 테스트/데이터 테스트

중복 취득 확인 테스트

saay-hi 2024. 6. 27. 13:55
TEST 목적 개별 consumer의 데이터 취득 확인 및 중복 취득 확인
TEST 시나리오  1)    Producer : 10만 개 topic 전송
2)    Consumer : consumer1 실행 후 topic 모두 받지 않고, 중단 후 consumer 2 실행
3)    Consumer group 조회 : consumer1 실행 후, 중단 시, consumer group의 lag 및 consumer id 확인, Conusmer2 실행 시작 전, 시작 후 consumer group의 lag 및 consumer id 확인

* 정확도를 높이기 위해 같은 test를 총 6번 진행하였으며, test5부턴 consumer1,2에게 개별 instance.id ( consumer.id와 쓰임이 유사하며 실제 group.instance.id가 개별 consumer.id 역할을 함 )를 주어 이전 consumer의 offset을 중복으로 받는지 여부를 확인하기 위해 진행됨 (뒤에 변경된 consumer 소스 추가됨)
TEST 구성

(옵션) [ 설정값 변경 ( topic 취득을 육안으로 확인하기 위해 데이터 취득하는 시간을 sleep 을 이용하여 천천히 취득) ]

broker / zookeeper default
consumer1.java

public class consumerTest1 {
    //public void receiveFromKafka() {
        public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.105:9092");
        props.put("group.id", "basic-consumer");
        props.put("group.instance.id", "conusmer1");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                System.out.println("records:"+records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                Thread.sleep(10000);
            }
        } catch(Exception ex){
        } finally {
            consumer.close();
        }
consumer2.java public class consumerTest1 {
    //public void receiveFromKafka() {
        public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.105:9092");
        props.put("group.id", "basic-consumer");
        props.put("group.instance.id", "conusmer2");
        props.put("enable.auto.commit", "true");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                System.out.println("records:"+records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                Thread.sleep(10000);
            }
        } catch(Exception ex){
        } finally {
            consumer.close();
        }

 


[ test 방법 ]

1.consumer1 기동 후 일시정지 후 마지막 log에 남은 offset 확인


Topic: basic-consumer, Partition: 0, Offset: 4376997, Key: 46999, Value:

2.consumer 2 기동

records:500
Topic: basic-consumer, Partition: 0, Offset: 4376498, Key: 46500, Value:

 

3.consumer group 조회하는 cli 명령어를 통해 consumer 1  /2 상태 조회

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
basic-consumer  basic-consumer  0          4376498         4929998         553500         1stConsumer-4af995b0-956e-432a-ae60-7b3866032270 /182.168.56.105 consumer-basic-consumer-1stConsumer



GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
basic-consumer  basic-consumer  0          4376498         4929998         553500         2ndConsumer-cff0e8c8-42af-46c1-b60d-8892588aba99 /192.168.56.105 consumer-basic-consumer-2ndConsumer

[ 결과 및 의견 ]

consumer1이 중단된 뒤, consumer2를 실행할 경우 바로 다음 offset으로 데이터를 취득하는 경우도 있었으나 consume1의 -500~2500 가량의 이전 offset부터 데이터를 취득하기도 함.

그러므로 같은 그룹 내에서 consumer2는 consumer1에서 끊긴 offset을 이어서 습득하는 것은 맞으나, 바로 다음 offset부터 데이터 취득하는 것만은 아님. 

따라서 topic을 어느정도 중복취득을 할 수 있음. consumer AP에서 enable.auto.commit을 false로 변경할 경우 수동 commit으로 전환 가능

다만 offset의 어느정도 단위가 고정적으로 중복되는지는 test마다 달라서 중복 크기는 고정적이지 않기때문에 중복 취득으로 인한 topic log관리는 취득하고자 하는 것보다 조금 더 여유롭게 준비해야한다고 생각함.