Streaming data from Oracle into Kafka

Published by in Oracle, Cdc, Debezium, Goldengate, Xstream, Logminer, Ksqldb at https://rmoff.net/2018/12/12/streaming-data-from-oracle-into-kafka/

This is a short summary discussing what the options are for integrating Oracle RDBMS into Kafka, as of December 2018 (refreshed June 2020). For a more detailed background to why and how at a broader level for all databases (not just Oracle) see this blog and this talk.

What techniques & tools are there? ๐Ÿ”—

Franck Pachot has written up an excellent analysis of the options available here.

As of June 2020, this is what the line-up looks like:

  • Query-based CDC
    • The JDBC Connector for Kafka Connect, polls the database for new or changed data based on an incrementing ID column and/or update timestamp
  • Log-based CDC
    • Oracle GoldenGate for Big Data (license $20k per CPU). Supports three “handlers”:

    • Oracle XStream (requires Oracle GoldenGate license $17.5k per CPU).

      • Built on top of LogMiner. Oracle’s API for third-party applications wanting to stream events from the database.
      • Currently beta implementation by Debezium (0.9) with Kafka Connect
    • Oracle Log Miner No special license required (even available in Oracle XE).

      • Currently beta implementation by Debezium (1.3.0 beta2) with Kafka Connect (details)
      • Implemented by community connector here
      • Available commercially from Attunity, SQData, HVR, StreamSets, Striim etc
      • DBVisit Replicate is no longer developed.
  • Triggers to capture changes made to a table, write details of those changes to another database table, ingest that table into Kafka (e.g. with JDBC connector).
  • Flashback to show all changes to a given table between two points in time. Implemented as a PoC by Stewart Bryson and Bjรถrn Rost.
  • Other commercial tools including Qlik Replicate (nee Attunity Replicateโ€Ž), Precisely/SyncSort/SQData, Striim, IBM IIDR, etc

What do they look like in action? ๐Ÿ”—

I did a talk at UK Oracle User Group TECH18 conference, presenting my talk “No More Silos: Integrating Databases and Apache Kafka”. As part of this I did a live demo showing the difference between using the JDBC Connector (query-based CDC) and the Debezium/XStream option (log-based CDC). Here I’ll try and replicate the discussion and examples. You can also see previous articles that I’ve written showing GoldenGate in action.

You can find all of the code on the demo-scene repository, runnable through Docker and Docker Compose. Simply clone the repo, and then run

cd no-more-silos-oracle
./setup.sh

The setup script does all of the rest, including bringing up Confluent Platform, and configuring the connectors. You do have to build the Oracle database docker image first.

Setup ๐Ÿ”—

Some notes on setup of each option:

  • JDBC connector
    • The main thing you need here is the Oracle JDBC driver in the correct folder for the Kafka Connect JDBC connector. The JDBC driver can be downloaded directly from Maven and this is done as part of the container’s start up.
      • Check out this video to learn more about how to install JDBC driver for Kafka Connect.
    • You also need to make sure that the source table has an incrementing ID column and/or update timestamp column that can be used to identify changed rows. Without that you can only do a bulk load of the data each time.
  • Debezium connector
    • Requires a bunch of libraries (instant client and others), installed at runtime in the container (now that Instant Client can be downloaded without a click-through ๐Ÿ™Œ).
    • This requires config work on the database, covered by the Debezium docs and done by the Docker script here
    • Each table needs to be configured (script here)
    • I hit problems with the Capture stopping with permission errors so automated its restart (hacky, I know)

The actual config of the two connectors is done in separate calls to Kafka Connect’s REST API (JDBC / Debezium).

You can validate that each connector is running by querying the REST API:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
      jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
      column -s : -t| sed 's/\"//g'| sort

source  |  ora-source-debezium-xstream  |  RUNNING  |  RUNNING  |  io.debezium.connector.oracle.OracleConnector
source  |  ora-source-jdbc              |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSourceConnector

Initial data load ๐Ÿ”—

In Oracle, check the source data:

COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CLUB_STATUS FOR A12
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS, UPDATE_TS FROM CUSTOMERS;

  ID FIRST_NAME      LAST_NAME       CLUB_STATUS  UPDATE_TS
---- --------------- --------------- ------------ -----------------------------
  1 Rica            Blaisdell       bronze       11-DEC-18 05.16.00.000000 PM
  2 Ruthie          Brockherst      platinum     11-DEC-18 05.16.00.000000 PM
  3 Mariejeanne     Cocci           bronze       11-DEC-18 05.16.00.000000 PM
  4 Hashim          Rumke           platinum     11-DEC-18 05.16.00.000000 PM
  5 Hansiain        Coda            platinum     11-DEC-18 05.16.00.000000 PM

Now let’s see what’s in Kafka. I’m using ksqlDB here to inspect the data; you could use other Kafka console consumers if you’d rather.

Launch ksqlDB:

docker exec -it ksqldb ksql http://ksqldb:8088

Inspect the topics on the Kafka cluster:

ksql> LIST TOPICS;

Kafka Topic               | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------------
asgard.DEBEZIUM.CUSTOMERS | false      | 1          | 1                  | 0         | 0
ora-CUSTOMERS-jdbc        | false      | 1          | 1                  | 0         | 0
โ€ฆ

The two topics listed are for the same table (CUSTOMERS) from the Debezium and JDBC connectors respectively.

Dump the contents:

  • Debezium/XStreams:

      ksql> PRINT 'asgard.DEBEZIUM.CUSTOMERS' FROM BEGINNING;                                                                                                                                  
      Key format: AVRO or KAFKA_STRING                                                                                                                                                         
      Value format: AVRO                                                                                                                                                                       
      rowtime: 2020/06/12 11:28:50.877 Z, key: {"ID": 1}, value: {"before": null, "after": {"ID": 1, "FIRST_NAME": "Rica", "LAST_NAME": "Blaisdell", "EMAIL": "rblaisdell0@rambler.ru", "GENDER": "Female", "CLUB_STATUS": "bronze", "COMMENTS": "Universal optimal hierarchy", "CREATE_TS": 1591960461011878, "UPDATE_TS": 1591960461000000}, "source": {"version": "1.2.0-SNAPSHOT", "connector": "oracle", "name": "asgard", "ts_ms": 1591961330293, "snapshot": "true", "db": "ORCLPDB1", "schema": "DEBEZIUM", "table": "CUSTOMERS", "txId": null, "scn": 1483831, "lcr_position": null}, "op": "r", "ts_ms": 1591961330303, "transaction": null, "messagetopic": "asgard.DEBEZIUM.CUSTOMERS", "messagesource": "Debezium CDC from Oracle on asgard"}
      โ€ฆ
    
  • JDBC connector

      ksql> PRINT 'ora-CUSTOMERS-jdbc' FROM BEGINNING;
      Key format: ยฏ\_(ใƒ„)_/ยฏ - no data processed
      Value format: AVRO
      rowtime: 2020/06/12 11:20:01.770 Z, key: <null>, value: {"ID": 1, "FIRST_NAME": "Rica", "LAST_NAME": "Blaisdell", "EMAIL": "rblaisdell0@rambler.ru", "GENDER": "Female", "CLUB_STATUS": "bronze", "COMMENTS": "Universal optimal hierarchy", "CREATE_TS": 1591960461011, "UPDATE_TS": 1591960461000, "messagetopic": "ora-CUSTOMERS-jdbc", "messagesource": "JDBC Source Connector from Oracle on asgard"}
      โ€ฆ
    

Each has the full contents of the source table (5 records, only first is shown above). We can actually use ksqlDB to easily query the topic directly if we want. First we declare each topic as the source for a stream:

SET 'auto.offset.reset' = 'earliest';
CREATE STREAM CUSTOMERS_STREAM_DBZ_SRC WITH (KAFKA_TOPIC='asgard.DEBEZIUM.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM_JDBC_SRC WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc', VALUE_FORMAT='AVRO');

and then query the JDBC-sourced Kafka topic:

ksql> SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES LIMIT 5;

+---------------------+---------------------+---------------------+---------------------+
|ID                   |FIRST_NAME           |LAST_NAME            |CLUB_STATUS          |
+---------------------+---------------------+---------------------+---------------------+
|1                    |Rica                 |Blaisdell            |bronze               |
|2                    |Ruthie               |Brockherst           |platinum             |
|5                    |Hansiain             |Coda                 |platinum             |
|4                    |Hashim               |Rumke                |platinum             |
|3                    |Mariejeanne          |Cocci                |bronze               |

and the one from Debezium:

ksql> SELECT AFTER->ID AS ID, AFTER->FIRST_NAME AS FIRST_NAME, AFTER->LAST_NAME AS LAST_NAME, AFTER->CLUB_STATUS AS CLUB_STATUS FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;

+---------------------+---------------------+---------------------+---------------------+
|ID                   |FIRST_NAME           |LAST_NAME            |CLUB_STATUS          |
+---------------------+---------------------+---------------------+---------------------+
|1                    |Rica                 |Blaisdell            |bronze               |
|2                    |Ruthie               |Brockherst           |platinum             |
|3                    |Mariejeanne          |Cocci                |bronze               |
|4                    |Hashim               |Rumke                |platinum             |
|5                    |Hansiain             |Coda                 |platinum             |

Note that I’m accessing nested attributes of the AFTER object here using the -> operator.

The schema for both topics come from the Schema Registry, in which Kafka Connect automatically stores the schema for the data coming from Oracle and serialises the data into Avro (or Protobuf, or JSON Schema). The great thing about this is in a consuming application, such as ksqlDB, the schema is already available and doesn’t have to be manually entered.

INSERT ๐Ÿ”—

Insert a row in the Oracle database:

SQL> SET AUTOCOMMIT ON;
SQL>
SQL> INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');

1 row created.

Commit complete.

Straight away in the Kafka topics you’ll see a new row (in fact, if you have left the above SELECT running you won’t need to rerun this, it’ll show the new row already):

  • JDBC

      ksql> SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;
      +---------------------+---------------------+---------------------+---------------------+
      |ID                   |FIRST_NAME           |LAST_NAME            |CLUB_STATUS          |
      +---------------------+---------------------+---------------------+---------------------+
      |1                    |Rica                 |Blaisdell            |bronze               |
      |2                    |Ruthie               |Brockherst           |platinum             |
      |5                    |Hansiain             |Coda                 |platinum             |
      |4                    |Hashim               |Rumke                |platinum             |
      |3                    |Mariejeanne          |Cocci                |bronze               |
      |42                   |Rick                 |Astley               |Bronze               |
    
  • Debezium/XStream

      ksql> SELECT AFTER->ID AS ID, AFTER->FIRST_NAME AS FIRST_NAME, AFTER->LAST_NAME AS LAST_NAME, AFTER->CLUB_STATUS AS CLUB_STATUS FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
      +---------------------+---------------------+---------------------+---------------------+
      |ID                   |FIRST_NAME           |LAST_NAME            |CLUB_STATUS          |
      +---------------------+---------------------+---------------------+---------------------+
      |1                    |Rica                 |Blaisdell            |bronze               |
      |2                    |Ruthie               |Brockherst           |platinum             |
      |3                    |Mariejeanne          |Cocci                |bronze               |
      |4                    |Hashim               |Rumke                |platinum             |
      |5                    |Hansiain             |Coda                 |platinum             |
      |42                   |Rick                 |Astley               |Bronze               |
    

So far, so same. Each captures an insert. Debezium from XStream and the database’s redo log, JDBC by polling the database for any rows with a newer UPDATE_TS or higher ID than the previous request.

UPDATE ๐Ÿ”—

This is where things get interesting. Let’s update the row in Oracle that we just created:

SQL> UPDATE CUSTOMERS SET CLUB_STATUS = 'Platinum' where ID=42;

1 row updated.

Commit complete.
SQL>

Now check out the data in Kafka.

  • JDBC is as before; the changed data row is available to us:

      ksql> SELECT ID, FIRST_NAME, LAST_NAME, CLUB_STATUS FROM CUSTOMERS_STREAM_JDBC_SRC EMIT CHANGES;
      +---------------------+---------------------+---------------------+---------------------+
      |ID                   |FIRST_NAME           |LAST_NAME            |CLUB_STATUS          |
      +---------------------+---------------------+---------------------+---------------------+
      |1                    |Rica                 |Blaisdell            |bronze               |
      |2                    |Ruthie               |Brockherst           |platinum             |
      |5                    |Hansiain             |Coda                 |platinum             |
      |4                    |Hashim               |Rumke                |platinum             |
      |3                    |Mariejeanne          |Cocci                |bronze               |
      |42                   |Rick                 |Astley               |Bronze               |
      |42                   |Rick                 |Astley               |Platinum             |
    
  • Debezium/XStream now comes into its own. As well as the new row of data, we can see what it was previously, through the BEFORE nested object:

      ksql> SELECT OP, AFTER->ID, BEFORE->CLUB_STATUS AS STATUS_BEFORE, AFTER->CLUB_STATUS AS STATUS_AFTER FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
      +---------------------+---------------------+---------------------+---------------------+
      |OP                   |ID                   |STATUS_BEFORE        |STATUS_AFTER         |
      +---------------------+---------------------+---------------------+---------------------+
      |r                    |1                    |null                 |bronze               |
      |r                    |2                    |null                 |platinum             |
      |r                    |3                    |null                 |bronze               |
      |r                    |4                    |null                 |platinum             |
      |r                    |5                    |null                 |platinum             |
      |c                    |42                   |null                 |Bronze               |
      |u                    |42                   |Bronze               |Platinum             |
    
    I'm just showing the before/after `CLUB_STATUS` but all the other fields are also available. There's also metadata about the change, including the type of operation in the `OP` field (`r`=read, i.e the initial snapshot, `c`=create, `u`=update)
    
    Let's look at the full payload of each message sent to Kafka: 
    
      {
        "before": {
          "ID": 42,
          "FIRST_NAME": "Rick",
          "LAST_NAME": "Astley",
          "EMAIL": null,
          "GENDER": null,
          "CLUB_STATUS": "Bronze",
          "COMMENTS": null,
          "CREATE_TS": 1544000706681769,
          "UPDATE_TS": 1544000706000000
        },
        "after": {
          "ID": 42,
          "FIRST_NAME": "Rick",
          "LAST_NAME": "Astley",
          "EMAIL": null,
          "GENDER": null,
          "CLUB_STATUS": "Platinum",
          "COMMENTS": null,
          "CREATE_TS": 1544000706681769,
          "UPDATE_TS": 1544000742000000
        },
        "source": {
          "version": "1.2.0-SNAPSHOT",
          "connector": "oracle",
          "name": "asgard",
          "ts_ms": 1544000742000,
          "txId": "6.26.734",
          "scn": 2796831,
          "snapshot": false
        },
        "op": "u",
        "ts_ms": 1544000745823,
        "messagetopic": "asgard.DEBEZIUM.CUSTOMERS",
        "messagesource": "Debezium CDC from Oracle on asgard"
      }
    
    So each time a change is made in the database, you get a full before/after snapshot of the record, plus a bunch of other metadata. This is great for applications processing inbound changes that need to know not just that something changed (_here's the new record_) but also exactly _what_ changed (before/after payloads) as well as _how_ (insert/update/etc.)
    

DELETE ๐Ÿ”—

Delete a record from the source system

SQL> DELETE FROM CUSTOMERS WHERE ID=42;

1 row deleted.

Commit complete.

Now check out the data in Kafka.

JDBC is unchanged; it’s not captured any change to the source table. If you think about it, this is perfectly reasonable. How you query a database for a row that doesn’t exist? Debezium/XStream, on the other hand, reports the data change precisely:

ksql> SELECT OP, AFTER->ID, BEFORE->CLUB_STATUS AS STATUS_BEFORE, AFTER->CLUB_STATUS AS STATUS_AFTER FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
+---------------------+---------------------+---------------------+---------------------+
|OP                   |ID                   |STATUS_BEFORE        |STATUS_AFTER         |
+---------------------+---------------------+---------------------+---------------------+
|r                    |1                    |null                 |bronze               |
|r                    |2                    |null                 |platinum             |
|r                    |3                    |null                 |bronze               |
|r                    |4                    |null                 |platinum             |
|r                    |5                    |null                 |platinum             |
|c                    |42                   |null                 |Bronze               |
|u                    |42                   |Bronze               |Platinum             |
|d                    |null                 |Platinum             |null                 |

Note the d record on the last row. This has captured the DELETE operation perfectly. The null in the right-most column is the current value for AFTER->CLUB_STATUS, and since the record is deleted, it has no value. We can see this even more clearly if we look at the raw payload for the whole record:

{
  "before": {
    "ID": 42,
    "FIRST_NAME": "Rick",
    "LAST_NAME": "Astley",
    "EMAIL": null,
    "GENDER": null,
    "CLUB_STATUS": "Platinum",
    "COMMENTS": null,
    "CREATE_TS": 1544562543660463,
    "UPDATE_TS": 1544562791000000
  },
  "after": null,
  "source": {
    "version": "1.2.0-SNAPSHOT",
    "connector": "oracle",
    "name": "asgard",
    "ts_ms": 1544563479000,
    "txId": "9.32.712",
    "scn": 3042804,
    "snapshot": true
  },
  "op": "d",
  "ts_ms": 1544563482682,
  "messagetopic": "asgard.DEBEZIUM.CUSTOMERS",
  "messagesource": "Debezium CDC from Oracle on asgard"
}

The full record that has been deleted is present in the BEFORE object, but AFTER is nullโ€”it’s been deleted, it no longer exists. It is an ex-record.


Bonus ksqlDB :

๐Ÿ‘‰ Introduction to ksqlDB

We’re working with data in a Kafka topic. As it happens, ksqlDB is kinda useful for interogating that data, but at the end of the day it’s still just a Kafka topic. We can use ksqlDB to also help monitor the lag between the event in the source system (source->ms_ms as provided by Debezium) and the time recorded on the Kafka broker (the Kafka message timestamp, exposed in ROWTIME):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), 
             OP, 
             ROWTIME - SOURCE->TS_MS AS LAG_MS 
        FROM CUSTOMERS_STREAM_DBZ_SRC EMIT CHANGES;
+----------------------------+----------------------------+----------------------------+
|KSQL_COL_0                  |OP                          |LAG_MS                      |
+----------------------------+----------------------------+----------------------------+
|2020-06-12 11:28:50 +0000   |r                           |584                         |
|2020-06-12 11:28:50 +0000   |r                           |570                         |
|2020-06-12 11:28:50 +0000   |r                           |570                         |
|2020-06-12 11:28:50 +0000   |r                           |571                         |
|2020-06-12 11:28:50 +0000   |r                           |570                         |
|2020-06-12 12:34:53 +0000   |c                           |4720                        |
|2020-06-12 12:35:18 +0000   |u                           |18686                       |
|2020-06-12 13:11:36 +0000   |d                           |62802                       |

Some of these lag times are pretty high; DBZ-1018 Oracle connector is laggy is a JIRA currently tracking it.

You can get the same data out of the JDBC connector, based on the UPDATE_TS of the record itself:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), 
             ROWTIME - UPDATE_TS AS LAG_MS 
        FROM CUSTOMERS_STREAM_JDBC_SRC  EMIT CHANGES;
+--------------------------------------------+--------------------------------------------+
|KSQL_COL_0                                  |LAG_MS                                      |
+--------------------------------------------+--------------------------------------------+
|2020-06-12 11:20:01 +0000                   |340770                                      |
|2020-06-12 11:20:01 +0000                   |340774                                      |
|2020-06-12 11:20:01 +0000                   |340775                                      |
|2020-06-12 11:20:01 +0000                   |340776                                      |
|2020-06-12 11:20:01 +0000                   |340777                                      |
|2020-06-12 12:34:51 +0000                   |3195                                        |
|2020-06-12 13:08:01 +0000                   |1046                                        |

You’ll note here no available OP information, and no row for the corresponding DELETE action in the source database.


Ecosystem ๐Ÿ”—

When you’re bringing data into Kafka, you need to remember the bigger picture. Dumping it into a topic alone is not enough. Well, it is, but your wider community of developers won’t thank you.

You want to ensure that the schema of the source data is preserved, and that you’re using a serialisation method for the data that is suitable. Doing this means that developers can use the data without being tightly coupled to the producer of the data to understand how to use it.

However you do this, it should be in a way that integrates with the broader Kafka and Confluent Platform ecosystem. One option is the Schema Registry and Avro (or Protobuf, or JSON Schema). If you’re using Kafka Connect then this is available by default, since you just select the Avro converter when you set up Kafka Connect.


Overview of the Pros and Cons of each technique ๐Ÿ”—

Some of these are objective, others subjective. Others may indeed be plain false ;-) Discussion, comments, and corrections via Twitter or LinkedIn welcomed!

  • Query-based CDC
    • Pros:
      • Easy to set up
      • Minimal privileges required
    • Cons:
      • No capture of DELETEs

      • No capture of before-state in an UPDATE

      • No guarantee that all events are captured; only the state at the time of polling

      • Increased load on the source DB due to polling (and/or unacceptable latency in capturing the events if polling interval too high)

  • Log-based CDC

  • Triggers

    • Pros:

      • No licence cost
      • Entirely customisable
    • Cons:

      • Completely bespoke code to develop and maintain.
      • Tightly coupled to source application

      A contribution from another reader, Janne Keskitalo, who has implemented a CDC solution using triggers and is very happy with it:

      Change data capture from Oracle to Kafka with triggers


      A contribution from reader ynux about triggers:

      Thanks for your article which I found very helpful. Many people are interested, and many underestimate the complexity of this.

      I’d like to add some “cons” to the self-built trigger solution because I was part of a team that built one (originally to feed changes into Elasticsearch).

      • Can only be done per table
      • Creates load on the Oracle database, because there are more writes and, more importantly, more locks. We had to put the trigger on the central table that had a very high change rate, and the locks it produced were small but summing up, especially as more applications asking for changes were added.
      • People have to be very cautious if they want to retain the order of events. I don’t remember the details, but we put a materialized view on top of the table filled by the trigger, and I think the reasoning was: If you have the source table -> change table -> application, consider this case. There’s a commit to the source table, and change 1, say, “insert item A”, asks for a timestamp t1 and updates the change table at time t1’. Change 2, “update item A”, gets timestamp t2 and updates the change table at time t2’, but overtakes change 1: t1 < t2, but t1’ > t2’ . (Can that happen? Does Oracle guarantee that it won’t?). As Murphy rules, between t2’ and t1’ the application fetches the changes. And is understandably confused since it has to update an item that doesn’t exist. So I assume the materialized view between the change table and the application was added to fix this. Note that the source table had both a date and an incrementing ID per item.

      My takeaway from this experience was:

      • Streams and Tables may be dual, but reconstructing a stream from a table is expensive
      • When you think of ordering and consistency, think hard
  • Flashback

    • Pro + Con:

      • Requires EE licenceโ€”but this is something users are more likely to have already than OGG/OGGBD
    • Unknown:

      • What granularity of data can be retrieved?
      • Impact on the DB from polling?
      • Unclear how much bespoke coding this would require per integration?

References ๐Ÿ”—

Try it out! ๐Ÿ”—

You can find all of the code used in this article on github here.

Feedback? ๐Ÿ”—

Some of these are objective, others subjective. Others may indeed be plain false ;-) Discussion, comments, and corrections in the comment function below welcomed!

For help in getting this working, the best place to head is the Confluent Community:

There’s also a good Debezium community:

Updates & Comments ๐Ÿ”—

TABLE OF CONTENTS