kafka 설치 및 구성/참고) configuration

2) connect (connect-distributed.properties)

saay-hi 2024. 6. 25. 13:31

[ default 옵션 ]

  옵션명 default 설명
1 bootstrap.servers localhost:9092 카프카 클러스터에 대한 초기 커넥션을 구축하는 데 사용할 호스트/포트 쌍 리스트.kafka broker 서버 = bootstrap 서버.
2 group.id connect-cluster connect를 같은 클러스터로 묶기 위한 그룹 설정.
3 key.converter org.apache.kafka.connect.json.JsonConverter 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷 간을 변환할 때 사용할 컨버터 클래스. 카프카에서 쓰거나 읽은 메세지 키 포맷은 이 클래스가 제어하며, 컨버터는 커넥터와는 독립적이기 때문에, 사용하는 커넥터와는 무관하게 어떤 직렬화 포맷으로도 작업할 수 있다.
4 value.converter org.apache.kafka.connect.json.JsonConverter  
5 key.converter.schemas.enable true key/value값이 내부 schema와 data를 모두 포함하는 복합 객체로 처리되도록 하는 설정
6 value.converter.schemas.enable true  
7 offset.storage.topic connect-offsets connect offset을 저장할 kafka topic명.
8 offset.storage.replication.factor 1 offset을 저장하는 토픽을 생성할때 사용할 replication factor. default는 3
9 config.storage.topic connect-configs connector 설정을 저장할 kafka topic명
10 config.storage.replication.factor 1 설정을 저장하는 topic을 생성할 때 사용할 replication factor. default는 3
11 status.storage.topic connect-status connector와 task 상태를 저장할 topic명.
12 status.storage.replication.factor 1 상태를 저장하는 topic을 생성할 때 사용할 replication factor. default는 3
13 offset.flush.interval.ms 10000 task에서 offset 커밋을 시도하는 간격

[ 그 외 주석처리된 옵션 값]

  옵션명 default 설명
1 offset.storage.partition 25 offset을 저장에 사용할 topic 의 partitions 수.
사용된 최대 복제 계수만큼 적어도 브로커가 있어야 함
단일 broker cluster에서 실행하기 위해 복제 계수를 1로 설정함
2 status.storage.partition 5 상태 저장에 사용할 topic의 partition 수.
kafka connect는 필요할 때 자동으로 항목을 작성하려고 시도하지만 항상 수동으로 작성할 수 도 있음
사용된 최대 복제 계수만큼 적어도 브로커가 있어야 함
단일 broker cluster에서 실행하기 위해 복제 계수를 1로 설정함
3 listeners HTTP://:8083 REST API가 수신할 쉼표로 구분된 URT 목록. 지왼되는 프로토콜은 HTTP와 HTTPS임
만일 호스트이름을  0.0.0.0으로 지정하면 모든 인터페이스에 바인딩함
기본 인터페이스에 바인딩하려면 host 이름을 비워둬야함
ex) http://myhost:8083, https://myhost:8084
4 rest.advertised.host.name   다른 서버에서 라우팅 가능한 URL 등에 연결하기 위해 다른 작업자에게 제공되는 Hostaname과 port
설정되지 않고 구성된 경우 listener값을 사용함
5 rest.advertised.port
6 rest.advertised.listener
7 plugin.path   plugin이 들어있는 path 리스트.

[ 그 외 공식 문서 옵션 ]

옵션명 default 값 설명
importance : high
config.storage.topic   connection 옵션에 저장되는 kafka topic이름
exactly.once.source.support disabled 트랜잭션을 사용하여 소스 레코드 및 해당 소스 오프셋을 작성하고 새 작업 세대를 가져오기 전에 이전 작업 세대를 사전에 차단함으로써 클러스터의 소스 커넥터에 대한 정확히 한 번의 지원을 활성화할지 여부
새 클러스터에서 정확히 한 번만 소스 지원을 활성화하려면 이 속성을 'enabled'로 설정
기존 클러스터에 대한 지원을 활성화하려면 먼저 클러스터의 모든 작업자에 대해 'preparing'로 설정한 다음, 'enabled'로 설정 권장
Valid Values: (case insensitive) DISABLED, ENABLED, PREPARING
heartbeat.interval.ms 3000 (3 seconds) Kafka의 그룹 관리 기능을 사용할 때 그룹 코디네이터에 대한 하트비트 사이의 예상 시간
하트비트는 worker 세션이 활성 상태를 유지하도록 하고 새 구성원이 그룹에 가입하거나 탈퇴할 떄 재조정을 용이하게 하는 데 사용됨. 값은 session.timeout.ms보다 낮게 설정되어야하며 일반적으로 1/3이하로 설정해야 함 .
정상적인 재조정에 대한 예상 시간을 제어하기 위해 1/3보다 더 낮게 조정할 수도 있음
rebalance.timeout.ms 60000 (1 minute) 재조정이 시작된 후 각 작업자가 그룹에 합류하는 데 허용되는 최대 시간
이는 기본적으로 모든 작업이 보류 중이 ㄴ데이터를 플러시하고 오프켓을 커밋하는 데
필요한 시간에 대한 제한.
시간이 초과되면 작업자가 그룹에서 제거되어 오프셋 커밋이 실패하게 됨 
session.timeout.ms 10000 (10 seconds) 작업자 오류를 감지하는 데 사용되는 제한 시간
작업자는 주기적인 하트비트를 보내 브로커에게 활성 상태를 나타냄.
이 옵션의 시간 초과가 만료되기전에 브로커가 하트비트를 수신하지 않으면 브로커는 그룹에서 작업자를 제거하고 재조정을 시작함.
값은 group.min.session.timeout.msgroup.max.session.timeout.ms에 의해 브로커 구성에 구성된 허용 범위 내에 있어야 함
ssl.key.password null 키 저장소 파일에 있는 개인 키의 비밀번호 또는 'ssl.keystore.key'에 지정된 PEM 키
   
importance : medium
client.dns.lookup use_all_dns_ips 클라이언트가 DNS 조회를 사용하는 방법을 제어함
use_all_dns_ip가 설정된 경우, 성공적인 연결이 설정될 때까지 반환된 각 ip주소에 순차적으로 연결함. 연결이 끊어지면 다음 ip가 사용됨. 모든 ip가 한 번 사용되면 클라이언트는 호스트 이름에서 ip를 다시 확인함(JVM 및 OS 캐시 DNS 이름 조회 모두)
resolve_canonical_bootstrap_servers_only로 설정된 경우, 각 bootstrap 주소를 정식 이름목록으로 확인함.
bootstrap 단계 이후에는,use_all_dns_ip와 비슷함
connections.max.idle.ms 540000 (9 minutes) 이 옵션에 지정된 시간 후에 유휴연결을 닫음
connector.client.config.override.policy All ConnectorClientConfigOverridePolicy 구현의 클래스 이름 또는 별칭
어떤 클라이언트 configuration이 connector에 의해 재정의될 수 있는지 정의내림.
기본값인 'ALL'은 connector configuration이 모든 clinet 속성에 재정의할 수 있다는 것을 의미함.
다른 가능성 있는 정책은 'none'인데, 이는 클라이언트 속성이 재정의되는것으로부터 승인되지않으며, 'Principal'이라는 정책은 이와 달리 승인함 
receive.buffer.bytes 32768 (32 kibibytes) 데이터를 읽을 때 사용되는 TCP recceive 버퍼의 사이즈
만약 값이 -1이면, OS 기본값으로 사용됨
request.timeout.ms 40000 (40 seconds) 이 옵션은 클라이언트가 요청 응답을 기다리는 최대 시간의 양을 통제함
만약의 시간 초과 전에 응답을 받지 못하면, 클라이언트는 필욯ㄴ 경우엔 요청을 다시 보내는데 재시도가 모두 소진되면 요청이 실패함
sasl.client.callback.handler.class null AuthenticateCallbackHandler 인터페이스를 구현하는 salsl client callback handler  class완전한 이름임 
   
security.protocol PLAINTEXT 브로커와 통신하는 데 사용되는 프로토콜.
유효한 값은 PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
send.buffer.bytes 131072 (128 kibibytes) 데이터를 보낼 때 사용할 TCP 전송 버퍼(SO_SNDBUF)의 크기
값이 -1이면 os기본값이 사용됨
ssl.enabled.protocols TLSv1.2,TLSv1.3 SSL 연결에 활성화된 프로토콜 목록.
Java 11 이상으로 실행하는 경우 기본값은 'TLSv1.2,TLSv1.3'이고, 그렇지 않으면 'TLSv1.2'
Java 11의 기본값을 사용하면 클라이언트와 서버가 둘 다 지원하는 경우 TLSv1.3을 선호하고 그렇지 않으면 TLSv1.2로 대체(둘 다 TLSv1.2 이상을 지원한다고 가정)
대부분의 경우, 기본값이 적합함
   
worker.sync.timeout.ms 3000 (3 seconds) 작업자가 다른 작업자와 동기화되지 않고 재 동기화 설정이 필요할 때,
포기하고 group을 떠나기 전에 이 옵션에 지정된 시간까지 기다렸다가 다시 참여하기 전에 backoff 시간을 기다려야 함.
worker.unsync.backoff.ms 300000 (5 minutes) 작업자가 다른 작업자와 동기화되지 않고 worker.sync.timeout.ms 내에서 따라잡지 못하는 경우 다시 참여하기 전에 오래동안 Connect 클러스터를 종료  권장
importance : low
access.control.allow.methods "" Access-Control-Allow-Methods 헤더를 설정하여 교차 출처 요청에 지원되는 방법을 설정함. Access-Control-Allow-Methods 헤더의 기본값은 GET, POST 및 HEAD에 대한 교차 출처 요청을 허용
access.control.allow.origin "" REST API 요청에 대해 Access-Control-Allow-Origin 헤더를 설정하는 값
원본 간 액세스를 활성화하려면 API에 액세스하도록 허용해야 하는 애플리케이션의 도메인으로 설정하거나, 모든 애플리케이션에서 액세스를 허용하려면 '*'로 설정 권장
기본값은 EST API 도메인에서의 액세스만 허용함
admin.listeners null Admin REST API가 수신 대기할 쉼표로 구분된 URI 목록
지원되는 프로토콜 : HTTP, HTTPS
비어있거나 빈 문자열을 사용하면 이 기능이 비활성화됨.
기본 동작은 일반 listener(listener속성으로 지정)을 사용
auto.include.jmx.reporter TRUE [더 이상 사용되지 않음]
client.id "" 요청할 때 서버에 전달할 ID 문자열
이 옵션의 목적은 논리적 애플리케이션 이름이 서버 측 요청 로깅에 포함되도록 허용하여 ip/port이외의 요청 소스를 추적할 수 있도록 하는 것.
config.providers "" ConfigProvider classes의 쉼표로구분된 클래스 이름으로, 지정된 순서대로 로드됨
configProvider 인터페이스를 구현하면, 외부화된 비밀과 같은 커넥터 구성의 변수 참조를 바꿀 수 있음
config.storage.replication.factor 3 구성 저장소 주제를 생성할 때 사용되는 복제 요소
connect.protocol sessioned Kafka Connect 프로토콜의 호환성 모드
header.converter org.apache.kafka.connect.storage.SimpleHeaderConverter Kafka Connect 형식과 Kafka에 기록된 직렬화된 형식 간에 변환하는 데 사용되는 HeaderConverter 클래스
이는 Kafka에 쓰거나 Kafka에서 읽는 메시지의 헤더 값 형식을 제어하며 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식으로 작동할 수 있음
일반적인 형식의 예로는 JSON 및 Avro가 있으며, 기본적으로 SimpleHeaderConverter는 헤더 값을 문자열로 직렬화하고 스키마를 유추하여 역직렬화하는 데 사용됨
inter.worker.key.generation.algorithm HmacSHA256 내부 요청 키를 생성하는 데 사용하는 알고리즘
'HmacSHA256' 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨
다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함
inter.worker.key.size null 내부 요청 서명에 사용할 키의 크기(비트)
 null인 경우 키 생성 알고리즘의 기본 키 크기가 사용됨
inter.worker.key.ttl.ms 3600000 (1 hour) 내부 요청 검증에 사용되는 생성된 세션 키의 TTL(밀리초)
inter.worker.signature.algorithm HmacSHA256 내부 요청에 서명하는 데 사용되는 알고리즘. 이 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨. 다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함
inter.worker.verification.algorithms HmacSHA256 inter.worker.signature.algorithm 속성에 사용되는 알고리즘을 포함해야 하는 내부 요청 확인을 위해 허용되는 알고리즘 목록
이 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨. 다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함
listeners http://:8083 REST API가 수신할 쉼표로 구분된 URI 목록
지원되는 프로토콜 : HTTP, HTTPS
모든 인터페이스에 바인딩하려면 호스트 이름을 0.0.0.0으로 지정
기본 인터페이스에 지정하려면 호스트 이름을 비워둬야 함
ex) HTTP://myhost:8083,HTTPS://myhost:8084
metadata.max.age.ms 300000 (5 minutes) 새로운 브로커나 파티션을 사전에 검색하기 위해 파티션 리더십 변경이 확인되지 않은 경우에도 메타데이터를 강제로 새로 고치는 데 걸리는 시간(밀리초)
metric.reporters "" 측정항목 보고자로 사용할 클래스 목록
org.apache.kafka.common.metrics.MetricsReporter인터페이스를 구현하면 새 메트릭 생성에 대한 알림을 받을 클래스를 연결할 수 있음
JMX 통계를 등록하기 위해 JmxReporter가 항상 포함됨
metrics.num.samples 2 지표를 계산하기 위해 유지되는 샘플 수
metrics.recording.level INFO 메트릭에 대한 가장 높은 기록 level
metrics.sample.window.ms 30000 (30 seconds) 측정항목 샘플이 계산되는 기간
offset.flush.interval.ms 60000 (1 minute) 작업에 대한 오프셋 커밋을 시도하는 간격
offset.flush.timeout.ms 5000 (5 seconds) 프로세스를 취소하고 향후 시도에서 커밋될 오프셋 데이터를 복원하기 전에 레코드가 플러시되고 오프셋 데이터가 오프셋 스토리지에 커밋될 때까지 기다리는 최대 시간(밀리초)
이 옵션은 정확히 1회 지원으로 실행되는 소스 커넥터에는 영향을 주지않음
offset.storage.replication.factor 25 오프셋 저장 주제 생성 시 사용되는 복제 인자
plugin.discovery hybrid_warn 클래스 경로 및 플러그인.경로 구성에 있는 플러그인을 검색하는 데 사용하는 방법
* only_scan: 리플렉션을 통해서만 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작에 영향을 미치지 않음.
* hybrid_warn: 반사적으로 그리고 ServiceLoader를 통해 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작 중에 경고를 알림
* hybrid_fail: 반사적으로 ServiceLoader를 통해 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작에 실패하게 만듦
* service_load: ServiceLoader로만 플러그인을 검색. 다른 모드보다 시작 속도가 빠름. ServiceLoader에서 검색할 수 없는 플러그인은 사용하지 못할 수도 있음.
reconnect.backoff.max.ms 1000 (1 second) 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초)
제공되는 경우 연속 연결이 실패할 때마다 호스트당 백오프가 이 최대값까지 기하급수적으로 증가함. 백오프 증가를 계산한 후 연결 폭풍을 방지하기 위해 20% 랜덤 지터가 추가됨
reconnect.backoff.ms 50 지정된 호스트에 다시 연결을 시도하기 전에 대기하는 기본 시간
긴밀한 루프에서 호스트에 반복적으로 연결되는 것을 방지할 수 있음
backoff는 클라이언트가 브로커에 시도하는 모든 연결에 적용됨.
이 값은 초기 backoff값이며 연속 연결이 실패할 떄마다 해당 reconnect.backoff.max.ms값까지 기하급수적으로 증가함
response.http.headers.config "" REST API HTTP 응답 헤더에 대한 규칙
rest.advertised.host.name null 이옵션이 설정될 경우, 연결할 다른 작업자에게 제공되는 host이름임
rest.advertised.listener null 다른 작업자가 사용할 수 있도록 제공되는 광고 리스너(HTTP 또는 HTTPS)를 설정
rest.advertised.port null 다른 작업자에게 연결을 위해 제공되는 포트
rest.extension.classes "" ConnectRestExtension classes의 콤마로 분리된 이름으로, 지정된 순서대로 로드됨
인터페이스를 구현하면 ConnectRestExtension필터와 같은 Connect의 REST API 사용자 정의 리소스에 삽입할 수 있음
일반적으로 로깅, 보안 등과 같은 사용자 정의 기능을 추가하는 데 사용됨
retry.backoff.max.ms 1000 (1 second) 반복적으로 실패한 브로커에 대한 요청을 재시도할 때 대기하는 최대 시간(밀리초)
제공된 경우 클라이언트당 백오프는 실패한 각 요청에 대해 이 최대값까지 기하급수적으로 증가함
재시도 시 모든 클라이언트가 동기화되는 것을 방지하기 위해 0.2배의 무작위 지터가 백오프에 적용되어 백오프가 계산된 값보다 20% 아래에서 20% 사이의 범위 내에 들어가게됨
retry.backoff.ms retry.backoff.max.ms보다 높게 설정되면 retry.backoff.max.ms는 급수적인 증가없이 처음부터 지속적인 backoff로 사용됨
retry.backoff.ms 100 지정된 주제 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간
이렇게 하면 일부 실패 시나리오에서 긴밀한 루프로 반복적으로 요청을 보내는 것을 방지할 수 있음. 이 값은 초기 backoff값이며 요청이 실패할 떄마다 해당 retry.backoff.max.ms값까지 기하급수적으로 증가함
sasl.kerberos.kinit.cmd /usr/bin/kinit Kerberos kinit 명령 경로.
scheduled.rebalance.max.delay.ms 300000 (5 minutes) 커넥터와 작업을 그룹에 다시 밸런싱하고 재할당하기 전에 한 명 이상의 퇴사한 작업자가 돌아올 때까지 기다리도록 예약된 최대 지연을 의미하는 옵션.
이 기간 동안 퇴사한 작업자의 커넥터와 작업은 할당되지 않은 상태로 유지됨
socket.connection.setup.timeout.max.ms 30000 (30 seconds) 클라이언트가 소켓 연결이 설정될 때까지 기다리는 최대 시간
연결 설정 시간 제한은 이 최대값까지 연속 연결 실패가 발생할 때마다 기하급수적으로 증가함. 연결 폭주를 방지하기 위해 0.2의 무작위 인자가 시간 초과에 적용되어 계산된 값보다 20% 아래에서 20% 사이의 무작위 범위가 됨
socket.connection.setup.timeout.ms 10000 (10 seconds) 클라이언트가 소켓 연결이 설정될 때까지 기다리는 시간
제한 시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음
이 값은 초기 backoff값이며 연속 연결이 실패할 때마다 해당 socket.connection.setup.timeout.max.ms까지 기하 급후적으로 증가함
ssl.cipher.suites null 암호 제품군 목록
이는 TLS 또는 SSL 네트워크 프로토콜을 사용하여 네트워크 연결에 대한 보안 설정을 협상하는 데 사용되는 인증, 암호화, MAC 및 키 교환 알고리즘의 명명된 조합임
   
status.storage.replication.factor 3 상태 저장 토픽 생성 시 사용되는 복제 인자
task.shutdown.graceful.timeout.ms 5000 (5 seconds) 작업이 정상적으로 종료될 때까지 기다리는 시간
작업 별이 아닌 총시간을 의미하며, 모든 작업은 종료가 트리거된 후 순차적으로 대기함
topic.creation.enable TRUE 소스 커넥터가 `topic.creation.` 속성으로 구성된 경우 소스 커넥터에서 사용되는 주제의 자동 생성을 허용할지 여부
각 작업은 관리 클라이언트를 사용하여 topic을 생성하며 kafka브로커에 의존하여 자동으로 topic을 생성하진않음
topic.tracking.allow.reset TRUE true로 설정하면 사용자 요청이 커넥터당 활성 항목 집합을 재설정할 수
topic.tracking.enable TRUE 런타임 중에 커넥터당 활성 항목 집합을 추적할 수 있음