rmoff's random ramblings
about talks

Skipping bad records with the Kafka Connect JDBC sink connector

Published Oct 15, 2019 by in Kafka Connect, JDBC Sink, Consumer Group, Kcat (Kafkacat) at https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/

The Kafka Connect framework provides generic error handling and dead-letter queue capabilities which are available for problems with [de]serialisation and Single Message Transforms. When it comes to errors that a connector may encounter doing the actual pull or put of data from the source/target system, it’s down to the connector itself to implement logic around that. For example, the Elasticsearch sink connector provides configuration (behavior.on.malformed.documents) that can be set so that a single bad record won’t halt the pipeline. Others, such as the JDBC Sink connector, don’t provide this yet. That means that if you hit this problem, you need to manually unblock it yourself. One way is to manually move the offset of the consumer on past the bad message.

TL;DR : You can use kafka-consumer-groups --reset-offsets --to-offset <x> to manually move the connector past a bad message

Try it out! 🔗

Create the connector

curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:postgresql://postgres:5432/",
      "connection.user": "postgres",
      "connection.password": "postgres",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "foo",
      "auto.create": "true",
      "auto.evolve":"true",
      "pk.mode":"none"          
    }'

Send a message to the topic

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

Confirm that the data’s on the topic:

$ kafkacat -b localhost:9092 -t foo -C -f 'Topic: %t\nPartition: %p\nOffset: %o\nKey: %k\nPayload: %S bytes: %s\n--\n'

Topic: foo
Partition: 0
Offset: 0
Key:
Payload: 543 bytes: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--

Check the connector status:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink    |  sink_postgres_foo_00         |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSinkConnector

Confirm there’s data in the target DB:

postgres=# \dt
        List of relations
 Schema | Name | Type  |  Owner
--------+------+-------+----------
 public | foo | table | postgres
(1 row)

postgres-# \d foo
                            Table "public.foo"
  Column   |            Type             | Collation | Nullable | Default
-----------+-----------------------------+-----------+----------+---------
 update_ts | timestamp without time zone |           | not null |
 create_ts | timestamp without time zone |           | not null |
 c1        | integer                     |           | not null |
 c2        | text                        |           | not null |

postgres=# select * from "foo";
      update_ts      |      create_ts      |  c1   | c2
---------------------+---------------------+-------+-----
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10000 | bar
(1 row)

Here comes the problem 🔗

Let’s send another message to the topic, but omit one of the fields (c2):

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000,  "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

If we look at the status of the connector we can see that the task has FAILED:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink  |  sink_postgres_foo_00  |  RUNNING  |  FAILED  |  io.confluent.connect.jdbc.JdbcSinkConnector

And the Kafka Connect worker log shows a problem:

[2019-10-15 08:30:34,412] ERROR [sink_postgres_foo_00|task-0] WorkerSinkTask{id=sink_postgres_foo_00-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "foo"("c1","create_ts","update_ts") VALUES(10000,'2017-08-04 08:09:26+00','2017-08-04 08:09:26+00') was aborted: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null).  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null).
org.postgresql.util.PSQLException: ERROR: null value in column "c2" violates not-null constraint
  Detail: Failing row contains (2017-08-04 08:09:26, 2017-08-04 08:09:26, 10000, null).
   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
   ... 10 more
[2019-10-15 08:30:34,413] ERROR [sink_postgres_foo_00|task-0] WorkerSinkTask{id=sink_postgres_foo_00-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

Distilling this down gives us:

Batch entry 0 INSERT INTO "foo"("c1","create_ts","update_ts") VALUES(10000,'2017-08-04 08:09:26+00','2017-08-04 08:09:26+00') was aborted: 
ERROR: null value in column "c2" violates not-null constraint

Because we omitted field c2 from our payload, and it’s NOT NULL in the target schema, the message cannot be written, and the Connect sink task aborts.

What about if we send another, valid, message to the topic:

kafkacat -b localhost:9092 -t foo -P <<EOF
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10001, "c2": "bar2", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
EOF

Restart the connector’s failed task:

curl -X POST http://localhost:8083/connectors/sink_postgres_foo_00/tasks/0/restart

It’s up…

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink  |  sink_postgres_foo_00  |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSinkConnector

but soon…it’s down

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink  |  sink_postgres_foo_00  |  RUNNING  |  FAILED  |  io.confluent.connect.jdbc.JdbcSinkConnector

The worker log shows the same error as before - ERROR: null value in column "c2" violates not-null constraint.

Of the three messages on the topic, we’ve got a 'poison pill' which has broken our pipeline 😿. Each time we restart the connector, it will start from where it got to last time and so fall over again—regardless of how many 'good' messages may come after it. The connector will only consider a message actually read once it has successfully written it to the target, which makes sense if you think about it from a data integrity point of view—but does land us with this problem here.

What to do? 🔗

There are a few options:

  1. If we were using Avro then it would be harder to break things, because schema compatibility can be enforced and bad messages would be rejected when being produced on to the topic.

  2. We could write a stream processing job to take the source topic foo and write all valid messages from it to a new topic (e.g. foo_good) and hook our JDBC sink up to that instead.

  3. Use the consumer group mechanism to just skip the bad message for the connector

Which you use depends on how the problem arose. For example, one-off problems could be addressed by option #3, but it’s very manual and could be error-prone if you’re not careful. Option #2 is appropriate if you’re dealing with third-parties and you have on-going data quality issues. #1, using Avro, is always a good idea, regardless!

Manually skipping bad messages 🔗

Each sink connector in Kafka Connect has its own consumer group, with the offset persisted in Kafka itself (pretty clever, right). This is also why if you delete a connector and recreate it with the same name you’ll find it starts from where the previous instance got to.

You can view consumer groups using the kafka-consumer-groups command:

$ kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --list
connect-sink_postgres_00
_confluent-ksql-confluent_rmoff_01query_CSAS_JDBC_POSTGRES_TRANSACTIONS_GBP_2
_confluent-ksql-confluent_rmoff_01query_CSAS_JDBC_POSTGRES_TRANSACTIONS_NO_CUSTOMERID_1
connect-sink_postgres_foo_00
connect-SINK_ES_04
_confluent-ksql-confluent_rmoff_01transient_2925897355317205962_1571058964212
_confluent-controlcenter-5-4-0-1
connect-SINK_ES_03
_confluent-controlcenter-5-4-0-1-command
connect-SINK_ES_02
connect-SINK_ES_01

There are various ones there, but we’re interested in the one with a connect- prefix that matches our connector name, connect-sink_postgres_foo_00

$ kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --describe \
    --group connect-sink_postgres_foo_00

Consumer group 'connect-sink_postgres_foo_00' has no active members.

GROUP                        TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-sink_postgres_foo_00 foo             0          1               3               2               -               -               -

You can see from this that the current offset is 1, and there are two more messages to be read (one of which is the 'poison-pill').

kafkacat is a fantastic tool for this kind of debugging, because we can directly relate offsets with the messages themselves:

$ kafkacat -b localhost:9092 -t foo -C -f 'Offset: %o\nPayload: %s\n--\n'
Offset: 0
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
Offset: 1
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000,  "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
Offset: 2
Payload: { "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "c1" }, { "type": "string", "optional": false, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10001, "c2": "bar2", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
--
% Reached end of topic foo [0] at offset 3

So at offset 0 is the good message which Connect read, thus the current offset is 1. When the connector restarts from its failure it will be at offset 1, which is the 'bad' message. The end of the topic currently is offset 3, i.e. the position after the third message which is at offset 2 (zero-based offsets).

What we want to do is tell Kafka Connect to resume from the next-good message, which we can see from kafkacat above is at offset 2.

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
connect-sink_postgres_foo_00   foo                            0          2

Now we can restart the failed task:

curl -X POST http://localhost:8083/connectors/sink_postgres_foo_00/tasks/0/restart

and this time the connector stays running:

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
             jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
             column -s : -t| sed 's/\"//g'| sort
sink  |  sink_postgres_foo_00  |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSinkConnector

and in Postgres we get the new rows of data (except for the bad one, which is lost to us):

postgres=# select * from "foo";
      update_ts      |      create_ts      |  c1   |  c2
---------------------+---------------------+-------+------
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10000 | bar
 2017-08-04 08:09:26 | 2017-08-04 08:09:26 | 10001 | bar2
(2 rows)

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025