rmoff's random ramblings
about talks

🎄 Twelve Days of SMT 🎄 - Day 6: InsertField II

Published Dec 15, 2020 by in Kafka Connect, Single Message Transform, TwelveDaysOfSMT at https://rmoff.net/2020/12/15/twelve-days-of-smt-day-6-insertfield-ii/

We kicked off this series by seeing on day 1 how to use InsertField to add in the timestamp to a message passing through the Kafka Connect sink connector. Today we’ll see how to use the same Single Message Transform to add in a static field value, as well as the name of the Kafka topic, partition, and offset from which the message has been read.

"transforms"                                : "insertStaticField1",
"transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertStaticField1.static.field": "sourceSystem",
"transforms.insertStaticField1.static.value": "NeverGonna"

👾 Demo code

Adding fields to messages at ingest in the Kafka Connect source connector

When ingesting data from a source it can be useful to add fields to store information such as the server from which it was read.

Here’s an example source connector, which adds

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day6-00/config \
    -d '{
        "connector.class"                           : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day6-transactions.with"              : "#{Internet.uuid}",
        "genv.day6-transactions.cost.with"          : "#{Commerce.price}",
        "genv.day6-transactions.card_type.with"     : "#{Business.creditCardType}",
        "genv.day6-transactions.item.with"          : "#{Beer.name}",
        "topic.day6-transactions.throttle.ms"       : 5000,
        "transforms"                                : "insertStaticField1,insertStaticField2",
        "transforms.insertStaticField1.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField1.static.field": "sourceSystem",
        "transforms.insertStaticField1.static.value": "NeverGonna",
        "transforms.insertStaticField2.type"        : "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.insertStaticField2.static.field": "ingestAgent",
        "transforms.insertStaticField2.static.value": "GiveYouUp"
    }'

The resulting message that’s written to Kafka includes the data from the source system (a data generator, in this case, writing fields cost, card_type, and item), plus the static fields configured (sourceSystem, ingestAgent):

docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day6-transactions -C -c1 -o end -u -q -J | jq  '.payload'
{
  "cost": {
    "string": "25.79"
  },
  "card_type": {
    "string": "visa"
  },
  "item": {
    "string": "Westmalle Trappist Tripel"
  },
  "sourceSystem": {
    "string": "NeverGonna"
  },
  "ingestAgent": {
    "string": "GiveYouUp"
  }
}

Adding details about the Kafka message at egress with a Kafka Connect sink connector

👉 https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code)

It can often be useful to add information about the Kafka message (topic, partition, offset) when the data is sent to a target system. Here’s an example using the above topic, with another static field added in for good measure. You can also add the timestamp of the Kafka message, as shown previously.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day6-00/config \
    -d '{
          "connector.class"                           : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url"                            : "jdbc:mysql://mysql:3306/demo",
          "connection.user"                           : "mysqluser",
          "connection.password"                       : "mysqlpw",
          "topics"                                    : "day6-transactions",
          "tasks.max"                                 : "4",
          "auto.create"                               : "true",
          "auto.evolve"                               : "true",
          "transforms"                                : "insertPartition,insertOffset,insertTopic",
          "transforms.insertPartition.type"           : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertPartition.partition.field": "kafkaPartition",
          "transforms.insertOffset.type"              : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertOffset.offset.field"      : "kafkaOffset",
          "transforms.insertTopic.type"               : "org.apache.kafka.connect.transforms.InsertField$Value",
          "transforms.insertTopic.topic.field"        : "kafkaTopic"
        }'

Here’s the data as it appears in the target system:

mysql> SELECT * FROM `day6-transactions` LIMIT 5;
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
| cost  | card_type | item                | sourceSystem | ingestAgent | kafkaPartition | kafkaOffset | kafkaTopic        |
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
| 11.78 | visa      | Schneider Aventinus | NeverGonna   | GiveYouUp   |              0 |           0 | day6-transactions |
| 17.65 | discover  | Two Hearted Ale     | NeverGonna   | GiveYouUp   |              0 |           1 | day6-transactions |
| 11.63 | jcb       | Stone IPA           | NeverGonna   | GiveYouUp   |              0 |           2 | day6-transactions |
| 67.51 | switch    | Shakespeare Oatmeal | NeverGonna   | GiveYouUp   |              0 |           3 | day6-transactions |
| 22.25 | discover  | Hop Rod Rye         | NeverGonna   | GiveYouUp   |              0 |           4 | day6-transactions |
+-------+-----------+---------------------+--------------+-------------+----------------+-------------+-------------------+
5 rows in set (0.00 sec)

Try it out!

You can find the full code for trying this out—including a Docker Compose so you can spin it up on your local machine— 👾 here


Robin Moffatt

Robin Moffatt is a Principal DevEx Engineer at LakeFS. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2022