saay, hi

3) consumer configs(consumer.properties) 본문

kafka 설치 및 구성/참고) configuration

3) consumer configs(consumer.properties)

saay-hi 2024. 6. 25. 13:32

[ default 옵션]

  옵션명 default 설명
1 bootstrap.servers localhost:9092 초기 kafka cluster에게 연결하기 위한 host/port 쌍의 list.
bootstrap.servers 를 등록해야 외부 서버에서 연결이 가능하며, 2개 이상 권장 ( consumer AP 끊기게 될 경우, 다른 consumer 로 연결하기 위해)

default : ""
2 group.id test-consumer-group 각 consumer가 속해있는 컨슈머 그룹들을 구별하기 위해 사용. 
이 속성은 컨슈머가 susbscribe(topic)사용에 의한 그룹 관리 기능이거나  카프카 기반의 offset 관리 정책를 사용할 경우 요청됨
dafult  : null

[ consumer AP 옵션 ]

  옵션명 default 이름
1 key.serializer   인터페이스를 구현하는 키에 대한 직렬 변환기 클래스
2 value.serializer   인터페이스를 구현하는 값에 대한 직렬 변환기 클래스
3 bootstrap.server   kafka cluster에 대한 초기 연결을 설정하는 데 사용할 호스트/포트 쌍 목록
클라이언트는 bootstraping을 위해 여기에 지정된 서버에 관계없이 모든 서버를 사용함. 이 목록은 전체 서버 세트를 검색하는 데 사용되는 초기 호스트에만 영향을 미침. 기본형식은 host1:port1. 이러한 서버는 전체 클러스터 구성원(동적으로 변경될 수 있음) 을 검색하기 위한 초기 연결에만 사용되므로 이 목록에는 전체 서버 집합이 포함될 필요가 없으나, 서버가 다운된 경우에는 두개 이상 필요
4 group.id   consumer가 속한 consumer group을 식별하는 고유 문자열
5 group.instance.id   최종 사용자가 제공한 consumer instance의 고유 식별자.
비어있지 않은 문자열만 허용되며,
이 값이 설정된 경우 consumer는 정적멤버로 처리됨.
이 id를 가진 인스턴스는 항상 consumer group에서 하나만 허용됨.
이 값을 설정하지않으면 consumer 는 전통적인 동작인
동적 구성원으로 그룹에 참여하게 됨
6 enable.auto.commit TRUE true인 경우, consumer의 offset이 백그라운드에서 주기적으로 커밋됨
7 auto.offset.reset. latest kafka에 초기 offset이 없거나 현재 offset이 서버에 더 이상 존재하지 않는 경우(ex 해당데이터가 삭제된 경우) 수행할 작업:
• offset을 가장 빠른 offset으로 자동 재설정함
• latest : offset을 최신 offset으로 자동 재설정함
• 없음 : consumer group 에 대해 이전 offset이 발견되지 않은 경우, consumer에게 예외를 발생시킴
• consumer 에게 예외를 던짐
이 구성은 latest로 설정하는 동안 partition번호를 변경하면 consumer가 offset을 재설정하기 전에 생성자가 새로 추가된 partition(즉, 초기 offset이 아직 존재하지 않음) 에 메시지를 보내기 시작할 수 있으므로 메시지 전달 손실이 발생할 수 있음 유의

 

[ default 그 외 공식문서 정리]

옵션명 default 설명
importance : high
key.serializer   인터페이스를 구현하는 키에 대한 직렬 변환기 클래스
value.serializer   인터페이스를 구현하는 값에 대한 직렬 변환기 클래스
bootstrap.server   kafka cluster에 대한 초기 연결을 설정하는 데 사용할 호스트/포트 쌍 목록
클라이언트는 bootstraping을 위해 여기에 지정된 서버에 관계없이 모든 서버를 사용함. 이 목록은 전체 서버 세트를 검색하는 데 사용되는 초기 호스트에만 영향을 미침. 기본형식은 host1:port1. 이러한 서버는 전체 클러스터 구성원(동적으로 변경될 수 있음) 을 검색하기 위한 초기 연결에만 사용되므로 이 목록에는 전체 서버 집합이 포함될 필요가 없으나, 서버가 다운된 경우에는 두개 이상 필요
group.id   consumer가 속한 consumer group을 식별하는 고유 문자열
enable.auto.commit TRUE true인 경우, consumer의 offset이 백그라운드에서 주기적으로 커밋됨
auto.offset.reset. latest kafka에 초기 offset이 없거나 현재 offset이 서버에 더 이상 존재하지 않는 경우(ex 해당데이터가 삭제된 경우) 수행할 작업:
• offset을 가장 빠른 offset으로 자동 재설정함
• latest : offset을 최신 offset으로 자동 재설정함
• 없음 : consumer group 에 대해 이전 offset이 발견되지 않은 경우, consumer에게 예외를 발생시킴
• consumer 에게 예외를 던짐
이 구성은 latest로 설정하는 동안 partition번호를 변경하면 consumer가 offset을 재설정하기 전에 생성자가 새로 추가된 partition(즉, 초기 offset이 아직 존재하지 않음) 에 메시지를 보내기 시작할 수 있으므로 메시지 전달 손실이 발생할 수 있음 유의
유효한 값 : latest, earliest, none
     
fetch.min.bytes 1 서버가 가져오기 요청에 대해 반환해야 하는 최소 데이터 양
사용할 수 있는 데이터가 충분하지 않은 경우 요청은 요청에 응답하기 전에 그만큼의 데이터가 누적될 때까지 기다림
기본 설정인 1바이트는 해당 바이트의 데이터를 사용할 수 있게 되자마자 가져오기 요청에 응답하거나 데이터가 도착할 때까지 기다리는 동안 가져오기 요청이 시간 초과됨을 의미함
이 옵션의 값을 더 큰 값으로 설정하면 서버가 더 많은 양의 데이터가 누적될 때까지 기다리게 되므로 대기시간이 추가되는 대신, 서버 처리량이 약간 향상될 수 있음
유효한 값 : 0,...
group.protocol classic group protocol consumer 는 사용해야하는 옵션.
현재 'classic','consumer'를 지원함
만약 'consumer'로 지정할 경우, consumer group protocol은 사용되어지지만,
그렇지 않으면 classic group protocol이 사용됨
유효한 값 : consumer, classic
heartbeat.interval.ms 3000 (3 seconds) Kafka의 그룹 관리 기능을 사용할 때 consumer 코디네이터에 대한 하트비트 사이의 예상 시간
하트비트는 consumer의 세션이 활성 상태로 유지되도록 하고 새 consumer가 그룹에 가입하거나 떠날 떄 재조정을 용이하게 하는 데 사용됨. 값은 session.timeout.ms보다 낮게 설정해야 하지만, 일반적으로 해당 값의 1/3보다 높게 설정해서는 안됨. 정상적인 재조정에 대한 예상 시간을 제어하기 위해 더 낮게 조정할 수 있음
max.partition.fetch.bytes 1048576 (1 mebibyte) 서버가 반환할 파티션당 최대 데이터 양
레코드는 consumer가 일괄적으로 가져옴. Fetch(가져오기)의 비어있지 않은 첫 번째 파티션에 있는 첫 번째 레코드 일괄처리가 이 제한보다 큰 경우,
consumer가 진행할 수 있도록 일괄 처기라 계속 반환됨. 브로커가 허용하는 최대 레코드 배치 크기는 message.max.bytes(broker config) 또는 max.message.bytes를 통해 정의됨. Consumer요청 크기그를 제한하는 방법은 fetch.max.byte참조
session.timeout.ms 45000 (45 seconds) kafka의 그룹 관리 기능을 사용할 때 클라이언트 오류를 감지하는데 사용되는 시간 제한.
클라이언트는 주기적인 하트비트를 전송하여 브로커에 활성 상태를 나타냄.
이 세션 제한시간이 만료되기 전에 브로커가 하트비트를 수신하지 않으면, 브로커는 이 클라이언트를 그룹에서 제거하고 재조정을 시작함.
이 값은 브로커 구성에서 group.min.session.timeout.ms  group.max.session.timeout.ms로 구성된 허용 범위 내에 있어야 함
importance : medium
group.instance.id   최종 사용자가 제공한 consumer instance의 고유 식별자.
비어있지 않은 문자열만 허용되며,
이 값이 설정된 경우 consumer는 정적멤버로 처리됨.
이 id를 가진 인스턴스는 항상 consumer group에서 하나만 허용됨을 의미.
이는 일시적인 사용불가(ex: 프로세스 재시작)으로 인한 그룹 재조정을 방지하기 위해 더 큰 세션 시간 제한과 함께 사용할 수 있음.
이 값을 설정하지않으면 consumer 는 기존 동작인
동적 구성원으로 그룹에 참여하게 됨.
allow.auto.create.topics TRUE topic을 구독하거나 할당할 때 브로커에서 자동 topic 생성을 허요함.
구독중인 topic은 브로커가 'auto.create.topics.enable'의 브로커 구성을 사용하여 허용하는 경우에만 자동으로 생성됨.
0.11.0 이전 브로커를 사용하는 경우 'false'로 설정해야 함
client.dns.lookup use_all_dns_ips 클라이언트가 DNS 조회를 사용하는 방법을 제어함
use_all_dns_ips로 설정돼 있을 경우, 성공적으로 연결될 때까지 반환된 각 IP 주소에 순서대로 연결함
연결이 끊기면 다음 IP가 사용되며, 모든 IP가 한 번 사용되면 클라이언트는 호스트 이름에서 IP를 다시 확인함
(다만 JVM 및 OS 캐시 DNS 이름 모두 조회함)
resolve_canonical_bootstrap_servers_only로 설정된 경우,
각 부트스트랩 주소를 정식 이름 목록으로 확인하며, 부트스트랩 단계 이후에는 use_all_dns_ips와 동일하게 동작함
유효한 값 : use_all_dns_ips, resolve_canonical_bootstrap_servers_only
connections.max.idle.ms 540000 (9 minut) 이 옵션은 지정된 시간(밀리초) 후에 유휴 연결을 닫음
default.api.timeout.ms 60000(1minutes) 클라이언트 API에 대한 시간 제한(밀리초)을 지정함
이 옵션은 timeout 매개 변수를 지정하지 않는 모든 클라이언트 작업에 대한 기본시간 제한으로 사용됨
enable.auto.commit TRUE true이면 소비자의 오프셋이 백그라운드에서 주기적으로 커밋됨
exclude.internal.topics TRUE 구독 패턴과 일치하는 내부 항목을 구독에서 제외해야 하는지 여부
fetch.max.bytes 52428800 (50 mebibytes) 서버가 가져오기 요청에 대해 반환해야 하는 최대 데이터 양
레코드는 소비자에 의해 일괄적으로 fetch되며, fetch의 비어있지 않은 첫 번째 파티션에 있는 첫 번째 레코드 배치가 이 값보다 큰 경우 소비자가 진행할 수 있도록 레코드 batch가 계속 반황됨
따라서 이것은 절대적인 최대값이 아님
브로커가 허용하는 최대 레코드 배치크기는 message.max.bytes(broker config) 또는 max.message.bytes(topic config)를 통해 정의됨.
consumer는 여러 가져오기를 병렬로 수행함
group.remote.assignor null 사용할 서버 측 할당자
할당자가 지정되지 않은 경우 그룹 코디네이터가 지정함.
이 옵션은 group.protocol가 "consumer"설정된 경우에만 적용됨
isolation.level read_uncommitted 트랜잭션으로 작성된 메시지를 읽는 방법을 제어
read_committed로 설정하면 consumer.poll()은 커밋된 트랜잭션 메시지만 반환함
read_uncommitted (기본값)으로 설정하면, consumer.poll()은 중단된 트랜잭션 메시지를 포함한 모든 메시지를 반환함
비트랜잭션 메시지는 두 모드 중 하나에서 무조건 반환됨
메시지는 항상 오프셋 순서로 반홤됨. 따라서 read_committed mode에서 consumer.poll()은 첫 번째 열린 트랜잭션의 오프셋보다 작은 마지막 안정 오프셋(LSO)까지만 메시지를 반환함
특히, 진행 중인 트랜잭션에 속한 메시지 이후에 나타나는 모든 메시지는 해당 트랜잭션이 완료될 때까지 보류됨.
그 결과 read_committed consumer는 항공편 거래가 있을 때 상위 워터마크까지 읽을 수 없음
또한 read_committed일 때, seekToEnd 메서드에서 LSO를 반환함
유효한 값 : read_committed, read_uncommitted
max.poll.interval.ms 300000 (5 minutes) 이용자 그룹 관리를 사용할 때 poll() 호출 사이의 최대 지연
이렇게하면, consumer가 더 많은 레코드를 가져오기 전에 유휴 상태일 수 있는 시간의 상한이 설정됨
이 제한시간이 만료되기 전에 poll()을 호출하지 않으면 소비자가 실패한 것으로 간주되고 그룹은 파티션을 다른 멤버에 재할당하기 위해 재조정함
이 시간 제한에 도달하는 null이 아닌 값을 group.instance.id를 사용하는 소비자의 경우 파티션이 즉시 다시 할당되지 않음
대신 consumer는 하트비트 전송을 중지하고 session.timeout.ms값이 만료된 후 파티션이 다시 할당됨
이는 종료된 정적 소비자의 동작을 미러링함
max.poll.records 500 poll()에 대한 단일 호출에서 반환되는 최대 레코드 수
이 max.poll.records는 기본 가져오기 동작에 영향을 주지 않음
consumer는 각 가져오기 요처에서 레코드를 캐시하고 폴링에서 점진적으로 반환함
partition.assignment.strategy class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor 그룹 관리를 사용할 때 클라이언트가 consumer 인스턴스 간에 파티션 소유권을 분산하는데 사용할 지원되는 파티션 할당 전략의 클래스 이름 또는 클래스 형식목록( 기본 설정에 따라 다름)
[사용가능한 옵션]
1) org.apache.kafka.clients.consumer.RangeAssignor: 토픽별로 파티션을 할당
2) org.apache.kafka.clients.consumer.RoundRobinAssignor: 라운드 로빈 방식으로 소비자에게 파티션을 할당
3) org.apache.kafka.clients.consumer.StickyAssignor: 가능한 한 많은 기존 파티션 할당을 유지하면서 최대한으로 균형 잡힌 할당을 보장
4) org.apache.kafka.clients.consumer.CooperativeStickyAssignor: 동일한 StickyAssignor 논리를 따르지만 협조적 재조정을 허용
기본 할당자는 기본적으로 RangeAssignor를 사용하지만 목록에서 RangeAssignor를 제거하는 단일 롤링 바운스만으로 CooperativeStickyAssignor로 업그레이드할 수 있는 [RangeAssignor, CooperativeStickyAssignor]
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor인터페이스를 구현하면 사용자 지정 할당 전략을 연결할 수 있음
유효한 값 : non-null string
receive.buffer.bytes 65536 (64 kibibytes) 데이터를 읽을 때 사용할 TCP 수신 버퍼(SO_RCVBUF)의 크기
값이 -1이면 OS 기본값이 사용됨
유효한 값 : -1,…
request.timeout.ms 30000 (30 seconds) 클라이언트가 요청 응답을 기다리는 최대 시간을 제어하는 옵션
제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 필요한 경우 요청을 다시 보내거나 재시도가 소진되면 요청에 실패함
security.protocol PLAINTEXT 브로커와 통신하는 데 사용되는 프로토콜
유효한 값 : SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT
send.buffer.bytes 131072 (128 kibibytes) 데이터를 보낼 때 사용할 TCP 송신 버퍼(SO_SNDBUF)의 크기
값이 -1이면 OS 기본값을 사용함
유효한 값 : -1…
socket.connection.setup.timeout.max.ms 30000 (30 seconds) 소켓 연결이 설정될 때까지 클라이언트가 대기하는 최대 시간
연결 설정 시간 제한은 이 최대값까지 각 연속 연결 실패에 대해 기하급수적으로 증가함
연결 스톰을 방지하기 위해 시간 제한에 임의화 계수 0.2가 적용되어 계산된 값보다 20% 낮거나 20% 높은 임의 범위가 됨
socket.connection.setup.timeout.ms 10000 (10 seconds) 소켓 연결이 설정될 때까지 클라이언트가 대기하는 시간
제한시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음
이 값은 초기 백오프 값이며 각 연속 연결 실패에 대해 이 socket.connection.setup.timeout.max.ms값까지 기하급수적으로 증가함
importance : low
auto.commit.interval.ms 5000 (5 seconds) enable.auto.commit가 true 로 설정된 경우 소비자 오프셋이 Kafka에 자동 커밋되는 빈도(밀리초)
auto.include.jmx.reporter TRUE 사용되지 않음
check.crcs TRUE 사용된 레코드의 CRC32를 자동으로 확인함.
이렇게 하면 메시지에 대한 온더와이어 또는 온디스크 손상이 발생하지 않음
이 검사는 약간의 오버헤드를 추가하므로 극한의 성능을 원하는 경우 사용하지 않도록 설정할 수 있음
client.id "" 요청을 할 때 서버에 전달할 id 문자열
이것의 목적은 논리적 애플리케이션 이름이 서버 측 요청 로깅에 포함되도록 허용하여 IP/포트 이외의 요청 소스를 추적할 수 있도록 하는 것임
client.rack "" 이 클라이언트의 랙 식별자
 이 클라이언트가 물리적으로 있는 위치를 나타내는 문자열 값일 수 있습니다. 브로커 구성 'broker.rack'에 해당함
enable.metrics.push TRUE 클러스터에 이 클라이언트와 일치하는 클라이언트 메트릭 구독이 있는 경우 클러스터에 클라이언트 메트릭을 푸시할 수 있는지 여부
fetch.max.wait.ms 500 fetch.min.bytes에 제공된 요구 사항을 즉시 충족하기에 충분한 데이터가 없는 경우 가져오기 요청에 응답하기 전에 서버가 차단하는 최대 시간
interceptor.classes "" 인터셉터로 사용할 클래스 목록
org.apache.kafka.clients.consumer.ConsumerInterceptor인터페이스를 구현하면 소비자가 수신한 레코드를 가로챌 수 있습니다(그리고 변경할 수도 있음).
기본적으로 인터셉터는 없음
유효한 값 : non-null
metadata.max.age.ms 300000 (5 minutes) 파티션 리더십 변경 사항이 없는 경우에도 새 브로커 또는 파티션을 사전에 검색하기 위해 메타데이터를 강제로 새로 고치는 시간(밀리초)
metric.reporters "" 메트릭 리포터로 사용할 클래스 목록
org.apache.kafka.common.metrics.MetricsReporter인터페이스를 구현하면 새 메트릭 생성에 대한 알림을 받을 클래스를 연결할 수 있음
JMX 통계를 등록하기 위해 JmxReporter가 항상 포함됨
유효한 값 : non-null
metrics.num.samples 2 메트릭을 계산하기 위해 유지되는 샘플 수
metrics.recording.level INFO 메트릭에 대한 가장 높은 기록 수준
유효한 값 : INFO, DEBUG, TRACE
metrics.sample.window.ms 30000 (30 seconds) 메트릭 샘플이 계산되는 기간
유효한 값 : 0,...
reconnect.backoff.max.ms 1000 (1 second) 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초)
제공된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가하며 백오프를 계산한 후 연결 폭풍을 피하기 위해 20% 임의 지터가 추가됨
유효한 값 : 0,...
reconnect.backoff.ms 50 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초)
제공된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가하며 백오프를 계산한 후 연결 폭풍을 피하기 위해 20% 임의 지터가 추가됨
유효한 값 : 0,...
retry.backoff.max.ms 1000 (1 second) 반복적으로 실패한 브로커에 대한 요청을 재시도할 때 대기하는 최대 시간(밀리초)
제공된 경우 클라이언트당 백오프는 실패한 각 요청에 대해 이 최대값까지 기하급수적으로 증가함.
재시도시 모든 클라이언트가 동기화되지 않도록 하기 위해 계수가 0.2인 임의 지터가 백오프에 적용되어
백오프가 계산된 값보다 20% 낮거나 20% 높은 범위 내에 속함.
retry.backoff.ms retry.backoff.max.ms보다 높게 설정된 경우,retry.backoff.max.ms는 기하급수적 증가없이 처음부터 일정한 백오프로 사용됨
유효한 값 : 0,...
retry.backoff.ms 100 지정된 topic 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간.
이렇게하면 일부 실패 시나리오에서 긴필한 루프로 요청을 반복적으로 보내는 것을 방지할 수 있음
이 값은 초기 백오프 값이며 실패한 각 요청에 대해 해당 retry.backoff.max.ms값까지 기하급수적으로 증가함
security.providers null 각각 보안 알고리즘을 구현하는 공급자를 반환하는 구성 가능한 작성자 클래스 목록
org.apache.kafka.common.security.auth.SecurityProviderCreator 인터페이스를 구현해야함

[ ssl.key 관련 ]

properties설명
ssl.key.password 키 저장 파일이나 'ssl.keystore.key'에 명시되어진 PEM key 의 개인 key의 비밀번호
ssl.keystore.certificate.chacin 'ssl.keystore.type'에 의해 명시되어진 포맷의 certificate chain
default : null
ssl.keystore.key 'ssl.keystore.type'에 의해 명시되어진 포맷안에 개인 키
default : null
ssl.keystore.location key 저장 파일의 위치
dafault : null
ssl.keystore.password key 저장 파일의 저장 비밀번호
default : null
ssl.truststore.certificates 'ssl.truststore.type'에 의해 지정된 포맷에서 신뢰된 certificate

ssl.truststore.location trust store 파일 위치
defulat : null
ssl.truststore.password trust store 파일의 비밀번호
default : null