From Zero to Hero with Kafka Connect

A presentation at Berlin Buzzwords 2019 in June 2019 in Berlin, Germany by Robin Moffatt

Slide 1

Slide 1

@rmoff #bbuzz From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect

Slide 2

Slide 2

@rmoff #bbuzz What is Kafka Connect? From Zero to Hero with Kafka Connect

Slide 3

Slide 3

@rmoff #bbuzz Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 4

Slide 4

@rmoff #bbuzz Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 5

Slide 5

@rmoff #bbuzz Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 6

Slide 6

Look Ma, No Code! @rmoff #bbuzz { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } https://docs.confluent.io/current/connect/ “connection.url”: From Zero to Hero with Kafka Connect

Slide 7

Slide 7

@rmoff #bbuzz Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS From Zero to Hero with Kafka Connect

Slide 8

Slide 8

Writing to data stores from Kafka @rmoff #bbuzz App Kaf ka Con nec t Data Store From Zero to Hero with Kafka Connect

Slide 9

Slide 9

@rmoff #bbuzz Evolve processing from old systems to new Existing App New App <x> a k f Ka t c e n n o C RDBMS From Zero to Hero with Kafka Connect

Slide 10

Slide 10

@rmoff #bbuzz Demo #1 http:!//rmoff.dev/bbuzz19_demo-code From Zero to Hero with Kafka Connect

Slide 11

Slide 11

@rmoff #bbuzz REST API - tricks http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect

Slide 12

Slide 12

REST API - tricks @rmoff #bbuzz (h/t to @madewithtea) http://go.rmoff.net/selectively-delete-connectors From Zero to Hero with Kafka Connect

Slide 13

Slide 13

@rmoff #bbuzz Configuring Kafka Connect Inside the API - connectors, transforms, converters From Zero to Hero with Kafka Connect

Slide 14

Slide 14

Kafka Connect basics Source Kafka Connect @rmoff #bbuzz Kafka From Zero to Hero with Kafka Connect

Slide 15

Slide 15

@rmoff #bbuzz Connectors Connector Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 16

Slide 16

@rmoff #bbuzz Connectors “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } From Zero to Hero with Kafka Connect

Slide 17

Slide 17

@rmoff #bbuzz Connectors Connector Native data Connect Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 18

Slide 18

@rmoff #bbuzz Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 19

Slide 19

@rmoff #bbuzz Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf From Zero to Hero with Kafka Connect

Slide 20

Slide 20

The Confluent Schema Registry Avro Schema @rmoff #bbuzz Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect From Zero to Hero with Kafka Connect

Slide 21

Slide 21

@rmoff #bbuzz Converters key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector From Zero to Hero with Kafka Connect

Slide 22

Slide 22

@rmoff #bbuzz What about internal converters? value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter From Zero to Hero with Kafka Connect

Slide 23

Slide 23

@rmoff #bbuzz Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka From Zero to Hero with Kafka Connect

Slide 24

Slide 24

@rmoff #bbuzz Single Message Transforms “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform From Zero to Hero with Kafka Connect

Slide 25

Slide 25

Extensible Connector @rmoff #bbuzz Transform(s) Converter From Zero to Hero with Kafka Connect

Slide 26

Slide 26

@rmoff #bbuzz Confluent Hub hub.confluent.io From Zero to Hero with Kafka Connect

Slide 27

Slide 27

@rmoff #bbuzz Deploying Kafka Connect Connectors, Tasks, and Workers From Zero to Hero with Kafka Connect

Slide 28

Slide 28

@rmoff #bbuzz Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 29

Slide 29

@rmoff #bbuzz Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 30

Slide 30

@rmoff #bbuzz Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect

Slide 31

Slide 31

@rmoff #bbuzz Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker From Zero to Hero with Kafka Connect

Slide 32

Slide 32

@rmoff #bbuzz Kafka Connect Standalone Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets From Zero to Hero with Kafka Connect

Slide 33

Slide 33

@rmoff #bbuzz “Scaling” the Standalone Worker JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets Fault-tolerant? Nope. From Zero to Hero with Kafka Connect

Slide 34

Slide 34

Kafka Connect Distributed Worker @rmoff #bbuzz S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect

Slide 35

Slide 35

Scaling the Distributed Worker @rmoff #bbuzz S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect

Slide 36

Slide 36

Distributed Worker - fault tolerance @rmoff #bbuzz S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 37

Slide 37

Distributed Worker - fault tolerance @rmoff #bbuzz S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 38

Slide 38

Multiple Distributed Clusters @rmoff #bbuzz S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Config Config Status Status From Zero to Hero with Kafka Connect

Slide 39

Slide 39

@rmoff #bbuzz Troubleshooting Kafka Connect From Zero to Hero with Kafka Connect

Slide 40

Slide 40

Troubleshooting Kafka Connect @rmoff #bbuzz Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect

Slide 41

Slide 41

Troubleshooting Kafka Connect @rmoff #bbuzz curl -s “http:!//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java: 1018)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java: 748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java: 212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t!!… 3 more\n” From Zero to Hero with Kafka Connect

Slide 42

Slide 42

@rmoff #bbuzz The log is the source of truth $ confluent log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log From Zero to Hero with Kafka Connect

Slide 43

Slide 43

Kafka Connect @rmoff #bbuzz From Zero to Hero with Kafka Connect

Slide 44

Slide 44

Kafka Connect @rmoff #bbuzz “Task is being killed and will not recover until manually restarted” Symptom not Cause From Zero to Hero with Kafka Connect

Slide 45

Slide 45

Kafka Connect @rmoff #bbuzz From Zero to Hero with Kafka Connect

Slide 46

Slide 46

@rmoff #bbuzz Common errors From Zero to Hero with Kafka Connect

Slide 47

Slide 47

@rmoff #bbuzz org.apache.kafka.common.errors.SerializationException: Unknown magic byte! From Zero to Hero with Kafka Connect

Slide 48

Slide 48

Mismatched converters @rmoff #bbuzz org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ Use the correct Converter for the source data From Zero to Hero with Kafka Connect

Slide 49

Slide 49

Mixed serialisation methods @rmoff #bbuzz org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ Use error handling to deal with bad messages From Zero to Hero with Kafka Connect

Slide 50

Slide 50

@rmoff #bbuzz Error Handling and DLQ Handled Not Handled Convert (read/write from Kafka, Start (Connections to a data [de]-serialisation) store) Transform Poll / Put (Read/Write from/to data

  • store)
  • can be retried by Connect https://cnfl.io/connect-dlq From Zero to Hero with Kafka Connect

Slide 51

Slide 51

@rmoff #bbuzz Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 52

Slide 52

@rmoff #bbuzz YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 53

Slide 53

@rmoff #bbuzz Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 54

Slide 54

Re-processing the Dead Letter Queue @rmoff #bbuzz Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 55

Slide 55

@rmoff #bbuzz No fields found using key and value schemas for table: foo-bar From Zero to Hero with Kafka Connect

Slide 56

Slide 56

@rmoff #bbuzz No fields found using key and value schemas for table: foo-bar JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields From Zero to Hero with Kafka Connect

Slide 57

Slide 57

Schema, where art thou? @rmoff #bbuzz No fields found using key and value schemas for table: foo-bar JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields From Zero to Hero with Kafka Connect

Slide 58

Slide 58

@rmoff #bbuzz Schema, where art thou? $ http localhost:8081/subjects[…] jq ‘.schema|fromjson’ { “type”: “record”, “name”: “Value”, “namespace”: “asgard.demo.ORDERS”, “fields”: [ { “name”: “id”, “type”: “int” }, { “name”: “order_id”, “type”: [ “null”, “int” ], “default”: null }, { “name”: “customer_id”, “type”: [ “null”, “int” ], “default”: null }, Schema (From Schema Registry) Messages (Avro) $ kafkacat -b localhost:9092 -C -t mysqlQF@Land RoverDefender 90SheffieldSwift LL Parkway(2019-05-09T13:42:28Z(2019-05-09T1 From Zero to Hero with Kafka Connect

Slide 59

Slide 59

@rmoff #bbuzz Schema, where art thou? No fields found using key and value schemas for table: foo-bar { No schema! Messages (JSON) “id”: 7, “order_id”: 7, “customer_id”: 849, “order_total_usd”: 98062.21, “make”: “Pontiac”, “model”: “Aztek”, “delivery_city”: “Leeds”, “delivery_company”: “Price-Zieme” “delivery_address”: “754 Becker W “CREATE_TS”: “2019-05-09T13:42:28 “UPDATE_TS”: “2019-05-09T13:42:28 } From Zero to Hero with Kafka Connect

Slide 60

Slide 60

@rmoff #bbuzz Schema, where art thou? JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields { Schema (Embedded per JSON message) Messages (JSON) “schema”: { “type”: “struct”, “fields”: [ { “type”: “int32”, “optional”: false, “field”: “id” }, [!!…] ], “optional”: true, “name”: “asgard.demo.OR }, “payload”: { “id”: 611, “order_id”: 111, “customer_id”: 851, “order_total_usd”: 1821 “make”: “Kia”, “model”: “Sorento”, From Zero to Hero with Kafka Connect “delivery_city”: “Swind

Slide 61

Slide 61

@rmoff #bbuzz Using KSQL to apply schema to your data JSON Avro CREATE STREAM ORDERS_JSON (id INT, order_total_usd DOUBLE, delivery_city VARCHAR) WITH (KAFKA_TOPIC=’orders-json’, VALUE_FORMAT=’JSON’); Kafka Connect CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT=’AVRO’, KAFKA_TOPIC=’orders-avro’) AS SELECT * FROM ORDERS_JSON; KSQL From Zero to Hero with Kafka Connect

Slide 62

Slide 62

@rmoff #bbuzz Using KSQL to apply schema to your data JSON Avro KSQL CREATE STREAM ORDERS_JSON (id INT, order_total_usd DOUBLE, delivery_city VARCHAR) WITH (KAFKA_TOPIC=’orders-json’, VALUE_FORMAT=’JSON’); Kafka Connect CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT=’AVRO’, KAFKA_TOPIC=’orders-avro’) AS SELECT * FROM ORDERS_JSON; From Zero to Hero with Kafka Connect

Slide 63

Slide 63

@rmoff #bbuzz Containers From Zero to Hero with Kafka Connect

Slide 64

Slide 64

Kafka Connect images on Docker Hub @rmoff #bbuzz kafka-connect-elasticsearch kafka-connect-jdbc kafka-connect-hdfs […] confluentinc/cp-kafka-connect-base confluentinc/cp-kafka-connect From Zero to Hero with Kafka Connect

Slide 65

Slide 65

Adding connectors to a container @rmoff #bbuzz Confluent Hub JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 66

Slide 66

@rmoff #bbuzz At runtime kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker From Zero to Hero with Kafka Connect

Slide 67

Slide 67

Build a new image @rmoff #bbuzz FROM confluentinc/cp-kafka-connect:5.2.1 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 68

Slide 68

Automating connector creation @rmoff #bbuzz

# Download JDBC drivers cd /usr/share/java/kafka-connect-jdbc/ curl https:!//cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.13.tar.gz | tar xz # # Now launch Kafka Connect /etc/confluent/docker/run & # # Wait for Kafka Connect listener while [ $$(curl -s -o /dev/null -w %{http_code} http:!//$$CONNECT_REST_ADVERTISED_HOST_NAME:$… echo -e $$(date) ” Kafka Connect listener HTTP state: ” $$(curl -s -o /dev/null -w %{http_… sleep 5 done # # Create JDBC Source connector curl -X POST http:!//localhost:8083/connectors -H “Content-Type: application/json” -d ‘{ “name”: “jdbc_source_mysql_00”, “config”: { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql:!//mysql:3306/demo”, “connection.user”: “connect_user”, “connection.password”: “asgard”, “topic.prefix”: “mysql-00-“, “table.whitelist” : “demo.customers”, } }’ # Don’t let the container die http://rmoff.dev/ksln19-connect-docker sleep infinity From Zero to Hero with Kafka Connect

Slide 69

Slide 69

#EOF http://rmoff.dev/bbuzz19-kafka-connect

Slide 70

Slide 70

@rmoff #bbuzz @rmoff #bbuzz From Zero to Hero with Kafka Connect