Introduction to ksqlDB

A presentation at LJC Virtual Meetup in May 2020 in by Robin Moffatt

Slide 1

Slide 1

Introduction to ksqlDB Robin Moffatt #ljcjug @rmoff

Slide 2

Slide 2

@rmoff #ljcjug Apache Kafka Producer Consumer The Log Connectors Streaming Engine Introduction to ksqlDB

Slide 3

Slide 3

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {

Slide 4

Slide 4

@rmoff #ljcjug Streams of events Time Introduction to ksqlDB

Slide 5

Slide 5

Stream Processing @rmoff #ljcjug Stream: widgets Stream: widgets_red Introduction to ksqlDB

Slide 6

Slide 6

Stream Processing with Kafka Streams @rmoff #ljcjug Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red Introduction to ksqlDB

Slide 7

Slide 7

Stream Processing with ksqlDB @rmoff #ljcjug Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red Introduction to ksqlDB

Slide 8

Slide 8

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 @rmoff #ljcjug Photo by Franck V. on Unsplash {

Slide 9

Slide 9

@rmoff #ljcjug SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 Photo by Franck V. on Unsplash “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …);

Slide 10

Slide 10

ksqlDB @rmoff #ljcjug The event streaming database purpose-built for stream processing applications. Introduction to ksqlDB

Slide 11

Slide 11

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Slide 12

Slide 12

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Slide 13

Slide 13

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Introduction to ksqlDB

Slide 14

Slide 14

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Analytics Introduction to ksqlDB

Slide 15

Slide 15

Stream Processing with ksqlDB @rmoff #ljcjug Source stream Applications / Microservices Introduction to ksqlDB

Slide 16

Slide 16

Stream Processing with ksqlDB @rmoff #ljcjug …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices Introduction to ksqlDB

Slide 17

Slide 17

Photo by Raoul Droog on Unsplash @rmoff #ljcjug DEMO https://rmoff.dev/ksqldb-demo Introduction to ksqlDB

Slide 18

Slide 18

Interacting with ksqlDB Photo by Tim Mossholder on Unsplash

Slide 19

Slide 19

ksqlDB - Confluent Control Center @rmoff #ljcjug Introduction to ksqlDB

Slide 20

Slide 20

ksqlDB - REST API @rmoff #ljcjug Introduction to ksqlDB

Slide 21

Slide 21

ksqlDB - Native client (coming soon) @rmoff #ljcjug Introduction to ksqlDB

Slide 22

Slide 22

@rmoff #ljcjug @rmoff #KafkaMeetup What else can ksqlDB do? Photo by Sereja Ris on Unsplash Introduction to ksqlDB

Slide 23

Slide 23

Lookups and Joins with ksqlDB ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Slide 24

Slide 24

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Slide 25

Slide 25

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} Introduction to ksqlDB

Slide 26

Slide 26

Lookups and Joins with ksqlDB @rmoff #ljcjug { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS_ENRICHED { } “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 Introduction to ksqlDB

Slide 27

Slide 27

Connecting ksqlDB to other systems Photo by Mak on Unsplash @rmoff #ljcjug Introduction to ksqlDB

Slide 28

Slide 28

Connecting ksqlDB to other systems @rmoff #ljcjug syslog Google BigQuery Amazon S3 Introduction to ksqlDB

Slide 29

Slide 29

Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SOURCE CONNECTOR syslog SOURCE_MYSQL_01 WITH ‘connector.class’ = ‘database.hostname’ ‘table.whitelist’ = ( ‘MySqlConnector’, = ‘mysql’, ‘demo.customers’); Introduction to ksqlDB

Slide 30

Slide 30

Connecting ksqlDB to other systems @rmoff #ljcjug CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( ‘connector.class’ = ‘ElasticsearchSinkConnector’, ‘connection.url’ = ‘http://elasticsearch:9200’, ‘topics’ = ‘orders’); Google BigQuery Amazon S3 Introduction to ksqlDB

Slide 31

Slide 31

@rmoff #ljcjug Streams & Tables Introduction to ksqlDB

Slide 32

Slide 32

Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } @rmoff #ljcjug ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB

Slide 33

Slide 33

Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | @rmoff #ljcjug SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) Introduction to ksqlDB

Slide 34

Slide 34

Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB @rmoff #ljcjug { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Introduction to ksqlDB

Slide 35

Slide 35

Kafka topic Pull and Push queries in ksqlDB { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store @rmoff #ljcjug ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query Push query Introduction to ksqlDB

Slide 36

Slide 36

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API @rmoff #ljcjug curl -s -X “POST” “http:#//localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} Introduction to ksqlDB

Slide 37

Slide 37

Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Exits: Immediately @rmoff #ljcjug Push query All value changes Never Introduction to ksqlDB

Slide 38

Slide 38

@rmoff #ljcjug Under the covers of ksqlDB Introduction to ksqlDB Photo by Vinicius de Moraes on Unsplash

Slide 39

Slide 39

@rmoff #ljcjug Kafka cluster consume produce ksqlDB Introduction to ksqlDB

Slide 40

Slide 40

@rmoff #ljcjug JVM Kafka cluster consume produce ksqlDB Kafka Streams RocksDB Introduction to ksqlDB

Slide 41

Slide 41

Slide 42

Slide 42

K & ^ Kafka Fully Managed as a Service L Q S

Slide 43

Slide 43

Running ksqlDB - self-managed @rmoff #ljcjug DEB, RPM, ZIP, TAR downloads http://confluent.io/download Docker images ksqlDB Server confluentinc/ksqldb-server (JVM process) …and many more… Introduction to ksqlDB

Slide 44

Slide 44

@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Introduction to ksqlDB

Slide 45

Slide 45

@rmoff #ljcjug Scaling ksqlDB Kafka cluster ksqlDB Work split by partition ksqlDB ksqlDB cluster Introduction to ksqlDB

Slide 46

Slide 46

Slide 47

Slide 47

Think Applications, not database instances ksqlDB cluster Inventory Kafka cluster @rmoff #ljcjug ksqlDB cluster Fraud ksqlDB cluster Orders Introduction to ksqlDB

Slide 48

Slide 48

Kafka Clusters Kafka cluster A @rmoff #ljcjug ksqlDB cluster Replicator ksqlDB ksqlDB cluster cluster Kafka cluster B Introduction to ksqlDB

Slide 49

Slide 49

@rmoff #ljcjug ksqlDB or Kafka Streams? Photo by Ramiz Dedaković Unsplash Introduction to onksqlDB

Slide 50

Slide 50

ksqlDB Builds on Streams @rmoff #ljcjug ksqlDB Kafka Streams Consumer, Producer Introduction to ksqlDB

Slide 51

Slide 51

ksqlDB supports UDF, UDAF, UDTF @rmoff #ljcjug Introduction to ksqlDB

Slide 52

Slide 52

@rmoff #ljcjug ksqlDB code lifecycle Web UI and CLI for development and testing REST API to deploy code for Production Single ksqlDB node ksqlDB clustered for scale and availability REST Desired ksqlDB queries have been identified “Hmm, let me try out this idea…” curl -s -X “POST” “http://localhost:8088/ksql” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{ “ksql”:”CREATE STREAM LONDON AS SELECT * FROM MOVEMENTS WHERE LOCATION=”’london”’;”, “streamsProperties”: { “ksql.streams.auto.offset.reset”: “earliest” } }’ Introduction to ksqlDB

Slide 53

Slide 53

@rmoff #ljcjug Monitoring ksqlDB Confluent Control Center JMX https://www.confluent.io/blog/troubleshooting-ksql-part-2 Introduction to ksqlDB

Slide 54

Slide 54

Photo by Tucker Good on Unsplash @rmoff #ljcjug Want to learn more? CTAs, not CATs (sorry, not sorry) Introduction to ksqlDB

Slide 55

Slide 55

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Slide 56

Slide 56

Confluent Community Slack group @rmoff #ljcjug cnfl.io/slack Introduction to ksqlDB

Slide 57

Slide 57

Free Books! https://rmoff.dev/ljcjug @rmoff #ljcjug Introduction to ksqlDB

Slide 58

Slide 58

More ksqlDB examples Photo by Tengyart on Unsplash

Slide 59

Slide 59

@rmoff #ljcjug Filtering with ksqlDB ORDERS Introduction to ksqlDB

Slide 60

Slide 60

@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; Introduction to ksqlDB

Slide 61

Slide 61

@rmoff #ljcjug Filtering with ksqlDB ORDERS ksqlDB CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS->STATE=’New York’; ORDERS_NY Introduction to ksqlDB

Slide 62

Slide 62

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US UK ORDERS_UK UK Introduction to ksqlDB

Slide 63

Slide 63

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS; UK ORDERS_UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS FROM ORDERS_UK; Introduction to ksqlDB

Slide 64

Slide 64

@rmoff #ljcjug Transform data with ksqlDB - merge streams ORDERS US UK US INSERT INTO ORDERS_COMBINED SELECT ‘US’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS; ORDERS_UK UK UK UK INSERT INTO ORDERS_COMBINED SELECT ‘UK’ AS SOURCE, ORDERTIME, ITEMID, ORDERUNITS, ADDRESS US FROM ORDERS_UK; ORDERS_COMBINED Introduction to ksqlDB

Slide 65

Slide 65

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK UK US ORDERS_COMBINED Introduction to ksqlDB

Slide 66

Slide 66

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; UK US ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; Introduction to ksqlDB

Slide 67

Slide 67

@rmoff #ljcjug Transform data with ksqlDB - split streams US UK CREATE STREAM ORDERS_US AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’US’; US US ORDERS_US US UK ORDERS_COMBINED CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS_COMBINED WHERE SOURCE =’UK’; UK UK ORDERS_UK Introduction to ksqlDB

Slide 68

Slide 68

Message transformation with ksqlDB ORDERS s i h t t r e v n o C o t p m a t s e m e l ti b a d a e r n a m u h t a m for @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } Drop these address field s Introduction to ksqlDB

Slide 69

Slide 69

Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } } ORDERS_NO_ADDRESS_DATA AS ORDERS ksqlDB CREATE STREAM SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; Introduction to ksqlDB

Slide 70

Slide 70

Message transformation with ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } AS ORDERS_NO_ADDRESS_DATA } ORDERS ksqlDB CREATE STREAM SELECT TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm:ss’) AS ORDER_TIMESTAMP, ORDERID, ITEMID, ORDERUNITS FROM ORDERS; ORDERS_NO_ADDRESS_DATA { “order_ts”: “2020-02-14 15:10:58”, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5 } Introduction to ksqlDB

Slide 71

Slide 71

Schema manipulation - flatten records { ORDERS s d l e fi d e t s e N } @rmoff #ljcjug “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } Introduction to ksqlDB

Slide 72

Slide 72

Schema manipulation with ksqlDB ORDERS ksqlDB @rmoff #ljcjug { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; Introduction to ksqlDB

Slide 73

Slide 73

Schema manipulation with ksqlDB @rmoff #ljcjug { ORDERS ksqlDB “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address”: { “street”: “243 Utah Way”, “city”: “Orange”, “state”: “California” } CREATE STREAM ORDERS_FLAT AS SELECT […] } ADDRESS->STREET AS ADDRESS_STREET, ADDRESS->CITY AS ADDRESS_CITY, ADDRESS->STATE AS ADDRESS_STATE FROM ORDERS; ORDERS_FLAT {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB

Slide 74

Slide 74

Reserialising data with ksqlDB Avro ORDERS @rmoff #ljcjug {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} Introduction to ksqlDB

Slide 75

Slide 75

@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED’) AS SELECT * FROM ORDERS_FLAT; Introduction to ksqlDB

Slide 76

Slide 76

@rmoff #ljcjug Reserialising data with ksqlDB Avro ksqlDB CSV ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “address-street”: “243 Utah Way”, “address-city”: “Orange”, “address-state”: “California”} CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT=’DELIMITED) AS SELECT * FROM ORDERS_FLAT; ORDERS_CSV 1560045914101,24644,Item_0,1,43078 De 1560047305664,24643,Item_29,3,209 Mon 1560057079799,24642,Item_38,18,3 Autu 1560088652051,24647,Item_6,6,82893 Ar 1560105559145,24648,Item_0,12,45896 W 1560108336441,24646,Item_33,4,272 Hef 1560123862235,24641,Item_15,16,0 Dort 1560124799053,24645,Item_12,1,71 Knut Introduction to ksqlDB