🚂 On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data @rmoff @oredev
A presentation at Øredev 2019 in November 2019 in Malmö, Sweden by Robin Moffatt
🚂 On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data @rmoff @oredev
h/t @kennybastani / @gamussa
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https:!//wiki.openraildata.com 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Why do trains get cancelled? @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Do cancellation reasons vary by Train Operator? 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff @oredev SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Code! http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Getting the data in 📥 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Data Sources Train Movements Continuous feed Schedules Updated Daily Reference data Static 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Streaming Integration with Kafka Connect @rmoff @oredev Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
kafkacat @rmoff @oredev • Kafka CLI producer & consumer • Also metadata inspection • Integrates beautifully with unix pipes! curl -s “https:”//my.api.endpoint” | \ kafkacat -b localhost:9092 -t topic -P • Get it! • https:!//github.com/edenhill/kafkacat/ • apt-get install kafkacat • brew install kafkacat • https:!//hub.docker.com/r/confluentinc/cp-kafkacat/ https://rmoff.net/2018/05/10/quick-n-easy-population-of-realistic-test-data-into-kafka/ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https://wiki.openraildata.com/index.php?title=Train_Movements Movement data A train’s movements Arrived / departed Where What time Variation from timetable A train was cancelled Where When Why A train was ‘activated’ Links a train to a schedule 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Ingest ActiveMQ with Kafka Connect “connector.class” : “activemq.url” : “jms.destination.type”: “jms.destination.name”: “kafka.topic” : @rmoff @oredev “io.confluent.connect.activemq.ActiveMQSourceConnector”, “tcp:”//datafeeds.networkrail.co.uk:61619”, “topic”, “TRAIN_MVT_EA_TOC”, “networkrail_TRAIN_MVT”, Kafka Movement events Kafka Connect Northern Trains GNER* TransPennine networkrail_TRAIN_MVT
@rmoff @oredev Envelope Payload 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Message transformation Extract element Convert JSON @rmoff @oredev Explode Array kafkacat -b localhost:9092 -G tm_explode networkrail_TRAIN_MVT -u | \ jq -c ‘.text|fromjson[]’ | \ kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -T -P 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
https://wiki.openraildata.com/index.php?title=SCHEDULE Schedule data @rmoff @oredev Train routes Type of train (diesel/electric) Classes, sleeping accomodation, reservations, etc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Load JSON from S3 into Kafka @rmoff @oredev curl -s -L -u “$USER:$PW” “$FEED_URL” | \ gunzip | \ kafkacat -b localhost -P -t CIF_FULL_DAILY 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https://wiki.openraildata.com/index.php?title=SCHEDULE Reference data Train company names Location codes Train type codes Location geocodes 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Geocoding @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Static reference data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev AA:{“canx_reason_code”:”AA”,”canx_reason”:”Waiting acceptance into off Network Terminal or Yard”} AC:{“canx_reason_code”:”AC”,”canx_reason”:”Waiting train preparation or completion of TOPS list/RT3973”} AE:{“canx_reason_code”:”AE”,”canx_reason”:”Congestion in off Network Terminal or Yard”} AG:{“canx_reason_code”:”AG”,”canx_reason”:”Wagon load incident including adjusting loads or open door”} AJ:{“canx_reason_code”:”AJ”,”canx_reason”:”Waiting Customer’s traffic including documentation”} DA:{“canx_reason_code”:”DA”,”canx_reason”:”Non Technical Fleet Holding Code”} DB:{“canx_reason_code”:”DB”,”canx_reason”:”Train Operations Holding Code”} DC:{“canx_reason_code”:”DC”,”canx_reason”:”Train Crew Causes Holding Code”} DD:{“canx_reason_code”:”DD”,”canx_reason”:”Technical Fleet Holding Code”} DE:{“canx_reason_code”:”DE”,”canx_reason”:”Station Causes Holding Code”} DF:{“canx_reason_code”:”DF”,”canx_reason”:”External Causes Holding Code”} DG:{“canx_reason_code”:”DG”,”canx_reason”:”Freight Terminal and or Yard Holding Code”} DH:{“canx_reason_code”:”DH”,”canx_reason”:”Adhesion and or Autumn Holding Code”} FA:{“canx_reason_code”:”FA”,”canx_reason”:”Dangerous goods incident”} FC:{“canx_reason_code”:”FC”,”canx_reason”:”Freight train driver”} kafkacat -l canx_reason_code.dat -b localhost:9092 -t canx_reason_code -P -K: 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Transforming the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Movement Activation Schedule Location 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Streams of events Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff @oredev Stream: widgets Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff @oredev Stream: widgets CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event data @rmoff @oredev ActiveMQ Kafka Connect NETWORKRAIL_TRAIN_MVT_X 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Message routing with KSQL TRAIN_ACTIVATIONS Time NETWORKRAIL_TRAIN_MVT_X ’ 1 0 00 ’ = E YP T _ G MS TRAIN_CANCELLATIONS Time MSG _TY Time MSG_TYPE=’0002’ PE= ‘00 03’ TRAIN_MOVEMENTS Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Split a stream of data (message routing) CREATE STREAM TRAIN_MOVEMENTS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X s t n e v e t n e m e ’ v 3 o 0 0 0 M ’ = E P Y T _ G MS WHERE header!->msg_type = ‘0003’; NETWORKRAIL_TRAIN_MVT_X Source topic CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X WHERE header!->msg_type = ‘0002’; Can cella t MSG _TY ion e v e PE= n t ‘00 s 0 2’ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Joining events to lookup data (stream-table joins) TS 10:00:01 LOC_ID 42 TS 10:00:02 TRAIN_MOVEMENTS (Stream) LOCATION (Table) ID DESCRIPTION 42 MENSTON 43 Ilkley LOC_ID 43 TS 10:00:01 LOC_ID 42 LOC_DESC Menston TS 10:00:02 LOC_ID 43 CREATE STREAM TRAIN_MOVEMENTS_ENRICHED AS SELECT TM.LOC_ID AS LOC_ID, TM.TS AS TS, L.DESCRIPTION AS LOC_DESC, […] FROM TRAIN_MOVEMENTS TM TRAIN_MOVEMENTS_ENRICHED LEFT JOIN LOCATION L (Stream) ON TM.LOC_ID = L.ID; LOC_DESC Ilkley 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Decode values, and generate composite key @rmoff @oredev +———————-+————————+————-+——————————-+————————+————————————+ | CIF_train_uid | sched_start_dt | stp_ind | SCHEDULE_KEY | CIF_power_type | POWER_TYPE | +———————-+————————+————-+——————————-+————————+————————————+ | Y82535 | 2019-05-20 | P | Y82535/2019-05-20/P | E | Electric | | Y82537 | 2019-05-20 | P | Y82537/2019-05-20/P | DMU | Other | | Y82542 | 2019-05-20 | P | Y82542/2019-05-20/P | D | Diesel | | Y82535 | 2019-05-24 | P | Y82535/2019-05-24/P | EMU | Electric Multiple Unit | | Y82542 | 2019-05-24 | P | Y82542/2019-05-24/P | HST | High Speed Train | CASE SELECT JsonScheduleV1”->CIF_train_uid + ‘/’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘D’
Schemas @rmoff @oredev CREATE STREAM SCHEDULE_RAW ( TiplocV1 STRUCT<transaction_type tiploc_code NALCO STANOX crs_code description tps_description WITH (KAFKA_TOPIC=’CIF_FULL_DAILY’, VALUE_FORMAT=’JSON’); VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR>) 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Handling JSON data CREATE STREAM NETWORKRAIL_TRAIN_MVT_X ( header STRUCT< msg_type VARCHAR, […] >, body VARCHAR) WITH (KAFKA_TOPIC=’networkrail_TRAIN_MVT_X’, VALUE_FORMAT=’JSON’); CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT HEADER, EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev The Confluent Schema Registry Avro Schema Schema Registry Target Source KSQL Avro Message Avro Message Kafka Connect 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Reserialise data CREATE STREAM TRAIN_CANCELLATIONS_00 WITH (VALUE_FORMAT=’AVRO’) AS SELECT * FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Flatten and re-key a stream { } “TiplocV1”: { “transaction_type”: “Create”, “tiploc_code”: “ABWD”, “nalco”: “513100”, “stanox”: “88601”, “crs_code”: “ABW”, “description”: “ABBEY WOOD”, “tps_description”: “ABBEY WOOD” } Access nested CREATE STREAM TIPLOC_FLAT_KEYED SELECT TiplocV1”->TRANSACTION_TYPE TiplocV1”->TIPLOC_CODE TiplocV1”->NALCO TiplocV1”->STANOX TiplocV1”->CRS_CODE TiplocV1”->DESCRIPTION TiplocV1”->TPS_DESCRIPTION FROM SCHEDULE_RAW PARTITION BY TIPLOC_CODE; element AS AS AS AS AS AS AS TRANSACTION_TYPE , TIPLOC_CODE , NALCO , STANOX , CRS_CODE , DESCRIPTION , TPS_DESCRIPTION y e e g k n g a n h i C n ksql> DESCRIBE TIPLOC_FLAT_KEYED; io t i t r a p Name : TIPLOC_FLAT_KEYED Field | Type ——————————————————————-TRANSACTION_TYPE | VARCHAR(STRING) TIPLOC_CODE | VARCHAR(STRING) NALCO | VARCHAR(STRING) STANOX | VARCHAR(STRING) CRS_CODE | VARCHAR(STRING) DESCRIPTION | VARCHAR(STRING) TPS_DESCRIPTION | VARCHAR(STRING) ——————————————————————— 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Using the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Streaming Integration with Kafka Connect @rmoff @oredev Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> Elasticsearch { “connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “connection.url”: “http:”//elasticsearch:9200”, “type.name”: “type.name=kafkaconnect”, “key.ignore”: “false”, “schema.ignore”: “true”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kibana for real-time visualisation of rail data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kibana for real-time analysis of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> Postgres { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter”, “connection.url”: “jdbc:postgresql:”//postgres:5432/”, “connection.user”: “postgres”, “connection.password”: “postgres”, “auto.create”: true, “auto.evolve”: true, “insert.mode”: “upsert”, “pk.mode”: “record_key”, “pk.fields”: “MESSAGE_KEY”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “transforms”: “dropArrays”, “transforms.dropArrays.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.dropArrays.blacklist”: “SCHEDULE_SEGMENT_LOCATION, HEADER” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka -> Postgres @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
But do you even need a database? @rmoff @oredev TRAIN_MOVEMENTS Time SELECT TIMESTAMPTOSTRING(WINDOWSTART(),’yyyy-MM-dd’) AS WINDOW_START_TS, SUM(CASE WHEN TOC = ‘Arriva Trains Northern’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘East Midlands Trains’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘London North Eastern Railway’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘TransPennine Express’ THEN 1 ELSE 0 FROM TRAIN_MOVEMENTS WINDOW TUMBLING (SIZE 1 DAY) GROUP BY VARIATION_STATUS; TOC_STATS Aggregate Time Aggregate VARIATION_STATUS, END) AS Arriva_CT, END) AS EastMidlands_CT, END) AS LNER_CT, END) AS TransPennineExpress_CT +——————+——————-+————+—————+———+———-+ | Date | Variation | Arriva | East Mid | LNER | TPE | +——————+——————-+————+—————+———+———-+ | 2019-07-02 | OFF ROUTE | 46 | 78 | 20 | 167 | | 2019-07-02 | ON TIME | 19083 | 3568 | 1509 | 2916 | | 2019-07-02 | LATE | 30850 | 7953 | 5420 | 9042 | | 2019-07-02 | EARLY | 11478 | 3518 | 1349 | 2096 | | 2019-07-03 | OFF ROUTE | 79 | 25 | 41 | 213 | | 2019-07-03 | ON TIME | 19512 | 4247 | 1915 | 2936 | | 2019-07-03 | LATE | 37357 | 8258 | 5342 | 11016 | | 2019-07-03 | EARLY | 11825 | 4574 | 1888 | 2094 | 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
{ @rmoff @oredev Kafka -> S3 “connector.class”: “io.confluent.connect.s3.S3SinkConnector”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “tasks.max”: “1”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “s3.region”: “us-west-2”, “s3.bucket.name”: “rmoff-rail-streaming-demo”, “flush.size”: “65536”, “storage.class”: “io.confluent.connect.s3.storage.S3Storage”, “format.class”: “io.confluent.connect.s3.format.avro.AvroFormat”, “schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”, “schema.compatibility”: “NONE”, “partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”, “path.format”:“‘year’=YYYY/’month’=MM/’day’=dd”, “timestamp.extractor”:”Record”, “partition.duration.ms”: 1800000, “locale”:”en”, “timezone”: “UTC” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> S3 $ aws s3 ls s3:!//rmoff-rail-streaming-demo/topics/TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00/year=2019/month=07/day=07/ 2019-07-07 10:05:41 2019-07-07 19:47:11 200995548 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0004963416.avro 87631494 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0005007151.avro 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Infering table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Inferring table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Analysis with AWS Athena @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Graph analysis { “connector.class”: “streams.kafka.connect.sink.Neo4jSinkConnector”, “topics”: “TRAIN_CANCELLATIONS_02”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “errors.tolerance”: “all”, “errors.deadletterqueue.topic.name”: “sink-neo4j-train-00_dlq”, “errors.deadletterqueue.topic.replication.factor”: 1, “errors.deadletterqueue.context.headers.enable”:true, “neo4j.server.uri”: “bolt:”//neo4j:7687”, “neo4j.authentication.basic.username”: “neo4j”, “neo4j.authentication.basic.password”: “connect”, “neo4j.topic.cypher.TRAIN_CANCELLATIONS_02”: “MERGE (train:train{id: event.TRAIN_ID}) MERGE (toc:toc{toc: event.TOC}) MERGE (canx_reason:canx_reason{reason: event.CANX_REASON}) MERGE (canx_loc:canx_loc{location: coalesce(event.CANCELLATION_LOCATION,’<unknown>’)}) MERGE (train)[:OPERATED_BY]!->(toc) MERGE (canx_loc)!<-[:CANCELLED_AT{reason:event.CANX_REASON, time:event.CANX_TIMESTAMP}]-(train)-[:CANCELLED_BECAUSE]!->(canx_reason)” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Graph relationships @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Event-driven Alerting - logic TRAIN_MOVEMENTS Time ALERT_CONFIG Key/Value state CREATE STREAM TRAINS_DELAYED_ALERT AS SELECT […] FROM TRAIN_MOVEMENTS T TRAIN_DELAYED_ALERT Time INNER JOIN ALERT_CONFIG A ON T.LOC_NLCDESC = A.STATION WHERE TIMETABLE_VARIATION > A.ALERT_OVER_MINS; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Sending messages to Telegram curl -X POST -H ‘Content-Type: application/json’ -d ‘{“chat_id”: “-364377679”, “text”: “This is a test from curl”}’ https:”//api.telegram.org/botxxxxx/sendMessage { } “connector.class”: “io.confluent.connect.http.HttpSinkConnector”, “request.method”: “post”, “http.api.url”: “https:!//api.telegram.org/[…]/sendMessage”, “headers”: “Content-Type: application/json”, “topics”: “TRAINS_DELAYED_ALERT_TG”, “tasks.max”: “1”, “batch.prefix”: “{“chat_id”:”-364377679”,”parse_mode”: “markdown”,”, “batch.suffix”: “}”, “batch.max.size”: “1”, “regex.patterns”:”.\{MESSAGE=(.)\}.*”, “regex.replacements”: “”text”:”$1”“, “regex.separator”: “~”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” Kafka Kafka Connect HTTP Sink Telegram 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event-driven Alerting - config @rmoff @oredev Compacted topic $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”45”} EOF CREATE TABLE ALERT_CONFIG (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’, KEY=’STATION’); CREATE STREAM ALERT_CONFIG_STREAM (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’); ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 45 $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”20”} EOF ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; LEEDS | 20 ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 20 *
@rmoff @oredev Running it & monitoring & maintenance 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Consumer lag @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
System health & Throughput @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Restart failing connectors rmoff@proxmox01 > crontab -l “*/5 * * * * /home/rmoff/restart_failed_connector_tasks.sh rmoff@proxmox01 > cat /home/rmoff/restart_failed_connector_tasks.sh “#!/usr/bin/env bash # @rmoff / June 6, 2019
🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Is everything running? @rmoff @oredev http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev check_latest_timestamp.sh Start at latest Just read one message message kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -o-1 -c1 -C | \ jq ‘.header.msg_queue_timestamp’ | \ sed -e ‘s/”“//g’ | \ sed -e ‘s/000$”//g’ | \ Convert from epoch to humanxargs -Ifoo date “—date=@foo readable timestamp format $ ./check_latest_timestamp.sh Fri Jul 5 16:28:09 BST 2019 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka Connect Kafka Kafka Connect @rmoff @oredev KSQL ✅ Stream raw data ✅ Stream enriched data ✅ Historic data & data replay 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff @oredev SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Fully Managed Kafka as a Service
Free Books! @rmoff @oredev http://cnfl.io/book-bundle 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net