rmoff's random ramblings
about talks

🎄 Twelve Days of SMT 🎄 - Day 3: Flatten

Published Dec 10, 2020 by in Kafka Connect, Single Message Transform, TwelveDaysOfSMT at https://rmoff.net/2020/12/10/twelve-days-of-smt-day-3-flatten/

The Flatten Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.

To use the Single Message Transform you only need to reference it; there’s no additional configuration required:

"transforms"                    : "flatten",
"transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value"

You can optionally override the default delimiter (.) that’s used:

"transforms.flatten.delimiter"  : "_"

👾 Demo code

Example - JDBC Sink connector

See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code)

Given a source message that looks like this:

{
  "FULL_NAME": "Opossum, american virginia",
  "ADDRESS": {
    "STREET":  "20 Acker Terrace"
    "CITY":  "Lynchburg"
    "COUNTY_OR_STATE": "Virginia"
    "ZIP_OR_POSTCODE": "24515"
  }
}

We can’t load it directly into a database because databases expect flat structures. If we try to load it as it is the JDBC Sink connector will fail and throw an error:

…(STRUCT) type doesn't have a mapping to the SQL database column type

So we use the Single Message Transform to flatten the source payload:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day3-customers",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true",
          "transforms"                    : "flatten",
          "transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter"  : "_"
        }'

This will work, and you can now see the data in MySQL:

mysql> describe `day3-customers`;
+-------------------------+------+------+-----+---------+-------+
| Field                   | Type | Null | Key | Default | Extra |
+-------------------------+------+------+-----+---------+-------+
| FULL_NAME               | text | YES  |     | NULL    |       |
| ADDRESS_STREET          | text | YES  |     | NULL    |       |
| ADDRESS_CITY            | text | YES  |     | NULL    |       |
| ADDRESS_COUNTY_OR_STATE | text | YES  |     | NULL    |       |
| ADDRESS_ZIP_OR_POSTCODE | text | YES  |     | NULL    |       |
+-------------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select * from `day3-customers`;
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| FULL_NAME                  | ADDRESS_STREET              | ADDRESS_CITY | ADDRESS_COUNTY_OR_STATE | ADDRESS_ZIP_OR_POSTCODE |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| Opossum, american virginia | 20 Acker Terrace            | Lynchburg    | Virginia                | 24515                   |
| Red deer                   | 53 Basil Terrace            | Lexington    | Kentucky                | 40515                   |
| Laughing kookaburra        | 84 Monument Alley           | San Jose     | California              | 95113                   |
| American bighorn sheep     | 326 Sauthoff Crossing       | San Antonio  | Texas                   | 78296                   |
| Skua, long-tailed          | 7 Laurel Terrace            | Manassas     | Virginia                | 22111                   |
| Fox, bat-eared             | 2946 Daystar Drive          | Jamaica      | New York                | 11431                   |
| Greater rhea               | 97 Morning Way              | Charleston   | West Virginia           | 25331                   |
| Vervet monkey              | 7615 Brown Park             | Chicago      | Illinois                | 60681                   |
| White spoonbill            | 7 Fulton Parkway            | Asheville    | North Carolina          | 28805                   |
| Sun gazer                  | 61 Lakewood Gardens Parkway | Pensacola    | Florida                 | 32590                   |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
10 rows in set (0.00 sec)

Here’s how to add the key into the target table:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \
    -d '{
          "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                : "jdbc:mysql://mysql:3306/demo",
          "connection.user"               : "mysqluser",
          "connection.password"           : "mysqlpw",
          "topics"                        : "day3-customers2",
          "tasks.max"                     : "4",
          "auto.create"                   : "true",
          "auto.evolve"                   : "true",
          "transforms"                    : "flatten",
          "transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter"  : "_",
          "pk.mode"                       : "record_key",
          "pk.fields"                     : "id",
          "key.converter"                 : "org.apache.kafka.connect.converters.LongConverter"
        }'

Try it out!

You can find the full code for trying this out—including a Docker Compose so you can spin it up on your local machine— 👾 here


Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2023