Here’s a hacky way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.
Let’s say you’ve got a couple of connectors set up, pulling data from MySQL with Debezium and streaming it to Elasticsearch. The MySQL source is a bit flaky and goes offline periodically. You can view the status of all your connectors and tasks:
curl -s "http://localhost:8083/connectors"| \
jq '.[]'| \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | FAILED
If a connector’s task(s) are failed you can restart them using the REST API:
curl -X POST http://localhost:8083/connectors/source-debezium-orders-00/tasks/0/restart
after which it comes back
curl -s "http://localhost:8083/connectors"| \
jq '.[]'| \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | RUNNING
But, manually watching and restarting tasks isn’t fun, so let’s automate it. Here’s a bit of bash that will restart any failed tasks. It’s the same pattern as above for iterating through the connectors on Kafka Connect’s REST API, coupled with jq
's ability to filter data (select(.tasks[].state=="FAILED")
)
#!/usr/bin/env bash
# @rmoff / June 6, 2019
echo '----'
# Set the path so cron can find jq, necessary for cron depending on your default PATH
export PATH=$PATH:/usr/local/bin/
# What time is it Mr Wolf?
date
# List current connectors and status
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
# Restart any connector tasks that are FAILED
# Works for Apache Kafka >= 2.3.0
# Thanks to @jocelyndrean for this enhanced code snippet that also supports
# multiple tasks in a connector
curl -s "http://localhost:8083/connectors?expand=status" | \
jq -c -M 'map({name: .status.name } + {tasks: .status.tasks}) | .[] | {task: ((.tasks[]) + {name: .name})} | select(.task.state=="FAILED") | {name: .task.name, task_id: .task.id|tostring} | ("/connectors/"+ .name + "/tasks/" + .task_id + "/restart")' | \
xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083"\{connector_and_task\}
Which as any hacky admin will know can be scheduled to run with a crontab such as this:
*/5 * * * * /u01/connectors/restart_failed_connector_tasks.sh 2>&1 >> /u01/connectors/connect_monitor.log
Now every five minutes the script will look for any FAILED
tasks and send a REST call to restart them.
If you’re using Apache Kafka < 2.3.0 then the enhanced REST API for connectors
is not available so you will need something like this:
# List current connectors and status
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
# Restart any connector tasks that are FAILED
curl -s "http://localhost:8083/connectors" | \
jq '.[]' | \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status" | \
jq -c -M '[select(.tasks[].state=="FAILED") | .name,"§±§",.tasks[].id]' | \
grep -v "\[\]"| \
sed -e 's/^\[\"//g'| sed -e 's/\",\"§±§\",/\/tasks\//g'|sed -e 's/\]$//g'| \
xargs -I{connector_and_task} curl -v -X POST "http://localhost:8083/connectors/"{connector_and_task}"/restart"