A presentation at Big Data Conference Europe by Robin Moffatt
@rmoff From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect
What is Kafka Connect? @rmoff
Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers @rmoff
Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers @rmoff
Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers @rmoff
Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff
Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS @rmoff
Writing to data stores from Kafka App Kaf ka Con nec t Data Store @rmoff
Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS @rmoff
Demo! Kafka Connect Kafka Connect Elasticsearch http:!//rmoff.dev/kafka-connect-code @rmoff
Configuring Kafka Connect Inside the API - connectors, transforms, converters @rmoff
Kafka Connect basics Source Kafka Connect Kafka @rmoff
Connectors Connector Source Kafka Connect Kafka @rmoff
Connectors “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } @rmoff
Connectors Connector Native data Connect Record Source Kafka Connect Kafka @rmoff
Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka @rmoff
Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV 👍 👍 👍 😬 https://rmoff.dev/qcon-schemas @rmoff
The Confluent Schema Registry Avro Schema Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect @rmoff
Converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector @rmoff
What about internal converters? value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter @rmoff
Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka @rmoff
Single Message Transforms “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform @rmoff
Extensible Connector Transform(s) Converter @rmoff
Confluent Hub hub.confluent.io @rmoff
Deploying Kafka Connect Connectors, Tasks, and Workers @rmoff
Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff
Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff
Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff
Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker @rmoff
Kafka Connect Standalone Worker Fault-tolerant? Nope. S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets @rmoff
“Scaling” the Standalone Worker Fault-tolerant? Nope. JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets @rmoff
Kafka Connect Distributed Worker Fault-tolerant? Yeah! S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff
Scaling the Distributed Worker Fault-tolerant? Yeah! S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff
Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status @rmoff
Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff
Multiple Distributed Clusters S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status @rmoff
Containers @rmoff
Kafka Connect images on Docker Hub confluentinc/cp-kafka-connect-base @rmoff
Adding connectors to a container Confluent Hub JAR confluentinc/cp-kafka-connect-base @rmoff
At runtime kafka-connect: image: confluentinc/cp-kafka-connect-base:6.0.0 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker @rmoff
Build a new image FROM confluentinc/cp-kafka-connect-base:6.0.0 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base @rmoff
Automating connector creation
Troubleshooting Kafka Connect @rmoff
Troubleshooting Kafka Connect Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status @rmoff
Troubleshooting Kafka Connect curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1018)\n\t at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” @rmoff
The log is the source of truth $ confluent local log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log @rmoff
Dynamic log levels (Added in Apache Kafka 2.4 / Confluent Platform 5.4) curl -s http://localhost:8083/admin/loggers/ | jq { “org.apache.kafka.connect.runtime.rest”: { “level”: “WARN” }, “org.reflections”: { “level”: “ERROR” }, “root”: { “level”: “INFO” } } curl -s -X PUT http://localhost:8083/admin/loggers/io.debezium -H “Content-Type:application/json” -d ‘{“level”: “TRACE”}’ https://rmoff.dev/kc-dynamic-log-level @rmoff
Kafka Connect @rmoff
Kafka Connect “Task is being killed and will not recover until manually restarted” Symptom not Cause @rmoff
Kafka Connect @rmoff
Error Handling and Dead Letter Queues @rmoff
Mismatched converters org.apache.kafka.common.errors.SerializationException: Unknown magic byte! @rmoff
Mismatched converters org.apache.kafka.common.errors.SerializationException: JSON JSON JSON JSON JSON JSON Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data @rmoff
Mixed serialisation methods org.apache.kafka.common.errors.SerializationException: Avro Avro JSON Avro JSON JSON Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages @rmoff
Error Handling and DLQ https://cnfl.io/connect-dlq @rmoff
Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages @rmoff
YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages @rmoff
Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages @rmoff
Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages @rmoff
Metrics and Monitoring @rmoff
REST API • sort • peco http://go.rmoff.net/connector-status @rmoff
Confluent Control Center @rmoff
Consumer lag @rmoff
JMX @rmoff
on Photo by Want to learn more? CTAs, not CATs (sorry, not sorry) @rmoff
RM OF F2 00 $200 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $200 towards your bill 😄 ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer
Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io
Confluent Community Slack group cnfl.io/slack @rmoff
#EOF @rmoff rmoff.dev/talks youtube.com/rmoff
Integrating Apache Kafka with other systems in a reliable and scalable way is often a key part of a streaming platform. Fortunately, Apache Kafka includes the Connect API that enables streaming integration both in and out of Kafka. Like any technology, understanding its architecture and deployment patterns is key to successful use, as is knowing where to go looking when things aren’t working.
This talk will discuss the key design concepts within Kafka Connect and the pros and cons of standalone vs distributed deployment modes. We’ll do a live demo of building pipelines with Kafka Connect for streaming data in from databases, and out to targets including Elasticsearch. With some gremlins along the way, we’ll go hands-on in methodically diagnosing and resolving common issues encountered with Kafka Connect. The talk will finish off by discussing more advanced topics including Single Message Transforms, and deployment of Kafka Connect in containers.
The following resources were mentioned during the presentation or are useful additional information.