From Zero to Hero with Kafka Connect

A presentation at Confluent VUG in April 2020 in by Robin Moffatt

Slide 1

Slide 1

@rmoff #ConfluentVUG From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect

Slide 2

Slide 2

@rmoff #ConfluentVUG From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect

Slide 3

Slide 3

@rmoff #ConfluentVUG What is Kafka Connect? From Zero to Hero with Kafka Connect

Slide 4

Slide 4

@rmoff #ConfluentVUG Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 5

Slide 5

@rmoff #ConfluentVUG Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 6

Slide 6

@rmoff #ConfluentVUG Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 7

Slide 7

Look Ma, No Code! @rmoff #ConfluentVUG { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } https://docs.confluent.io/current/connect/ “connection.url”: From Zero to Hero with Kafka Connect

Slide 8

Slide 8

@rmoff #ConfluentVUG Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS From Zero to Hero with Kafka Connect

Slide 9

Slide 9

Writing to data stores from Kafka @rmoff #ConfluentVUG App Kaf ka Con nec t Data Store From Zero to Hero with Kafka Connect

Slide 10

Slide 10

@rmoff #ConfluentVUG Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS From Zero to Hero with Kafka Connect

Slide 11

Slide 11

@rmoff #ConfluentVUG Demo http:!//rmoff.dev/kafka-connect-code From Zero to Hero with Kafka Connect

Slide 12

Slide 12

@rmoff #ConfluentVUG Configuring Kafka Connect Inside the API - connectors, transforms, converters From Zero to Hero with Kafka Connect

Slide 13

Slide 13

Kafka Connect basics Source Kafka Connect @rmoff #ConfluentVUG Kafka From Zero to Hero with Kafka Connect

Slide 14

Slide 14

@rmoff #ConfluentVUG Connectors Connector Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 15

Slide 15

Connectors @rmoff #ConfluentVUG “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } From Zero to Hero with Kafka Connect

Slide 16

Slide 16

@rmoff #ConfluentVUG Connectors Connector Native data Connect Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 17

Slide 17

@rmoff #ConfluentVUG Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 18

Slide 18

@rmoff #ConfluentVUG Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf From Zero to Hero with Kafka Connect

Slide 19

Slide 19

The Confluent Schema Registry Avro Schema @rmoff #ConfluentVUG Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect From Zero to Hero with Kafka Connect

Slide 20

Slide 20

Converters @rmoff #ConfluentVUG key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector From Zero to Hero with Kafka Connect

Slide 21

Slide 21

What about internal converters? @rmoff #ConfluentVUG value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter From Zero to Hero with Kafka Connect

Slide 22

Slide 22

@rmoff #ConfluentVUG Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 23

Slide 23

Single Message Transforms @rmoff #ConfluentVUG “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform From Zero to Hero with Kafka Connect

Slide 24

Slide 24

Extensible Connector @rmoff #ConfluentVUG Transform(s) Converter From Zero to Hero with Kafka Connect

Slide 25

Slide 25

@rmoff #ConfluentVUG Confluent Hub hub.confluent.io From Zero to Hero with Kafka Connect

Slide 26

Slide 26

@rmoff #ConfluentVUG Deploying Kafka Connect Connectors, Tasks, and Workers From Zero to Hero with Kafka Connect

Slide 27

Slide 27

@rmoff #ConfluentVUG Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 28

Slide 28

@rmoff #ConfluentVUG Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 29

Slide 29

@rmoff #ConfluentVUG Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 30

Slide 30

@rmoff #ConfluentVUG Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker From Zero to Hero with Kafka Connect

Slide 31

Slide 31

@rmoff #ConfluentVUG From Zero to Hero with Kafka Connect

Slide 32

Slide 32

Kafka Connect Standalone Worker @rmoff #ConfluentVUG S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets From Zero to Hero with Kafka Connect

Slide 33

Slide 33

@rmoff #ConfluentVUG “Scaling” the Standalone Worker JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets Fault-tolerant? Nope. From Zero to Hero with Kafka Connect

Slide 34

Slide 34

Kafka Connect Distributed Worker @rmoff #ConfluentVUG S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect

Slide 35

Slide 35

Scaling the Distributed Worker @rmoff #ConfluentVUG S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect

Slide 36

Slide 36

@rmoff #ConfluentVUG Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 37

Slide 37

@rmoff #ConfluentVUG Distributed Worker - fault tolerance S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 38

Slide 38

Multiple Distributed Clusters @rmoff #ConfluentVUG S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status From Zero to Hero with Kafka Connect

Slide 39

Slide 39

@rmoff #ConfluentVUG Containers From Zero to Hero with Kafka Connect

Slide 40

Slide 40

@rmoff #ConfluentVUG Kafka Connect images on Docker Hub kafka-connect-elasticsearch kafka-connect-jdbc kafka-connect-hdfs […] confluentinc/cp-kafka-connect-base confluentinc/cp-kafka-connect From Zero to Hero with Kafka Connect

Slide 41

Slide 41

Adding connectors to a container @rmoff #ConfluentVUG Confluent Hub JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 42

Slide 42

@rmoff #ConfluentVUG At runtime kafka-connect: image: confluentinc/cp-kafka-connect:5.4.0 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker From Zero to Hero with Kafka Connect

Slide 43

Slide 43

Build a new image @rmoff #ConfluentVUG FROM confluentinc/cp-kafka-connect:5.4.0 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 44

Slide 44

Automating connector creation @rmoff #ConfluentVUG

Launch Kafka Connect http://rmoff.dev/ksln19-connect-docker /etc/confluent/docker/run & # # Wait for Kafka Connect listener while [ $$(curl -s -o /dev/null -w %{http_code} http:”//$$CONNECT echo -e $$(date) ” Kafka Connect listener HTTP state: ” $$(cur sleep 5 done # # Create JDBC Source connector curl -X POST http:”//localhost:8083/connectors -H “Content-Type: “name”: “jdbc_source_mysql_00”, “config”: { “connector.class”: “io.confluent.connect.jdbc. From Zero to Hero with Kafka Connect “connection.url”: “jdbc:mysql:”//mysql:3306/dem

Slide 45

Slide 45

@rmoff #ConfluentVUG Troubleshooting Kafka Connect From Zero to Hero with Kafka Connect

Slide 46

Slide 46

Troubleshooting Kafka Connect @rmoff #ConfluentVUG Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect

Slide 47

Slide 47

Troubleshooting Kafka Connect @rmoff #ConfluentVUG curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1018)\n\t at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” From Zero to Hero with Kafka Connect

Slide 48

Slide 48

The log is the source of truth @rmoff #ConfluentVUG $ confluent log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log From Zero to Hero with Kafka Connect

Slide 49

Slide 49

Dynamic log levels @rmoff #ConfluentVUG (Added in Apache Kafka 2.4 / Confluent Platform 5.4) curl -s http://localhost:8083/admin/loggers/ | jq { “org.apache.kafka.connect.runtime.rest”: { “level”: “WARN” }, “org.reflections”: { “level”: “ERROR” }, “root”: { “level”: “INFO” } } curl -s -X PUT http://localhost:8083/admin/loggers/io.debezium -H “Content-Type:application/json” -d ‘{“level”: “TRACE”}’ https://rmoff.dev/kc-dynamic-log-level From Zero to Hero with Kafka Connect

Slide 50

Slide 50

@rmoff #ConfluentVUG Error Handling and Dead Letter Queues From Zero to Hero with Kafka Connect

Slide 51

Slide 51

@rmoff #ConfluentVUG org.apache.kafka.common.errors.SerializationException: Unknown magic byte! From Zero to Hero with Kafka Connect

Slide 52

Slide 52

Mismatched converters @rmoff #ConfluentVUG org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data From Zero to Hero with Kafka Connect

Slide 53

Slide 53

Mixed serialisation methods @rmoff #ConfluentVUG org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages From Zero to Hero with Kafka Connect

Slide 54

Slide 54

@rmoff #ConfluentVUG Error Handling and DLQ Handled Not Handled Convert Start -> read/write from Kafka -> [de]-serialisation Transform -> Connections to a data store Poll / Put -> Read/Write from/to data store* * can be retried by Connect https://cnfl.io/connect-dlq From Zero to Hero with Kafka Connect

Slide 55

Slide 55

@rmoff #ConfluentVUG Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 56

Slide 56

@rmoff #ConfluentVUG YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 57

Slide 57

@rmoff #ConfluentVUG Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 58

Slide 58

@rmoff #ConfluentVUG Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 59

Slide 59

@rmoff #ConfluentVUG Metrics and Monitoring From Zero to Hero with Kafka Connect

Slide 60

Slide 60

REST API @rmoff #ConfluentVUG http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect

Slide 61

Slide 61

Confluent Control Center @rmoff #ConfluentVUG From Zero to Hero with Kafka Connect

Slide 62

Slide 62

Consumer lag @rmoff #ConfluentVUG From Zero to Hero with Kafka Connect

Slide 63

Slide 63

JMX @rmoff #ConfluentVUG From Zero to Hero with Kafka Connect

Slide 64

Slide 64

@rmoff #ConfluentVUG Standby for resource links… From Zero to Hero with Kafka Connect

Slide 65

Slide 65

@rmoff #ConfluentVUG Free Books! https://rmoff.dev/cvug-apr20 From Zero to Hero with Kafka Connect

Slide 66

Slide 66

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Slide 67

Slide 67

Confluent Community Slack group @rmoff #ConfluentVUG cnfl.io/slack From Zero to Hero with Kafka Connect

Slide 68

Slide 68

Fully Managed Kafka as a Service

Slide 69

Slide 69

@rmoff #ConfluentVUG #EOF https://talks.rmoff.net