Kafka Connect and Oracle data types

The Kafka Connect JDBC Connector by default does not cope so well with:

  • NUMBER columns with no defined precision/scale. You may end up with apparent junk (bytes) in the output, or just errors.
  • TIMESTAMP WITH LOCAL TIME ZONE. Throws JDBC type -102 not currently supported warning in the log.

Read more about NUMBER data type in the Oracle docs.

tl;dr : How do I make it work?

There are several options:

New in Confluent Platform 4.1.1 : numeric.mapping

  • In the connector configuration, set "numeric.mapping":"best_fit"
  • New in Confluent Platform 4.1.1 (Doc)

Avoid the problem in the first place

  • Change the DDL of the source object. For example:
    • refine the NUMBER 's precision and scale
    • Use a TIMESTAMP type that is supported

CAST the datatypes in the query

  • Pull from the object directly, and use query in the JDBC connector (instead of table.whitelist)—and cast the columns appropriately:

    curl -i -X POST -H "Accept:application/json" \
      -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
      -d '{
            "name": "jdbc_source_oracle_soe_logon_07",
            "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
                    "mode": "incrementing",
                    "query": "SELECT CAST(LOGON_ID AS NUMERIC(8,0)) AS LOGON_ID, CAST(CUSTOMER_ID AS NUMERIC(18,0)) AS CUSTOMER_ID, LOGON_DATE FROM LOGON",
                    "poll.interval.ms": "1000",
                    "incrementing.column.name":"LOGON_ID",
                    "topic.prefix": "ora-soe-07-LOGON",
                    "validate.non.null":false
            }
    }'
    

    Data:

    Robin@asgard02 ~/cp> kafka-avro-console-consumer \
                            --bootstrap-server localhost:9092 \
                            --property schema.registry.url=http://localhost:8081 \
                            --topic ora-soe-07-LOGON --from-beginning --max-messages 1| jq '.'
    {
      "LOGON_ID": {
        "int": 2
      },
      "CUSTOMER_ID": {
        "long": 48645
      },
      "LOGON_DATE": {
        "long": 962854648000
      }
    }
    Processed a total of 1 messages
    

Use a View in the source database to cast the data types

  • Define a view in the source DB that casts the columns appropriately, and then use the connector against this instead (make sure to include "table.types":"VIEW")

    CREATE VIEW VW_LOGON AS SELECT CAST(LOGON_ID AS NUMERIC(8,0)) AS LOGON_ID, CAST(CUSTOMER_ID AS NUMERIC(18,0)) AS CUSTOMER_ID, LOGON_DATE FROM LOGON;
    
    
    curl -i -X POST -H "Accept:application/json" \
      -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
      -d '{
            "name": "jdbc_source_oracle_soe_logon_05",
            "config": {
                    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                    "connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
                    "table.whitelist":"VW_LOGON",
                    "table.types":"VIEW",
                    "mode": "incrementing",
                    "poll.interval.ms": "1000",
                    "incrementing.column.name":"LOGON_ID",
                    "topic.prefix": "ora-soe-05-",
                    "validate.non.null":false
            }
    }'
    

    Happy data:

    Robin@asgard02 ~/cp> kafka-avro-console-consumer \
                            --bootstrap-server localhost:9092 \
                            --property schema.registry.url=http://localhost:8081 \
                            --topic ora-soe-05-VW_LOGON --from-beginning --max-messages 1| jq '.'
    {
      "LOGON_ID": {
        "int": 2
      },
      "CUSTOMER_ID": {
        "long": 48645
      },
      "LOGON_DATE": {
        "long": 962854648000
      }
    }
    Processed a total of 1 messages
    

What happens

SQL> DESCRIBE LOGON;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 LOGON_ID                                  NOT NULL NUMBER
 CUSTOMER_ID                               NOT NULL NUMBER
 LOGON_DATE                                         DATE

Using the ID column doesn't work:

curl -i -X POST -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
  -d '{
        "name": "jdbc_source_oracle_soe_logon_01",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
                "table.whitelist":"LOGON",
                "mode": "incrementing",
                "poll.interval.ms": "1000",
                "incrementing.column.name":"LOGON_ID",
                "topic.prefix": "ora-soe-"
        }
}'

The task fails with

org.apache.kafka.connect.errors.ConnectException: Scale of Decimal value for incrementing column must be 0

Using timestamp works but the data pulled through has the NUMBER columns as bytes, which is no use.

curl -i -X POST -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
  -d '{
        "name": "jdbc_source_oracle_soe_logon_01",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
                "table.whitelist":"LOGON",
                "mode": "timestamp",
                "poll.interval.ms": "1000",
                "timestamp.column.name":"LOGON_DATE",
                "topic.prefix": "ora-soe-",
                "validate.non.null":false
        }
}'

Sample message:

{"LOGON_ID": {"bytes": "\u0000ÖݳpÌ\u0081ä\u008E8\u0005µì4påI\u008DÍO;ʶ÷SI1½éoUÙv\u0099\f\u0003ð5j|\u0080\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}, "CUSTOMER_ID": {"bytes": "\t±Ó\u001Cluº\u000B|8åÆM0jzÏXFioF.\u0084\u008B,\f%ïYÝ\u0011\u0082À*\fjÑ\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}, "LOGON_DATE": 1526388662000}

Using 4.1.1 and "numeric.mapping":"best_fit",, no joy

curl -i -X POST -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
  -d '{
        "name": "jdbc_source_oracle_soe_logon_04",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:oracle:thin:soe/soe@localhost:1521/ORCLPDB1",
                "table.whitelist":"LOGON",
                "mode": "timestamp",
                "poll.interval.ms": "1000",
                "timestamp.column.name":"LOGON_DATE","numeric.mapping":"best_fit",
                "topic.prefix": "ora-soe-04-","validate.non.null":false,"numeric.mapping":"best_fit"
        }
}'

same bytes output:

Robin@asgard02 ~/cp> kafka-avro-console-consumer \
                        --bootstrap-server localhost:9092 \
                        --property schema.registry.url=http://localhost:8081 \
                        --topic ora-soe-04-LOGON --from-beginning --max-messages 1| jq '.'
{
  "LOGON_ID": "'ñK\u0001³èò~¯x6\"¤É^ãñ&Ý\u001cÐÀl)\u001f\u0019W¤¦ ­b»;ç\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000",
  "CUSTOMER_ID": "\u001e©/@sy/\tÍ`j;±èÂÃAâ#,ú1\u0003\u0017Ùg|ÙóNwEj\u001cH\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000",
  "LOGON_DATE": {
    "long": 946687534000
  }
}
Processed a total of 1 messages

Why does it happen?

The source data is defined as NUMERIC:

NUMBER means "store the value as given", and the JDBC metadata for the column returns a precision of 38 and scale of non-zero. The connector has to trust the metadata, so it maps that to smallest type it can: Decimal logical type (or java.math.BigDecimal).

SQL> DESCRIBE LOGON;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 LOGON_ID                                  NOT NULL NUMBER
 CUSTOMER_ID                               NOT NULL NUMBER
 LOGON_DATE                                         DATE

Compare this to when a scale is given, e.g. :

SQL> DESCRIBE WAREHOUSES;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 WAREHOUSE_ID                                       NUMBER(6)
 WAREHOUSE_NAME                                     VARCHAR2(35)
 LOCATION_ID                                        NUMBER(4)

This works fine:

Robin@asgard02 ~/cp> kafka-avro-console-consumer \
                        --bootstrap-server localhost:9092 \
                        --property schema.registry.url=http://localhost:8081 \
                        --topic ora-soe-03-WAREHOUSES --from-beginning --max-messages 1| jq '.'
{
  "WAREHOUSE_ID": {
    "int": 712
  },
  "WAREHOUSE_NAME": {
    "string": "bFLB2"
  },
  "LOCATION_ID": {
    "int": 1564
  }
}
Processed a total of 1 messages

Robin Moffatt

Read more posts by this author.

Yorkshire, UK