kafka 기술문서

kafka 3.8.0 connect

saay-hi 2024. 10. 24. 17:24
1.kafka connect란

kafka는 메시지를 발행하는 producer , 메시지를 소비하는 consumer, 중앙화된 전송 영역을 제공하는 broker로 구성됨
하나 이상의 broker는 kafka cluster에 저장되며 각 broker는 여러개의 topic을 가지며 topic에 메세지가 저장됨

실제 kafka 시스템을 분산 시스템에 적용할 때 개발자는 producer AP, consumer AP, broker를 구동해야하는데 여기서 직접 파이프라인을 구축해야함
구축해야할 파이프라인이 여러개라면 매번 반복적으로 파이프라인을 구성해야하는 번거로움 발생
또한, producer와 consumer가 코드를 작성할 수 없고 변경도 불가능한 시스템(ex. DB, storage service, Amazon S3, ES 등) 일 때 kafka 데이터 파이프라인을 직접 구성할 수 없음

이런한 문제점을 해결하기 위해 kafka connect 사용

결국, kafka connect는

1) 반복적인 파이프라인 구축의 번거로움을 쉽고 간편하게하고,
2) 직접 파이프라인 구축이 어려운 시스템을 위해 만들어진 apache kafka 프로젝트 중 하나

덧붙여, rest API를 제공하기 때문에 커넥터의 현재 상태를 확인 할 수 있음 → curl  명령어 이용 ( ex. curl http://localhost:8083/connectors/local-file-source | python -m json.tool )

(rest API를 사용할 경우 재시작하지 않고도 동적으로 커넥터의 설정과 옵션을 변경할 수 있음) 

 

2. 개념

 

1) connect 및 connector

 

용어설명

connect connector가 동작하게 하는 서버
connector data source(DB)의 데이터를 처리하는 소스가 들어있는 jar파일

worker(instance) kafka connect 프로세스가 실행되는 서버 또는 인스턴스를 의미함. connector나 task들이 워커에서 실행됨
분산 모드는 특정 worker에 장애가 발생하더라도 해당 worker에서 동작 중인 connector나 task들이 다른 worker로 이동해 연속해서 동작할 수 있다는 장점이 있지만, 단독 모드는 그렇지 않음
task 커넥터가 정의한 작업을 직접 수행하는 역할
커넥터는 데이터 전송에 관한 작업을 정의한 후 각 테스크들을 워커에 분산함. 그런 다음 워커에 분산 배치된 테크스들은 커넥터가 정의한 작업대로 데이터를 복사하게됨.
테스크 또한 소스 테스크와 싱크 테스크로 나뉨

즉, **Kafka Connect**는 **Apache Kafka**에서 외부 시스템과 데이터를 쉽게 주고받기 위한 도구
코드 없이 설정만으로 데이터 파이프라인을 구축할 수 있으며, 실시간으로 데이터를 처리하고 여러 시스템과 연동할 수 있도록 도와줌

 

2) 모드

모드 명 설명 config 파일 차이점
standalone 모드



하나의 connect만 사용하는 모드
1개의 connect Server을 구동시키는 모드이기때문에 가용성 측면에서 떨어짐
따라서 단일장애점(single point of failure)문제가 발생할 수 있으므로 개발환경이나 중요도가 낮은 데이터 파이프라인을 운영할 때 사용 권장


#config 설정 값
① : 브로커 주소 지정
② : 카프카로 데이터를 보내거나 가져올 때 사용하는 포맷을 지정하는데, 키와 밸류를 각각 지정함.
      org.apache.kafka.connect.json.JsonConverter 가 기본값이며 json 외에도 Avro, String, ByteArray 등이 있음

*kafka converter는 소스에서 카프카로 전송하 때의 직렬화와 카프카에서 싱크로 전송할 때의 역직렬화를 담당함
이로인해 불필요하게 컨버팅하는 코드를 추가로 작하지 않아도됨

③ : 스키마가 포함된 구조를 사용할 수 있음.
④ : 재처리 등을 목적으로 오프셋을 파일로 저장하는데, /tmp/connect.offsets 가 기본값임
⑤ : 오프셋 플러시 주기를 설정하며 10,000가 기본값임
⑥ : connect 관련 jar 파일을 기재해야하는데, 해당 파일 위치는 /kafka/kafka_2.13-3.7.0/libs 에 위치함. 기재하지 않을 시 connect가 기동할 때 필요한 class 파일을 찾지 못함
connect-standalone.properties


# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092 ①

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter ②
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true ③
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets ④
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000 ⑤

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path= /kafka/kafka_2.13-3.7.0/libs ⑥
distributed 모드








자동 리밸런싱 동작이 있어서, worker 4추가시,


커넥트는 즉시 확장됨 
여러개의 connect를 한개의 클러스트로 묶어서 사용하는 모드
2개 이상의 connect Server을 cluster형태로 운영함으로서 고가용성을 보장 
데이터 처리량의 변화에도 유연하게 대응할 수 있는 scale out을 지원하며 실제 상용환경에서 사용 권장
 
: 분산 모드의 그룹 아이디. 컨슈머 그룹의 그룹 아이디와 동일한 개념
② : 커넥터들의 오프셋 추적을 위해 저장하는 카프카 내부 토픽. 
③ : 커넥터들의 설정을 저장하는 카프카 내부 토픽. replication factor 수 3 지향
④ : 커넥터들의 상태를 저장하는 카프카 내부 토픽. replication factor 수 3 지향
connect-distributed.properties 1) 메타 정보의 위치
분산 모드는 메타 정보의 저장소로 카프카 내부 토픽을 이용함
이용 방법 : 컨슈머 그룹들의 오프셋 정보를 __consumer_offsets 토픽에 저장하는 방법과 유사함
장애 발생시 유연하게 대응할 수 있도록 카프카 커넥터는 안전한 저장소인 카프카의 내부 토픽을 메타 저장소로 이용함. 이로써 하나의 워커에 장애가 발생하더라도 남아 있는 모든 워커가 카프카의 내부 토픽으로부터 메타 정보를 얻을 수 있음.
그래서 더더욱 replication factor는 3으로 설정해야 함
2) 자동 리밸런싱 동작
워커 추가시 내부적으로 자동 리밸런싱 동작함
(ex. 워커 추가 시, 기존 워커에 있던 커넥터가 새 워커로 이동)

3) 미러모드 사용 시 유용

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092 

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets ②
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status ④
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084"
#listeners=HTTP://:8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for "listeners" if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors
plugin.path= /kafka/kafka_2.13-3.7.0/libs

 

3.활용 방안 구조

1) single mode → 세 가지 안을 먼저 single mode로 진행한다 

2) distributed mode → 세 가지 안 single 모드 환경 구성 및 설정 완료 후, distributed mode로 전환하여 진행한다.

1) DB to DB 
2) Java source to DB 
3) DB to java code

 

 

4. 구성 # AWS 환경에서

  • 환경 구성 (single mode)
  • jdk version : 
모드 구분 방향 테스트 서버 연관 파일  사전 준비 환경 구성 방법 확인 날짜




single mode
(connect-standalone.properties / 내부 connect)
connector 하나만 사용할 경우 Java source to local file broker3

connect-standalone.sh
connect-standalone.properties
connect-file-sink.properties
producer.sh
#connect-standalone.properties
-bootstrap.servers 를 broker3으로 변경
-plugin.path 는 kafka 엔진 아래 libs 위치로 수정


#connect-file-sink.properties
- file명과 topics명 변경


1)  기존 connect-standalone.properties, connect-file-sink.properties 파일 백업
$>kafcfg
$>cp -a connect-standalone.properties connect-standalone.properties_origin
$>cp -a connect-connect-file-sink.properties connect-connect-file-sink.properties_origin

2) connect-file-sink.properties 가 적용된 standalone 모드 실행
$>kafbin
$>./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties

3) 8083 port 떠있는지 확인
$>netstat -an | grep 8083

4) producer 에서 data 전송
$>./producer.sh test-topic_241024

5) /kafka/src/에 json 파일 형태로 파일 생성되었는지 확인
$>cd /kafka/src
$>ls -al | grep test-topic_241024.txt

6) connect-file-sink.properties 단독 이용 불가 시, connect-file-source.properties를 daemon으로 하여 connect-standalone 기동 ( 2개의 connector 로 진행 )
$>kafbin
$>./connecst-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties 
24.10.24
Java source to java code connect-standalone.sh
connect-standalone.properties
connect-source-sink.properties
consumer.sh
#connect-standalone.properties
-bootstrap.servers 를 broker3으로 변경


#connect-file-source.properties
- file명과 topics명 변경


                                                                                                                                                                                                                                                                                                                                                                                        


connector 두 개 모두 사용할 경우
DB to DB
connect-standalone.sh
connect-standalone.properties
JDBC source connector json 파일
1.connect-distributed.properties 구동
2.kafka connect topic 확인
3.jdbc connector 설치
4.jdbc connector 관련 설정
5.plugin.path에 jdbc.jar 파일 경로로 변경

1) kafka connect 따로 설치하라고 하는데 bin에 있음 (conflient-community-6.10.tar.gz )
2) jdbc connector 설치
3) 설치한 jdbc connect dir 내부의 lib/kafka-connect-jdbc.jar파일 경로 복사
4) MariaDB 사용을 위해 드라이버 복사 → 왜지
(해당 드라이버를 kafka에 설정)
$> cp ./mariadb-java-client.jar