saay, hi
3) consumer configs(consumer.properties) 본문
[ default 옵션]
옵션명 | default | 설명 | |
1 | bootstrap.servers | localhost:9092 | 초기 kafka cluster에게 연결하기 위한 host/port 쌍의 list. bootstrap.servers 를 등록해야 외부 서버에서 연결이 가능하며, 2개 이상 권장 ( consumer AP 끊기게 될 경우, 다른 consumer 로 연결하기 위해) default : "" |
2 | group.id | test-consumer-group | 각 consumer가 속해있는 컨슈머 그룹들을 구별하기 위해 사용. 이 속성은 컨슈머가 susbscribe(topic)사용에 의한 그룹 관리 기능이거나 카프카 기반의 offset 관리 정책를 사용할 경우 요청됨 dafult : null |
[ consumer AP 옵션 ]
옵션명 | default | 이름 | |
1 | key.serializer | 인터페이스를 구현하는 키에 대한 직렬 변환기 클래스 | |
2 | value.serializer | 인터페이스를 구현하는 값에 대한 직렬 변환기 클래스 | |
3 | bootstrap.server | kafka cluster에 대한 초기 연결을 설정하는 데 사용할 호스트/포트 쌍 목록 클라이언트는 bootstraping을 위해 여기에 지정된 서버에 관계없이 모든 서버를 사용함. 이 목록은 전체 서버 세트를 검색하는 데 사용되는 초기 호스트에만 영향을 미침. 기본형식은 host1:port1. 이러한 서버는 전체 클러스터 구성원(동적으로 변경될 수 있음) 을 검색하기 위한 초기 연결에만 사용되므로 이 목록에는 전체 서버 집합이 포함될 필요가 없으나, 서버가 다운된 경우에는 두개 이상 필요 |
|
4 | group.id | consumer가 속한 consumer group을 식별하는 고유 문자열 | |
5 | group.instance.id | 최종 사용자가 제공한 consumer instance의 고유 식별자. 비어있지 않은 문자열만 허용되며, 이 값이 설정된 경우 consumer는 정적멤버로 처리됨. 이 id를 가진 인스턴스는 항상 consumer group에서 하나만 허용됨. 이 값을 설정하지않으면 consumer 는 전통적인 동작인 동적 구성원으로 그룹에 참여하게 됨 |
|
6 | enable.auto.commit | TRUE | true인 경우, consumer의 offset이 백그라운드에서 주기적으로 커밋됨 |
7 | auto.offset.reset. | latest | kafka에 초기 offset이 없거나 현재 offset이 서버에 더 이상 존재하지 않는 경우(ex 해당데이터가 삭제된 경우) 수행할 작업: • offset을 가장 빠른 offset으로 자동 재설정함 • latest : offset을 최신 offset으로 자동 재설정함 • 없음 : consumer group 에 대해 이전 offset이 발견되지 않은 경우, consumer에게 예외를 발생시킴 • consumer 에게 예외를 던짐 이 구성은 latest로 설정하는 동안 partition번호를 변경하면 consumer가 offset을 재설정하기 전에 생성자가 새로 추가된 partition(즉, 초기 offset이 아직 존재하지 않음) 에 메시지를 보내기 시작할 수 있으므로 메시지 전달 손실이 발생할 수 있음 유의 |
[ default 그 외 공식문서 정리]
옵션명 | default | 설명 |
importance : high | ||
key.serializer | 인터페이스를 구현하는 키에 대한 직렬 변환기 클래스 | |
value.serializer | 인터페이스를 구현하는 값에 대한 직렬 변환기 클래스 | |
bootstrap.server | kafka cluster에 대한 초기 연결을 설정하는 데 사용할 호스트/포트 쌍 목록 클라이언트는 bootstraping을 위해 여기에 지정된 서버에 관계없이 모든 서버를 사용함. 이 목록은 전체 서버 세트를 검색하는 데 사용되는 초기 호스트에만 영향을 미침. 기본형식은 host1:port1. 이러한 서버는 전체 클러스터 구성원(동적으로 변경될 수 있음) 을 검색하기 위한 초기 연결에만 사용되므로 이 목록에는 전체 서버 집합이 포함될 필요가 없으나, 서버가 다운된 경우에는 두개 이상 필요 |
|
group.id | consumer가 속한 consumer group을 식별하는 고유 문자열 | |
enable.auto.commit | TRUE | true인 경우, consumer의 offset이 백그라운드에서 주기적으로 커밋됨 |
auto.offset.reset. | latest | kafka에 초기 offset이 없거나 현재 offset이 서버에 더 이상 존재하지 않는 경우(ex 해당데이터가 삭제된 경우) 수행할 작업: • offset을 가장 빠른 offset으로 자동 재설정함 • latest : offset을 최신 offset으로 자동 재설정함 • 없음 : consumer group 에 대해 이전 offset이 발견되지 않은 경우, consumer에게 예외를 발생시킴 • consumer 에게 예외를 던짐 이 구성은 latest로 설정하는 동안 partition번호를 변경하면 consumer가 offset을 재설정하기 전에 생성자가 새로 추가된 partition(즉, 초기 offset이 아직 존재하지 않음) 에 메시지를 보내기 시작할 수 있으므로 메시지 전달 손실이 발생할 수 있음 유의 유효한 값 : latest, earliest, none |
fetch.min.bytes | 1 | 서버가 가져오기 요청에 대해 반환해야 하는 최소 데이터 양 사용할 수 있는 데이터가 충분하지 않은 경우 요청은 요청에 응답하기 전에 그만큼의 데이터가 누적될 때까지 기다림 기본 설정인 1바이트는 해당 바이트의 데이터를 사용할 수 있게 되자마자 가져오기 요청에 응답하거나 데이터가 도착할 때까지 기다리는 동안 가져오기 요청이 시간 초과됨을 의미함 이 옵션의 값을 더 큰 값으로 설정하면 서버가 더 많은 양의 데이터가 누적될 때까지 기다리게 되므로 대기시간이 추가되는 대신, 서버 처리량이 약간 향상될 수 있음 유효한 값 : 0,... |
group.protocol | classic | group protocol consumer 는 사용해야하는 옵션. 현재 'classic','consumer'를 지원함 만약 'consumer'로 지정할 경우, consumer group protocol은 사용되어지지만, 그렇지 않으면 classic group protocol이 사용됨 유효한 값 : consumer, classic |
heartbeat.interval.ms | 3000 (3 seconds) | Kafka의 그룹 관리 기능을 사용할 때 consumer 코디네이터에 대한 하트비트 사이의 예상 시간 하트비트는 consumer의 세션이 활성 상태로 유지되도록 하고 새 consumer가 그룹에 가입하거나 떠날 떄 재조정을 용이하게 하는 데 사용됨. 값은 session.timeout.ms보다 낮게 설정해야 하지만, 일반적으로 해당 값의 1/3보다 높게 설정해서는 안됨. 정상적인 재조정에 대한 예상 시간을 제어하기 위해 더 낮게 조정할 수 있음 |
max.partition.fetch.bytes | 1048576 (1 mebibyte) | 서버가 반환할 파티션당 최대 데이터 양 레코드는 consumer가 일괄적으로 가져옴. Fetch(가져오기)의 비어있지 않은 첫 번째 파티션에 있는 첫 번째 레코드 일괄처리가 이 제한보다 큰 경우, consumer가 진행할 수 있도록 일괄 처기라 계속 반환됨. 브로커가 허용하는 최대 레코드 배치 크기는 message.max.bytes(broker config) 또는 max.message.bytes를 통해 정의됨. Consumer요청 크기그를 제한하는 방법은 fetch.max.byte참조 |
session.timeout.ms | 45000 (45 seconds) | kafka의 그룹 관리 기능을 사용할 때 클라이언트 오류를 감지하는데 사용되는 시간 제한. 클라이언트는 주기적인 하트비트를 전송하여 브로커에 활성 상태를 나타냄. 이 세션 제한시간이 만료되기 전에 브로커가 하트비트를 수신하지 않으면, 브로커는 이 클라이언트를 그룹에서 제거하고 재조정을 시작함. 이 값은 브로커 구성에서 group.min.session.timeout.ms 및 group.max.session.timeout.ms로 구성된 허용 범위 내에 있어야 함 |
importance : medium | ||
group.instance.id | 최종 사용자가 제공한 consumer instance의 고유 식별자. 비어있지 않은 문자열만 허용되며, 이 값이 설정된 경우 consumer는 정적멤버로 처리됨. 이 id를 가진 인스턴스는 항상 consumer group에서 하나만 허용됨을 의미. 이는 일시적인 사용불가(ex: 프로세스 재시작)으로 인한 그룹 재조정을 방지하기 위해 더 큰 세션 시간 제한과 함께 사용할 수 있음. 이 값을 설정하지않으면 consumer 는 기존 동작인 동적 구성원으로 그룹에 참여하게 됨. |
|
allow.auto.create.topics | TRUE | topic을 구독하거나 할당할 때 브로커에서 자동 topic 생성을 허요함. 구독중인 topic은 브로커가 'auto.create.topics.enable'의 브로커 구성을 사용하여 허용하는 경우에만 자동으로 생성됨. 0.11.0 이전 브로커를 사용하는 경우 'false'로 설정해야 함 |
client.dns.lookup | use_all_dns_ips | 클라이언트가 DNS 조회를 사용하는 방법을 제어함 use_all_dns_ips로 설정돼 있을 경우, 성공적으로 연결될 때까지 반환된 각 IP 주소에 순서대로 연결함 연결이 끊기면 다음 IP가 사용되며, 모든 IP가 한 번 사용되면 클라이언트는 호스트 이름에서 IP를 다시 확인함 (다만 JVM 및 OS 캐시 DNS 이름 모두 조회함) resolve_canonical_bootstrap_servers_only로 설정된 경우, 각 부트스트랩 주소를 정식 이름 목록으로 확인하며, 부트스트랩 단계 이후에는 use_all_dns_ips와 동일하게 동작함 유효한 값 : use_all_dns_ips, resolve_canonical_bootstrap_servers_only |
connections.max.idle.ms | 540000 (9 minut) | 이 옵션은 지정된 시간(밀리초) 후에 유휴 연결을 닫음 |
default.api.timeout.ms | 60000(1minutes) | 클라이언트 API에 대한 시간 제한(밀리초)을 지정함 이 옵션은 timeout 매개 변수를 지정하지 않는 모든 클라이언트 작업에 대한 기본시간 제한으로 사용됨 |
enable.auto.commit | TRUE | true이면 소비자의 오프셋이 백그라운드에서 주기적으로 커밋됨 |
exclude.internal.topics | TRUE | 구독 패턴과 일치하는 내부 항목을 구독에서 제외해야 하는지 여부 |
fetch.max.bytes | 52428800 (50 mebibytes) | 서버가 가져오기 요청에 대해 반환해야 하는 최대 데이터 양 레코드는 소비자에 의해 일괄적으로 fetch되며, fetch의 비어있지 않은 첫 번째 파티션에 있는 첫 번째 레코드 배치가 이 값보다 큰 경우 소비자가 진행할 수 있도록 레코드 batch가 계속 반황됨 따라서 이것은 절대적인 최대값이 아님 브로커가 허용하는 최대 레코드 배치크기는 message.max.bytes(broker config) 또는 max.message.bytes(topic config)를 통해 정의됨. consumer는 여러 가져오기를 병렬로 수행함 |
group.remote.assignor | null | 사용할 서버 측 할당자 할당자가 지정되지 않은 경우 그룹 코디네이터가 지정함. 이 옵션은 group.protocol가 "consumer"설정된 경우에만 적용됨 |
isolation.level | read_uncommitted | 트랜잭션으로 작성된 메시지를 읽는 방법을 제어 read_committed로 설정하면 consumer.poll()은 커밋된 트랜잭션 메시지만 반환함 read_uncommitted (기본값)으로 설정하면, consumer.poll()은 중단된 트랜잭션 메시지를 포함한 모든 메시지를 반환함 비트랜잭션 메시지는 두 모드 중 하나에서 무조건 반환됨 메시지는 항상 오프셋 순서로 반홤됨. 따라서 read_committed mode에서 consumer.poll()은 첫 번째 열린 트랜잭션의 오프셋보다 작은 마지막 안정 오프셋(LSO)까지만 메시지를 반환함 특히, 진행 중인 트랜잭션에 속한 메시지 이후에 나타나는 모든 메시지는 해당 트랜잭션이 완료될 때까지 보류됨. 그 결과 read_committed consumer는 항공편 거래가 있을 때 상위 워터마크까지 읽을 수 없음 또한 read_committed일 때, seekToEnd 메서드에서 LSO를 반환함 유효한 값 : read_committed, read_uncommitted |
max.poll.interval.ms | 300000 (5 minutes) | 이용자 그룹 관리를 사용할 때 poll() 호출 사이의 최대 지연 이렇게하면, consumer가 더 많은 레코드를 가져오기 전에 유휴 상태일 수 있는 시간의 상한이 설정됨 이 제한시간이 만료되기 전에 poll()을 호출하지 않으면 소비자가 실패한 것으로 간주되고 그룹은 파티션을 다른 멤버에 재할당하기 위해 재조정함 이 시간 제한에 도달하는 null이 아닌 값을 group.instance.id를 사용하는 소비자의 경우 파티션이 즉시 다시 할당되지 않음 대신 consumer는 하트비트 전송을 중지하고 session.timeout.ms값이 만료된 후 파티션이 다시 할당됨 이는 종료된 정적 소비자의 동작을 미러링함 |
max.poll.records | 500 | poll()에 대한 단일 호출에서 반환되는 최대 레코드 수 이 max.poll.records는 기본 가져오기 동작에 영향을 주지 않음 consumer는 각 가져오기 요처에서 레코드를 캐시하고 폴링에서 점진적으로 반환함 |
partition.assignment.strategy | class org.apache.kafka.clients.consumer.RangeAssignor,class org.apache.kafka.clients.consumer.CooperativeStickyAssignor | 그룹 관리를 사용할 때 클라이언트가 consumer 인스턴스 간에 파티션 소유권을 분산하는데 사용할 지원되는 파티션 할당 전략의 클래스 이름 또는 클래스 형식목록( 기본 설정에 따라 다름) [사용가능한 옵션] 1) org.apache.kafka.clients.consumer.RangeAssignor: 토픽별로 파티션을 할당 2) org.apache.kafka.clients.consumer.RoundRobinAssignor: 라운드 로빈 방식으로 소비자에게 파티션을 할당 3) org.apache.kafka.clients.consumer.StickyAssignor: 가능한 한 많은 기존 파티션 할당을 유지하면서 최대한으로 균형 잡힌 할당을 보장 4) org.apache.kafka.clients.consumer.CooperativeStickyAssignor: 동일한 StickyAssignor 논리를 따르지만 협조적 재조정을 허용 기본 할당자는 기본적으로 RangeAssignor를 사용하지만 목록에서 RangeAssignor를 제거하는 단일 롤링 바운스만으로 CooperativeStickyAssignor로 업그레이드할 수 있는 [RangeAssignor, CooperativeStickyAssignor] org.apache.kafka.clients.consumer.ConsumerPartitionAssignor인터페이스를 구현하면 사용자 지정 할당 전략을 연결할 수 있음 유효한 값 : non-null string |
receive.buffer.bytes | 65536 (64 kibibytes) | 데이터를 읽을 때 사용할 TCP 수신 버퍼(SO_RCVBUF)의 크기 값이 -1이면 OS 기본값이 사용됨 유효한 값 : -1,… |
request.timeout.ms | 30000 (30 seconds) | 클라이언트가 요청 응답을 기다리는 최대 시간을 제어하는 옵션 제한 시간이 경과하기 전에 응답이 수신되지 않으면 클라이언트는 필요한 경우 요청을 다시 보내거나 재시도가 소진되면 요청에 실패함 |
security.protocol | PLAINTEXT | 브로커와 통신하는 데 사용되는 프로토콜 유효한 값 : SASL_SSL, PLAINTEXT, SSL, SASL_PLAINTEXT |
send.buffer.bytes | 131072 (128 kibibytes) | 데이터를 보낼 때 사용할 TCP 송신 버퍼(SO_SNDBUF)의 크기 값이 -1이면 OS 기본값을 사용함 유효한 값 : -1… |
socket.connection.setup.timeout.max.ms | 30000 (30 seconds) | 소켓 연결이 설정될 때까지 클라이언트가 대기하는 최대 시간 연결 설정 시간 제한은 이 최대값까지 각 연속 연결 실패에 대해 기하급수적으로 증가함 연결 스톰을 방지하기 위해 시간 제한에 임의화 계수 0.2가 적용되어 계산된 값보다 20% 낮거나 20% 높은 임의 범위가 됨 |
socket.connection.setup.timeout.ms | 10000 (10 seconds) | 소켓 연결이 설정될 때까지 클라이언트가 대기하는 시간 제한시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음 이 값은 초기 백오프 값이며 각 연속 연결 실패에 대해 이 socket.connection.setup.timeout.max.ms값까지 기하급수적으로 증가함 |
importance : low | ||
auto.commit.interval.ms | 5000 (5 seconds) | enable.auto.commit가 true 로 설정된 경우 소비자 오프셋이 Kafka에 자동 커밋되는 빈도(밀리초) |
auto.include.jmx.reporter | TRUE | 사용되지 않음 |
check.crcs | TRUE | 사용된 레코드의 CRC32를 자동으로 확인함. 이렇게 하면 메시지에 대한 온더와이어 또는 온디스크 손상이 발생하지 않음 이 검사는 약간의 오버헤드를 추가하므로 극한의 성능을 원하는 경우 사용하지 않도록 설정할 수 있음 |
client.id | "" | 요청을 할 때 서버에 전달할 id 문자열 이것의 목적은 논리적 애플리케이션 이름이 서버 측 요청 로깅에 포함되도록 허용하여 IP/포트 이외의 요청 소스를 추적할 수 있도록 하는 것임 |
client.rack | "" | 이 클라이언트의 랙 식별자 이 클라이언트가 물리적으로 있는 위치를 나타내는 문자열 값일 수 있습니다. 브로커 구성 'broker.rack'에 해당함 |
enable.metrics.push | TRUE | 클러스터에 이 클라이언트와 일치하는 클라이언트 메트릭 구독이 있는 경우 클러스터에 클라이언트 메트릭을 푸시할 수 있는지 여부 |
fetch.max.wait.ms | 500 | fetch.min.bytes에 제공된 요구 사항을 즉시 충족하기에 충분한 데이터가 없는 경우 가져오기 요청에 응답하기 전에 서버가 차단하는 최대 시간 |
interceptor.classes | "" | 인터셉터로 사용할 클래스 목록 org.apache.kafka.clients.consumer.ConsumerInterceptor인터페이스를 구현하면 소비자가 수신한 레코드를 가로챌 수 있습니다(그리고 변경할 수도 있음). 기본적으로 인터셉터는 없음 유효한 값 : non-null |
metadata.max.age.ms | 300000 (5 minutes) | 파티션 리더십 변경 사항이 없는 경우에도 새 브로커 또는 파티션을 사전에 검색하기 위해 메타데이터를 강제로 새로 고치는 시간(밀리초) |
metric.reporters | "" | 메트릭 리포터로 사용할 클래스 목록 org.apache.kafka.common.metrics.MetricsReporter인터페이스를 구현하면 새 메트릭 생성에 대한 알림을 받을 클래스를 연결할 수 있음 JMX 통계를 등록하기 위해 JmxReporter가 항상 포함됨 유효한 값 : non-null |
metrics.num.samples | 2 | 메트릭을 계산하기 위해 유지되는 샘플 수 |
metrics.recording.level | INFO | 메트릭에 대한 가장 높은 기록 수준 유효한 값 : INFO, DEBUG, TRACE |
metrics.sample.window.ms | 30000 (30 seconds) | 메트릭 샘플이 계산되는 기간 유효한 값 : 0,... |
reconnect.backoff.max.ms | 1000 (1 second) | 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초) 제공된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가하며 백오프를 계산한 후 연결 폭풍을 피하기 위해 20% 임의 지터가 추가됨 유효한 값 : 0,... |
reconnect.backoff.ms | 50 | 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초) 제공된 경우 호스트당 백오프는 각 연속 연결 실패에 대해 이 최대값까지 기하급수적으로 증가하며 백오프를 계산한 후 연결 폭풍을 피하기 위해 20% 임의 지터가 추가됨 유효한 값 : 0,... |
retry.backoff.max.ms | 1000 (1 second) | 반복적으로 실패한 브로커에 대한 요청을 재시도할 때 대기하는 최대 시간(밀리초) 제공된 경우 클라이언트당 백오프는 실패한 각 요청에 대해 이 최대값까지 기하급수적으로 증가함. 재시도시 모든 클라이언트가 동기화되지 않도록 하기 위해 계수가 0.2인 임의 지터가 백오프에 적용되어 백오프가 계산된 값보다 20% 낮거나 20% 높은 범위 내에 속함. retry.backoff.ms가 retry.backoff.max.ms보다 높게 설정된 경우,retry.backoff.max.ms는 기하급수적 증가없이 처음부터 일정한 백오프로 사용됨 유효한 값 : 0,... |
retry.backoff.ms | 100 | 지정된 topic 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간. 이렇게하면 일부 실패 시나리오에서 긴필한 루프로 요청을 반복적으로 보내는 것을 방지할 수 있음 이 값은 초기 백오프 값이며 실패한 각 요청에 대해 해당 retry.backoff.max.ms값까지 기하급수적으로 증가함 |
security.providers | null | 각각 보안 알고리즘을 구현하는 공급자를 반환하는 구성 가능한 작성자 클래스 목록 org.apache.kafka.common.security.auth.SecurityProviderCreator 인터페이스를 구현해야함 |
[ ssl.key 관련 ]
properties설명
ssl.key.password | 키 저장 파일이나 'ssl.keystore.key'에 명시되어진 PEM key 의 개인 key의 비밀번호 |
ssl.keystore.certificate.chacin | 'ssl.keystore.type'에 의해 명시되어진 포맷의 certificate chain default : null |
ssl.keystore.key | 'ssl.keystore.type'에 의해 명시되어진 포맷안에 개인 키 default : null |
ssl.keystore.location | key 저장 파일의 위치 dafault : null |
ssl.keystore.password | key 저장 파일의 저장 비밀번호 default : null |
ssl.truststore.certificates | 'ssl.truststore.type'에 의해 지정된 포맷에서 신뢰된 certificate |
ssl.truststore.location | trust store 파일 위치 defulat : null |
ssl.truststore.password | trust store 파일의 비밀번호 default : null |
'kafka 설치 및 구성 > 참고) configuration' 카테고리의 다른 글
6) zookeeper config (zookeeper.properties) (0) | 2024.06.25 |
---|---|
5) topic-Level configs (0) | 2024.06.25 |
4) producer configs(producer.properties) (0) | 2024.06.25 |
2) connect (connect-distributed.properties) (0) | 2024.06.25 |
1) broker configs(server.properties) (0) | 2024.06.21 |