A presentation at Øredev 2019 in in Malmö, Sweden by Robin Moffatt
🚂 On Track with Apache Kafka®: Building a Streaming ETL solution with Rail Data @rmoff @oredev
h/t @kennybastani / @gamussa
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https:!//wiki.openraildata.com 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Real-time visualisation of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Why do trains get cancelled? @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Do cancellation reasons vary by Train Operator? 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff @oredev SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Code! http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Getting the data in 📥 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Data Sources Train Movements Continuous feed Schedules Updated Daily Reference data Static 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Streaming Integration with Kafka Connect @rmoff @oredev Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
kafkacat @rmoff @oredev • Kafka CLI producer & consumer • Also metadata inspection • Integrates beautifully with unix pipes! curl -s “https:”//my.api.endpoint” | \ kafkacat -b localhost:9092 -t topic -P • Get it! • https:!//github.com/edenhill/kafkacat/ • apt-get install kafkacat • brew install kafkacat • https:!//hub.docker.com/r/confluentinc/cp-kafkacat/ https://rmoff.net/2018/05/10/quick-n-easy-population-of-realistic-test-data-into-kafka/ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https://wiki.openraildata.com/index.php?title=Train_Movements Movement data A train’s movements Arrived / departed Where What time Variation from timetable A train was cancelled Where When Why A train was ‘activated’ Links a train to a schedule 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Ingest ActiveMQ with Kafka Connect “connector.class” : “activemq.url” : “jms.destination.type”: “jms.destination.name”: “kafka.topic” : @rmoff @oredev “io.confluent.connect.activemq.ActiveMQSourceConnector”, “tcp:”//datafeeds.networkrail.co.uk:61619”, “topic”, “TRAIN_MVT_EA_TOC”, “networkrail_TRAIN_MVT”, Kafka Movement events Kafka Connect Northern Trains GNER* TransPennine networkrail_TRAIN_MVT
@rmoff @oredev Envelope Payload 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Message transformation Extract element Convert JSON @rmoff @oredev Explode Array kafkacat -b localhost:9092 -G tm_explode networkrail_TRAIN_MVT -u | \ jq -c ‘.text|fromjson[]’ | \ kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -T -P 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
https://wiki.openraildata.com/index.php?title=SCHEDULE Schedule data @rmoff @oredev Train routes Type of train (diesel/electric) Classes, sleeping accomodation, reservations, etc 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Load JSON from S3 into Kafka @rmoff @oredev curl -s -L -u “$USER:$PW” “$FEED_URL” | \ gunzip | \ kafkacat -b localhost -P -t CIF_FULL_DAILY 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev https://wiki.openraildata.com/index.php?title=SCHEDULE Reference data Train company names Location codes Train type codes Location geocodes 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Geocoding @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Static reference data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev AA:{“canx_reason_code”:”AA”,”canx_reason”:”Waiting acceptance into off Network Terminal or Yard”} AC:{“canx_reason_code”:”AC”,”canx_reason”:”Waiting train preparation or completion of TOPS list/RT3973”} AE:{“canx_reason_code”:”AE”,”canx_reason”:”Congestion in off Network Terminal or Yard”} AG:{“canx_reason_code”:”AG”,”canx_reason”:”Wagon load incident including adjusting loads or open door”} AJ:{“canx_reason_code”:”AJ”,”canx_reason”:”Waiting Customer’s traffic including documentation”} DA:{“canx_reason_code”:”DA”,”canx_reason”:”Non Technical Fleet Holding Code”} DB:{“canx_reason_code”:”DB”,”canx_reason”:”Train Operations Holding Code”} DC:{“canx_reason_code”:”DC”,”canx_reason”:”Train Crew Causes Holding Code”} DD:{“canx_reason_code”:”DD”,”canx_reason”:”Technical Fleet Holding Code”} DE:{“canx_reason_code”:”DE”,”canx_reason”:”Station Causes Holding Code”} DF:{“canx_reason_code”:”DF”,”canx_reason”:”External Causes Holding Code”} DG:{“canx_reason_code”:”DG”,”canx_reason”:”Freight Terminal and or Yard Holding Code”} DH:{“canx_reason_code”:”DH”,”canx_reason”:”Adhesion and or Autumn Holding Code”} FA:{“canx_reason_code”:”FA”,”canx_reason”:”Dangerous goods incident”} FC:{“canx_reason_code”:”FC”,”canx_reason”:”Freight train driver”} kafkacat -l canx_reason_code.dat -b localhost:9092 -t canx_reason_code -P -K: 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Transforming the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Movement Activation Schedule Location 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Streams of events Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff @oredev Stream: widgets Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Stream Processing with KSQL @rmoff @oredev Stream: widgets CREATE STREAM widgets_red AS SELECT * FROM widgets WHERE colour=’RED’; Stream: widgets_red 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event data @rmoff @oredev ActiveMQ Kafka Connect NETWORKRAIL_TRAIN_MVT_X 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Message routing with KSQL TRAIN_ACTIVATIONS Time NETWORKRAIL_TRAIN_MVT_X ’ 1 0 00 ’ = E YP T _ G MS TRAIN_CANCELLATIONS Time MSG _TY Time MSG_TYPE=’0002’ PE= ‘00 03’ TRAIN_MOVEMENTS Time 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Split a stream of data (message routing) CREATE STREAM TRAIN_MOVEMENTS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X s t n e v e t n e m e ’ v 3 o 0 0 0 M ’ = E P Y T _ G MS WHERE header!->msg_type = ‘0003’; NETWORKRAIL_TRAIN_MVT_X Source topic CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT * FROM NETWORKRAIL_TRAIN_MVT_X WHERE header!->msg_type = ‘0002’; Can cella t MSG _TY ion e v e PE= n t ‘00 s 0 2’ 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Joining events to lookup data (stream-table joins) TS 10:00:01 LOC_ID 42 TS 10:00:02 TRAIN_MOVEMENTS (Stream) LOCATION (Table) ID DESCRIPTION 42 MENSTON 43 Ilkley LOC_ID 43 TS 10:00:01 LOC_ID 42 LOC_DESC Menston TS 10:00:02 LOC_ID 43 CREATE STREAM TRAIN_MOVEMENTS_ENRICHED AS SELECT TM.LOC_ID AS LOC_ID, TM.TS AS TS, L.DESCRIPTION AS LOC_DESC, […] FROM TRAIN_MOVEMENTS TM TRAIN_MOVEMENTS_ENRICHED LEFT JOIN LOCATION L (Stream) ON TM.LOC_ID = L.ID; LOC_DESC Ilkley 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Decode values, and generate composite key @rmoff @oredev +———————-+————————+————-+——————————-+————————+————————————+ | CIF_train_uid | sched_start_dt | stp_ind | SCHEDULE_KEY | CIF_power_type | POWER_TYPE | +———————-+————————+————-+——————————-+————————+————————————+ | Y82535 | 2019-05-20 | P | Y82535/2019-05-20/P | E | Electric | | Y82537 | 2019-05-20 | P | Y82537/2019-05-20/P | DMU | Other | | Y82542 | 2019-05-20 | P | Y82542/2019-05-20/P | D | Diesel | | Y82535 | 2019-05-24 | P | Y82535/2019-05-24/P | EMU | Electric Multiple Unit | | Y82542 | 2019-05-24 | P | Y82542/2019-05-24/P | HST | High Speed Train | CASE SELECT JsonScheduleV1”->CIF_train_uid + ‘/’ WHEN JsonScheduleV1”->schedule_segment”->CIF_power_type = ‘D’
Schemas @rmoff @oredev CREATE STREAM SCHEDULE_RAW ( TiplocV1 STRUCT<transaction_type tiploc_code NALCO STANOX crs_code description tps_description WITH (KAFKA_TOPIC=’CIF_FULL_DAILY’, VALUE_FORMAT=’JSON’); VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR>) 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Handling JSON data CREATE STREAM NETWORKRAIL_TRAIN_MVT_X ( header STRUCT< msg_type VARCHAR, […] >, body VARCHAR) WITH (KAFKA_TOPIC=’networkrail_TRAIN_MVT_X’, VALUE_FORMAT=’JSON’); CREATE STREAM TRAIN_CANCELLATIONS_00 AS SELECT HEADER, EXTRACTJSONFIELD(body,’$.canx_timestamp’) AS canx_timestamp, EXTRACTJSONFIELD(body,’$.canx_reason_code’) AS canx_reason_code, […] FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Serialisation & Schemas Avro -> Confluent Schema Registry Protobuf JSON CSV https://qconnewyork.com/system/files/presentation-slides/qcon_17_-_schemas_and_apis.pdf 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev The Confluent Schema Registry Avro Schema Schema Registry Target Source KSQL Avro Message Avro Message Kafka Connect 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Reserialise data CREATE STREAM TRAIN_CANCELLATIONS_00 WITH (VALUE_FORMAT=’AVRO’) AS SELECT * FROM networkrail_TRAIN_MVT_X WHERE header”->msg_type = ‘0002’; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Flatten and re-key a stream { } “TiplocV1”: { “transaction_type”: “Create”, “tiploc_code”: “ABWD”, “nalco”: “513100”, “stanox”: “88601”, “crs_code”: “ABW”, “description”: “ABBEY WOOD”, “tps_description”: “ABBEY WOOD” } Access nested CREATE STREAM TIPLOC_FLAT_KEYED SELECT TiplocV1”->TRANSACTION_TYPE TiplocV1”->TIPLOC_CODE TiplocV1”->NALCO TiplocV1”->STANOX TiplocV1”->CRS_CODE TiplocV1”->DESCRIPTION TiplocV1”->TPS_DESCRIPTION FROM SCHEDULE_RAW PARTITION BY TIPLOC_CODE; element AS AS AS AS AS AS AS TRANSACTION_TYPE , TIPLOC_CODE , NALCO , STANOX , CRS_CODE , DESCRIPTION , TPS_DESCRIPTION y e e g k n g a n h i C n ksql> DESCRIBE TIPLOC_FLAT_KEYED; io t i t r a p Name : TIPLOC_FLAT_KEYED Field | Type ——————————————————————-TRANSACTION_TYPE | VARCHAR(STRING) TIPLOC_CODE | VARCHAR(STRING) NALCO | VARCHAR(STRING) STANOX | VARCHAR(STRING) CRS_CODE | VARCHAR(STRING) DESCRIPTION | VARCHAR(STRING) TPS_DESCRIPTION | VARCHAR(STRING) ——————————————————————— 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Using the data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Streaming Integration with Kafka Connect @rmoff @oredev Amazon S3 syslog Google BigQuery Kafka Connect Kafka Brokers 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> Elasticsearch { “connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “connection.url”: “http:”//elasticsearch:9200”, “type.name”: “type.name=kafkaconnect”, “key.ignore”: “false”, “schema.ignore”: “true”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kibana for real-time visualisation of rail data 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kibana for real-time analysis of rail data @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> Postgres { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter”, “connection.url”: “jdbc:postgresql:”//postgres:5432/”, “connection.user”: “postgres”, “connection.password”: “postgres”, “auto.create”: true, “auto.evolve”: true, “insert.mode”: “upsert”, “pk.mode”: “record_key”, “pk.fields”: “MESSAGE_KEY”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “transforms”: “dropArrays”, “transforms.dropArrays.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”, “transforms.dropArrays.blacklist”: “SCHEDULE_SEGMENT_LOCATION, HEADER” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka -> Postgres @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
But do you even need a database? @rmoff @oredev TRAIN_MOVEMENTS Time SELECT TIMESTAMPTOSTRING(WINDOWSTART(),’yyyy-MM-dd’) AS WINDOW_START_TS, SUM(CASE WHEN TOC = ‘Arriva Trains Northern’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘East Midlands Trains’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘London North Eastern Railway’ THEN 1 ELSE 0 SUM(CASE WHEN TOC = ‘TransPennine Express’ THEN 1 ELSE 0 FROM TRAIN_MOVEMENTS WINDOW TUMBLING (SIZE 1 DAY) GROUP BY VARIATION_STATUS; TOC_STATS Aggregate Time Aggregate VARIATION_STATUS, END) AS Arriva_CT, END) AS EastMidlands_CT, END) AS LNER_CT, END) AS TransPennineExpress_CT +——————+——————-+————+—————+———+———-+ | Date | Variation | Arriva | East Mid | LNER | TPE | +——————+——————-+————+—————+———+———-+ | 2019-07-02 | OFF ROUTE | 46 | 78 | 20 | 167 | | 2019-07-02 | ON TIME | 19083 | 3568 | 1509 | 2916 | | 2019-07-02 | LATE | 30850 | 7953 | 5420 | 9042 | | 2019-07-02 | EARLY | 11478 | 3518 | 1349 | 2096 | | 2019-07-03 | OFF ROUTE | 79 | 25 | 41 | 213 | | 2019-07-03 | ON TIME | 19512 | 4247 | 1915 | 2936 | | 2019-07-03 | LATE | 37357 | 8258 | 5342 | 11016 | | 2019-07-03 | EARLY | 11825 | 4574 | 1888 | 2094 | 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
{ @rmoff @oredev Kafka -> S3 “connector.class”: “io.confluent.connect.s3.S3SinkConnector”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “tasks.max”: “1”, “topics”: “TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00”, “s3.region”: “us-west-2”, “s3.bucket.name”: “rmoff-rail-streaming-demo”, “flush.size”: “65536”, “storage.class”: “io.confluent.connect.s3.storage.S3Storage”, “format.class”: “io.confluent.connect.s3.format.avro.AvroFormat”, “schema.generator.class”: “io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator”, “schema.compatibility”: “NONE”, “partitioner.class”: “io.confluent.connect.storage.partitioner.TimeBasedPartitioner”, “path.format”:“‘year’=YYYY/’month’=MM/’day’=dd”, “timestamp.extractor”:”Record”, “partition.duration.ms”: 1800000, “locale”:”en”, “timezone”: “UTC” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Kafka -> S3 $ aws s3 ls s3:!//rmoff-rail-streaming-demo/topics/TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00/year=2019/month=07/day=07/ 2019-07-07 10:05:41 2019-07-07 19:47:11 200995548 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0004963416.avro 87631494 TRAIN_MOVEMENTS_ACTIVATIONS_SCHEDULE_00+0+0005007151.avro 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Infering table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Inferring table config from S3 files with AWS Glue 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Analysis with AWS Athena @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Graph analysis { “connector.class”: “streams.kafka.connect.sink.Neo4jSinkConnector”, “topics”: “TRAIN_CANCELLATIONS_02”, “key.converter”:”org.apache.kafka.connect.storage.StringConverter”, “errors.tolerance”: “all”, “errors.deadletterqueue.topic.name”: “sink-neo4j-train-00_dlq”, “errors.deadletterqueue.topic.replication.factor”: 1, “errors.deadletterqueue.context.headers.enable”:true, “neo4j.server.uri”: “bolt:”//neo4j:7687”, “neo4j.authentication.basic.username”: “neo4j”, “neo4j.authentication.basic.password”: “connect”, “neo4j.topic.cypher.TRAIN_CANCELLATIONS_02”: “MERGE (train:train{id: event.TRAIN_ID}) MERGE (toc:toc{toc: event.TOC}) MERGE (canx_reason:canx_reason{reason: event.CANX_REASON}) MERGE (canx_loc:canx_loc{location: coalesce(event.CANCELLATION_LOCATION,’<unknown>’)}) MERGE (train)[:OPERATED_BY]!->(toc) MERGE (canx_loc)!<-[:CANCELLED_AT{reason:event.CANX_REASON, time:event.CANX_TIMESTAMP}]-(train)-[:CANCELLED_BECAUSE]!->(canx_reason)” } 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Graph relationships @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Tell me when trains are delayed at my station 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Event-driven Alerting - logic TRAIN_MOVEMENTS Time ALERT_CONFIG Key/Value state CREATE STREAM TRAINS_DELAYED_ALERT AS SELECT […] FROM TRAIN_MOVEMENTS T TRAIN_DELAYED_ALERT Time INNER JOIN ALERT_CONFIG A ON T.LOC_NLCDESC = A.STATION WHERE TIMETABLE_VARIATION > A.ALERT_OVER_MINS; 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Sending messages to Telegram curl -X POST -H ‘Content-Type: application/json’ -d ‘{“chat_id”: “-364377679”, “text”: “This is a test from curl”}’ https:”//api.telegram.org/botxxxxx/sendMessage { } “connector.class”: “io.confluent.connect.http.HttpSinkConnector”, “request.method”: “post”, “http.api.url”: “https:!//api.telegram.org/[…]/sendMessage”, “headers”: “Content-Type: application/json”, “topics”: “TRAINS_DELAYED_ALERT_TG”, “tasks.max”: “1”, “batch.prefix”: “{“chat_id”:”-364377679”,”parse_mode”: “markdown”,”, “batch.suffix”: “}”, “batch.max.size”: “1”, “regex.patterns”:”.\{MESSAGE=(.)\}.*”, “regex.replacements”: “”text”:”$1”“, “regex.separator”: “~”, “key.converter”: “org.apache.kafka.connect.storage.StringConverter” Kafka Kafka Connect HTTP Sink Telegram 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Event-driven Alerting - config @rmoff @oredev Compacted topic $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”45”} EOF CREATE TABLE ALERT_CONFIG (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’, KEY=’STATION’); CREATE STREAM ALERT_CONFIG_STREAM (STATION VARCHAR, ALERT_OVER_MINS INT) WITH (KAFKA_TOPIC=’alert_config’, VALUE_FORMAT=’JSON’); ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 45 $ kafkacat -b localhost:9092 -t alert_config -P -K: “<<EOF LEEDS:{“STATION”:”LEEDS”,”ALERT_OVER_MINS”:”20”} EOF ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG WHERE STATION=’LEEDS’; LEEDS | 20 ksql> SELECT STATION, ALERT_OVER_MINS FROM ALERT_CONFIG_STREAM WHERE STATION=’LEEDS’; LEEDS | 45 LEEDS | 20 *
@rmoff @oredev Running it & monitoring & maintenance 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Consumer lag @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
System health & Throughput @rmoff @oredev 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev Restart failing connectors rmoff@proxmox01 > crontab -l “*/5 * * * * /home/rmoff/restart_failed_connector_tasks.sh rmoff@proxmox01 > cat /home/rmoff/restart_failed_connector_tasks.sh “#!/usr/bin/env bash # @rmoff / June 6, 2019
🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Is everything running? @rmoff @oredev http://rmoff.dev/kafka-trains-code-01 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev check_latest_timestamp.sh Start at latest Just read one message message kafkacat -b localhost:9092 -t networkrail_TRAIN_MVT_X -o-1 -c1 -C | \ jq ‘.header.msg_queue_timestamp’ | \ sed -e ‘s/”“//g’ | \ sed -e ‘s/000$”//g’ | \ Convert from epoch to humanxargs -Ifoo date “—date=@foo readable timestamp format $ ./check_latest_timestamp.sh Fri Jul 5 16:28:09 BST 2019 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Kafka Connect Kafka Kafka Connect @rmoff @oredev KSQL ✅ Stream raw data ✅ Stream enriched data ✅ Historic data & data replay 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
ig nf Co Co nf ig @rmoff @oredev SQL 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
Fully Managed Kafka as a Service
Free Books! @rmoff @oredev http://cnfl.io/book-bundle 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
@rmoff @oredev #EOF 💬 Join the Confluent Community Slack group at http://cnfl.io/slack https://talks.rmoff.net
As data engineers, we frequently need to build scalable systems working with data from a variety of sources and with various ingest rates, sizes, and formats. This talk takes an in-depth look at how Apache Kafka can be used to provide a common platform on which to build data infrastructure driving both real-time analytics as well as event-driven applications. Using a public feed of railway data it will show how to ingest data from message queues such as ActiveMQ with Kafka Connect, as well as from static sources such as S3 and REST endpoints. We’ll then see how to wrangle the data into a form useful for streaming to analytics in tools such as Elasticsearch and Neo4j. If you’re wondering how to build your next scalable data platform, how to reconcile the impedance mismatch between stream and batch, and how to wrangle streams of data—this talk is for you!
The following resources were mentioned during the presentation or are useful additional information.
Provides managed Apache Kafka, Schema Registry, KSQL, etc