A presentation at Berlin Kafka Meetup in in Berlin, Germany by Robin Moffatt
@rmoff #KafkaMeetup From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect
@rmoff #KafkaMeetup From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect
@rmoff #KafkaMeetup What is Kafka Connect? From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect
Look Ma, No Code! @rmoff #KafkaMeetup { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } https://docs.confluent.io/current/connect/ “connection.url”: From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS From Zero to Hero with Kafka Connect
Writing to data stores from Kafka @rmoff #KafkaMeetup App Kaf ka Con nec t Data Store From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Demo http:!//rmoff.dev/kafka-connect-code From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Configuring Kafka Connect Inside the API - connectors, transforms, converters From Zero to Hero with Kafka Connect
Kafka Connect basics Source Kafka Connect @rmoff #KafkaMeetup Kafka From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Connectors Connector Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
Connectors @rmoff #KafkaMeetup “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Connectors Connector Native data Connect Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf From Zero to Hero with Kafka Connect
The Confluent Schema Registry Avro Schema @rmoff #KafkaMeetup Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect From Zero to Hero with Kafka Connect
Converters @rmoff #KafkaMeetup key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector From Zero to Hero with Kafka Connect
What about internal converters? @rmoff #KafkaMeetup value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Single Message Transforms “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform From Zero to Hero with Kafka Connect
Extensible Connector @rmoff #KafkaMeetup Transform(s) Converter From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Confluent Hub hub.confluent.io From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Deploying Kafka Connect Connectors, Tasks, and Workers From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup From Zero to Hero with Kafka Connect
Kafka Connect Standalone Worker @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup “Scaling” the Standalone Worker JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets Fault-tolerant? Nope. From Zero to Hero with Kafka Connect
Kafka Connect Distributed Worker @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect
Scaling the Distributed Worker @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect
Distributed Worker - fault tolerance @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status From Zero to Hero with Kafka Connect
Distributed Worker - fault tolerance @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status From Zero to Hero with Kafka Connect
Multiple Distributed Clusters @rmoff #KafkaMeetup S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Containers From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Kafka Connect images on Docker Hub kafka-connect-elasticsearch kafka-connect-jdbc kafka-connect-hdfs […] confluentinc/cp-kafka-connect-base confluentinc/cp-kafka-connect From Zero to Hero with Kafka Connect
Adding connectors to a container @rmoff #KafkaMeetup Confluent Hub JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup At runtime kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker From Zero to Hero with Kafka Connect
Build a new image @rmoff #KafkaMeetup FROM confluentinc/cp-kafka-connect:5.2.1 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect
Automating connector creation @rmoff #KafkaMeetup
@rmoff #KafkaMeetup Troubleshooting Kafka Connect From Zero to Hero with Kafka Connect
Troubleshooting Kafka Connect @rmoff #KafkaMeetup Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect
Troubleshooting Kafka Connect @rmoff #KafkaMeetup curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java: 1018)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java: 748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java: 212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” From Zero to Hero with Kafka Connect
The log is the source of truth @rmoff #KafkaMeetup $ confluent log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Error Handling and Dead Letter Queues From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup org.apache.kafka.common.errors.SerializationException: Unknown magic byte! From Zero to Hero with Kafka Connect
Mismatched converters @rmoff #KafkaMeetup org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data From Zero to Hero with Kafka Connect
Mixed serialisation methods @rmoff #KafkaMeetup org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Error Handling and DLQ Handled Not Handled Convert Start -> read/write from Kafka -> [de]-serialisation Transform -> Connections to a data store Poll / Put -> Read/Write from/to data store* * can be retried by Connect https://cnfl.io/connect-dlq From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup Metrics and Monitoring From Zero to Hero with Kafka Connect
REST API @rmoff #KafkaMeetup http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect
Confluent Control Center @rmoff #KafkaMeetup From Zero to Hero with Kafka Connect
Consumer lag @rmoff #KafkaMeetup From Zero to Hero with Kafka Connect
JMX @rmoff #KafkaMeetup From Zero to Hero with Kafka Connect
Fully Managed Kafka as a Service
@rmoff #KafkaMeetup Free Books! http://cnfl.io/book-bundle From Zero to Hero with Kafka Connect
@rmoff #KafkaMeetup #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net
Integrating Apache Kafka with other systems in a reliable and scalable way is often a key part of a streaming platform. Fortunately, Apache Kafka includes the Connect API that enables streaming integration both in and out of Kafka. Like any technology, understanding its architecture and deployment patterns is key to successful use, as is knowing where to go looking when things aren’t working.
This talk will discuss the key design concepts within Kafka Connect and the pros and cons of standalone vs distributed deployment modes. We’ll do a live demo of building pipelines with Kafka Connect for streaming data in from databases, and out to targets including Elasticsearch. With some gremlins along the way, we’ll go hands-on in methodically diagnosing and resolving common issues encountered with Kafka Connect. The talk will finish off by discussing more advanced topics including Single Message Transforms, and deployment of Kafka Connect in containers.
The following resources were mentioned during the presentation or are useful additional information.