Kafka as a Platform: the Ecosystem from the Ground Up Robin Moffatt | #GOTOpia | @rmoff

$ whoami • Robin Moffatt (@rmoff) • Senior Developer Advocate at Confluent (Apache Kafka, not Wikis !) • Working in data & analytics since 2001 • Oracle ACE Director (Alumnus) http://rmoff.dev/talks · http://rmoff.dev/blog · http://rmoff.dev/youtube @rmoff | #GOTOpia | @confluentinc

EVENTS @rmoff | #GOTOpia | @confluentinc

EVENTS @rmoff | #GOTOpia | @confluentinc

• • EVENTS d e n e p p a h g n i h t e Som d e n e p p a h t a Wh

Human generated events A Sale A Stock movement @rmoff | #GOTOpia | @confluentinc

Machine generated events IoT Networking Applications @rmoff | #GOTOpia | @confluentinc

EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc

EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc

K V

LOG @rmoff | #GOTOpia | @confluentinc

K V

K V

K V

K V

K V

K V

K V

Immutable Event Log Old New Events are added at the end of the log @rmoff | #GOTOpia | @confluentinc

TOPICS @rmoff | #GOTOpia | @confluentinc

Topics Clicks Orders Customers Topics are similar in concept to tables in a database @rmoff | #GOTOpia | @confluentinc

PARTITIONS @rmoff | #GOTOpia | @confluentinc

Partitions Clicks p0 P1 P2 Messages are guaranteed to be strictly ordered within a partition @rmoff | #GOTOpia | @confluentinc

PUB / SUB @rmoff | #GOTOpia | @confluentinc

PUB / SUB @rmoff | #GOTOpia | @confluentinc

Producing data Old New Messages are added at the end of the log @rmoff | #GOTOpia | @confluentinc

partition 0 … partition 1 producer … partition 2 … Partitioned Topic

try (KafkaProducer<String, Payment> producer = new KafkaProducer<String, Payment>(props)) { for (long i = 0; i < 10; i++) { final String orderId = “id” + Long.toString(i); final Payment payment = new Payment(orderId, 1000.00d); final ProducerRecord<String, Payment> record = new ProducerRecord<String, Payment>(“transactions”, payment.getId().toString(), payment); producer.send(record); } } catch (final InterruptedException e) { e.printStackTrace(); }

package main import ( “gopkg.in/confluentinc/confluent-kafka-go.v1/kafka” ) func main() { topic := “test_topic” p, _ := kafka.NewProducer(&kafka.ConfigMap{ “bootstrap.servers”: “localhost:9092”}) defer p.Close() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte(“Hello world”)}, nil) }

Producing to Kafka - No Key Time Partition 1 Partition 2 Partition 3 Messages will be produced in a round robin fashion Partition 4 @rmoff | #GOTOpia | @confluentinc

Producing to Kafka - With Key Time Partition 1 A Partition 2 B hash(key) % numPartitions = N Partition 3 C Partition 4 D @rmoff | #GOTOpia | @confluentinc

Producers partition 0 … partition 1 producer … partition 2 … Partitioned Topic • A client application • Puts messages into topics • Handles partitioning, network protocol • Java, Go, .NET, C/C++, Python • Also every other language Plus REST proxy if not

PUB / SUB @rmoff | #GOTOpia | @confluentinc

Consuming data - access is only sequential Read to offset & scan Old New @rmoff | #GOTOpia | @confluentinc

Consumers have a position of their own Old New Sally is here @rmoff | Scan #GOTOpia | @confluentinc

Consumers have a position of their own Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc

Consumers have a position of their own Rick is here Scan Old New Fred is here Sally is here Scan @rmoff | Scan #GOTOpia | @confluentinc

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(TOPIC)); while (true) { ConsumerRecords<String, Payment> records = consumer.poll(100); for (ConsumerRecord<String, Payment> record : records) { String key = record.key(); Payment value = record.value(); System.out.printf(“key = %s, value = %s%n”, key, value); } } }

c, _ := kafka.NewConsumer(&cm) defer c.Close() c.Subscribe(topic, nil) for { select { case ev := <-c.Events(): switch ev.(type) { case *kafka.Message: km := ev.(*kafka.Message) fmt.Printf(“✅ Message ‘%v’ received from topic ‘%v’\n”, string(km.Value), string(*km.TopicPartition.Topic)) } } }

Consuming From Kafka - Single Consumer Partition 1 Partition 2 C Partition 3 Partition 4 @rmoff | #GOTOpia | @confluentinc

Consuming From Kafka - Multiple Consumers Partition 1 C1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

Consuming From Kafka - Multiple Consumers C1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

Consuming From Kafka - Grouped Consumers CC1 1 CC1 1 Partition 1 Partition 2 Partition 3 C2 Partition 4 @rmoff | #GOTOpia | @confluentinc

CONSUMERS CONSUMER GROUP COORDINATOR CONSUMER GROUP

Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 C4 Partition 4 @rmoff | #GOTOpia | @confluentinc

Consuming From Kafka - Grouped Consumers Partition 1 Partition 2 Partition 3 C1 C2 C3 3 #GOTOpia | Partition 4 @rmoff | @confluentinc

Consuming From Kafka - Grouped Consumers Partition 1 C1 Partition 2 Partition 3 C2 C3 Partition 4 @rmoff | #GOTOpia | @confluentinc

Consumers partition 0 … partition 1 … consumer A consumer A consumer A partition 2 … Partitioned Topic consumer B • A client application • Reads messages from topics • Horizontally, elastically scalable (if stateless) • Java, Go, .NET, C/C++, Python, everything else Plus REST proxy if not

BROKERS and REPLICATION @rmoff | #GOTOpia | @confluentinc

Leader Partition Leadership and Replication Follower Partition 1 Partition 2 Partition 3 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Leader Partition Leadership and Replication Follower Partition 1 Partition 1 Partition 1 Partition 2 Partition 2 Partition 2 Partition 3 Partition 3 Partition 3 Partition 4 Partition 4 Partition 4 Broker 1 Broker 2 Broker 3 @rmoff | #GOTOpia | @confluentinc

Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc

So far, this is Pretty good @rmoff | #GOTOpia | @confluentinc

So far, this is Pretty good but I’ve not finished yet… @rmoff | #GOTOpia | @confluentinc

Streaming Pipelines Amazon S3 RDBMS HDFS @rmoff | #GOTOpia | @confluentinc

Evolve processing from old systems to new Existing New App <x> App RDBMS @rmoff | #GOTOpia | @confluentinc

Streaming Integration with Kafka Connect syslog Sources Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Streaming Integration with Kafka Connect Amazon Sinks Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect @rmoff | Kafka Brokers #GOTOpia | @confluentinc

Look Ma, No Code! { “connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”, “connection.url”: “jdbc:mysql://asgard:3306/demo”, “table.whitelist”: “sales,orders,customers” } @rmoff | #GOTOpia | @confluentinc

Extensible Connector Transform(s) @rmoff Converter | #GOTOpia | @confluentinc

hub.confluent.io @rmoff | #GOTOpia | @confluentinc

Fault-tolerant? Nope. Kafka Connect Standalone Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Offsets Worker @rmoff | #GOTOpia | @confluentinc

Fault-tolerant? Yeah! Kafka Connect Distributed Worker S3 Task #1 JDBC Task #1 JDBC Task #2 Kafka Connect cluster Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc

Fault-tolerant? Yeah! Scaling the Distributed Worker S3 Task #1 JDBC Task #1 Kafka Connect cluster JDBC Task #2 Worker Worker Offsets Config Status @rmoff | #GOTOpia | @confluentinc

Schema Registry @rmoff | #GOTOpia | @confluentinc

K V

K V

K V

K V

K V ? s i h t s ’ t a h w … t i a W

How do you serialise your data? JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc

APIs are contracts between services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc

But not all services {user_id: 53, address: “2 Elm st.”} Profile service Quote service {user_id: 53, quote: 580} @rmoff | #GOTOpia | @confluentinc

And naturally… {user_id: 53, address: “2 Elm st.”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Schemas are about how teams work together {user_id: 53, timestamp: 1497842472} new Date(timestamp) Profile service Quote service Profile database create table ( user_id number, timestamp number) @rmoff Stream processing | #GOTOpia | @confluentinc

Things change… {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Moving fast and breaking things {user_id: 53, timestamp: “June 28, 2017 4:00pm”} Profile service Quote service Profile database @rmoff Stream processing | #GOTOpia | @confluentinc

Lack of schemas – Coupling teams and services 2001 2001 Citrus Heights-Sunrise Blvd Citrus_Hghts 60670001 3400293 34 SAC Sacramento SV Sacramento Valley SAC Sacramento County APCD SMA8 Sacramento Metropolitan Area CA 6920 Sacramento 28 6920 13588 7400 Sunrise Blvd 95610 38 41 56 38.6988889 121 16 15.98999977 -121.271111 10 4284781 650345 52 @rmoff | #GOTOpia | @confluentinc

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV @rmoff | #GOTOpia | @confluentinc

Serialisation & Schemas JSON Avro Protobuf Schema JSON CSV #

$ https://rmoff.dev/qcon-schemas @rmoff | #GOTOpia | @confluentinc

It isn’t just about the services Software Teams Engineering & Culture Data & Metadata @rmoff | #GOTOpia | @confluentinc

Schemas Schema Registry Topic producer … consumer

partition 0 consumer A … consumer A partition 1 … consumer A partition 2 … consumer B Partitioned Topic @rmoff | #GOTOpia | @confluentinc

consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc

Streams of events Time @rmoff | #GOTOpia | @confluentinc

Stream Processing Stream: widgets Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

Stream Processing with Kafka Streams Stream: widgets final StreamsBuilder builder = new StreamsBuilder() .stream(“widgets”, Consumed.with(stringSerde, widgetsSerde)) .filter( (key, widget) -> widget.getColour().equals(“RED”) ) .to(“widgets_red”, Produced.with(stringSerde, widgetsSerde)); Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

consumer A consumer A consumer A @rmoff | #GOTOpia | @confluentinc

Streams Application Streams Application Streams Application @rmoff | #GOTOpia | @confluentinc

Properties streamsConfiguration = getProperties(SCHEMA_REGISTRY_URL); final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); final SpecificAvroSerde<Movie> movieSerde = getMovieAvroSerde(serdeConfig); final SpecificAvroSerde<Rating> ratingSerde = getRatingAvroSerde(serdeConfig); final SpecificAvroSerde<RatedMovie> ratedMovieSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); StreamsBuilder builder = new StreamsBuilder(); KTable<Long, Double> ratingAverage = getRatingAverageTable(builder); getRatedMoviesTable(builder, ratingAverage, movieSerde); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.start(); } private static SpecificAvroSerde<Rating> getRatingAvroSerde(Map<String, String> serdeConfig) { final SpecificAvroSerde<Rating> ratingSerde = new SpecificAvroSerde<>(); ratingSerde.configure(serdeConfig, false); return ratingSerde;

final SpecificAvroSerde<Movie> movieSerde = new SpecificAvroSerde<>(); movieSerde.configure(serdeConfig, false); return movieSerde; } public static KTable<Long, String> getRatedMoviesTable(StreamsBuilder builder, KTable<Long, Double> ratingAverage, SpecificAvroSerde<Movie> movieSerde) { builder.stream(“raw-movies”, Consumed.with(Serdes.Long(), Serdes.String())) .mapValues(Parser::parseMovie) .map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)) .to(“movies”, Produced.with(Serdes.Long(), movieSerde)); KTable<Long, Movie> movies = builder.table(“movies”, Materialized .<Long, Movie, KeyValueStore<Bytes, byte[]>>as( “movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); KTable<Long, String> ratedMovies = ratingAverage .join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; }

.join(movies, (avg, movie) -> movie.getTitle() + “=” + avg); ratedMovies.toStream().to(“rated-movies”, Produced.with(Serdes.Long(), Serdes.String())); return ratedMovies; } public static KTable<Long, Double> getRatingAverageTable(StreamsBuilder builder) { KStream<Long, String> rawRatings = builder.stream(“raw-ratings”, Consumed.with(Serdes.Long(), Serdes.String())); KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating) .map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating)); KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating); KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey(); KTable<Long, Long> ratingCounts = ratingsById.count(); KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2); KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(), Materialized.as(“average-ratings”)); ratingAverage.toStream() /.peek((key, value) -> { // debug only System.out.println(“key = ” + key + “, value = ” + value); })/ .to(“average-ratings”); return ratingAverage;

KTable<Long, Movie> movies = builder.table(“movies”, Materialized. <Long, Movie,KeyValueStore< Bytes, byte[]>> as(“movies-store”) .withValueSerde(movieSerde) .withKeySerde(Serdes.Long()) ); • Java API • Filter, join, aggregate, etc. • Locates stream processing with your application • Scales like a Consumer Group (but better!)

Stream Processing with ksqlDB Stream: widgets ksqlDB CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | #GOTOpia | @confluentinc

} “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash { @rmoff | #GOTOpia | @confluentinc

SELECT * FROM WIDGETS WHERE WEIGHT_G > 120 { SELECT COUNT(*) FROM WIDGETS GROUP BY PRODUCTION_LINE } “reading_ts”: “2020-02-14T12:19:27Z”, “sensor_id”: “aa-101”, “production_line”: “w01”, “widget_type”: “acme94”, “temp_celcius”: 23, “widget_weight_g”: 100 Photo by Franck V. on Unsplash SELECT AVG(TEMP_CELCIUS) AS TEMP FROM WIDGETS GROUP BY SENSOR_ID HAVING TEMP>20 CREATE SINK CONNECTOR dw WITH ( Object store, ‘connector.class’ = ‘S3Connector’, data warehouse, ‘topics’ = ‘widgets’ RDBMS …); @rmoff | #GOTOpia | @confluentinc

ksqlDB The event streaming database purpose-built for stream processing applications. @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB Source stream @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB Source stream Analytics @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc

Stream Processing with ksqlDB …SUM(TXN_AMT) GROUP BY AC_ID AC _I D= 42 BA LA NC AC E= _I 94 D= .0 42 0 Source stream Applications / Microservices @rmoff | #GOTOpia | @confluentinc

Lookups and Joins with ksqlDB ORDERS {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} @rmoff | #GOTOpia | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} ORDERS @rmoff | #GOTOpia | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; @rmoff | } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} #GOTOpia | @confluentinc

Lookups and Joins with ksqlDB { “id”: “Item_9”, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9 ITEMS ORDERS ksqlDB CREATE STREAM ORDERS_ENRICHED AS SELECT O., I., O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE, FROM ORDERS O INNER JOIN ITEMS I ON O.ITEMID = I.ID ; } {“ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5} { “ordertime”: 1560070133853, “orderid”: 67, “itemid”: “Item_9”, “orderunits”: 5, “make”: “Boyle-McDermott”, “model”: “Apiaceae”, “unit_cost”: 19.9, “total_order_value”: 99.5 ORDERS_ENRICHED } @rmoff | #GOTOpia | @confluentinc

Streams & Tables @rmoff | #GOTOpia | @confluentinc

Streams and Tables Kafka topic (k/v bytes) { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } ksqlDB Stream +——————————+———-+————-+ |EVENT_TS |PERSON |LOCATION | +——————————+———-+————-+ |2020-02-17 15:22:00 |robin |Leeds | |2020-02-17 17:23:00 |robin |London | |2020-02-17 22:23:00 |robin |Wakefield| |2020-02-18 09:00:00 |robin |Leeds | Stream: Topic + Schema ksqlDB Table +———-+————-+ |PERSON |LOCATION | +———-+————-+ |robin |Leeds |London |Wakefield| | Table: state for given key Topic + Schema { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc

Stateful aggregations in ksqlDB Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } SELECT PERSON, COUNT(*) FROM MOVEMENTS GROUP BY PERSON; SELECT PERSON, COUNT_DISTINCT(LOCATION) FROM MOVEMENTS GROUP BY PERSON; +———-+—————————+ |PERSON | LOCATION_CHANGES | +———-+—————————+ |robin | 4 1 2 3 | +———-+—————————+ |PERSON | UNIQUE_LOCATIONS | +———-+—————————+ |robin | 3 1 2 | { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } Aggregations can be across the entire input, or windowed (TUMBLING, HOPPING, SESSION) @rmoff | #GOTOpia | @confluentinc

Kafka topic { “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Stateful aggregations in ksqlDB { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } @rmoff | #GOTOpia | @confluentinc

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |3 |3 | Query terminated ksql> PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic Pull and Push queries in ksqlDB ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=’robin’; EMIT CHANGES; +————————-+————————-+ |LOCATION_CHANGES |UNIQUE_LOCATIONS | +————————-+————————-+ |1 |1 | |2 |2 | |3 |3 | |4 |3 | Press CTRL-C to interrupt Pull query @rmoff | Push query #GOTOpia | @confluentinc

{ “event_ts”: “2020-02-17T15:22:00Z”, “person” : “robin”, “location”: “Leeds” } { “event_ts”: “2020-02-17T17:23:00Z”, “person” : “robin”, “location”: “London” } { “event_ts”: “2020-02-17T22:23:00Z”, “person” : “robin”, “location”: “Wakefield” } { “event_ts”: “2020-02-18T09:00:00Z”, “person” : “robin”, “location”: “Leeds” } CREATE TABLE PERSON_MOVEMENTS AS SELECT PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY PERSON; PERSON_ MOVEMENTS Internal ksqlDB state store Kafka topic ksqlDB REST API curl -s -X “POST” “http://localhost:8088/query” \ -H “Content-Type: application/vnd.ksql.v1+json; charset=utf-8” \ -d ‘{“ksql”:”SELECT UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY=”’robin”’;”}’ {“value”:”3”} @rmoff | #GOTOpia | @confluentinc

Pull and Push queries in ksqlDB Pull query Tells you: Point in time value Push query All value changes Exits: Immediately Never @rmoff | #GOTOpia | @confluentinc

ksqlDB or Kafka Streams? @rmoff | #GOTOpia | @confluentinc Photo by Ramiz Dedaković on Unsplash

Standing on the Shoulders of Streaming Giants ksqlDB Powered by Ease of use ksqlDB UDFs Kafka Streams Powered by Producer, Consumer APIs Flexibility @rmoff | #GOTOpia | @confluentinc

Photo by Raoul Droog on Unsplas DEMO @rmoff | #GOTOpia | @confluentinc

Summary @rmoff | #GOTOpia | @confluentinc

@rmoff | #GOTOpia | @confluentinc

K V @rmoff | #GOTOpia | @confluentinc

K V @rmoff | #GOTOpia | @confluentinc

The Log @rmoff | #GOTOpia | @confluentinc

Producer Consumer The Log @rmoff | #GOTOpia | @confluentinc

Producer Consumer Connectors The Log @rmoff | #GOTOpia | @confluentinc

Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc

Apache Kafka Producer Consumer Connectors The Log Streaming Engine @rmoff | #GOTOpia | @confluentinc

Confluent Platform ksqlDB Producer Consumer Connectors The Log Schema Registry Streaming Engine @rmoff | #GOTOpia | @confluentinc

EVENTS are EVERYWHERE @rmoff | #GOTOpia | @confluentinc

EVENTS y r e v ^ are POWERFUL @rmoff | #GOTOpia | @confluentinc

Standby for resource links… @rmoff | #GOTOpia | @confluentinc

Free Books! https://rmoff.dev/gotopia @rmoff | #GOTOpia | @confluentinc

60 DE VA DV $50 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $60 towards your bill % ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Confluent Community Slack group cnfl.io/slack @rmoff | #GOTOpia | @confluentinc

#EOF @rmoff rmoff.dev/talks youtube.com/rmoff