Utilizing a personalized transformer for Kafka Connect?

The objective is to create a workflow for handling nginx logs, which involves using a kafka connector to upload the logs to a topic and then using an hdfs sync connector to store them in hdfs. The data in hdfs is then analyzed using Hive. Although it is feasible to reformat the nginx logs to meet the hive metastore’s requirements, it may be possible to achieve this without altering the log format by employing a converter similar to org.apache.kafka.connect.json. As the source stated, placing the converter in the connector’s folder resolved the issue.

Question:

I’m having trouble implementing a custom converter for
Kafka Connect
. I’m seeking assistance from someone who has experience with this in order to find a solution.

Initial situation

  • my custom
    converter’s class
    path is

    custom.CustomStringConverter

    .

  • to avoid any mistakes, my custom converter is currently just a copy/paste of the pre-existing StringConverter (of course, this will change when I’ll get it to work).
    https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

  • I have a
    kafka connect
    cluster of 3 nodes, The nodes are running confluent’s official docker images (

    confluentinc/cp-kafka-connect:3.3.0

    ).

  • Each node is configured to load a jar with my converter in it (using a docker volume).

What happens ?

As per the logs, the connectors initiate and successfully load the jars along with identifying the custom converter.

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

Next, I create my connector by sending a JSON config to one of the connector nodes via a POST request.

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

And receive the following reply :

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the following 1 error(s):nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

What am I missing ?

The error message remains unchanged when attempting to run Kafka Connect in standalone mode.

Has this issue been encountered before? What could I be overlooking?


Solution:

With the help of Philip Schmitt on the Kafka Users mailing list, I was able to discover the solution.

The problem I am encountering is the same as the one mentioned in https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007.

To quote him:

I tested it by duplicating my SMT jar and modifying the plugin.path property to match the folder of the connector in use.

By placing the converter in the folder designated for the connector, I successfully eliminated the error.

Additionally, an alternative approach was attempted by generating a personalized connector and employing it in combination with the customized converter, both of which were integrated as plugins. This method also proved to be successful.

In brief, the connector loads the converters. If the connector is a plugin, then the converter should also be a plugin. However, if the connector is not a plugin (i.e. it is bundled with your Kafka Connect distribution), then the converter should not be a plugin either.

Frequently Asked Questions