rmoff's random ramblings
about talks

Automatically restarting failed Kafka Connect tasks

Published Jun 6, 2019 by in Kafka Connect at https://rmoff.net/2019/06/06/automatically-restarting-failed-kafka-connect-tasks/

Here’s a hacky way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.

Let’s say you’ve got a couple of connectors set up, pulling data from MySQL with Debezium and streaming it to Elasticsearch. The MySQL source is a bit flaky and goes offline periodically. You can view the status of all your connectors and tasks:

curl -s "http://localhost:8083/connectors"| \
  jq '.[]'| \
  xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
  jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
  column -s : -t| sed 's/\"//g'| sort

sink-elastic-orders-00     |  RUNNING  |  RUNNING
source-debezium-orders-00  |  RUNNING  |  FAILED

If a connector’s task(s) are failed you can restart them using the REST API:

curl -X POST http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/restart

after which it comes back

curl -s "http://localhost:8083/connectors"| \
  jq '.[]'| \
  xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
  jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
  column -s : -t| sed 's/\"//g'| sort

sink-elastic-orders-00     |  RUNNING  |  RUNNING
source-debezium-orders-00  |  RUNNING  |  RUNNING

But, manually watching and restarting tasks isn’t fun, so let’s automate it. Here’s a bit of bash that will restart any failed tasks. It’s the same pattern as above for iterating through the connectors on Kafka Connect’s REST API, coupled with jq 's ability to filter data (select(.tasks[].state=="FAILED"))

#!/usr/bin/env bash
# @rmoff / June 6, 2019

echo '----'
# Set the path so cron can find jq, necessary for cron depending on your default PATH
export PATH=$PATH:/usr/local/bin/

# What time is it Mr Wolf? 
date 

# List current connectors and status
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

# Restart any connector tasks that are FAILED
# Works for Apache Kafka >= 2.3.0 
# Thanks to @jocelyndrean for this enhanced code snippet that also supports 
#  multiple tasks in a connector
curl -s "http://localhost:8083/connectors?expand=status" | \
  jq -c -M 'map({name: .status.name } +  {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})}  | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
  xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083"\{connector_and_task\}

Which as any hacky admin will know can be scheduled to run with a crontab such as this:

*/5 * * * * /u01/connectors/restart_failed_connector_tasks.sh 2>&1 >> /u01/connectors/connect_monitor.log

Now every five minutes the script will look for any FAILED tasks and send a REST call to restart them.


If you’re using Apache Kafka < 2.3.0 then the enhanced REST API for connectors is not available so you will need something like this:

# List current connectors and status
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort

# Restart any connector tasks that are FAILED
curl -s "http://localhost:8083/connectors" | \
  jq '.[]' | \
  xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status" | \
  jq -c -M '[select(.tasks[].state=="FAILED") | .name,"§±§",.tasks[].id]' | \
  grep -v "\[\]"| \
  sed -e 's/^\[\"//g'| sed -e 's/\",\"§±§\",/\/tasks\//g'|sed -e 's/\]$//g'| \
  xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083/connectors/"{connector_and_task}"/restart"

Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025