Kafka as a Platform: the Ecosystem from the Ground Up

A presentation at GOTOpia in September 2020 in by Robin Moffatt

Slide 1

Slide 1

Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff

Slide 2

Slide 2

$ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis !) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff | #GOTOpia | @confluentinc

Slide 3

Slide 3

EVENTS @rmoff | #GOTOpia | @confluentinc

Slide 4

Slide 4

EVENTS @rmoff | #GOTOpia | @confluentinc

Slide 5

Slide 5

• • EVENTS d e n e p p a h g n i h t e Som d e n e p p a h t a Wh

Slide 6

Slide 6

Human generated events A Sale A Stock movement @rmoff | #GOTOpia | @confluentinc

Slide 7

Slide 7

Machine generated events IoT Networking Applications @rmoff | #GOTOpia | @confluentinc

Slide 8

Slide 8

EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc

Slide 9

Slide 9

EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc

Slide 10

Slide 10

Slide 11

Slide 11

Slide 12

Slide 12

K V

Slide 13

Slide 13

LOG @rmoff | #GOTOpia | @confluentinc

Slide 14

Slide 14

K V

Slide 15

Slide 15

K V

Slide 16

Slide 16

K V

Slide 17

Slide 17

K V

Slide 18

Slide 18

K V

Slide 19

Slide 19

K V

Slide 20

Slide 20

K V

Slide 21

Slide 21

Immutable Event Log Old New Events are added at the end of the log @rmoff | #GOTOpia | @confluentinc

Slide 22

Slide 22

TOPICS @rmoff | #GOTOpia | @confluentinc

Slide 23

Slide 23

Topics Clicks Orders Customers Topics are similar in concept to tables in a database @rmoff | #GOTOpia | @confluentinc

Slide 24

Slide 24

PARTITIONS @rmoff | #GOTOpia | @confluentinc

Slide 25

Slide 25

Partitions Clicks p0 P1 P2 Messages are guaranteed to be strictly ordered within a partition @rmoff | #GOTOpia | @confluentinc

Slide 26

Slide 26

PUB / SUB @rmoff | #GOTOpia | @confluentinc

Slide 27

Slide 27

PUB / SUB @rmoff | #GOTOpia | @confluentinc

Slide 28

Slide 28

Producing data Old New Messages are added at the end of the log @rmoff | #GOTOpia | @confluentinc

Slide 29

Slide 29

partition 0 … partition 1 producer … partition 2 … Partitioned Topic

Slide 30

Slide 30

try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) { for (long i = 0; i < 10; i++) { final String orderId = “id” + Long.toString(i); final Payment payment = new Payment(orderId, 1000.00d); final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(“transactions”, payment.getId().toString(), payment); producer.send(record); } } catch (final InterruptedException e) { e.printStackTrace(); }

Slide 31

Slide 31

package main import ( “gopkg.in/confluentinc/confluent-kafka-go.v1/kafka” ) func main() { topic := “test_topic” p, _ := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: “localhost:9092”}) defer p.Close() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte(“Hello world”)}, nil) }

Slide 32

Slide 32

Producing to Kafka - No Key Time Partition 1 Partition 2 Partition 3 Messages will be produced in a round robin fashion Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 33

Slide 33

Producing to Kafka - With Key Time Partition 1 A Partition 2 B hash(key) % numPartitions = N Partition 3 C Partition 4 D @rmoff | #GOTOpia | @confluentinc

Slide 34

Slide 34

Producers partition 0 … partition 1 producer … partition 2 … Partitioned Topic • A client application • Puts messages into topics • Handles partitioning, network protocol • Java, Go, .NET, C/C++, Python • Also every other language Plus REST proxy if not

Slide 35

Slide 35

PUB / SUB @rmoff | #GOTOpia | @confluentinc

Slide 36

Slide 36

Consuming data - access is only sequential Read to offset & scan Old New @rmoff | #GOTOpia | @confluentinc

Slide 37

Slide 37

Consumers have a position of their own Old New Sally is here @rmoff | Scan #GOTOpia | @confluentinc

Slide 38

Slide 38

Consumers have a position of their own Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc

Slide 39

Slide 39

Consumers have a position of their own Rick is here Scan Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc

Slide 40

Slide 40

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(TOPIC)); while (true) { ConsumerRecords<String, Payment> records = consumer.poll(100); for (ConsumerRecord<String, Payment> record : records) { String key = record.key(); Payment value = record.value(); System.out.printf(“key = %s, value = %s%n”, key, value); } } }

Slide 41

Slide 41

c, _ := kafka.NewConsumer(&cm) defer c.Close() c.Subscribe(topic, nil) for { select { case ev := <-c.Events(): switch ev.(type) { case *kafka.Message: km := ev.(*kafka.Message) fmt.Printf(“✅ Message ‘%v’ received from topic ‘%v’\n”, string(km.Value), string(*km.TopicPartition.Topic)) } } }

Slide 42

Slide 42

Consuming From Kafka - Single Consumer Partition 1 Partition 2 C Partition 3 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 43

Slide 43

Consuming From Kafka - Multiple Consumers Partition 1 C1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 44

Slide 44

Consuming From Kafka - Multiple Consumers C1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 45

Slide 45

Consuming From Kafka - Grouped Consumers CC1 1 CC1 1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 46

Slide 46

CONSUMERS CONSUMER GROUP COORDINATOR CONSUMER GROUP

Slide 47

Slide 47

Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 C4 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 48

Slide 48

Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 3 #GOTOpia | Partition 4 @rmoff | @confluentinc

Slide 49

Slide 49

Consuming From Kafka - Grouped Consumers Partition 1 C1 Partition 2 Partition 3 C2 C3 Partition 4 @rmoff | #GOTOpia | @confluentinc

Slide 50

Slide 50

Consumers partition 0 … partition 1 … consumer A consumer A consumer A partition 2 … Partitioned Topic consumer B • A client application • Reads messages from topics • Horizontally, elastically scalable (if stateless) • Java, Go, .NET, C/C++, Python, everything else Plus REST proxy if not

Slide 51

Slide 51

BROKERS and REPLICATION @rmoff | #GOTOpia | @confluentinc

Slide 52

Slide 52

Leader Partition Leadership and Replication Follower Partition 1 Partition 2 Partition 3 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Slide 53

Slide 53

Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Slide 54

Slide 54

Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Slide 55

Slide 55

Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc

Slide 56

Slide 56

So far, this is Pretty good @rmoff | #GOTOpia | @confluentinc

Slide 57

Slide 57

So far, this is Pretty good but I’ve not finished yet… @rmoff | #GOTOpia | @confluentinc

Slide 58

Slide 58

Streaming Pipelines Amazon S3 RDBMS HDFS @rmoff | #GOTOpia | @confluentinc

Slide 59

Slide 59

Evolve processing from old systems to new Existing New App <x> App RDBMS @rmoff | #GOTOpia | @confluentinc

Slide 60

Slide 60

Slide 61

Slide 61

Streaming Integration with Kafka Connect syslog Sources Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Slide 62

Slide 62

Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Slide 63

Slide 63

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Slide 64

Slide 64

Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #GOTOpia | @confluentinc

Slide 65

Slide 65

Extensible Connector Transform(s) @rmoff Converter | #GOTOpia | @confluentinc

Slide 66

Slide 66

hub.confluent.io @rmoff | #GOTOpia | @confluentinc

Slide 67

Slide 67

Fault-tolerant? Nope. Kafka Connect Standalone Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Offsets Worker @rmoff | #GOTOpia | @confluentinc

Slide 68

Slide 68

Fault-tolerant? Yeah! Kafka Connect Distributed Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc

Slide 69

Slide 69

Fault-tolerant? Yeah! Scaling the Distributed Worker S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc

Slide 70

Slide 70

Schema Registry @rmoff | #GOTOpia | @confluentinc

Slide 71

Slide 71

K V

Slide 72

Slide 72

K V

Slide 73

Slide 73

K V

Slide 74

Slide 74

K V

Slide 75

Slide 75

K V ? s i h t s ’ t a h w … t i a W

Slide 76

Slide 76

How do you serialise your data? JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc

Slide 77

Slide 77

APIs are contracts between services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc

Slide 78

Slide 78

But not all services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc

Slide 79

Slide 79

And naturally… {user_id: 53, address: “2 Elm st.”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Slide 80

Slide 80

Schemas are about how teams work together {user_id: 53, timestamp: 1497842472} new Date(timestamp) Profile service Quote service Profile database create table ( user_id number, timestamp number) @rmoff Stream processing | #GOTOpia | @confluentinc

Slide 81

Slide 81

Things change… {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Slide 82

Slide 82

Moving fast and breaking things {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Slide 83

Slide 83

Lack of schemas – Coupling teams and services 2001 2001 Citrus Heights-Sunrise Blvd Citrus_Hghts 60670001 3400293 34 SAC Sacramento SV Sacramento Valley SAC Sacramento County APCD SMA8 Sacramento Metropolitan Area CA 6920 Sacramento 28 6920 13588 7400 Sunrise Blvd 95610 38 41 56 38.6988889 121 16 15.98999977 -121.271111 10 4284781 650345 52 @rmoff | #GOTOpia | @confluentinc

Slide 84

Slide 84

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc

Slide 85

Slide 85

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV #

$ https://rmoff.dev/qcon-schemas @rmoff | #GOTOpia | @confluentinc

Slide 86

Slide 86

It isn’t just about the services Software Teams Engineering & Culture Data & Metadata @rmoff | #GOTOpia | @confluentinc

Slide 87

Slide 87

Schemas Schema Registry Topic producer … consumer

Slide 88

Slide 88

partition 0 consumer A … consumer A partition 1 … consumer A partition 2 … consumer B Partitioned Topic @rmoff | #GOTOpia | @confluentinc

Slide 89

Slide 89

consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc

Slide 90

Slide 90

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc

Slide 91

Slide 91

Streams of events Time @rmoff | #GOTOpia | @confluentinc

Slide 92

Slide 92

Stream Processing Stream: widgets Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

Slide 93

Slide 93

Stream Processing with Kafka Streams Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

Slide 94

Slide 94

consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc

Slide 95

Slide 95

Streams Application Streams Application Streams Application @rmoff | #GOTOpia | @confluentinc

Slide 96

Slide 96

Properties streamsConfiguration = getProperties(SCHEMA_REGISTRY_URL); final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); final SpecificAvroSerde<Movie> movieSerde = getMovieAvroSerde(serdeConfig); final SpecificAvroSerde<Rating> ratingSerde = getRatingAvroSerde(serdeConfig); final SpecificAvroSerde<RatedMovie> ratedMovieSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); StreamsBuilder builder = new StreamsBuilder(); KTable<Long, Double> ratingAverage = getRatingAverageTable(builder); getRatedMoviesTable(builder, ratingAverage, movieSerde); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.start(); } private static SpecificAvroSerde<Rating> getRatingAvroSerde(Map<String, String> serdeConfig) { final SpecificAvroSerde<Rating> ratingSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); return ratingSerde;

Slide 97

Slide 97

final SpecificAvroSerde<Movie> movieSerde = new SpecificAvroSerde<>(); movieSerde.configure(serdeConfig, false); return movieSerde; } public static KTable<Long, String> getRatedMoviesTable(StreamsBuilder builder, KTable<Long, Double> ratingAverage, SpecificAvroSerde<Movie> movieSerde) { builder.stream(“raw-movies”, Consumed.with(Serdes.Long(), Serdes.String())) .mapValues(Parser::parseMovie) .map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)) .to(“movies”, Produced.with(Serdes.Long(), movieSerde)); KTable<Long, Movie> movies = builder.table(“movies”, Materialized .<Long, Movie, KeyValueStore<Bytes, byte[]>>as( “movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); KTable<Long, String> ratedMovies = ratingAverage .join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; }

Slide 98

Slide 98

.join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; } public static KTable<Long, Double> getRatingAverageTable(StreamsBuilder builder) { KStream<Long, String> rawRatings = builder.stream(“raw-ratings”, Consumed.with(Serdes.Long(), Serdes.String())); KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating) .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating)); KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating); KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey(); KTable<Long, Long> ratingCounts = ratingsById.count(); KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2); KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(), Materialized.as(“average-ratings”)); ratingAverage.toStream() /.peek((key, value) -> { // debug only System.out.println(“key = ” + key + “, value = ” + value); })/ .to(“average-ratings”); return ratingAverage;

Slide 99

Slide 99

KTable<Long, Movie> movies = builder.table(“movies”, Materialized. <Long, Movie,KeyValueStore< Bytes, byte[]>> as(“movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); • Java API • Filter, join, aggregate, etc. • Locates stream processing with your application • Scales like a Consumer Group (but better!)

Slide 100

Slide 100

Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

Slide 101

Slide 101

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc

Slide 102

Slide 102

SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …); @rmoff | #GOTOpia | @confluentinc

Slide 103

Slide 103

ksqlDB The event streaming database purpose-built for stream processing applications. @rmoff | #GOTOpia | @confluentinc

Slide 104

Slide 104

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Slide 105

Slide 105

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Slide 106

Slide 106

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Slide 107

Slide 107

Stream Processing with ksqlDB Source stream Analytics @rmoff | #GOTOpia | @confluentinc

Slide 108

Slide 108

Stream Processing with ksqlDB Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc

Slide 109

Slide 109

Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc

Slide 110

Slide 110

Lookups and Joins with ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @rmoff | #GOTOpia | @confluentinc

Slide 111

Slide 111

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS @rmoff | #GOTOpia | @confluentinc

Slide 112

Slide 112

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; @rmoff | } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} #GOTOpia | @confluentinc

Slide 113

Slide 113

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @rmoff | #GOTOpia | @confluentinc

Slide 114

Slide 114

Streams & Tables @rmoff | #GOTOpia | @confluentinc

Slide 115

Slide 115

Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc

Slide 116

Slide 116

Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @rmoff | #GOTOpia | @confluentinc

Slide 117

Slide 117

Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc

Slide 118

Slide 118

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic Pull and Push queries in ksqlDB ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query @rmoff | Push query #GOTOpia | @confluentinc

Slide 119

Slide 119

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API curl -s -X “POST” “http://localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} @rmoff | #GOTOpia | @confluentinc

Slide 120

Slide 120

Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Push query All value changes Exits: Immediately Never @rmoff | #GOTOpia | @confluentinc

Slide 121

Slide 121

ksqlDB or Kafka Streams? @rmoff | #GOTOpia | @confluentinc Photo by Ramiz Dedaković on Unsplash

Slide 122

Slide 122

Standing on the Shoulders of Streaming Giants ksqlDB Powered by Ease of use ksqlDB UDFs Kafka Streams Powered by Producer, Consumer APIs Flexibility @rmoff | #GOTOpia | @confluentinc

Slide 123

Slide 123

Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc

Slide 124

Slide 124

Summary @rmoff | #GOTOpia | @confluentinc

Slide 125

Slide 125

@rmoff | #GOTOpia | @confluentinc

Slide 126

Slide 126

K V @rmoff | #GOTOpia | @confluentinc

Slide 127

Slide 127

K V @rmoff | #GOTOpia | @confluentinc

Slide 128

Slide 128

The Log @rmoff | #GOTOpia | @confluentinc

Slide 129

Slide 129

Producer Consumer The Log @rmoff | #GOTOpia | @confluentinc

Slide 130

Slide 130

Producer Consumer Connectors The Log @rmoff | #GOTOpia | @confluentinc

Slide 131

Slide 131

Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc

Slide 132

Slide 132

Apache Kafka Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc

Slide 133

Slide 133

Confluent Platform ksqlDB Producer Consumer Connectors The Log Schema Registry Streaming Engine @rmoff | #GOTOpia | @confluentinc

Slide 134

Slide 134

EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc

Slide 135

Slide 135

EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc

Slide 136

Slide 136

Standby for resource links… @rmoff | #GOTOpia | @confluentinc

Slide 137

Slide 137

Free Books! https://rmoff.dev/gotopia @rmoff | #GOTOpia | @confluentinc

Slide 138

Slide 138

60 DE VA DV $50 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill % ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer

Slide 139

Slide 139

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Slide 140

Slide 140

Confluent Community Slack group cnfl.io/slack @rmoff | #GOTOpia | @confluentinc

Slide 141

Slide 141

#EOF @rmoff rmoff.dev/talks youtube.com/rmoff