kafka 3.8.0 connect
1.kafka connect란
kafka는 메시지를 발행하는 producer , 메시지를 소비하는 consumer, 중앙화된 전송 영역을 제공하는 broker로 구성됨
하나 이상의 broker는 kafka cluster에 저장되며 각 broker는 여러개의 topic을 가지며 topic에 메세지가 저장됨
실제 kafka 시스템을 분산 시스템에 적용할 때 개발자는 producer AP, consumer AP, broker를 구동해야하는데 여기서 직접 파이프라인을 구축해야함
구축해야할 파이프라인이 여러개라면 매번 반복적으로 파이프라인을 구성해야하는 번거로움 발생
또한, producer와 consumer가 코드를 작성할 수 없고 변경도 불가능한 시스템(ex. DB, storage service, Amazon S3, ES 등) 일 때 kafka 데이터 파이프라인을 직접 구성할 수 없음
이런한 문제점을 해결하기 위해 kafka connect 사용
결국, kafka connect는
1) 반복적인 파이프라인 구축의 번거로움을 쉽고 간편하게하고,
2) 직접 파이프라인 구축이 어려운 시스템을 위해 만들어진 apache kafka 프로젝트 중 하나
덧붙여, rest API를 제공하기 때문에 커넥터의 현재 상태를 확인 할 수 있음 → curl 명령어 이용 ( ex. curl http://localhost:8083/connectors/local-file-source | python -m json.tool )
(rest API를 사용할 경우 재시작하지 않고도 동적으로 커넥터의 설정과 옵션을 변경할 수 있음)
2. 개념
1) connect 및 connector
용어설명
connect | connector가 동작하게 하는 서버 |
connector | data source(DB)의 데이터를 처리하는 소스가 들어있는 jar파일 |
worker(instance) | kafka connect 프로세스가 실행되는 서버 또는 인스턴스를 의미함. connector나 task들이 워커에서 실행됨 분산 모드는 특정 worker에 장애가 발생하더라도 해당 worker에서 동작 중인 connector나 task들이 다른 worker로 이동해 연속해서 동작할 수 있다는 장점이 있지만, 단독 모드는 그렇지 않음 |
task | 커넥터가 정의한 작업을 직접 수행하는 역할 커넥터는 데이터 전송에 관한 작업을 정의한 후 각 테스크들을 워커에 분산함. 그런 다음 워커에 분산 배치된 테크스들은 커넥터가 정의한 작업대로 데이터를 복사하게됨. 테스크 또한 소스 테스크와 싱크 테스크로 나뉨 |
즉, **Kafka Connect**는 **Apache Kafka**에서 외부 시스템과 데이터를 쉽게 주고받기 위한 도구
코드 없이 설정만으로 데이터 파이프라인을 구축할 수 있으며, 실시간으로 데이터를 처리하고 여러 시스템과 연동할 수 있도록 도와줌
2) 모드
모드 명 | 설명 | config 파일 | 차이점 |
standalone 모드 |
하나의 connect만 사용하는 모드 1개의 connect Server을 구동시키는 모드이기때문에 가용성 측면에서 떨어짐 따라서 단일장애점(single point of failure)문제가 발생할 수 있으므로 개발환경이나 중요도가 낮은 데이터 파이프라인을 운영할 때 사용 권장 #config 설정 값 ① : 브로커 주소 지정 ② : 카프카로 데이터를 보내거나 가져올 때 사용하는 포맷을 지정하는데, 키와 밸류를 각각 지정함. org.apache.kafka.connect.json.JsonConverter 가 기본값이며 json 외에도 Avro, String, ByteArray 등이 있음 *kafka converter는 소스에서 카프카로 전송하 때의 직렬화와 카프카에서 싱크로 전송할 때의 역직렬화를 담당함 이로인해 불필요하게 컨버팅하는 코드를 추가로 작하지 않아도됨 ③ : 스키마가 포함된 구조를 사용할 수 있음. ④ : 재처리 등을 목적으로 오프셋을 파일로 저장하는데, /tmp/connect.offsets 가 기본값임 ⑤ : 오프셋 플러시 주기를 설정하며 10,000가 기본값임 ⑥ : connect 관련 jar 파일을 기재해야하는데, 해당 파일 위치는 /kafka/kafka_2.13-3.7.0/libs 에 위치함. 기재하지 않을 시 connect가 기동할 때 필요한 class 파일을 찾지 못함 |
connect-standalone.properties | |
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 ① # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter ② value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true ③ value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets ④ # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 ⑤ # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path= /kafka/kafka_2.13-3.7.0/libs ⑥ |
|||
distributed 모드 |
![]() 자동 리밸런싱 동작이 있어서, worker 4추가시, ![]() 커넥트는 즉시 확장됨 여러개의 connect를 한개의 클러스트로 묶어서 사용하는 모드 2개 이상의 connect Server을 cluster형태로 운영함으로서 고가용성을 보장 데이터 처리량의 변화에도 유연하게 대응할 수 있는 scale out을 지원하며 실제 상용환경에서 사용 권장 ① : 분산 모드의 그룹 아이디. 컨슈머 그룹의 그룹 아이디와 동일한 개념 ② : 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽. ③ : 커넥터들의 설정을 저장하는 카프카 내부 토픽. replication factor 수 3 지향 ④ : 커넥터들의 상태를 저장하는 카프카 내부 토픽. replication factor 수 3 지향 |
connect-distributed.properties | 1) 메타 정보의 위치 분산 모드는 메타 정보의 저장소로 카프카 내부 토픽을 이용함 이용 방법 : 컨슈머 그룹들의 오프셋 정보를 __consumer_offsets 토픽에 저장하는 방법과 유사함 장애 발생시 유연하게 대응할 수 있도록 카프카 커넥터는 안전한 저장소인 카프카의 내부 토픽을 메타 저장소로 이용함. 이로써 하나의 워커에 장애가 발생하더라도 남아 있는 모든 워커가 카프카의 내부 토픽으로부터 메타 정보를 얻을 수 있음. 그래서 더더욱 replication factor는 3으로 설정해야 함 2) 자동 리밸런싱 동작 워커 추가시 내부적으로 자동 리밸런싱 동작함 (ex. 워커 추가 시, 기존 워커에 있던 커넥터가 새 워커로 이동) 3) 미러모드 사용 시 유용 |
## # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ## # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended # to be used with the examples, and some settings may differ from those used in a production system, especially # the `bootstrap.servers` and those specifying replication factors. # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. bootstrap.servers=localhost:9092 # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs group.id=connect-cluster ① # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. offset.storage.topic=connect-offsets ② offset.storage.replication.factor=1 #offset.storage.partitions=25 # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. config.storage.topic=connect-configs ③ config.storage.replication.factor=1 # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. status.storage.topic=connect-status ④ status.storage.replication.factor=1 #status.storage.partitions=5 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS. # Specify hostname as 0.0.0.0 to bind to all interfaces. # Leave hostname empty to bind to default interface. # Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084" #listeners=HTTP://:8083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. # If not set, it uses the value for "listeners" if configured. #rest.advertised.host.name= #rest.advertised.port= #rest.advertised.listener= # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors plugin.path= /kafka/kafka_2.13-3.7.0/libs |
3.활용 방안 구조
1) single mode → 세 가지 안을 먼저 single mode로 진행한다
2) distributed mode → 세 가지 안 single 모드 환경 구성 및 설정 완료 후, distributed mode로 전환하여 진행한다.
1) DB to DB
|
![]() |
2) Java source to DB |
![]() |
3) DB to java code |
![]() |
4. 구성 # AWS 환경에서
- 환경 구성 (single mode)
- jdk version :
모드 | 구분 | 방향 | 테스트 서버 | 연관 파일 | 사전 준비 | 환경 구성 방법 | 확인 날짜 |
single mode (connect-standalone.properties / 내부 connect) |
connector 하나만 사용할 경우 | Java source to local file | broker3 |
connect-standalone.sh connect-standalone.properties connect-file-sink.properties producer.sh |
#connect-standalone.properties -bootstrap.servers 를 broker3으로 변경 -plugin.path 는 kafka 엔진 아래 libs 위치로 수정 #connect-file-sink.properties - file명과 topics명 변경 |
1) 기존 connect-standalone.properties, connect-file-sink.properties 파일 백업 $>kafcfg $>cp -a connect-standalone.properties connect-standalone.properties_origin $>cp -a connect-connect-file-sink.properties connect-connect-file-sink.properties_origin 2) connect-file-sink.properties 가 적용된 standalone 모드 실행 $>kafbin $>./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties 3) 8083 port 떠있는지 확인 $>netstat -an | grep 8083 4) producer 에서 data 전송 $>./producer.sh test-topic_241024 5) /kafka/src/에 json 파일 형태로 파일 생성되었는지 확인 $>cd /kafka/src $>ls -al | grep test-topic_241024.txt 6) connect-file-sink.properties 단독 이용 불가 시, connect-file-source.properties를 daemon으로 하여 connect-standalone 기동 ( 2개의 connector 로 진행 ) $>kafbin $>./connecst-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties |
24.10.24 |
Java source to java code | connect-standalone.sh connect-standalone.properties connect-source-sink.properties consumer.sh |
#connect-standalone.properties -bootstrap.servers 를 broker3으로 변경 #connect-file-source.properties - file명과 topics명 변경 |
|||||
connector 두 개 모두 사용할 경우 |
DB to DB | connect-standalone.sh connect-standalone.properties JDBC source connector json 파일 |
1.connect-distributed.properties 구동 2.kafka connect topic 확인 3.jdbc connector 설치 4.jdbc connector 관련 설정 5.plugin.path에 jdbc.jar 파일 경로로 변경 |
1) kafka connect 따로 설치하라고 하는데 bin에 있음 (conflient-community-6.10.tar.gz ) 2) jdbc connector 설치 3) 설치한 jdbc connect dir 내부의 lib/kafka-connect-jdbc.jar파일 경로 복사 4) MariaDB 사용을 위해 드라이버 복사 → 왜지 (해당 드라이버를 kafka에 설정) $> cp ./mariadb-java-client.jar |