AtlasMapSMT.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.kafka.smt;
import java.io.File;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.atlasmap.api.AtlasSession;
import io.atlasmap.core.DefaultAtlasContext;
import io.atlasmap.core.DefaultAtlasContextFactory;
public class AtlasMapSMT<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Process AtlasMap data mapping with a Kafka Connect record";
private interface ConfigName {
String ADM_PATH = "adm.path";
String DOCID_SOURCE_KEY = "docid.source.key";
String DOCID_SOURCE_VALUE = "docid.source.value";
String DOCID_TARGET_KEY = "docid.target.key";
String DOCID_TARGET_VALUE = "docid.target.value";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.ADM_PATH, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Path for the ADM file")
.define(ConfigName.DOCID_SOURCE_KEY, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Document ID for the source key")
.define(ConfigName.DOCID_SOURCE_VALUE, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Document ID for the source value")
.define(ConfigName.DOCID_TARGET_KEY, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Document ID for the target key")
.define(ConfigName.DOCID_TARGET_VALUE, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Document ID for the target value");
private static final Logger LOG = LoggerFactory.getLogger(AtlasMapSMT.class);
private String admPath;
private String docIdSourceKey;
private String docIdSourceValue;
private String docIdTargetKey;
private String docIdTargetValue;
private DefaultAtlasContext atlasContext;
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
admPath = config.getString(ConfigName.ADM_PATH);
docIdSourceKey = config.getString(ConfigName.DOCID_SOURCE_KEY);
docIdSourceValue = config.getString(ConfigName.DOCID_SOURCE_VALUE);
docIdTargetKey = config.getString(ConfigName.DOCID_TARGET_KEY);
docIdTargetValue = config.getString(ConfigName.DOCID_TARGET_VALUE);
try {
atlasContext = DefaultAtlasContextFactory.getInstance().createContext(new File(admPath));
} catch (Exception e) {
LOG.error("Could not load ADM archive file: {}", e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
}
}
@Override
public R apply(R record) {
try {
AtlasSession session = atlasContext.createSession();
if (docIdSourceKey != null && !docIdSourceKey.isEmpty()) {
session.setSourceDocument(docIdSourceKey, record.key());
}
if (docIdSourceValue != null && !docIdSourceValue.isEmpty()) {
session.setSourceDocument(docIdSourceValue, record.value());
} else {
session.setDefaultSourceDocument(record.value());
}
atlasContext.process(session);
Object outKey = null, outValue = null;
if (docIdTargetKey != null && !docIdTargetKey.isEmpty()) {
outKey = session.getTargetDocument(docIdTargetKey);
}
if (docIdTargetValue != null && !docIdTargetValue.isEmpty()) {
outValue = session.getTargetDocument(docIdTargetValue);
} else {
outValue = session.getDefaultTargetDocument();
}
return record.newRecord(record.topic(), record.kafkaPartition(),
null, outKey, null, outValue, record.timestamp());
} catch (Exception e) {
LOG.error("Could not process AtlasMap mapping: {}", e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
return record;
}
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
}