Kafka系列之:自定义转换Transformation
如果可用的单一消息转换 (SMT) 均未提供必要的转换,可以创建自己的转换。首先要理解的一个重要概念是,通常,SMT 实现在抽象类中提供大部分逻辑。SMT 实现然后提供两个具体的子类,称为 Key 和 Value,它们指定是处理 Connect 记录的键还是值。使用转换时,用户指定 Key 或 Value 类的完全限定类名。以下是创建和使用自定义 SMT 所需的高级步骤。1.查看默认 Kafka
Kafka系列之:自定义转换Transformation
一、自定义转换
如果可用的单一消息转换 (SMT) 均未提供必要的转换,可以创建自己的转换。
首先要理解的一个重要概念是,通常,SMT 实现在抽象类中提供大部分逻辑。SMT 实现然后提供两个具体的子类,称为 Key 和 Value,它们指定是处理 Connect 记录的键还是值。使用转换时,用户指定 Key 或 Value 类的完全限定类名。
以下是创建和使用自定义 SMT 所需的高级步骤。
1.查看默认 Kafka Connect 转换中可用的不同 SMT 源 java 文件。使用其中之一作为创建新的自定义转换的基础。
以下是查看 java 源文件时需要注意的重要方法:
- 搜索apply(),看看这个方法是如何实现的。
- 搜索configure(),看看这个方法是如何实现的。
2.编写并编译源代码和单元测试。 SMT 的示例单元测试可以在 Apache Kafka GitHub 项目中找到。
3.创建您的 JAR 文件。
4.安装 JAR 文件。将自定义 SMT JAR 文件(以及转换所需的任何非 Kafka JAR 文件)复制到 Connect Worker 配置文件中的plugin.path 属性中列出的目录之一下的目录中,如下所示:
plugin.path=/usr/local/share/kafka/plugins
例如,在 /usr/local/share/kafka/plugins 下创建名为 my-custom-smt 的目录,并将 JAR 文件复制到 my-custom-smt 目录中。
确保在所有工作节点上执行此操作。
启动工作人员和连接器,然后尝试您的自定义转换。
Connect worker 记录它在 DEBUG 级别找到的每个转换类。启用 DEBUG 模式并验证是否已找到您的转换。如果没有,请检查 JAR 安装并确保它位于正确的位置。
二、Transformation示例
这段代码是一个名为HeaderToValue
的类,它实现了Kafka Connect的Transformation
接口,用来将Kafka消息中的Header信息转换成消息体的一部分。
- 首先定义了许多常量和枚举类型,包括要处理的Header名和字段名、要执行的操作类型(移动或复制)等。
- 然后定义了一些配置信息,包括
headers
、fields
和operation
三个字段,并且对它们进行了一些属性验证和说明。 - 在类的定义中,重载了
config
、configure
和apply
方法。其中,config
方法返回一个ConfigDef
对象,用于指定该类的配置信息;configure
方法读取和验证配置信息,并初始化一些内部状态;apply
方法是真正的转换逻辑,用于将Kafka消息中的Header信息转换成消息体的一部分,并将修改后的消息返回。 - 除此之外,还定义了一些辅助方法,包括根据Header信息和字段信息创建新的Schema,根据Header信息和字段信息创建新的消息体,以及一些用于日志输出和调试的辅助函数。
命名空间定义,声明该类接受Debezium框架的命名空间。
package io.debezium.transforms;
这段代码是引入需要用到的类、接口、枚举类型以及需要导入的静态方法和变量。
import static io.debezium.transforms.HeaderToValue.Operation.MOVE;
import static java.lang.String.format;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.BoundedConcurrentHashMap;
这段代码定义了一个名为HeaderToValue
的类,它实现了Kafka Connect的Transformation
接口。
public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R>
这段代码定义了一些静态常量、枚举类型以及一些field变量,用于存储和读取配置。
private static final Logger LOGGER = LoggerFactory.getLogger(HeaderToValue.class);
public static final String FIELDS_CONF = "fields";
public static final String HEADERS_CONF = "headers";
public static final String OPERATION_CONF = "operation";
private static final String MOVE_OPERATION = "move";
private static final String COPY_OPERATION = "copy";
private static final int CACHE_SIZE = 64;
public static final String NESTING_SEPARATOR = ".";
public static final String ROOT_FIELD_NAME = "payload";
enum Operation {
MOVE(MOVE_OPERATION),
COPY(COPY_OPERATION);
private final String name;
Operation(String name) {
this.name = name;
}
static Operation fromName(String name) {
switch (name) {
case MOVE_OPERATION:
return MOVE;
case COPY_OPERATION:
return COPY;
default:
throw new IllegalArgumentException();
}
}
public String toString() {
return name;
}
}
public static final Field HEADERS_FIELD = Field.create(HEADERS_CONF)
.withDisplayName("Header names list")
.withType(ConfigDef.Type.LIST)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(
Field::notContainSpaceInAnyElement,
Field::notContainEmptyElements)
.withDescription("Header names in the record whose values are to be copied or moved to record value.")
.required();
public static final Field FIELDS_FIELD = Field.create(FIELDS_CONF)
.withDisplayName("Field names list")
.withType(ConfigDef.Type.LIST)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(
Field::notContainSpaceInAnyElement,
Field::notContainEmptyElements)
.withDescription(
"Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.")
.required();
public static final Field OPERATION_FIELD = Field.create(OPERATION_CONF)
.withDisplayName("Operation: mover or copy")
.withType(ConfigDef.Type.STRING)
.withEnum(Operation.class)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Either <code>move</code> if the fields are to be moved to the value (removed from the headers), " +
"or <code>copy</code> if the fields are to be copied to the value (retained in the headers).")
.required();
private List<String> fields;
private List<String> headers;
private Operation operation;
private final BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache = new BoundedConcurrentHashMap<>(CACHE_SIZE);
private final BoundedConcurrentHashMap<Headers, Headers> headersUpdateCache = new BoundedConcurrentHashMap<>(CACHE_SIZE);
这段代码实现了Transformation
接口中的config
和configure
方法,用于处理该类的配置信息。其中,config
方法返回一个ConfigDef
对象,用于指定该类的配置信息;configure
方法读取和验证配置信息,并初始化一些内部状态。
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, HEADERS_FIELD, FIELDS_FIELD, OPERATION_FIELD);
return config;
}
@Override
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
SmtManager<R> smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(FIELDS_FIELD, HEADERS_FIELD, OPERATION_FIELD));
fields = config.getList(FIELDS_FIELD);
headers = config.getList(HEADERS_FIELD);
validateConfiguration();
operation = Operation.fromName(config.getString(OPERATION_FIELD));
}
private void validateConfiguration() {
if (headers.size() != fields.size()) {
throw new ConfigException(format("'%s' config must have the same number of elements as '%s' config.",
FIELDS_FIELD, HEADERS_FIELD));
}
}
该部分代码实现了Transformation
接口中的apply
方法,用于转换输入记录并返回转换后的记录。
在apply
方法中,该代码会首先提取需要处理的头信息,并使用给定的配置对值对象进行修改或新记录生成。最后,该方法会返回转换后的记录。整个过程中,涉及到了一些辅助方法,如removeHeaders
、isContainedIn
、makeNewSchema
等。
@Override
public R apply(R record) {
final Struct value = requireStruct(record.value(), "Header field insertion");
LOGGER.trace("Processing record {}", value);
Map<String, Header> headerToProcess = StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> headers.contains(header.key()))
.collect(Collectors.toMap(Header::key, Function.identity()));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Header to be processed: {}", headersToString(headerToProcess));
}
if (headerToProcess.isEmpty()) {
return record;
}
Schema updatedSchema = schemaUpdateCache.computeIfAbsent(value.schema(), valueSchema -> makeNewSchema(valueSchema, headerToProcess));
LOGGER.trace("Updated schema fields: {}", updatedSchema.fields());
Struct updatedValue = makeUpdatedValue(value, headerToProcess, updatedSchema);
LOGGER.trace("Updated value: {}", updatedValue);
Headers updatedHeaders = record.headers();
if (MOVE.equals(operation)) {
updatedHeaders = headersUpdateCache.computeIfAbsent(record.headers(), this::removeHeaders);
}
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp(),
updatedHeaders);
}
private Headers removeHeaders(Headers originalHeaders) {
Headers updatedHeaders = originalHeaders.duplicate();
headers.forEach(updatedHeaders::remove);
return updatedHeaders;
}
private Struct makeUpdatedValue(Struct originalValue, Map<String, Header> headerToProcess, Schema updatedSchema) {
List<String> nestedFields = fields.stream().filter(field -> field.contains(NESTING_SEPARATOR)).collect(Collectors.toList());
return buildUpdatedValue(ROOT_FIELD_NAME, originalValue, headerToProcess, updatedSchema, nestedFields, 0);
}
private Struct buildUpdatedValue(String fieldName, Struct originalValue, Map<String, Header> headerToProcess, Schema updatedSchema, List<String> nestedFields,
int level) {
Struct updatedValue = new Struct(updatedSchema);
for (org.apache.kafka.connect.data.Field field : originalValue.schema().fields()) {
if (originalValue.get(field) != null) {
if (isContainedIn(field.name(), nestedFields)) {
Struct nestedField = requireStruct(originalValue.get(field), "Nested field");
updatedValue.put(field.name(),
buildUpdatedValue(field.name(), nestedField, headerToProcess, updatedSchema.field(field.name()).schema(), nestedFields, ++level));
}
else {
updatedValue.put(field.name(), originalValue.get(field));
}
}
}
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
if (currentHeader != null) {
Optional<String> fieldNameToAdd = getFieldName(fields.get(i), fieldName, level);
fieldNameToAdd.ifPresent(s -> updatedValue.put(s, currentHeader.value()));
}
}
return updatedValue;
}
private boolean isContainedIn(String fieldName, List<String> nestedFields) {
return nestedFields.stream().anyMatch(s -> s.contains(fieldName));
}
private Schema makeNewSchema(Schema oldSchema, Map<String, Header> headerToProcess) {
List<String> nestedFields = fields.stream().filter(field -> field.contains(NESTING_SEPARATOR)).collect(Collectors.toList());
return buildNewSchema(ROOT_FIELD_NAME, oldSchema, headerToProcess, nestedFields, 0);
}
private Schema buildNewSchema(String fieldName, Schema oldSchema, Map<String, Header> headerToProcess, List<String> nestedFields, int level) {
if (oldSchema.type().isPrimitive()) {
return oldSchema;
}
// Get fields from original schema
SchemaBuilder newSchemabuilder = SchemaUtil.copySchemaBasics(oldSchema, SchemaBuilder.struct());
for (org.apache.kafka.connect.data.Field field : oldSchema.fields()) {
if (isContainedIn(field.name(), nestedFields)) {
newSchemabuilder.field(field.name(), buildNewSchema(field.name(), field.schema(), headerToProcess, nestedFields, ++level));
}
else {
newSchemabuilder.field(field.name(), field.schema());
}
}
LOGGER.debug("Fields copied from the old schema {}", newSchemabuilder.fields());
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
Optional<String> currentFieldName = getFieldName(fields.get(i), fieldName, level);
LOGGER.trace("CurrentHeader {} - currentFieldName {}", headers.get(i), currentFieldName);
if (currentFieldName.isPresent() && currentHeader != null) {
newSchemabuilder = newSchemabuilder.field(currentFieldName.get(), currentHeader.schema());
}
}
LOGGER.debug("Fields added from headers {}", newSchemabuilder.fields());
return newSchemabuilder.build();
}
private Optional<String> getFieldName(String destinationFieldName, String fieldName, int level) {
String[] nestedNames = destinationFieldName.split("\\.");
if (isRootField(fieldName, nestedNames)) {
return Optional.of(nestedNames[0]);
}
if (isChildrenOf(fieldName, level, nestedNames)) {
return Optional.of(nestedNames[level]);
}
return Optional.empty();
}
private static boolean isChildrenOf(String fieldName, int level, String[] nestedNames) {
int parentLevel = level == 0 ? 0 : level - 1;
return nestedNames[parentLevel].equals(fieldName);
}
private static boolean isRootField(String fieldName, String[] nestedNames) {
return nestedNames.length == 1 && fieldName.equals(ROOT_FIELD_NAME);
}
private String headersToString(Map<?, ?> map) {
return map.keySet().stream()
.map(key -> key + "=" + map.get(key))
.collect(Collectors.joining(", ", "{", "}"));
}
@Override
public void close() {
}
}
更多推荐
所有评论(0)