A presentation at Stuttgart Data Engineers Meetup in in Stuttgart, Germany by Robin Moffatt
On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data 🚂 @rmoff @confluentinc #KafkaMeetup
h/t @kennybastani / @gamussa
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup https:!//wiki.openraildata.com 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Why do trains get cancelled? @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Do cancellation reasons vary by Train Operator? 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Graph relationships @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Graph relationships @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff #KafkaMeetup SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Code! @rmoff #KafkaMeetup http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Getting the data in 📥 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Data Sources Train Movements Continuous feed Schedules Updated Daily Reference data Static 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup kafkacat • Kafka CLI producer & consumer • Also metadata inspection • Integrates beautifully with unix pipes! curl -s “https:”//api.mockaroo.com/api/d5a195e0?count=2&key=ff7856d0”| \ kafkacat -b localhost:9092 -t purchases -P • Get it! • https:!//github.com/edenhill/kafkacat/ • apt-get install kafkacat • brew install kafkacat • https:!//hub.docker.com/r/confluentinc/cp-kafkacat/ https://rmoff.net/2018/05/10/quick-n-easy-population-of-realistic-test-data-into-kafka/ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
https://wiki.openraildata.com/index.php?title=Train_Movements Movement data @rmoff #KafkaMeetup A train’s movements Arrived / departed Where What time Variation from timetable A train was cancelled Where When Why A train was ‘activated’ Links a train to a schedule 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Ingest ActiveMQ with Kafka Connect @rmoff #KafkaMeetup “connector.class”: “io.confluent.connect.activemq.ActiveMQSourceConnector”, “activemq.url”: “tcp:”//datafeeds.networkrail.co.uk:61619”, “jms.destination.type”: “topic”, “jms.destination.name”: “TRAIN_MVT_EA_TOC”, “kafka.topic”: “networkrail_TRAIN_MVT”, 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Message transformation Extract element Convert JSON @rmoff #KafkaMeetup Explode Array kafkacat -b localhost:9092 -G tm_explode networkrail_TRAIN_MVT -u | \ jq -c ‘.text|fromjson[]’ | \ kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -T -P 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
https://wiki.openraildata.com/index.php?title=SCHEDULE Schedule data @rmoff #KafkaMeetup Train routes Type of train (diesel/electric) Classes, sleeping accomodation, reservations, etc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Load JSON from S3 into Kafka @rmoff #KafkaMeetup curl -s -L -u “$USER:$PW” “$FEED_URL” | \ gunzip | \ kafkacat -b localhost -P -t CIF_FULL_DAILY 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
https://wiki.openraildata.com/index.php?title=SCHEDULE Reference data @rmoff #KafkaMeetup Train company names Location codes Train type codes 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Static reference data @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup AA:{“canx_reason_code”:”AA”,”canx_reason”:”Waiting acceptance into off Network Terminal or Yard”} AC:{“canx_reason_code”:”AC”,”canx_reason”:”Waiting train preparation or completion of TOPS list/RT3973”} AE:{“canx_reason_code”:”AE”,”canx_reason”:”Congestion in off Network Terminal or Yard”} AG:{“canx_reason_code”:”AG”,”canx_reason”:”Wagon load incident including adjusting loads or open door”} AJ:{“canx_reason_code”:”AJ”,”canx_reason”:”Waiting Customer’s traffic including documentation”} DA:{“canx_reason_code”:”DA”,”canx_reason”:”Non Technical Fleet Holding Code”} DB:{“canx_reason_code”:”DB”,”canx_reason”:”Train Operations Holding Code”} DC:{“canx_reason_code”:”DC”,”canx_reason”:”Train Crew Causes Holding Code”} DD:{“canx_reason_code”:”DD”,”canx_reason”:”Technical Fleet Holding Code”} DE:{“canx_reason_code”:”DE”,”canx_reason”:”Station Causes Holding Code”} DF:{“canx_reason_code”:”DF”,”canx_reason”:”External Causes Holding Code”} DG:{“canx_reason_code”:”DG”,”canx_reason”:”Freight Terminal and or Yard Holding Code”} DH:{“canx_reason_code”:”DH”,”canx_reason”:”Adhesion and or Autumn Holding Code”} FA:{“canx_reason_code”:”FA”,”canx_reason”:”Dangerous goods incident”} FC:{“canx_reason_code”:”FC”,”canx_reason”:”Freight train driver”} kafkacat -l canx_reason_code.dat -b localhost:9092 -t canx_reason_code -P -K: 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Transforming the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Location reference Train movements source Schedule Activations Train movement Cancellations 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Streams of events Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff #KafkaMeetup Stream: widgets Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff #KafkaMeetup Stream: widgets CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event data @rmoff #KafkaMeetup NETWORKRAIL_TRAIN_MVT_X 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Message routing with KSQL TRAIN_ACTIVATIONS Time NETWORKRAIL_TRAIN_MVT_X ’ 1 0 00 ’ = E YP T _ G MS TRAIN_CANCELLATIONS Time MSG _TY Time MSG_TYPE=’0002’ PE= ‘00 03’ TRAIN_MOVEMENTS Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Split a stream of data (message routing) @rmoff #KafkaMeetup CREATE STREAM TRAIN_MOVEMENTS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X WHERE header!->msg_type = ‘0003’; CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X WHERE header!->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Joining events to lookup data (stream-table joins) TRAIN_MOVEMENTS (Stream) LOCATION (Table) CREATE STREAM TRAIN_MOVEMENTS_ENRICHED AS SELECT TM.LOC_ID AS LOC_ID, TM.TS AS TS, L.DESCRIPTION AS LOC_DESC, […] FROM TRAIN_MOVEMENTS TM TRAIN_MOVEMENTS_ENRICHED LEFT JOIN LOCATION L (Stream) ON TM.LOC_ID = L.ID; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Decode values, and generate composite key CREATE STREAM SCHEDULE_00 AS Concatenate SELECT JsonScheduleV1”->CIF_train_uid + ‘/’ columns
@rmoff #KafkaMeetup Decode values, and generate composite key +———————-+————————+————-+——————————-+————————+————————————+ | CIF_train_uid | sched_start_dt | stp_ind | SCHEDULE_KEY | CIF_power_type | POWER_TYPE | +———————-+————————+————-+——————————-+————————+————————————+ | Y82535 | 2019-05-20 | P | Y82535/2019-05-20/P | E | Electric | | Y82537 | 2019-05-20 | P | Y82537/2019-05-20/P | DMU | Other | | Y82542 | 2019-05-20 | P | Y82542/2019-05-20/P | D | Diesel | | Y82535 | 2019-05-24 | P | Y82535/2019-05-24/P | EMU | Electric Multiple Unit | | Y82542 | 2019-05-24 | P | Y82542/2019-05-24/P | HST | High Speed Train | CASE SELECT JsonScheduleV1”->CIF_train_uid + ‘/’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘D’
Enrich columns @rmoff #KafkaMeetup CREATE STREAM TRAIN_MOVEMENTS_01 SELECT TM.EVENT_TYPE AS EVENT_TYPE, CAST(TM.TIMETABLE_VARIATION AS INT) AS TIMETABLE_VARIATION, CASE WHEN LEN(TM.PLATFORM)> 0 THEN ‘Platform’ + TM.PLATFORM ELSE ” END AS PLATFORM, CASE WHEN TM.VARIATION_STATUS = ‘ON TIME’ THEN ‘ON TIME’ WHEN TM.VARIATION_STATUS = ‘LATE’ THEN TM.TIMETABLE_VARIATION + ’ MINS LATE’ 🚂 On Track WHEN TM.VARIATION_STATUS=’EARLY’ THEN TM.TIMETABLE_VARIATION + with Apache Kafka: Building a Streaming ETL solution with Rail Data
Schemas @rmoff #KafkaMeetup NETWORKRAIL_TRAIN_MVT_X 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Handling JSON data & mixed schemas @rmoff #KafkaMeetup CREATE STREAM NETWORKRAIL_TRAIN_MVT_X ( header STRUCT< msg_type VARCHAR, […] >, body VARCHAR) WITH (KAFKA_TOPIC=’networkrail_TRAIN_MVT_X’, VALUE_FORMAT=’JSON’); CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT HEADER, EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup The Confluent Schema Registry Avro Schema Schema Registry Target Source KSQL Avro Message Avro Message Kafka Connect 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Reserialise data CREATE STREAM TRAIN_CANCELLATIONS_00 WITH (VALUE_FORMAT=’AVRO’) AS SELECT EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Flatten and re-key a stream { } “TiplocV1”: { “transaction_type”: “Create”, “tiploc_code”: “ABWD”, “nalco”: “513100”, “stanox”: “88601”, “crs_code”: “ABW”, “description”: “ABBEY WOOD”, “tps_description”: “ABBEY WOOD” } Access nested CREATE STREAM TIPLOC_FLAT_KEYED SELECT TiplocV1”->TRANSACTION_TYPE TiplocV1”->TIPLOC_CODE TiplocV1”->NALCO TiplocV1”->STANOX TiplocV1”->CRS_CODE TiplocV1”->DESCRIPTION TiplocV1”->TPS_DESCRIPTION FROM SCHEDULE_RAW PARTITION BY TIPLOC_CODE; element AS AS AS AS AS AS AS TRANSACTION_TYPE , TIPLOC_CODE , NALCO , STANOX , CRS_CODE , DESCRIPTION , TPS_DESCRIPTION y e e g k n g a n h i C n ksql> DESCRIBE TIPLOC_FLAT_KEYED; io t i t r a p Name : TIPLOC_FLAT_KEYED Field | Type ——————————————————————-TRANSACTION_TYPE | VARCHAR(STRING) TIPLOC_CODE | VARCHAR(STRING) NALCO | VARCHAR(STRING) STANOX | VARCHAR(STRING) CRS_CODE | VARCHAR(STRING) DESCRIPTION | VARCHAR(STRING) TPS_DESCRIPTION | VARCHAR(STRING) ——————————————————————— 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Using the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Streaming Integration with Kafka Connect Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Kafka -> Elasticsearch { “connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “connection.url”: “http:”//elasticsearch:9200”, “type.name”: “type.name=kafkaconnect”, “key.ignore”: “false”, “schema.ignore”: “true”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Kibana for real-time visualisation of rail data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Kafka -> Postgres { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter”, “connection.url”: “jdbc:postgresql:”//postgres:5432/”, “connection.user”: “postgres”, “connection.password”: “postgres”, “auto.create”: true, “auto.evolve”: true, “insert.mode”: “upsert”, “pk.mode”: “record_key”, “pk.fields”: “MESSAGE_KEY”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “transforms”: “dropArrays”, “transforms.dropArrays.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.dropArrays.blacklist”: “SCHEDULE_SEGMENT_LOCATION, HEADER” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka -> Postgres @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
But do you even need a database? @rmoff #KafkaMeetup TRAIN_MOVEMENTS Time SELECT TIMESTAMPTOSTRING(WINDOWSTART(),’yyyy-MM-dd’) AS WINDOW_START_TS, SUM(CASE WHEN TOC = ‘Arriva Trains Northern’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘East Midlands Trains’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘London North Eastern Railway’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘TransPennine Express’ THEN 1 ELSE 0 FROM TRAIN_MOVEMENTS WINDOW TUMBLING (SIZE 1 DAY) GROUP BY VARIATION_STATUS; TOC_STATS Aggregate Time Aggregate VARIATION_STATUS, END) AS Arriva_CT END) AS EastMidlands_CT END) AS LNER_CT END) AS TransPennineExpress_CT +——————+——————-+————+—————+———+———-+ | Date | Variation | Arriva | East Mid | LNER | TPE | +——————+——————-+————+—————+———+———-+ | 2019-07-02 | OFF ROUTE | 46 | 78 | 20 | 167 | | 2019-07-02 | ON TIME | 19083 | 3568 | 1509 | 2916 | | 2019-07-02 | LATE | 30850 | 7953 | 5420 | 9042 | | 2019-07-02 | EARLY | 11478 | 3518 | 1349 | 2096 | | 2019-07-03 | OFF ROUTE | 79 | 25 | 41 | 213 | | 2019-07-03 | ON TIME | 19512 | 4247 | 1915 | 2936 | | 2019-07-03 | LATE | 37357 | 8258 | 5342 | 11016 | | 2019-07-03 | EARLY | 11825 | 4574 | 1888 | 2094 | 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
{ @rmoff #KafkaMeetup Kafka -> S3 “connector.class”: “io.confluent.connect.s3.S3SinkConnector”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “tasks.max”: “1”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “s3.region”: “us-west-2”, “s3.bucket.name”: “rmoff-rail-streaming-demo”, “flush.size”: “65536”, “storage.class”: “io.confluent.connect.s3.storage.S3Storage”, “format.class”: “io.confluent.connect.s3.format.avro.AvroFormat”, “schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”, “schema.compatibility”: “NONE”, “partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”, “path.format”:“‘year’=YYYY/’month’=MM/’day’=dd”, “timestamp.extractor”:”Record”, “partition.duration.ms”: 1800000, “locale”:”en”, “timezone”: “UTC” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Kafka -> S3 $ aws s3 ls s3:!//rmoff-rail-streaming-demo/topics/TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00/year=2019/month=07/day=07/ 2019-07-07 10:05:41 2019-07-07 19:47:11 200995548 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0004963416.avro 87631494 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0005007151.avro 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Infering table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Infering table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Analysis with AWS Athena @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Graph analysis { “connector.class”: “streams.kafka.connect.sink.Neo4jSinkConnector”, “topics”: “TRAIN_CANCELLATIONS_02”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “errors.tolerance”: “all”, “errors.deadletterqueue.topic.name”: “sink-neo4j-train-00_dlq”, “errors.deadletterqueue.topic.replication.factor”: 1, “errors.deadletterqueue.context.headers.enable”:true, “neo4j.server.uri”: “bolt:”//neo4j:7687”, “neo4j.authentication.basic.username”: “neo4j”, “neo4j.authentication.basic.password”: “connect”, “neo4j.topic.cypher.TRAIN_CANCELLATIONS_02”: “MERGE (train:train{id: event.TRAIN_ID}) MERGE (toc:toc{toc: event.TOC}) MERGE (canx_reason:canx_reason{reason: event.CANX_REASON}) MERGE (canx_loc:canx_loc{location: coalesce(event.CANCELLATION_LOCATION,’<unknown>’)}) MERGE (train)[:OPERATED_BY]!->(toc) MERGE (canx_loc)!<-[:CANCELLED_AT{reason:event.CANX_REASON, time:event.CANX_TIMESTAMP}]-(train)-[:CANCELLED_BECAUSE]!->(canx_reason)” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Graph relationships @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event-driven Alerting - config @rmoff #KafkaMeetup Compacted topic $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”45”} EOF CREATE TABLE ALERT_CONFIG (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’, KEY=’STATION’); CREATE STREAM ALERT_CONFIG_STREAM (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’); ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 45 $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”20”} EOF ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 20 LEEDS | 45 LEEDS | 20 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event-driven Alerting - logic TRAIN_MOVEMENTS @rmoff #KafkaMeetup Time ALERT_CONFIG Key/Value state CREATE STREAM TRAINS_DELAYED_ALERT AS SELECT […] FROM TRAIN_MOVEMENTS T TRAIN_DELAYED_ALERT Time INNER JOIN ALERT_CONFIG A ON T.LOC_NLCDESC = A.STATION WHERE TIMETABLE_VARIATION > A.ALERT_OVER_MINS; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Sending messages to Telegram @rmoff #KafkaMeetup curl -X POST -H ‘Content-Type: application/json’ -d ‘{“chat_id”: “-364377679”, “text”: “This is a test from curl”}’ https:”//api.telegram.org/botxxxxx/sendMessage { } “connector.class”: “io.confluent.connect.http.HttpSinkConnector”, “request.method”: “post”, “http.api.url”: “https:!//api.telegram.org/[…]/sendMessage”, “headers”: “Content-Type: application/json”, “topics”: “TRAINS_DELAYED_ALERT_TG”, “tasks.max”: “1”, “batch.prefix”: “{“chat_id”:”-364377679”,”parse_mode”: “markdown”,”, “batch.suffix”: “}”, “batch.max.size”: “1”, “regex.patterns”:”.\{MESSAGE=(.)\}.*”, “regex.replacements”: “”text”:”$1”“, “regex.separator”: “~”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Running it & monitoring & maintenance 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Consumer lag @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
System health & Throughput @rmoff #KafkaMeetup 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup Restart failing connectors rmoff@proxmox01 > crontab -l “*/5 * * * * /home/rmoff/restart_failed_connector_tasks.sh rmoff@proxmox01 > cat /home/rmoff/restart_failed_connector_tasks.sh “#!/usr/bin/env bash # @rmoff / June 6, 2019
🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Is everything running? @rmoff #KafkaMeetup http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup check_latest_timestamp.sh Start at latest Just read one message message kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -o-1 -c1 -C | \ jq ‘.header.msg_queue_timestamp’ | \ sed -e ‘s/”“//g’ | \ sed -e ‘s/000$”//g’ | \ Convert from epoch to humanxargs -Ifoo date “—date=@foo readable timestamp format $ ./check_latest_timestamp.sh Fri Jul 5 16:28:09 BST 2019 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
$ cat TODO.txt @rmoff #KafkaMeetup • Automate ingest & monitoring • currently cron, replace with Apache Airflow? • Ad-hoc visual analysis • Superset? Google Data Studio? AWS Quicksight? • Handle schedule nested array to be able to show for each train movement its planned route • Proper partitioning of topics (current partitions=1 🙄) 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka Connect @rmoff #KafkaMeetup Kafka Kafka Connect KSQL ✅ Stream raw data ✅ Stream enriched data ✅ Historic data & data replay 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff #KafkaMeetup SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
CONFLUENT COMMUNITY DISCOUNT CODE KS19Meetup. 25% OFF* *Standard Priced Conference pass
Free Books! @rmoff #KafkaMeetup http://cnfl.io/book-bundle 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Code! @rmoff #KafkaMeetup http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff #KafkaMeetup #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net
As data engineers, we frequently need to build scalable systems working with data from a variety of sources and with various ingest rates, sizes, and formats. This talk takes an in-depth look at how Apache Kafka can be used to provide a common platform on which to build data infrastructure driving both real-time analytics as well as event-driven applications. Using a public feed of railway data it will show how to ingest data from message queues such as ActiveMQ with Kafka Connect, as well as from static sources such as S3 and REST endpoints. We’ll then see how to wrangle the data into a form useful for streaming to analytics in tools such as Elasticsearch and Neo4j. If you’re wondering how to build your next scalable data platform, how to reconcile the impedance mismatch between stream and batch, and how to wrangle streams of data—this talk is for you!
The following resources were mentioned during the presentation or are useful additional information.