kafka 설치 및 구성/참고) configuration
2) connect (connect-distributed.properties)
saay-hi
2024. 6. 25. 13:31
[ default 옵션 ]
옵션명 | default | 설명 | |
1 | bootstrap.servers | localhost:9092 | 카프카 클러스터에 대한 초기 커넥션을 구축하는 데 사용할 호스트/포트 쌍 리스트.kafka broker 서버 = bootstrap 서버. |
2 | group.id | connect-cluster | connect를 같은 클러스터로 묶기 위한 그룹 설정. |
3 | key.converter | org.apache.kafka.connect.json.JsonConverter | 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷 간을 변환할 때 사용할 컨버터 클래스. 카프카에서 쓰거나 읽은 메세지 키 포맷은 이 클래스가 제어하며, 컨버터는 커넥터와는 독립적이기 때문에, 사용하는 커넥터와는 무관하게 어떤 직렬화 포맷으로도 작업할 수 있다. |
4 | value.converter | org.apache.kafka.connect.json.JsonConverter | |
5 | key.converter.schemas.enable | true | key/value값이 내부 schema와 data를 모두 포함하는 복합 객체로 처리되도록 하는 설정 |
6 | value.converter.schemas.enable | true | |
7 | offset.storage.topic | connect-offsets | connect offset을 저장할 kafka topic명. |
8 | offset.storage.replication.factor | 1 | offset을 저장하는 토픽을 생성할때 사용할 replication factor. default는 3 |
9 | config.storage.topic | connect-configs | connector 설정을 저장할 kafka topic명 |
10 | config.storage.replication.factor | 1 | 설정을 저장하는 topic을 생성할 때 사용할 replication factor. default는 3 |
11 | status.storage.topic | connect-status | connector와 task 상태를 저장할 topic명. |
12 | status.storage.replication.factor | 1 | 상태를 저장하는 topic을 생성할 때 사용할 replication factor. default는 3 |
13 | offset.flush.interval.ms | 10000 | task에서 offset 커밋을 시도하는 간격 |
[ 그 외 주석처리된 옵션 값]
옵션명 | default | 설명 | |
1 | offset.storage.partition | 25 | offset을 저장에 사용할 topic 의 partitions 수. 사용된 최대 복제 계수만큼 적어도 브로커가 있어야 함 단일 broker cluster에서 실행하기 위해 복제 계수를 1로 설정함 |
2 | status.storage.partition | 5 | 상태 저장에 사용할 topic의 partition 수. kafka connect는 필요할 때 자동으로 항목을 작성하려고 시도하지만 항상 수동으로 작성할 수 도 있음 사용된 최대 복제 계수만큼 적어도 브로커가 있어야 함 단일 broker cluster에서 실행하기 위해 복제 계수를 1로 설정함 |
3 | listeners | HTTP://:8083 | REST API가 수신할 쉼표로 구분된 URT 목록. 지왼되는 프로토콜은 HTTP와 HTTPS임 만일 호스트이름을 0.0.0.0으로 지정하면 모든 인터페이스에 바인딩함 기본 인터페이스에 바인딩하려면 host 이름을 비워둬야함 ex) http://myhost:8083, https://myhost:8084 |
4 | rest.advertised.host.name | 다른 서버에서 라우팅 가능한 URL 등에 연결하기 위해 다른 작업자에게 제공되는 Hostaname과 port 설정되지 않고 구성된 경우 listener값을 사용함 |
|
5 | rest.advertised.port | ||
6 | rest.advertised.listener | ||
7 | plugin.path | plugin이 들어있는 path 리스트. |
[ 그 외 공식 문서 옵션 ]
옵션명 | default 값 | 설명 |
importance : high | ||
config.storage.topic | connection 옵션에 저장되는 kafka topic이름 | |
exactly.once.source.support | disabled | 트랜잭션을 사용하여 소스 레코드 및 해당 소스 오프셋을 작성하고 새 작업 세대를 가져오기 전에 이전 작업 세대를 사전에 차단함으로써 클러스터의 소스 커넥터에 대한 정확히 한 번의 지원을 활성화할지 여부 새 클러스터에서 정확히 한 번만 소스 지원을 활성화하려면 이 속성을 'enabled'로 설정 기존 클러스터에 대한 지원을 활성화하려면 먼저 클러스터의 모든 작업자에 대해 'preparing'로 설정한 다음, 'enabled'로 설정 권장 Valid Values: (case insensitive) DISABLED, ENABLED, PREPARING |
heartbeat.interval.ms | 3000 (3 seconds) | Kafka의 그룹 관리 기능을 사용할 때 그룹 코디네이터에 대한 하트비트 사이의 예상 시간 하트비트는 worker 세션이 활성 상태를 유지하도록 하고 새 구성원이 그룹에 가입하거나 탈퇴할 떄 재조정을 용이하게 하는 데 사용됨. 값은 session.timeout.ms보다 낮게 설정되어야하며 일반적으로 1/3이하로 설정해야 함 . 정상적인 재조정에 대한 예상 시간을 제어하기 위해 1/3보다 더 낮게 조정할 수도 있음 |
rebalance.timeout.ms | 60000 (1 minute) | 재조정이 시작된 후 각 작업자가 그룹에 합류하는 데 허용되는 최대 시간 이는 기본적으로 모든 작업이 보류 중이 ㄴ데이터를 플러시하고 오프켓을 커밋하는 데 필요한 시간에 대한 제한. 시간이 초과되면 작업자가 그룹에서 제거되어 오프셋 커밋이 실패하게 됨 |
session.timeout.ms | 10000 (10 seconds) | 작업자 오류를 감지하는 데 사용되는 제한 시간 작업자는 주기적인 하트비트를 보내 브로커에게 활성 상태를 나타냄. 이 옵션의 시간 초과가 만료되기전에 브로커가 하트비트를 수신하지 않으면 브로커는 그룹에서 작업자를 제거하고 재조정을 시작함. 값은 group.min.session.timeout.ms및group.max.session.timeout.ms에 의해 브로커 구성에 구성된 허용 범위 내에 있어야 함 |
ssl.key.password | null | 키 저장소 파일에 있는 개인 키의 비밀번호 또는 'ssl.keystore.key'에 지정된 PEM 키 |
… | ||
importance : medium | ||
client.dns.lookup | use_all_dns_ips | 클라이언트가 DNS 조회를 사용하는 방법을 제어함 use_all_dns_ip가 설정된 경우, 성공적인 연결이 설정될 때까지 반환된 각 ip주소에 순차적으로 연결함. 연결이 끊어지면 다음 ip가 사용됨. 모든 ip가 한 번 사용되면 클라이언트는 호스트 이름에서 ip를 다시 확인함(JVM 및 OS 캐시 DNS 이름 조회 모두) resolve_canonical_bootstrap_servers_only로 설정된 경우, 각 bootstrap 주소를 정식 이름목록으로 확인함. bootstrap 단계 이후에는,use_all_dns_ip와 비슷함 |
connections.max.idle.ms | 540000 (9 minutes) | 이 옵션에 지정된 시간 후에 유휴연결을 닫음 |
connector.client.config.override.policy | All | ConnectorClientConfigOverridePolicy 구현의 클래스 이름 또는 별칭 어떤 클라이언트 configuration이 connector에 의해 재정의될 수 있는지 정의내림. 기본값인 'ALL'은 connector configuration이 모든 clinet 속성에 재정의할 수 있다는 것을 의미함. 다른 가능성 있는 정책은 'none'인데, 이는 클라이언트 속성이 재정의되는것으로부터 승인되지않으며, 'Principal'이라는 정책은 이와 달리 승인함 |
receive.buffer.bytes | 32768 (32 kibibytes) | 데이터를 읽을 때 사용되는 TCP recceive 버퍼의 사이즈 만약 값이 -1이면, OS 기본값으로 사용됨 |
request.timeout.ms | 40000 (40 seconds) | 이 옵션은 클라이언트가 요청 응답을 기다리는 최대 시간의 양을 통제함 만약의 시간 초과 전에 응답을 받지 못하면, 클라이언트는 필욯ㄴ 경우엔 요청을 다시 보내는데 재시도가 모두 소진되면 요청이 실패함 |
sasl.client.callback.handler.class | null | AuthenticateCallbackHandler 인터페이스를 구현하는 salsl client callback handler class완전한 이름임 |
… | ||
security.protocol | PLAINTEXT | 브로커와 통신하는 데 사용되는 프로토콜. 유효한 값은 PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
send.buffer.bytes | 131072 (128 kibibytes) | 데이터를 보낼 때 사용할 TCP 전송 버퍼(SO_SNDBUF)의 크기 값이 -1이면 os기본값이 사용됨 |
ssl.enabled.protocols | TLSv1.2,TLSv1.3 | SSL 연결에 활성화된 프로토콜 목록. Java 11 이상으로 실행하는 경우 기본값은 'TLSv1.2,TLSv1.3'이고, 그렇지 않으면 'TLSv1.2' Java 11의 기본값을 사용하면 클라이언트와 서버가 둘 다 지원하는 경우 TLSv1.3을 선호하고 그렇지 않으면 TLSv1.2로 대체(둘 다 TLSv1.2 이상을 지원한다고 가정) 대부분의 경우, 기본값이 적합함 |
… | ||
worker.sync.timeout.ms | 3000 (3 seconds) | 작업자가 다른 작업자와 동기화되지 않고 재 동기화 설정이 필요할 때, 포기하고 group을 떠나기 전에 이 옵션에 지정된 시간까지 기다렸다가 다시 참여하기 전에 backoff 시간을 기다려야 함. |
worker.unsync.backoff.ms | 300000 (5 minutes) | 작업자가 다른 작업자와 동기화되지 않고 worker.sync.timeout.ms 내에서 따라잡지 못하는 경우 다시 참여하기 전에 오래동안 Connect 클러스터를 종료 권장 |
importance : low | ||
access.control.allow.methods | "" | Access-Control-Allow-Methods 헤더를 설정하여 교차 출처 요청에 지원되는 방법을 설정함. Access-Control-Allow-Methods 헤더의 기본값은 GET, POST 및 HEAD에 대한 교차 출처 요청을 허용 |
access.control.allow.origin | "" | REST API 요청에 대해 Access-Control-Allow-Origin 헤더를 설정하는 값 원본 간 액세스를 활성화하려면 API에 액세스하도록 허용해야 하는 애플리케이션의 도메인으로 설정하거나, 모든 애플리케이션에서 액세스를 허용하려면 '*'로 설정 권장 기본값은 EST API 도메인에서의 액세스만 허용함 |
admin.listeners | null | Admin REST API가 수신 대기할 쉼표로 구분된 URI 목록 지원되는 프로토콜 : HTTP, HTTPS 비어있거나 빈 문자열을 사용하면 이 기능이 비활성화됨. 기본 동작은 일반 listener(listener속성으로 지정)을 사용 |
auto.include.jmx.reporter | TRUE | [더 이상 사용되지 않음] |
client.id | "" | 요청할 때 서버에 전달할 ID 문자열 이 옵션의 목적은 논리적 애플리케이션 이름이 서버 측 요청 로깅에 포함되도록 허용하여 ip/port이외의 요청 소스를 추적할 수 있도록 하는 것. |
config.providers | "" | ConfigProvider classes의 쉼표로구분된 클래스 이름으로, 지정된 순서대로 로드됨 configProvider 인터페이스를 구현하면, 외부화된 비밀과 같은 커넥터 구성의 변수 참조를 바꿀 수 있음 |
config.storage.replication.factor | 3 | 구성 저장소 주제를 생성할 때 사용되는 복제 요소 |
connect.protocol | sessioned | Kafka Connect 프로토콜의 호환성 모드 |
header.converter | org.apache.kafka.connect.storage.SimpleHeaderConverter | Kafka Connect 형식과 Kafka에 기록된 직렬화된 형식 간에 변환하는 데 사용되는 HeaderConverter 클래스 이는 Kafka에 쓰거나 Kafka에서 읽는 메시지의 헤더 값 형식을 제어하며 커넥터와 독립적이므로 모든 커넥터가 모든 직렬화 형식으로 작동할 수 있음 일반적인 형식의 예로는 JSON 및 Avro가 있으며, 기본적으로 SimpleHeaderConverter는 헤더 값을 문자열로 직렬화하고 스키마를 유추하여 역직렬화하는 데 사용됨 |
inter.worker.key.generation.algorithm | HmacSHA256 | 내부 요청 키를 생성하는 데 사용하는 알고리즘 'HmacSHA256' 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨 다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함 |
inter.worker.key.size | null | 내부 요청 서명에 사용할 키의 크기(비트) null인 경우 키 생성 알고리즘의 기본 키 크기가 사용됨 |
inter.worker.key.ttl.ms | 3600000 (1 hour) | 내부 요청 검증에 사용되는 생성된 세션 키의 TTL(밀리초) |
inter.worker.signature.algorithm | HmacSHA256 | 내부 요청에 서명하는 데 사용되는 알고리즘. 이 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨. 다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함 |
inter.worker.verification.algorithms | HmacSHA256 | inter.worker.signature.algorithm 속성에 사용되는 알고리즘을 포함해야 하는 내부 요청 확인을 위해 허용되는 알고리즘 목록 이 알고리즘은 이를 지원하는 JVM에서 기본값으로 사용됨. 다른 JVM에서는 기본값이 사용되지 않으며 이 속성의 값은 작업자 구성에서 수동으로 지정되어야 함 |
listeners | http://:8083 | REST API가 수신할 쉼표로 구분된 URI 목록 지원되는 프로토콜 : HTTP, HTTPS 모든 인터페이스에 바인딩하려면 호스트 이름을 0.0.0.0으로 지정 기본 인터페이스에 지정하려면 호스트 이름을 비워둬야 함 ex) HTTP://myhost:8083,HTTPS://myhost:8084 |
metadata.max.age.ms | 300000 (5 minutes) | 새로운 브로커나 파티션을 사전에 검색하기 위해 파티션 리더십 변경이 확인되지 않은 경우에도 메타데이터를 강제로 새로 고치는 데 걸리는 시간(밀리초) |
metric.reporters | "" | 측정항목 보고자로 사용할 클래스 목록 org.apache.kafka.common.metrics.MetricsReporter인터페이스를 구현하면 새 메트릭 생성에 대한 알림을 받을 클래스를 연결할 수 있음 JMX 통계를 등록하기 위해 JmxReporter가 항상 포함됨 |
metrics.num.samples | 2 | 지표를 계산하기 위해 유지되는 샘플 수 |
metrics.recording.level | INFO | 메트릭에 대한 가장 높은 기록 level |
metrics.sample.window.ms | 30000 (30 seconds) | 측정항목 샘플이 계산되는 기간 |
offset.flush.interval.ms | 60000 (1 minute) | 작업에 대한 오프셋 커밋을 시도하는 간격 |
offset.flush.timeout.ms | 5000 (5 seconds) | 프로세스를 취소하고 향후 시도에서 커밋될 오프셋 데이터를 복원하기 전에 레코드가 플러시되고 오프셋 데이터가 오프셋 스토리지에 커밋될 때까지 기다리는 최대 시간(밀리초) 이 옵션은 정확히 1회 지원으로 실행되는 소스 커넥터에는 영향을 주지않음 |
offset.storage.replication.factor | 25 | 오프셋 저장 주제 생성 시 사용되는 복제 인자 |
plugin.discovery | hybrid_warn | 클래스 경로 및 플러그인.경로 구성에 있는 플러그인을 검색하는 데 사용하는 방법 * only_scan: 리플렉션을 통해서만 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작에 영향을 미치지 않음. * hybrid_warn: 반사적으로 그리고 ServiceLoader를 통해 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작 중에 경고를 알림 * hybrid_fail: 반사적으로 ServiceLoader를 통해 플러그인을 검색. ServiceLoader에서 검색할 수 없는 플러그인은 작업자 시작에 실패하게 만듦 * service_load: ServiceLoader로만 플러그인을 검색. 다른 모드보다 시작 속도가 빠름. ServiceLoader에서 검색할 수 없는 플러그인은 사용하지 못할 수도 있음. |
reconnect.backoff.max.ms | 1000 (1 second) | 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기하는 최대 시간(밀리초) 제공되는 경우 연속 연결이 실패할 때마다 호스트당 백오프가 이 최대값까지 기하급수적으로 증가함. 백오프 증가를 계산한 후 연결 폭풍을 방지하기 위해 20% 랜덤 지터가 추가됨 |
reconnect.backoff.ms | 50 | 지정된 호스트에 다시 연결을 시도하기 전에 대기하는 기본 시간 긴밀한 루프에서 호스트에 반복적으로 연결되는 것을 방지할 수 있음 backoff는 클라이언트가 브로커에 시도하는 모든 연결에 적용됨. 이 값은 초기 backoff값이며 연속 연결이 실패할 떄마다 해당 reconnect.backoff.max.ms값까지 기하급수적으로 증가함 |
response.http.headers.config | "" | REST API HTTP 응답 헤더에 대한 규칙 |
rest.advertised.host.name | null | 이옵션이 설정될 경우, 연결할 다른 작업자에게 제공되는 host이름임 |
rest.advertised.listener | null | 다른 작업자가 사용할 수 있도록 제공되는 광고 리스너(HTTP 또는 HTTPS)를 설정 |
rest.advertised.port | null | 다른 작업자에게 연결을 위해 제공되는 포트 |
rest.extension.classes | "" | ConnectRestExtension classes의 콤마로 분리된 이름으로, 지정된 순서대로 로드됨 인터페이스를 구현하면 ConnectRestExtension필터와 같은 Connect의 REST API 사용자 정의 리소스에 삽입할 수 있음 일반적으로 로깅, 보안 등과 같은 사용자 정의 기능을 추가하는 데 사용됨 |
retry.backoff.max.ms | 1000 (1 second) | 반복적으로 실패한 브로커에 대한 요청을 재시도할 때 대기하는 최대 시간(밀리초) 제공된 경우 클라이언트당 백오프는 실패한 각 요청에 대해 이 최대값까지 기하급수적으로 증가함 재시도 시 모든 클라이언트가 동기화되는 것을 방지하기 위해 0.2배의 무작위 지터가 백오프에 적용되어 백오프가 계산된 값보다 20% 아래에서 20% 사이의 범위 내에 들어가게됨 retry.backoff.ms가 retry.backoff.max.ms보다 높게 설정되면 retry.backoff.max.ms는 급수적인 증가없이 처음부터 지속적인 backoff로 사용됨 |
retry.backoff.ms | 100 | 지정된 주제 파티션에 대해 실패한 요청을 재시도하기 전에 대기하는 시간 이렇게 하면 일부 실패 시나리오에서 긴밀한 루프로 반복적으로 요청을 보내는 것을 방지할 수 있음. 이 값은 초기 backoff값이며 요청이 실패할 떄마다 해당 retry.backoff.max.ms값까지 기하급수적으로 증가함 |
sasl.kerberos.kinit.cmd | /usr/bin/kinit | Kerberos kinit 명령 경로. |
scheduled.rebalance.max.delay.ms | 300000 (5 minutes) | 커넥터와 작업을 그룹에 다시 밸런싱하고 재할당하기 전에 한 명 이상의 퇴사한 작업자가 돌아올 때까지 기다리도록 예약된 최대 지연을 의미하는 옵션. 이 기간 동안 퇴사한 작업자의 커넥터와 작업은 할당되지 않은 상태로 유지됨 |
socket.connection.setup.timeout.max.ms | 30000 (30 seconds) | 클라이언트가 소켓 연결이 설정될 때까지 기다리는 최대 시간 연결 설정 시간 제한은 이 최대값까지 연속 연결 실패가 발생할 때마다 기하급수적으로 증가함. 연결 폭주를 방지하기 위해 0.2의 무작위 인자가 시간 초과에 적용되어 계산된 값보다 20% 아래에서 20% 사이의 무작위 범위가 됨 |
socket.connection.setup.timeout.ms | 10000 (10 seconds) | 클라이언트가 소켓 연결이 설정될 때까지 기다리는 시간 제한 시간이 경과하기 전에 연결이 구축되지 않으면 클라이언트는 소켓 채널을 닫음 이 값은 초기 backoff값이며 연속 연결이 실패할 때마다 해당 socket.connection.setup.timeout.max.ms까지 기하 급후적으로 증가함 |
ssl.cipher.suites | null | 암호 제품군 목록 이는 TLS 또는 SSL 네트워크 프로토콜을 사용하여 네트워크 연결에 대한 보안 설정을 협상하는 데 사용되는 인증, 암호화, MAC 및 키 교환 알고리즘의 명명된 조합임 |
… | ||
status.storage.replication.factor | 3 | 상태 저장 토픽 생성 시 사용되는 복제 인자 |
task.shutdown.graceful.timeout.ms | 5000 (5 seconds) | 작업이 정상적으로 종료될 때까지 기다리는 시간 작업 별이 아닌 총시간을 의미하며, 모든 작업은 종료가 트리거된 후 순차적으로 대기함 |
topic.creation.enable | TRUE | 소스 커넥터가 `topic.creation.` 속성으로 구성된 경우 소스 커넥터에서 사용되는 주제의 자동 생성을 허용할지 여부 각 작업은 관리 클라이언트를 사용하여 topic을 생성하며 kafka브로커에 의존하여 자동으로 topic을 생성하진않음 |
topic.tracking.allow.reset | TRUE | true로 설정하면 사용자 요청이 커넥터당 활성 항목 집합을 재설정할 수 |
topic.tracking.enable | TRUE | 런타임 중에 커넥터당 활성 항목 집합을 추적할 수 있음 |