From Zero to Hero with Kafka Connect

A presentation at Kafka Summit London 2019 in May 2019 in London, UK by Robin Moffatt

Slide 1

Slide 1

@rmoff #kafkasummit From Zero to Hero with Kafka Connect a.k.a. A practical guide to becoming l33t with Kafka Connect

Slide 2

Slide 2

@rmoff #kafkasummit What is Kafka Connect? From Zero to Hero with Kafka Connect Kafka Connect enables you to build streaming integrations between systems up- and down-stream of Kafka. It just requires simple configuration files to use. Whilst it’s got a Java API that you can build on, as a data engineer you usually need no code to use it at all. Kafka Connect is part of Apache Kafka®. If you’re using Apache Kafka, you’ve already got Kafka Connect.

Slide 3

Slide 3

@rmoff #kafkasummit Streaming Integration with Kafka Connect syslog Sources Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 4

Slide 4

@rmoff #kafkasummit Streaming Integration with Kafka Connect Amazon S3 Sinks Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 5

Slide 5

@rmoff #kafkasummit Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers From Zero to Hero with Kafka Connect

Slide 6

Slide 6

Look Ma, No Code! @rmoff #kafkasummit { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } https://docs.confluent.io/current/connect/ “connection.url”: From Zero to Hero with Kafka Connect

Slide 7

Slide 7

@rmoff #kafkasummit Streaming Pipelines Amazon S3 RDBMS Kafka Connect Kafka Connect HDFS From Zero to Hero with Kafka Connect

Slide 8

Slide 8

Writing to data stores from Kafka @rmoff #kafkasummit App Kaf Con ka nec t Data Store From Zero to Hero with Kafka Connect

Slide 9

Slide 9

@rmoff #kafkasummit Evolve processing from old systems to new Existing App New App <x> a Kafk ect Conn RDBMS From Zero to Hero with Kafka Connect

Slide 10

Slide 10

@rmoff #kafkasummit Demo #1 http://rmoff.dev/ksldn19_demo-code From Zero to Hero with Kafka Connect A MySQL database stores orders placed by an application. Kafka Connect - Debezium MySQL connector streams them into Kafka topic Kafka Connect - Elasticsearch connector to stream orders from Kafka topic to Elasticsearch

Slide 11

Slide 11

REST API - tricks @rmoff #kafkasummit http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect http://go.rmoff.net/connector-status https://docs.confluent.io/current/connect/references/restapi.html

Slide 12

Slide 12

REST API - tricks @rmoff #kafkasummit (h/t to @madewithtea) http://go.rmoff.net/selectively-delete-connectors From Zero to Hero with Kafka Connect http://go.rmoff.net/selectively-delete-connectors https://docs.confluent.io/current/connect/references/restapi.html

Slide 13

Slide 13

@rmoff #kafkasummit Configuring Kafka Connect Inside the API - connectors, transforms, converters From Zero to Hero with Kafka Connect

Slide 14

Slide 14

Kafka Connect basics Source Kafka Connect @rmoff #kafkasummit Kafka From Zero to Hero with Kafka Connect That’s the big picture. But let’s GO BACK TO BASICS and take a few minutes to talk about some of the core concepts in Kafka Connect.

Slide 15

Slide 15

@rmoff #kafkasummit Connectors Connector Source Kafka Connect Kafka From Zero to Hero with Kafka Connect At the center of Kafka Connect are the CONNECTORS, which are reusable components that you can download, install, and use without writing code. Connectors in Kafka Connect define where data should be copied to and from. A SOURCE connector knows how to talk to specific SOURCE system and generate records that Kafka Connect then writes into Kafka. On the downstream side, the connector specifies the topics to be consumed and Kafka Connect reads those topics and sends them to the SINK connector that knows how to send those records to a specific SINK system. So the connectors know how to work with the records and talk to the external system, but Kafka Connect workers are acting as the conductor and taking care of the rest. https://docs.confluent.io/current/connect/concepts.html

Slide 16

Slide 16

Connectors @rmoff #kafkasummit “config”: { […] “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “topics”: “asgard.demo.orders”, } From Zero to Hero with Kafka Connect

Slide 17

Slide 17

@rmoff #kafkasummit Connectors Connector Native data Connect Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect At the center of Kafka Connect are the CONNECTORS, which are reusable components that you can download, install, and use without writing code. Connectors in Kafka Connect define where data should be copied to and from. A SOURCE connector knows how to talk to specific SOURCE system and generate records that Kafka Connect then writes into Kafka. On the downstream side, the connector specifies the topics to be consumed and Kafka Connect reads those topics and sends them to the SINK connector that knows how to send those records to a specific SINK system. So the connectors know how to work with the records and talk to the external system, but Kafka Connect workers are acting as the conductor and taking care of the rest. https://docs.confluent.io/current/connect/concepts.html

Slide 18

Slide 18

@rmoff #kafkasummit Converters Converter Connector Native data Connect bytes[] Record Source Kafka Connect Kafka From Zero to Hero with Kafka Connect One of the things that connectors don’t have to worry about is how to CONVERT between the source and sink records and the BINARY FORMAT used to persist those records in Kafka. Kafka doesn’t care what is in the messages, so it’s up to us to decide how we’re going to serialize and deserialize them. Kafka Connect CONVERTERS are the components that do this translation. Kafka Connect ships converter that uses JSON SERIALIZATION, and Confluent provides an open source AVRO converter that uses AVRO and a separate SCHEMA REGISTRY. But there are others that you can use, or you can even write your own. Now, source connectors still have control over the structure of the records they produce, and sink connectors are written to expect records they consume to have a specific structure. What happens when those structures don’t line up? https://docs.confluent.io/current/connect/concepts.html#converters https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

Slide 19

Slide 19

@rmoff #kafkasummit Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf From Zero to Hero with Kafka Connect Chucking data over the fence into a Kafka topic is not enough Integration should be done in a standard way to enable “plug and play” of building data pipelines in Kafka * Schema handling * Serialisation formats Avro & Confluent Schema Registry is a good way to do this :-) CSV is…not.

Slide 20

Slide 20

The Confluent Schema Registry Avro Schema @rmoff #kafkasummit Schema Registry Target Source Kafka Connect Avro Message Avro Message Kafka Connect From Zero to Hero with Kafka Connect

Slide 21

Slide 21

Converters @rmoff #kafkasummit key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 Set as a global default per-worker; optionally can be overriden per-connector From Zero to Hero with Kafka Connect

Slide 22

Slide 22

What about internal converters? @rmoff #kafkasummit value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.internal.value.converter=org.apache.kafka.connect.json.JsonConverter key.internal.value.converter.bork.bork.bork=org.apache.kafka.connect.json.JsonConverter key.internal.value.please.just.work.converter=org.apache.kafka.connect.json.JsonConverter From Zero to Hero with Kafka Connect Internal converters were deprecated as of Apache Kafka 2.0 https://cwiki.apache.org/confluence/display/KAFKA/KIP-174++Deprecate+and+remove+internal+converter+configs+in+WorkerConfig / https://issues.apache.org/jira/browse/KAFKA-5540

Slide 23

Slide 23

@rmoff #kafkasummit Single Message Transforms Connector Source Transform(s) Converter Kafka Connect Kafka From Zero to Hero with Kafka Connect https://docs.confluent.io/current/connect/transforms/index.html

Slide 24

Slide 24

@rmoff #kafkasummit Single Message Transforms “config”: { Do these transforms […] “transforms”: “addDateToTopic,labelFooBar”, “transforms.addDateToTopic.type”: “org.apache.kafka.connect.transforms.TimestampRouter”, “transforms.addDateToTopic.topic.format”: “${topic}-${timestamp}”, “transforms.addDateToTopic.timestamp.format”: “YYYYMM”, “transforms.labelFooBar.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.labelFooBar.renames”: “delivery_address:shipping_address”, } Transforms config Config per transform From Zero to Hero with Kafka Connect

Slide 25

Slide 25

Extensible Connector @rmoff #kafkasummit Transform(s) Converter From Zero to Hero with Kafka Connect

Slide 26

Slide 26

@rmoff #kafkasummit Confluent Hub hub.confluent.io From Zero to Hero with Kafka Connect

Slide 27

Slide 27

@rmoff #kafkasummit Deploying Kafka Connect Connectors, Tasks, and Workers From Zero to Hero with Kafka Connect

Slide 28

Slide 28

@rmoff #kafkasummit Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect A connector has one, or many, tasks

Slide 29

Slide 29

@rmoff #kafkasummit Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect JDBC Source is an example of where Kafka Connect can, if allowed, scale out the ingest. If you’re pulling data from more than one table, Kafka Connect can spawn additional tasks.

Slide 30

Slide 30

@rmoff #kafkasummit Connectors and Tasks JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 From Zero to Hero with Kafka Connect Tasks are the unit of parallelism & scale. A connector may or may not be able to run multiple tasks. An example of where the work can be split across tasks is ingesting data with the JDBC Source connector pulling data from multiple tables in a single database. https://docs.confluent.io/current/connect/concepts.html

Slide 31

Slide 31

@rmoff #kafkasummit Tasks and Workers JDBC Source S3 Sink S3 Task #1 JDBC Task #1 JDBC Task #2 Worker From Zero to Hero with Kafka Connect

Slide 32

Slide 32

Kafka Connect Standalone Worker @rmoff #kafkasummit S3 Task #1 JDBC Task #1 JDBC Task #2 Worker Offsets From Zero to Hero with Kafka Connect For development or environments that lend themselves to single agents (e.g. sending logs from webservers to Kafka), standalone mode is well suited. https://docs.confluent.io/current/connect/userguide.html#standalone-vs-distributed

Slide 33

Slide 33

@rmoff #kafkasummit “Scaling” the Standalone Worker JDBC Task #1 S3 Task #1 JDBC Task #2 Worker Offsets Worker Offsets Fault-tolerant? Nope. From Zero to Hero with Kafka Connect You can “scale” Kafka Connect in standalone mode by deploying each connector on its own worker. That way each worker has to do less work. However, once you reach the capacity of one worker node for the workload of a single connector, you’ve nowhere to go. You also have zero fault-tolerance.

Slide 34

Slide 34

Kafka Connect Distributed Worker @rmoff #kafkasummit S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect We recommend distributed mode for production deployments for ease of management and scalability. In use cases where a single source or sink may require heavy data volumes (e.g. sending data from Kafka to HDFS), distributed mode is more flexible in terms of scalability and offers the added advantage of a highly available service to minimize downtime. https://docs.confluent.io/current/connect/userguide.html#standalone-vs-distributed

Slide 35

Slide 35

Scaling the Distributed Worker @rmoff #kafkasummit S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status Fault-tolerant? Yeah! From Zero to Hero with Kafka Connect

Slide 36

Slide 36

Distributed Worker - fault tolerance @rmoff #kafkasummit S3 Task #1 JDBC Task #1 Kafka Connect cluster Worker Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 37

Slide 37

Distributed Worker - fault tolerance @rmoff #kafkasummit S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status From Zero to Hero with Kafka Connect

Slide 38

Slide 38

Multiple Distributed Clusters @rmoff #kafkasummit S3 Task #1 JDBC Task #1 Kafka Connect cluster #1 JDBC Task #2 Kafka Connect cluster #2 Offsets Offsets Status Status Config Config From Zero to Hero with Kafka Connect

Slide 39

Slide 39

@rmoff #kafkasummit Troubleshooting Kafka Connect From Zero to Hero with Kafka Connect

Slide 40

Slide 40

Troubleshooting Kafka Connect @rmoff #kafkasummit Task FAILED Connector RUNNING $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.connector.state’ “RUNNING” $ curl -s “http://localhost:8083/connectors/source-debezium-orders/status” | \ jq ‘.tasks[0].state’ “FAILED” http://go.rmoff.net/connector-status From Zero to Hero with Kafka Connect

Slide 41

Slide 41

Troubleshooting Kafka Connect @rmoff #kafkasummit curl -s “http:”//localhost:8083/connectors/source-debezium-orders-00/status” | jq ‘.tasks[0].trace’ “org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java: 1018)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java: 748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java :27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java: 212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t”“… 3 more\n” From Zero to Hero with Kafka Connect

Slide 42

Slide 42

The log is the source of truth @rmoff #kafkasummit $ confluent log connect $ docker-compose logs kafka-connect $ cat /var/log/kafka/connect.log From Zero to Hero with Kafka Connect

Slide 43

Slide 43

Kafka Connect @rmoff #kafkasummit From Zero to Hero with Kafka Connect Improved logging KIP: https://cwiki.apache.org/confluence/display/KAFKA/ KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs

Slide 44

Slide 44

Kafka Connect @rmoff #kafkasummit “Task is being killed and will not recover until manually restarted” Symptom not Cause From Zero to Hero with Kafka Connect Improved logging KIP: https://cwiki.apache.org/confluence/display/KAFKA/ KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs

Slide 45

Slide 45

Kafka Connect @rmoff #kafkasummit From Zero to Hero with Kafka Connect Improved logging KIP: https://cwiki.apache.org/confluence/display/KAFKA/ KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs

Slide 46

Slide 46

@rmoff #kafkasummit Common errors From Zero to Hero with Kafka Connect

Slide 47

Slide 47

@rmoff #kafkasummit org.apache.kafka.common.errors.SerializationException: Unknown magic byte! From Zero to Hero with Kafka Connect You’ll get this error if using the Avro deserialiser and reading data from a topic that’s not Avro, or has not been serialised to Avro using the Confluent Schema Registry serialiser (https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer). It might be that you’re simply using the wrong converter, and maybe have JSON messages and should use the JsonConverter

Slide 48

Slide 48

Mismatched converters @rmoff #kafkasummit org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Messages are not Avro “value.converter”: “AvroConverter” ⓘ source data Use the correct Converter for the From Zero to Hero with Kafka Connect You’ll get this error if using the Avro deserialiser and reading data from a topic that’s not Avro, or has not been serialised to Avro using the Confluent Schema Registry serialiser (https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer). It might be that you’re simply using the wrong converter, and maybe have JSON messages and should use the JsonConverter

Slide 49

Slide 49

Mixed serialisation methods @rmoff #kafkasummit org.apache.kafka.common.errors.SerializationException: Unknown magic byte! Some messages are not Avro “value.converter”: “AvroConverter” ⓘ with bad messages Use error handling to deal From Zero to Hero with Kafka Connect You’ll get this error if using the Avro deserialiser and reading data from a topic that’s not Avro, or has not been serialised to Avro using the Confluent Schema Registry serialiser (https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer). It might be that you’ve got Avro data, but interspersed amongst it non-Avro data. Often happens in test environments.

Slide 50

Slide 50

@rmoff #kafkasummit Error Handling and DLQ Handled Not Handled Convert (read/write from Kafka, Start (Connections to a data Transform Poll / Put (Read/Write from/to [de]-serialisation) store) data store)* * can be retried by Connect https://cnfl.io/connect-dlq From Zero to Hero with Kafka Connect

Slide 51

Slide 51

@rmoff #kafkasummit Fail Fast Source topic messages Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect

Slide 52

Slide 52

@rmoff #kafkasummit YOLO ¯_(ツ)_/¯ Source topic messages errors.tolerance=all Kafka Connect https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect Only ‘convert’ and ‘transform’ stages are handled. That means that failures in writing to a target (‘put’) are not handled.

Slide 53

Slide 53

@rmoff #kafkasummit Dead Letter Queue Dead letter queue Source topic messages Kafka Connect errors.tolerance=all errors.deadletterqueue.topic.name=my_dlq https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect Only ‘convert’ and ‘transform’ stages are handled. That means that failures in writing to a target (‘put’) are not handled.

Slide 54

Slide 54

@rmoff #kafkasummit Re-processing the Dead Letter Queue Source topic messages Dead letter queue Kafka Connect (Avro sink) Kafka Connect (JSON sink) https://cnfl.io/connect-dlq Sink messages From Zero to Hero with Kafka Connect Only ‘convert’ and ‘transform’ stages are handled. That means that failures in writing to a target (‘put’) are not handled.

Slide 55

Slide 55

@rmoff #kafkasummit No fields found using key and value schemas for table: foo-bar From Zero to Hero with Kafka Connect

Slide 56

Slide 56

@rmoff #kafkasummit No fields found using key and value schemas for table: foo-bar JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields From Zero to Hero with Kafka Connect

Slide 57

Slide 57

Schema, where art thou? @rmoff #kafkasummit No fields found using key and value schemas for table: foo-bar JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields From Zero to Hero with Kafka Connect

Slide 58

Slide 58

@rmoff #kafkasummit Schema, where art thou? $ http localhost:8081/subjects[…] jq ‘.schema|fromjson’ { “type”: “record”, “name”: “Value”, “namespace”: “asgard.demo.ORDERS”, “fields”: [ { “name”: “id”, “type”: “int” }, { “name”: “order_id”, “type”: [ “null”, “int” ], “default”: null }, { “name”: “customer_id”, “type”: [ “null”, “int” ], “default”: null }, Schema (From Schema Registry) Messages (Avro) $ kafkacat -b localhost:9092 -C -t mysql-debezium-asgard.demo.ORDERS QF@Land RoverDefender 90SheffieldSwift LLC,54258 Michigan Parkway(2019-05-09T13:42:28Z(2019-05-09T13:42:28Z From Zero to Hero with Kafka Connect

Slide 59

Slide 59

@rmoff #kafkasummit Schema, where art thou? No fields found using key and value schemas for table: foo-bar { No schema! Messages (JSON) } “id”: 7, “order_id”: 7, “customer_id”: 849, “order_total_usd”: 98062.21, “make”: “Pontiac”, “model”: “Aztek”, “delivery_city”: “Leeds”, “delivery_company”: “Price-Zieme”, “delivery_address”: “754 Becker Way”, “CREATE_TS”: “2019-05-09T13:42:28Z”, “UPDATE_TS”: “2019-05-09T13:42:28Z” From Zero to Hero with Kafka Connect This is what you get if you use JsonConverter with schemas.enable=false (or another converter with no Schema)

Slide 60

Slide 60

Schema, where art thou? @rmoff #kafkasummit No fields found using key and value schemas for table: foo-bar -> You need a schema! -> Use Avro, or use JSON with schemas.enable=true Either way, you need to re-configure the producer of the data Or, use KSQL to impose a schema on the JSON/CSV data and reserialise it to Avro ! https://www.confluent.io/blog/data-wrangling-apache-kafka-ksql From Zero to Hero with Kafka Connect

Slide 61

Slide 61

@rmoff #kafkasummit Schema, where art thou? JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields { Schema (Embedded per JSON message) Messages (JSON) “schema”: { “type”: “struct”, “fields”: [ { “type”: “int32”, “optional”: false, “field”: “id” }, [!!…] ], “optional”: true, “name”: “asgard.demo.ORDERS.Value” }, “payload”: { “id”: 611, “order_id”: 111, “customer_id”: 851, “order_total_usd”: 182190.93, “make”: “Kia”, “model”: “Sorento”, From Zero to Hero with Kafka Connect “delivery_city”: “Swindon”, “delivery_company”: “Lehner, Kuvalis and Schaefer”, This is what you get if you use JsonConverter with schemas.enable=true but not a schema/payload structure

Slide 62

Slide 62

@rmoff #kafkasummit Schema, where art thou? JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields -> Your JSON must be structured as expected. { “schema”: { “type”: “struct”, “fields”: [ { “type”: “int32”, “optional”: false, “field”: “id” }, [!!…] ], “optional”: true, “name”: “asgard.demo.ORDERS.Value” }, “payload”: { “id”: 611, “order_id”: 111, “customer_id”: 851, From Zero to Hero with Kafka Connect “order_total_usd”: 182190.93, “make”: “Kia”,

Slide 63

Slide 63

@rmoff #kafkasummit Using KSQL to apply schema to your data JSON Avro CREATE STREAM ORDERS_JSON (id INT, order_total_usd DOUBLE, delivery_city VARCHAR) WITH (KAFKA_TOPIC=’orders-json’, VALUE_FORMAT=’JSON’); Kafka Connect CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT=’AVRO’, KAFKA_TOPIC=’orders-avro’) AS SELECT * FROM ORDERS_JSON; KSQL From Zero to Hero with Kafka Connect

Slide 64

Slide 64

@rmoff #kafkasummit Using KSQL to apply schema to your data JSON Avro KSQL CREATE STREAM ORDERS_JSON (id INT, order_total_usd DOUBLE, delivery_city VARCHAR) WITH (KAFKA_TOPIC=’orders-json’, VALUE_FORMAT=’JSON’); Kafka Connect CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT=’AVRO’, KAFKA_TOPIC=’orders-avro’) AS SELECT * FROM ORDERS_JSON; From Zero to Hero with Kafka Connect

Slide 65

Slide 65

@rmoff #kafkasummit Containers From Zero to Hero with Kafka Connect

Slide 66

Slide 66

@rmoff #kafkasummit Kafka Connect images on Docker Hub kafka-connect-elasticsearch kafka-connect-jdbc kafka-connect-hdfs […] confluentinc/cp-kafka-connect-base confluentinc/cp-kafka-connect From Zero to Hero with Kafka Connect

Slide 67

Slide 67

Adding connectors to a container @rmoff #kafkasummit Confluent Hub JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 68

Slide 68

@rmoff #kafkasummit At runtime kafka-connect: image: confluentinc/cp-kafka-connect:5.2.1 environment: CONNECT_PLUGIN_PATH: ‘/usr/share/java,/usr/share/confluent-hub-components’ command: - bash - -c - | confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 /etc/confluent/docker/run JAR confluentinc/cp-kafka-connect-base http://rmoff.dev/ksln19-connect-docker From Zero to Hero with Kafka Connect

Slide 69

Slide 69

Build a new image @rmoff #kafkasummit FROM confluentinc/cp-kafka-connect:5.2.1 ENV CONNECT_PLUGIN_PATH=”/usr/share/java,/usr/share/confluent-hub-components” RUN confluent-hub install —no-prompt neo4j/kafka-connect-neo4j:1.0.0 JAR confluentinc/cp-kafka-connect-base From Zero to Hero with Kafka Connect

Slide 70

Slide 70

Automating connector creation @rmoff #kafkasummit

# Download JDBC drivers cd /usr/share/java/kafka-connect-jdbc/ curl https:!//cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.13.tar.gz | tar xz # # Now launch Kafka Connect /etc/confluent/docker/run & # # Wait for Kafka Connect listener while [ $$(curl -s -o /dev/null -w %{http_code} http:!//$$CONNECT_REST_ADVERTISED_HOST_NAME:$… echo -e $$(date) ” Kafka Connect listener HTTP state: ” $$(curl -s -o /dev/null -w %{http_… sleep 5 done # # Create JDBC Source connector curl -X POST http:!//localhost:8083/connectors -H “Content-Type: application/json” -d ‘{ “name”: “jdbc_source_mysql_00”, “config”: { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql:!//mysql:3306/demo”, “connection.user”: “connect_user”, “connection.password”: “asgard”, “topic.prefix”: “mysql-00-“, “table.whitelist” : “demo.customers”, } }’ # Don’t let the container die http://rmoff.dev/ksln19-connect-docker sleep infinity From Zero to Hero with Kafka Connect

Slide 71

Slide 71

#EOF

Slide 72

Slide 72

Confluent Community Components @rmoff #kafkasummit Apache Kafka with a bunch of cool stuff! For free! Database Changes Log Events loT Data Web Events … Confluent Platform Data Integration Real-time Applications Monitoring & Administration Confluent Control Center | Security Confluent Platform Transformations Hadoop Operations Replicator | Auto Data Balancing Custom Apps Database Data Compatibility Schema Registry SQL Stream Processing KSQL Data Warehouse Development and Connectivity Clients | Connectors | REST Proxy | CLI CRM Monitoring Apache Kafka® Core | Connect API | Streams API … CUSTOMER SELF-MANAGED Datacenter Public Cloud Analytics … CONFLUENT FULLY-MANAGED Confluent Cloud From Zero to Hero with Kafka Connect

Slide 73

Slide 73

@rmoff #kafkasummit http://cnfl.io/book-bundle From Zero to Hero with Kafka Connect

Slide 74

Slide 74

@rmoff Please rate my session in the Kafka Summit app :-) EOF #kafkasummit @rmoff #kafkasummit http://cnfl.io/slack From Zero to Hero with Kafka Connect