From Zero to Hero with Kafka Connect

A presentation at JFuture in October 2020 in by Robin Moffatt

Slide 1

Slide 1

@rmoff From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect

Slide 2

Slide 2

$ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis 😉) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff

Slide 3

Slide 3

What is Kafka Connect? @rmoff

Slide 4

Slide 4

Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers @rmoff

Slide 5

Slide 5

Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers @rmoff

Slide 6

Slide 6

Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers @rmoff

Slide 7

Slide 7

Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff

Slide 8

Slide 8

Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS @rmoff

Slide 9

Slide 9

Writing to data stores from Kafka App Kaf ka Con nec t Data Store @rmoff

Slide 10

Slide 10

Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS @rmoff

Slide 11

Slide 11

Demo http:!//rmoff.dev/kafka-connect-code @rmoff

Slide 12

Slide 12

Configuring Kafka Connect Inside the API - connectors, transforms, converters @rmoff

Slide 13

Slide 13

Kafka Connect basics Source Kafka Connect Kafka @rmoff

Slide 14

Slide 14

Connectors Connector Source Kafka Connect Kafka @rmoff

Slide 15

Slide 15

Connectors “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } @rmoff

Slide 16

Slide 16

Connectors Connector Native data Connect Record Source Kafka Connect Kafka @rmoff

Slide 17

Slide 17

Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka @rmoff

Slide 18

Slide 18

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV 👍 👍 👍 😬 https://rmoff.dev/qcon-schemas @rmoff

Slide 19

Slide 19

The Confluent Schema Registry Avro Schema Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect @rmoff

Slide 20

Slide 20

Converters key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector @rmoff

Slide 21

Slide 21

What about internal converters? value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter @rmoff

Slide 22

Slide 22

Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka @rmoff

Slide 23

Slide 23

Single Message Transforms “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform @rmoff

Slide 24

Slide 24

Extensible Connector Transform(s) Converter @rmoff

Slide 25

Slide 25

Confluent Hub hub.confluent.io @rmoff

Slide 26

Slide 26

Deploying Kafka Connect Connectors, Tasks, and Workers @rmoff

Slide 27

Slide 27

Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff

Slide 28

Slide 28

Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff

Slide 29

Slide 29

Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 @rmoff

Slide 30

Slide 30

Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker @rmoff

Slide 31

Slide 31

Slide 32

Slide 32

Kafka Connect Standalone Worker Fault-tolerant? Nope. S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets @rmoff

Slide 33

Slide 33

“Scaling” the Standalone Worker Fault-tolerant? Nope. JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets @rmoff

Slide 34

Slide 34

Kafka Connect Distributed Worker Fault-tolerant? Yeah! S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff

Slide 35

Slide 35

Scaling the Distributed Worker Fault-tolerant? Yeah! S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff

Slide 36

Slide 36

Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status @rmoff

Slide 37

Slide 37

Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff

Slide 38

Slide 38

Multiple Distributed Clusters S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status @rmoff

Slide 39

Slide 39

Containers @rmoff

Slide 40

Slide 40

Kafka Connect images on Docker Hub confluentinc/cp-kafka-connect-base @rmoff

Slide 41

Slide 41

Adding connectors to a container Confluent Hub JAR confluentinc/cp-kafka-connect-base @rmoff

Slide 42

Slide 42

At runtime kafka-connect: image: confluentinc/cp-kafka-connect-base:6.0.0 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker @rmoff

Slide 43

Slide 43

Build a new image FROM confluentinc/cp-kafka-connect-base:6.0.0 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base @rmoff

Slide 44

Slide 44

Automating connector creation

Launch Kafka Connect http://rmoff.dev/ksln19-connect-docker /etc/confluent/docker/run & # # Wait for Kafka Connect listener while [ $$(curl -s -o /dev/null -w %{http_code} http:”//$$CONNECT echo -e $$(date) ” Kafka Connect listener HTTP state: ” $$(cur sleep 5 done # # Create JDBC Source connector curl -X POST http:”//localhost:8083/connectors -H “Content-Type: “name”: “jdbc_source_mysql_00”, “config”: { “connector.class”: “io.confluent.connect.jdbc. @rmoff “connection.url”: “jdbc:mysql:”//mysql:3306/dem

Slide 45

Slide 45

Troubleshooting Kafka Connect @rmoff

Slide 46

Slide 46

Troubleshooting Kafka Connect Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status @rmoff

Slide 47

Slide 47

Troubleshooting Kafka Connect curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1018)\n\t at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” @rmoff

Slide 48

Slide 48

The log is the source of truth $ confluent local log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log @rmoff

Slide 49

Slide 49

Dynamic log levels (Added in Apache Kafka 2.4 / Confluent Platform 5.4) curl -s http://localhost:8083/admin/loggers/ | jq { “org.apache.kafka.connect.runtime.rest”: { “level”: “WARN” }, “org.reflections”: { “level”: “ERROR” }, “root”: { “level”: “INFO” } } curl -s -X PUT http://localhost:8083/admin/loggers/io.debezium -H “Content-Type:application/json” -d ‘{“level”: “TRACE”}’ https://rmoff.dev/kc-dynamic-log-level @rmoff

Slide 50

Slide 50

Kafka Connect @rmoff

Slide 51

Slide 51

Kafka Connect “Task is being killed and will not recover until manually restarted” Symptom not Cause @rmoff

Slide 52

Slide 52

Kafka Connect @rmoff

Slide 53

Slide 53

Error Handling and Dead Letter Queues @rmoff

Slide 54

Slide 54

Mismatched converters org.apache.kafka.common.errors.SerializationException: Unknown magic byte! @rmoff

Slide 55

Slide 55

Mismatched converters org.apache.kafka.common.errors.SerializationException: JSON JSON JSON JSON JSON JSON Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data @rmoff

Slide 56

Slide 56

Mixed serialisation methods org.apache.kafka.common.errors.SerializationException: Avro Avro JSON Avro JSON JSON Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages @rmoff

Slide 57

Slide 57

Error Handling and DLQ https://cnfl.io/connect-dlq @rmoff

Slide 58

Slide 58

Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages @rmoff

Slide 59

Slide 59

YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages @rmoff

Slide 60

Slide 60

Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages @rmoff

Slide 61

Slide 61

Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages @rmoff

Slide 62

Slide 62

Metrics and Monitoring @rmoff

Slide 63

Slide 63

REST API • sort • peco http://go.rmoff.net/connector-status @rmoff

Slide 64

Slide 64

Confluent Control Center @rmoff

Slide 65

Slide 65

Consumer lag @rmoff

Slide 66

Slide 66

JMX @rmoff

Slide 67

Slide 67

on Photo by Want to learn more? CTAs, not CATs (sorry, not sorry) @rmoff

Slide 68

Slide 68

60 DE VA DV $200 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill 😄 ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer

Slide 69

Slide 69

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Slide 70

Slide 70

Confluent Community Slack group cnfl.io/slack @rmoff

Slide 71

Slide 71

Further reading / watching https://rmoff.dev/kafka-talks @rmoff

Slide 72

Slide 72

#EOF https://talks.rmoff.net @rmoff