🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

A presentation at Apache Kafka Meetup in January 2021 in by Robin Moffatt

Slide 1

Slide 1

🚂 On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data @rmoff @confluentinc

Slide 2

Slide 2

Slide 3

Slide 3

h/t @kennybastani / @gamussa

Slide 4

Slide 4

Slide 5

Slide 5

@rmoff | @confluentinc

Slide 6

Slide 6

https://wiki.openraildata.com @rmoff | @confluentinc

Slide 7

Slide 7

@rmoff | @confluentinc

Slide 8

Slide 8

@rmoff | @confluentinc

Slide 9

Slide 9

ig nf Co Co nf ig SQL @rmoff | @confluentinc

Slide 10

Slide 10

Real-time visualisation of rail data @rmoff | @confluentinc

Slide 11

Slide 11

Real-time visualisation of rail data O M E D @rmoff | @confluentinc

Slide 12

Slide 12

Code! http://rmoff.dev/kafka-trains-code-01 @rmoff | @confluentinc

Slide 13

Slide 13

Graph relationships @rmoff | @confluentinc

Slide 14

Slide 14

Realtime Alerts @rmoff | @confluentinc

Slide 15

Slide 15

Getting the data in 📥 @rmoff | @confluentinc

Slide 16

Slide 16

Data Sources Train Movements Continuous feed Schedules Updated Daily Reference data Static @rmoff | @confluentinc

Slide 17

Slide 17

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect Kafka Brokers @rmoff | @confluentinc

Slide 18

Slide 18

kafkacat • Kafka CLI producer & consumer • Also metadata inspection • Integrates beautifully with unix pipes! curl -s “https://my.api.endpoint” | \ kafkacat -b localhost:9092 -t topic -P • Get it! • https://github.com/edenhill/kafkacat/ • apt-get install kafkacat • brew install kafkacat • https://hub.docker.com/r/confluentinc/cp-kafkacat/ https://rmoff.net/2018/05/10/quick-n-easy-population-of-realistic-test-data-into-kafka/ @rmoff | @confluentinc

Slide 19

Slide 19

https://wiki.openraildata.com/index.php?title=Train_Movements Movement data A train’s movements Arrived / departed Where What time Variation from timetable A train was cancelled Where When Why A train was ‘activated’ Links a train to a schedule @rmoff | @confluentinc

Slide 20

Slide 20

Ingest ActiveMQ with Kafka Connect “connector.class” : “activemq.url” : “jms.destination.type”: “jms.destination.name”: “kafka.topic” : “io.confluent.connect.activemq.ActiveMQSourceConnector”, “tcp://datafeeds.networkrail.co.uk:61619”, “topic”, “TRAIN_MVT_EA_TOC”, “networkrail_TRAIN_MVT”, Kafka Movement events Kafka Connect Northern Trains GNER* TransPennine networkrail_TRAIN_MVT

  • Remember them? @rmoff | @confluentinc

Slide 21

Slide 21

Envelope Payload @rmoff | @confluentinc

Slide 22

Slide 22

Message transformation Extract element Convert JSON Explode Array kafkacat -b localhost:9092 -G tm_explode networkrail_TRAIN_MVT -u | \ jq -c ‘.text|fromjson[]’ | \ kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -T -P @rmoff | @confluentinc

Slide 23

Slide 23

https://wiki.openraildata.com/index.php?title=SCHEDULE Schedule data Train routes Type of train (diesel/electric) Classes, sleeping accomodation, reservations, etc @rmoff | @confluentinc

Slide 24

Slide 24

Load JSON from S3 into Kafka curl -s -L -u “$USER:$PW” “$FEED_URL” | \ gunzip | \ kafkacat -b localhost -P -t CIF_FULL_DAILY @rmoff | @confluentinc

Slide 25

Slide 25

https://wiki.openraildata.com/index.php?title=SCHEDULE Reference data Train company names Location codes Train type codes Location geocodes @rmoff | @confluentinc

Slide 26

Slide 26

Geocoding @rmoff | @confluentinc

Slide 27

Slide 27

Static reference data @rmoff | @confluentinc

Slide 28

Slide 28

AA:{“canx_reason_code”:”AA”,”canx_reason”:”Waiting acceptance into off Network Terminal or Yard”} AC:{“canx_reason_code”:”AC”,”canx_reason”:”Waiting train preparation or completion of TOPS list/RT3973”} AE:{“canx_reason_code”:”AE”,”canx_reason”:”Congestion in off Network Terminal or Yard”} AG:{“canx_reason_code”:”AG”,”canx_reason”:”Wagon load incident including adjusting loads or open door”} AJ:{“canx_reason_code”:”AJ”,”canx_reason”:”Waiting Customer’s traffic including documentation”} DA:{“canx_reason_code”:”DA”,”canx_reason”:”Non Technical Fleet Holding Code”} DB:{“canx_reason_code”:”DB”,”canx_reason”:”Train Operations Holding Code”} DC:{“canx_reason_code”:”DC”,”canx_reason”:”Train Crew Causes Holding Code”} DD:{“canx_reason_code”:”DD”,”canx_reason”:”Technical Fleet Holding Code”} DE:{“canx_reason_code”:”DE”,”canx_reason”:”Station Causes Holding Code”} DF:{“canx_reason_code”:”DF”,”canx_reason”:”External Causes Holding Code”} DG:{“canx_reason_code”:”DG”,”canx_reason”:”Freight Terminal and or Yard Holding Code”} DH:{“canx_reason_code”:”DH”,”canx_reason”:”Adhesion and or Autumn Holding Code”} FA:{“canx_reason_code”:”FA”,”canx_reason”:”Dangerous goods incident”} FC:{“canx_reason_code”:”FC”,”canx_reason”:”Freight train driver”} kafkacat -l canx_reason_code.dat -b localhost:9092 -t canx_reason_code -P -K: @rmoff | @confluentinc

Slide 29

Slide 29

@rmoff | @confluentinc

Slide 30

Slide 30

Transforming the data @rmoff | @confluentinc

Slide 31

Slide 31

Movement Activation Schedule Location @rmoff | @confluentinc

Slide 32

Slide 32

Movement Activation Schedule Location @rmoff | @confluentinc

Slide 33

Slide 33

Movement Activation Schedule Location @rmoff | @confluentinc

Slide 34

Slide 34

Movement Activation Schedule Location @rmoff | @confluentinc

Slide 35

Slide 35

Streams of events Time @rmoff | @confluentinc

Slide 36

Slide 36

Stream Processing with ksqlDB Stream: widgets Stream: widgets_red @rmoff | @confluentinc

Slide 37

Slide 37

Stream Processing with ksqlDB Stream: widgets CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red @rmoff | @confluentinc

Slide 38

Slide 38

ksqlDB @rmoff | @confluentinc

Slide 39

Slide 39

Event data ActiveMQ Kafka Connect NETWORKRAIL_TRAIN_MVT_X @rmoff | @confluentinc

Slide 40

Slide 40

Message routing with ksqlDB TRAIN_ACTIVATIONS Time NETWORKRAIL_TRAIN_MVT_X ’ 1 0 00 ’ = E YP T _ G MS TRAIN_CANCELLATIONS Time MSG_TYPE=’0002’ Time MSG _TY PE= ‘00 03’ TRAIN_MOVEMENTS Time @rmoff | @confluentinc

Slide 41

Slide 41

Split a stream of data (message routing) CREATE STREAM TRAIN_MOVEMENTS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X s t n e v e t n e m e ’ v 3 o 0 0 0 M ’ PE= Y T _ SG M WHERE header->msg_type = ‘0003’; NETWORKRAIL_TRAIN_MVT_X Source topic CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT * Can cella t MSG _TY FROM NETWORKRAIL_TRAIN_MVT_X ion e v e PE= n t s ‘00 0 2’ WHERE header->msg_type = ‘0002’; @rmoff | @confluentinc

Slide 42

Slide 42

Joining events to lookup data (stream-table joins) TS 10:00:01 LOC_ID 42 TS 10:00:02 LOC_ID 43 TS 10:00:01 LOC_ID 42 LOC_DESC Menston TS 10:00:02 LOC_ID 43 LOC_DESC Ilkley TRAIN_MOVEMENTS (Stream) LOCATION (Table) ID DESCRIPTION 42 MENSTON 43 Ilkley CREATE STREAM TRAIN_MOVEMENTS_ENRICHED AS SELECT TM.LOC_ID AS LOC_ID, TM.TS AS TS, L.DESCRIPTION AS LOC_DESC, […] FROM TRAIN_MOVEMENTS TM LEFT JOIN LOCATION L TRAIN_MOVEMENTS_ENRICHED ON TM.LOC_ID = L.ID; (Stream) @rmoff | @confluentinc

Slide 43

Slide 43

Decode values, and generate composite key +———————-+————————+————-+——————————-+————————+————————————+ | CIF_train_uid | sched_start_dt | stp_ind | SCHEDULE_KEY | CIF_power_type | POWER_TYPE | +———————-+————————+————-+——————————-+————————+————————————+ | Y82535 | 2019-05-20 | P | Y82535/2019-05-20/P | E | Electric | | Y82537 | 2019-05-20 | P | Y82537/2019-05-20/P | DMU | Other | | Y82542 | 2019-05-20 | P | Y82542/2019-05-20/P | D | Diesel | | Y82535 | 2019-05-24 | P | Y82535/2019-05-24/P | EMU | Electric Multiple Unit | | Y82542 | 2019-05-24 | P | Y82542/2019-05-24/P | HST | High Speed Train | SELECT JsonScheduleV1->CIF_train_uid + ‘/’ + JsonScheduleV1->schedule_start_date + ‘/’ + JsonScheduleV1->CIF_stp_indicator AS SCHEDULE_KEY, CASE WHEN JsonScheduleV1->schedule_segment->CIF_power_type = ‘D’ THEN ‘Diesel’ WHEN JsonScheduleV1->schedule_segment->CIF_power_type = ‘E’ THEN ‘Electric’ WHEN JsonScheduleV1->schedule_segment->CIF_power_type = ‘ED’ THEN ‘Electro-Diesel’ END AS POWER_TYPE @rmoff | @confluentinc

Slide 44

Slide 44

Schemas CREATE STREAM SCHEDULE_RAW ( TiplocV1 STRUCT<transaction_type tiploc_code NALCO STANOX crs_code description tps_description WITH (KAFKA_TOPIC=’CIF_FULL_DAILY’, VALUE_FORMAT=’JSON’); @rmoff | VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR>) @confluentinc

Slide 45

Slide 45

Handling JSON data CREATE STREAM NETWORKRAIL_TRAIN_MVT_X ( header STRUCT< msg_type VARCHAR, […] >, body VARCHAR) WITH (KAFKA_TOPIC=’networkrail_TRAIN_MVT_X’, VALUE_FORMAT=’JSON’); CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT HEADER, EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header->msg_type = ‘0002’; @rmoff | @confluentinc

Slide 46

Slide 46

The Confluent Schema Registry Avro Schema Schema Registry Target Source ksqlDB Avro Message Avro Message Kafka Connect @rmoff | @confluentinc

Slide 47

Slide 47

Reserialise data CREATE STREAM TRAIN_CANCELLATIONS_00 WITH (VALUE_FORMAT=’AVRO’) AS SELECT * FROM networkrail_TRAIN_MVT_X WHERE header->msg_type = ‘0002’; @rmoff | @confluentinc

Slide 48

Slide 48

Flatten and re-key a stream { CREATE STREAM TIPLOC_FLAT_KEYED SELECT TiplocV1->TRANSACTION_TYPE TiplocV1->TIPLOC_CODE TiplocV1->NALCO TiplocV1->STANOX TiplocV1->CRS_CODE TiplocV1->DESCRIPTION TiplocV1->TPS_DESCRIPTION FROM SCHEDULE_RAW PARTITION BY TIPLOC_CODE; y e e g k n g a n h i C n ksql> DESCRIBE TIPLOC_FLAT_KEYED; io t i art “TiplocV1”: { “transaction_type”: “Create”, “tiploc_code”: “ABWD”, “nalco”: “513100”, “stanox”: “88601”, “crs_code”: “ABW”, “description”: “ABBEY WOOD”, “tps_description”: “ABBEY WOOD” } } p Access nested element AS AS AS AS AS AS AS TRANSACTION_TYPE , TIPLOC_CODE , NALCO , STANOX , CRS_CODE , DESCRIPTION , TPS_DESCRIPTION Name : TIPLOC_FLAT_KEYED Field | Type ——————————————————————-TRANSACTION_TYPE | VARCHAR(STRING) TIPLOC_CODE | VARCHAR(STRING) NALCO | VARCHAR(STRING) STANOX | VARCHAR(STRING) CRS_CODE | VARCHAR(STRING) DESCRIPTION | VARCHAR(STRING) TPS_DESCRIPTION | VARCHAR(STRING) @rmoff | @confluentinc ———————————————————————

Slide 49

Slide 49

@rmoff | @confluentinc

Slide 50

Slide 50

Using the data @rmoff | @confluentinc

Slide 51

Slide 51

ksqlDB @rmoff | @confluentinc

Slide 52

Slide 52

ksqlDB @rmoff | @confluentinc

Slide 53

Slide 53

Streaming Integration with Kafka Connect Amazon syslog Google Kafka Connect Kafka Brokers @rmoff | @confluentinc

Slide 54

Slide 54

Kafka -> Elasticsearch { “connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “connection.url”: “http://elasticsearch:9200”, “type.name”: “type.name=kafkaconnect”, “key.ignore”: “false”, “schema.ignore”: “true”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } @rmoff | @confluentinc

Slide 55

Slide 55

Kibana for real-time visualisation of rail data @rmoff | @confluentinc

Slide 56

Slide 56

Kibana for real-time analysis of rail data @rmoff | @confluentinc

Slide 57

Slide 57

Kafka -> Postgres { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter”, “connection.url”: “jdbc:postgresql://postgres:5432/”, “connection.user”: “postgres”, “connection.password”: “postgres”, “auto.create”: true, “auto.evolve”: true, “insert.mode”: “upsert”, “pk.mode”: “record_key”, “pk.fields”: “MESSAGE_KEY”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “transforms”: “dropArrays”, “transforms.dropArrays.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.dropArrays.blacklist”: “SCHEDULE_SEGMENT_LOCATION, HEADER” } @rmoff | @confluentinc

Slide 58

Slide 58

Kafka -> Postgres @rmoff | @confluentinc

Slide 59

Slide 59

But do you even need a database? TRAIN_MOVEMENTS Time Time SELECT TIMESTAMPTOSTRING(WINDOWSTART(),’yyyy-MM-dd’) AS WINDOW_START_TS, VARIATION_STATUS, SUM(CASE WHEN TOC = ‘Arriva Trains Northern’ THEN 1 ELSE 0 END) AS Arriva_CT, SUM(CASE WHEN TOC = ‘East Midlands Trains’ THEN 1 ELSE 0 END) AS EastMidlands_CT, SUM(CASE WHEN TOC = ‘London North Eastern Railway’ THEN 1 ELSE 0 END) AS LNER_CT, SUM(CASE WHEN TOC = ‘TransPennine Express’ THEN 1 ELSE 0 END) AS TransPennineExpress_CT FROM TRAIN_MOVEMENTS WINDOW TUMBLING (SIZE 1 DAY) GROUP BY VARIATION_STATUS; +——————+——————-+————+—————+———+———-+ | Date | Variation | Arriva | East Mid | LNER | TPE | +——————+——————-+————+—————+———+———-+ | 2019-07-02 | OFF ROUTE | 46 | 78 | 20 | 167 | | 2019-07-02 | ON TIME | 19083 | 3568 | 1509 | 2916 | TOC_STATS | 2019-07-02 | LATE | 30850 | 7953 | 5420 | 9042 | | 2019-07-02 | EARLY | 11478 | 3518 | 1349 | 2096 | Aggregate | 2019-07-03 | OFF ROUTE | 79 | 25 | 41 | 213 | Aggregate | 2019-07-03 | ON TIME | 19512 | 4247 | 1915 | 2936 | | 2019-07-03 | LATE | 37357 | 8258 | 5342 | 11016 | | 2019-07-03 | EARLY | 11825 | 4574 | 1888 | 2094 | @rmoff | @confluentinc

Slide 60

Slide 60

{ Kafka -> S3 “connector.class”: “io.confluent.connect.s3.S3SinkConnector”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “tasks.max”: “1”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “s3.region”: “us-west-2”, “s3.bucket.name”: “rmoff-rail-streaming-demo”, “flush.size”: “65536”, “storage.class”: “io.confluent.connect.s3.storage.S3Storage”, “format.class”: “io.confluent.connect.s3.format.avro.AvroFormat”, “schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”, “schema.compatibility”: “NONE”, “partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”, “path.format”:“‘year’=YYYY/’month’=MM/’day’=dd”, “timestamp.extractor”:”Record”, “partition.duration.ms”: 1800000, “locale”:”en”, “timezone”: “UTC” } @rmoff | @confluentinc

Slide 61

Slide 61

Kafka -> S3 $ aws s3 ls s3://rmoff-rail-streaming-demo/topics/TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00/year=2019/month=07/day=07/ 2019-07-07 10:05:41 200995548 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0004963416.avro 2019-07-07 19:47:11 87631494 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0005007151.avro @rmoff | @confluentinc

Slide 62

Slide 62

Analysis with AWS Athena @rmoff | @confluentinc

Slide 63

Slide 63

Graph analysis { “connector.class”: “streams.kafka.connect.sink.Neo4jSinkConnector”, “topics”: “TRAIN_CANCELLATIONS_02”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “errors.tolerance”: “all”, “errors.deadletterqueue.topic.name”: “sink-neo4j-train-00_dlq”, “errors.deadletterqueue.topic.replication.factor”: 1, “errors.deadletterqueue.context.headers.enable”:true, “neo4j.server.uri”: “bolt://neo4j:7687”, “neo4j.authentication.basic.username”: “neo4j”, “neo4j.authentication.basic.password”: “connect”, “neo4j.topic.cypher.TRAIN_CANCELLATIONS_02”: “MERGE (train:train{id: event.TRAIN_ID}) MERGE (toc:toc{toc: event.TOC}) MERGE (canx_reason:canx_reason{reason: event.CANX_REASON}) MERGE (canx_loc:canx_loc{location: coalesce(event.CANCELLATION_LOCATION,’<unknown>’)}) MERGE (train)[:OPERATED_BY]->(toc) MERGE (canx_loc)<-[:CANCELLED_AT{reason:event.CANX_REASON, time:event.CANX_TIMESTAMP}]-(train)-[:CANCELLED_BECAUSE]->(canx_reason)” } @rmoff | @confluentinc

Slide 64

Slide 64

Graph relationships @rmoff | @confluentinc

Slide 65

Slide 65

Tell me when trains are delayed at my station @rmoff | @confluentinc

Slide 66

Slide 66

Event-driven Alerting - logic TRAIN_MOVEMENTS Time TRAIN_DELAYED_ALERT ALERT_CONFIG Key/Value state Time CREATE STREAM TRAINS_DELAYED_ALERT AS SELECT […] FROM TRAIN_MOVEMENTS T INNER JOIN ALERT_CONFIG A ON T.LOC_NLCDESC = A.STATION WHERE TIMETABLE_VARIATION > A.ALERT_OVER_MINS; @rmoff | @confluentinc

Slide 67

Slide 67

Sending messages to Telegram curl -X POST -H ‘Content-Type: application/json’ -d ‘{“chat_id”: “-364377679”, “text”: “This is a test from curl”}’ https://api.telegram.org/botxxxxx/sendMessage { “connector.class”: “io.confluent.connect.http.HttpSinkConnector”, “request.method”: “post”, “http.api.url”: “https://api.telegram.org/[…]/sendMessage”, “headers”: “Content-Type: application/json”, “topics”: “TRAINS_DELAYED_ALERT_TG”, “tasks.max”: “1”, “batch.prefix”: “{“chat_id”:”-364377679”,”parse_mode”: “markdown”,”, “batch.suffix”: “}”, “batch.max.size”: “1”, “regex.patterns”:”.\{MESSAGE=(.)\}.*”, “regex.replacements”: “”text”:”$1”“, “regex.separator”: “~”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } Kafka Kafka Connect HTTP Sink Telegram @rmoff | @confluentinc

Slide 68

Slide 68

Running it & monitoring & maintenance @rmoff | @confluentinc

Slide 69

Slide 69

System health & Throughput @rmoff | @confluentinc

Slide 70

Slide 70

Configuring and monitoring Kafka Connect @rmoff | @confluentinc

Slide 71

Slide 71

Data Flow @rmoff | @confluentinc

Slide 72

Slide 72

Consumer lag @rmoff | @confluentinc

Slide 73

Slide 73

@rmoff @confluentinc Restart failing connectors [email protected] > crontab -l “*/5 * * * * /home/rmoff/restart_failed_connector_tasks.sh [email protected] > cat /home/rmoff/restart_failed_connector_tasks.sh “#!/usr/bin/env bash # @rmoff / June 6, 2019

Restart any connector tasks that are FAILED curl -s “http:”//localhost:8083/connectors” | \ jq ‘.[]’ | \ xargs -I{connector_name} curl -s “http:”//localhost:8083/connectors/”{connector_name}”/status” | \ jq -c -M ‘[select(.tasks[].state”==”FAILED”) | .name,”§±§”,.tasks[].id]’ | \ grep -v “[]”| \ sed -e ‘s/^[“”//g’| sed -e ‘s/”,”§±§”,//tasks”//g’|sed -e ‘s/]$”//g’| \ xargs -I{connector_and_task} curl -X POST “http:”//localhost:8083/connectors/”{connector_and_task}”/ restart”

🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 74

Slide 74

Is everything running? @rmoff @confluentinc http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 75

Slide 75

@rmoff @confluentinc check_latest_timestamp.sh Start at latest Just read one message message kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -o-1 -c1 -C | \ jq ‘.header.msg_queue_timestamp’ | \ sed -e ‘s/”“//g’ | \ sed -e ‘s/000$”//g’ | \ Convert from epoch to humanxargs -Ifoo date “—[email protected] readable timestamp format $ ./check_latest_timestamp.sh Mon 25 Jan 2021 13:39:40 GMT 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data

Slide 76

Slide 76

Kafka Connect Kafka Kafka Connect ksqlDB ✅ Stream raw data ✅ Stream enriched data ✅ Historic data & data replay @rmoff | @confluentinc

Slide 77

Slide 77

ig nf Co Co nf ig SQL @rmoff | @confluentinc

Slide 78

Slide 78

RM OF F2 00 $200 USD off your bill each calendar month for the first three months when you sign up https://rmoff.dev/ccloud Free money! (additional $200 towards your bill 😄 ) Fully Managed Kafka as a Service * T&C: https://www.confluent.io/confluent-cloud-promo-disclaimer

Slide 79

Slide 79

Learn Kafka. Start building with Apache Kafka at Confluent Developer. developer.confluent.io

Slide 80

Slide 80

#EOF