Introduction to ksqlDB Robin Moffatt #ljcjug @rmoff

@rmoff #ljcjug Apache Kafka Producer Consumer The Log Connectors Streaming Engine Introduction to ksqlDB

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {

@rmoff #ljcjug Streams of events Time Introduction to ksqlDB

Stream Processing @rmoff #ljcjug Stream: widgets Stream: widgets_red Introduction to ksqlDB

Stream Processing with Kafka Streams @rmoff #ljcjug Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red Introduction to ksqlDB

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {

@rmoff #ljcjug SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 Photo by Franck V. on Unsplash “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …);

ksqlDB @rmoff #ljcjug The event streaming database purpose-built for stream processing applications. Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Analytics Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Applications / Microservices Introduction to ksqlDB

Stream Processing with ksqlDB @rmoff #ljcjug …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices Introduction to ksqlDB

Photo by Raoul Droog on Unsplash @rmoff #ljcjug DEMO https://rmoff.dev/ksqldb-demo Introduction to ksqlDB

Interacting with ksqlDB Photo by Tim Mossholder on Unsplash

ksqlDB - Confluent Control Center @rmoff #ljcjug Introduction to ksqlDB

ksqlDB - REST API @rmoff #ljcjug Introduction to ksqlDB

ksqlDB - Native client (coming soon) @rmoff #ljcjug Introduction to ksqlDB

@rmoff #ljcjug @rmoff #KafkaMeetup What else can ksqlDB do? Photo by Sereja Ris on Unsplash Introduction to ksqlDB

Lookups and Joins with ksqlDB ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS_ENRICHED { } “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 Introduction to ksqlDB

Connecting ksqlDB to other systems Photo by Mak on Unsplash @rmoff #ljcjug Introduction to ksqlDB

Connecting ksqlDB to other systems @rmoff #ljcjug syslog Google BigQuery Amazon S3 Introduction to ksqlDB

Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SOURCE CONNECTOR syslog SOURCE_MYSQL_01 WITH ‘connector.class’ = ‘database.hostname’ ‘table.whitelist’ = ( ‘MySqlConnector’, = ‘mysql’, ‘demo.customers’); Introduction to ksqlDB

Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( ‘connector.class’ = ‘ElasticsearchSinkConnector’, ‘connection.url’ = ‘http://elasticsearch:9200’, ‘topics’ = ‘orders’); Google BigQuery Amazon S3 Introduction to ksqlDB

@rmoff #ljcjug Streams & Tables Introduction to ksqlDB

Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } @rmoff #ljcjug ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB

Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | @rmoff #ljcjug SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) Introduction to ksqlDB

Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB @rmoff #ljcjug { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB

Kafka topic Pull and Push queries in ksqlDB { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store @rmoff #ljcjug ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query Push query Introduction to ksqlDB

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API @rmoff #ljcjug curl -s -X “POST” “http:#//localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} Introduction to ksqlDB

Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Exits: Immediately @rmoff #ljcjug Push query All value changes Never Introduction to ksqlDB

@rmoff #ljcjug Under the covers of ksqlDB Introduction to ksqlDB Photo by Vinicius de Moraes on Unsplash

@rmoff #ljcjug Kafka cluster consume produce ksqlDB Introduction to ksqlDB

@rmoff #ljcjug JVM Kafka cluster consume produce ksqlDB Kafka Streams RocksDB Introduction to ksqlDB

K & ^ Kafka Fully Managed as a Service L Q S

Running ksqlDB - self-managed @rmoff #ljcjug DEB, RPM, ZIP, TAR downloads http://confluent.io/download Docker images ksqlDB Server confluentinc/ksqldb-server (JVM process) …and many more… Introduction to ksqlDB

@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Introduction to ksqlDB

@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Work split by partition ksqlDB ksqlDB cluster Introduction to ksqlDB

Think Applications, not database instances ksqlDB cluster Inventory Kafka cluster @rmoff #ljcjug ksqlDB cluster Fraud ksqlDB cluster Orders Introduction to ksqlDB

Kafka Clusters Kafka cluster A @rmoff #ljcjug ksqlDB cluster Replicator ksqlDB ksqlDB cluster cluster Kafka cluster B Introduction to ksqlDB

@rmoff #ljcjug ksqlDB or Kafka Streams? Photo by Ramiz Dedaković Unsplash Introduction to onksqlDB

ksqlDB Builds on Streams @rmoff #ljcjug ksqlDB Kafka Streams Consumer, Producer Introduction to ksqlDB

ksqlDB supports UDF, UDAF, UDTF @rmoff #ljcjug Introduction to ksqlDB

@rmoff #ljcjug ksqlDB code lifecycle Web UI and CLI for development and testing REST API to deploy code for Production Single ksqlDB node ksqlDB clustered for scale and availability REST Desired ksqlDB queries have been identified “Hmm, let me try out this idea…” curl -s -X “POST” “http://localhost:8088/ksql” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{ “ksql”:”CREATE STREAM LONDON AS SELECT * FROM MOVEMENTS WHERE LOCATION=”’london”’;”, “streamsProperties”: { “ksql.streams.auto.offset.reset”: “earliest” } }’ Introduction to ksqlDB

@rmoff #ljcjug Monitoring ksqlDB Confluent Control Center JMX https://www.confluent.io/blog/troubleshooting-ksql-part-2 Introduction to ksqlDB

Photo by Tucker Good on Unsplash @rmoff #ljcjug Want to learn more? CTAs, not CATs (sorry, not sorry) Introduction to ksqlDB

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Confluent Community Slack group @rmoff #ljcjug cnfl.io/slack Introduction to ksqlDB

Free Books! https://rmoff.dev/ljcjug @rmoff #ljcjug Introduction to ksqlDB

More ksqlDB examples Photo by Tengyart on Unsplash

@rmoff #ljcjug Filtering with ksqlDB ORDERS Introduction to ksqlDB

@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; Introduction to ksqlDB

@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; ORDERS_NY Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US UK ORDERS_UK UK Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; UK ORDERS_UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US UK US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS; ORDERS_UK UK UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS_UK; ORDERS_COMBINED Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK UK US ORDERS_COMBINED Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; UK US ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; Introduction to ksqlDB

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; US US ORDERS_US US UK ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; UK UK ORDERS_UK Introduction to ksqlDB

Message transformation with ksqlDB ORDERS s i h t t r e v n o C o t p m a t s e m e l ti b a d a e r n a m u h t a m for @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } Drop these address field s Introduction to ksqlDB

Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } ORDERS_NO_ADDRESS_DATA AS ORDERS ksqlDB CREATE STREAM SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; Introduction to ksqlDB

Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } AS ORDERS_NO_ADDRESS_DATA } ORDERS ksqlDB CREATE STREAM SELECT TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; ORDERS_NO_ADDRESS_DATA { “order_ts”: “2020-02-14 15:10:58”, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5 } Introduction to ksqlDB

Schema manipulation - flatten records { ORDERS s d l e fi d e t s e N } @rmoff #ljcjug “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } Introduction to ksqlDB

Schema manipulation with ksqlDB ORDERS ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; Introduction to ksqlDB

Schema manipulation with ksqlDB @rmoff #ljcjug { ORDERS ksqlDB “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; ORDERS_FLAT {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB

Reserialising data with ksqlDB Avro ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB

@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED’) AS SELECT * FROM ORDERS_FLAT; Introduction to ksqlDB

@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB CSV ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED) AS SELECT * FROM ORDERS_FLAT; ORDERS_CSV 1560045914101,24644,Item_0,1,43078 De 1560047305664,24643,Item_29,3,209 Mon 1560057079799,24642,Item_38,18,3 Autu 1560088652051,24647,Item_6,6,82893 Ar 1560105559145,24648,Item_0,12,45896 W 1560108336441,24646,Item_33,4,272 Hef 1560123862235,24641,Item_15,16,0 Dort 1560124799053,24645,Item_12,1,71 Knut Introduction to ksqlDB