TEST 목적 | 개별 consumer의 데이터 취득 확인 및 중복 취득 확인 |
TEST 시나리오 | 1) Producer : 10만 개 topic 전송 2) Consumer : consumer1 실행 후 topic 모두 받지 않고, 중단 후 consumer 2 실행 3) Consumer group 조회 : consumer1 실행 후, 중단 시, consumer group의 lag 및 consumer id 확인, Conusmer2 실행 시작 전, 시작 후 consumer group의 lag 및 consumer id 확인 * 정확도를 높이기 위해 같은 test를 총 6번 진행하였으며, test5부턴 consumer1,2에게 개별 instance.id ( consumer.id와 쓰임이 유사하며 실제 group.instance.id가 개별 consumer.id 역할을 함 )를 주어 이전 consumer의 offset을 중복으로 받는지 여부를 확인하기 위해 진행됨 (뒤에 변경된 consumer 소스 추가됨) |
TEST 구성 |
![]() |
(옵션) [ 설정값 변경 ( topic 취득을 육안으로 확인하기 위해 데이터 취득하는 시간을 sleep 을 이용하여 천천히 취득) ]
- consumer 소스에 group.instance.id && sleep 추가
broker / zookeeper | default |
consumer1.java |
public class consumerTest1 { //public void receiveFromKafka() { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.105:9092"); props.put("group.id", "basic-consumer"); props.put("group.instance.id", "conusmer1"); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); … try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); System.out.println("records:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } Thread.sleep(10000); } } catch(Exception ex){ } finally { consumer.close(); } … |
consumer2.java | public class consumerTest1 { //public void receiveFromKafka() { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.105:9092"); props.put("group.id", "basic-consumer"); props.put("group.instance.id", "conusmer2"); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); … try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); System.out.println("records:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic: %s, Partition: %s, Offset: %d, Key: %s, Value: %n\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } Thread.sleep(10000); } } catch(Exception ex){ } finally { consumer.close(); } … |
[ test 방법 ]
1.consumer1 기동 후 일시정지 후 마지막 log에 남은 offset 확인
… Topic: basic-consumer, Partition: 0, Offset: 4376997, Key: 46999, Value: |
2.consumer 2 기동
records:500 Topic: basic-consumer, Partition: 0, Offset: 4376498, Key: 46500, Value: |
3.consumer group 조회하는 cli 명령어를 통해 consumer 1 /2 상태 조회
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID basic-consumer basic-consumer 0 4376498 4929998 553500 1stConsumer-4af995b0-956e-432a-ae60-7b3866032270 /182.168.56.105 consumer-basic-consumer-1stConsumer GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID basic-consumer basic-consumer 0 4376498 4929998 553500 2ndConsumer-cff0e8c8-42af-46c1-b60d-8892588aba99 /192.168.56.105 consumer-basic-consumer-2ndConsumer |
[ 결과 및 의견 ]
consumer1이 중단된 뒤, consumer2를 실행할 경우 바로 다음 offset으로 데이터를 취득하는 경우도 있었으나 consume1의 -500~2500 가량의 이전 offset부터 데이터를 취득하기도 함.
그러므로 같은 그룹 내에서 consumer2는 consumer1에서 끊긴 offset을 이어서 습득하는 것은 맞으나, 바로 다음 offset부터 데이터 취득하는 것만은 아님.
따라서 topic을 어느정도 중복취득을 할 수 있음. consumer AP에서 enable.auto.commit을 false로 변경할 경우 수동 commit으로 전환 가능
다만 offset의 어느정도 단위가 고정적으로 중복되는지는 test마다 달라서 중복 크기는 고정적이지 않기때문에 중복 취득으로 인한 topic log관리는 취득하고자 하는 것보다 조금 더 여유롭게 준비해야한다고 생각함.