DefaultAtlasPreviewContext.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.core;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.atlasmap.api.AtlasConversionException;
import io.atlasmap.api.AtlasException;
import io.atlasmap.api.AtlasPreviewContext;
import io.atlasmap.spi.AtlasCollectionHelper;
import io.atlasmap.spi.AtlasInternalSession;
import io.atlasmap.spi.AtlasModule;
import io.atlasmap.spi.FieldDirection;
import io.atlasmap.v2.AtlasMapping;
import io.atlasmap.v2.AtlasModelFactory;
import io.atlasmap.v2.AuditStatus;
import io.atlasmap.v2.Audits;
import io.atlasmap.v2.Field;
import io.atlasmap.v2.FieldGroup;
import io.atlasmap.v2.Json;
import io.atlasmap.v2.Mapping;
import io.atlasmap.v2.MappingType;
import io.atlasmap.v2.SimpleField;
/**
* Limited version of AtlasMap context dedicated for preview processing.
* Since preview exchanges field values via {@code Field} object, It doesn't interact with
* actual {@code AtlasModule} which handles data format specific work, but read the values
* from {@code Field} object in the mapping directly.
*/
class DefaultAtlasPreviewContext extends DefaultAtlasContext implements AtlasPreviewContext {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAtlasPreviewContext.class);
private Mapping originalMapping;
private ObjectMapper jsonMapper;
private PreviewModule previewModule = new PreviewModule();
private AtlasCollectionHelper collectionHelper;
DefaultAtlasPreviewContext(DefaultAtlasContextFactory factory) {
super(factory, new AtlasMapping());
this.jsonMapper = Json.withClassLoader(factory.getClassLoader());
this.collectionHelper = new DefaultAtlasCollectionHelper(factory.getFieldActionService());
}
/**
* Process single mapping entry in preview mode. Since modules don't participate
* in preview mode, any document format specific function won't be applied.
*
* @param mapping A @link{Mapping} entry to process
*/
@Override
public Audits processPreview(Mapping mapping) throws AtlasException {
DefaultAtlasSession session = new DefaultAtlasSession(this);
this.originalMapping = mapping;
Mapping cloned;
try {
byte[] serialized = jsonMapper.writeValueAsBytes(mapping);
cloned = jsonMapper.readValue(serialized, Mapping.class);
} catch (Exception e) {
throw new AtlasException(e);
}
session.head().setMapping(cloned);
MappingType mappingType = cloned.getMappingType();
String expression = cloned.getExpression();
FieldGroup sourceFieldGroup = cloned.getInputFieldGroup();
List<Field> sourceFields = cloned.getInputField();
List<Field> targetFields = cloned.getOutputField();
targetFields.forEach(tf -> tf.setValue(null));
if ((sourceFieldGroup == null && sourceFields.isEmpty()) || targetFields.isEmpty()) {
return session.getAudits();
}
if (sourceFieldGroup != null) {
sourceFields = sourceFieldGroup.getField();
}
for (Field sf : sourceFields) {
if (sf.getFieldType() == null || sf.getValue() == null) {
continue;
}
if (sf.getValue() instanceof String && ((String)sf.getValue()).isEmpty()) {
continue;
}
if (!restoreSourceFieldType(session, sf)) {
return session.getAudits();
}
}
processSourceFieldMapping(session);
if (session.hasErrors()) {
return session.getAudits();
}
Field sourceField = session.head().getSourceField();
Field targetField;
if (mappingType == null || mappingType == MappingType.MAP) {
sourceFieldGroup = sourceField instanceof FieldGroup ? (FieldGroup) sourceField : null;
for (int i=0; i<targetFields.size(); i++) {
targetField = targetFields.get(i);
session.head().setTargetField(targetField);
if (sourceFieldGroup != null) {
if (sourceFieldGroup.getField().size() == 0) {
AtlasUtil.addAudit(session, targetField, String.format(
"Skipping empty source group field '%s:%s'",
sourceField.getDocId(), sourceField.getPath()),
AuditStatus.INFO, null);
continue;
}
Integer index = targetField.getIndex();
AtlasPath targetPath = new AtlasPath(targetField.getPath());
if (targetPath.hasCollection() && !targetPath.isIndexedCollection()) {
if (targetFields.size() > 1) {
AtlasUtil.addAudit(session, targetField,
"It's not yet supported to have a collection field as a part of multiple target fields in a same mapping",
AuditStatus.ERROR, null);
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
if (index != null) {
LOG.warn("Field index '{}' is detected on target field '{}:{}' while there's only one target field, ignoring",
index, targetField.getDocId(), targetField.getPath());
targetField.setIndex(null);
}
FieldGroup targetFieldGroup = targetField instanceof FieldGroup
? (FieldGroup)targetField
: AtlasModelFactory.createFieldGroupFrom(targetField, true);
targetFields.set(i, targetFieldGroup);
Field previousTargetField = null;
for (Field subSourceField : sourceFieldGroup.getField()) {
Field subTargetField = AtlasModelFactory.cloneFieldToSimpleField(targetFieldGroup);
targetFieldGroup.getField().add(subTargetField);
collectionHelper.copyCollectionIndexes(sourceFieldGroup, subSourceField, subTargetField, previousTargetField);
previousTargetField = subTargetField;
if (!convertSourceToTarget(session, subSourceField, subTargetField)) {
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
};
Field processed = subTargetField;
if (expression == null || expression.isEmpty()) {
processed = applyFieldActions(session, subTargetField);
}
subTargetField.setValue(processed.getValue());
}
continue;
} else if (index == null) {
session.head().setSourceField(sourceFieldGroup.getField().get(sourceFieldGroup.getField().size()-1));
} else {
if (sourceFieldGroup.getField().size() > index) {
session.head().setSourceField(sourceFieldGroup.getField().get(index));
} else {
AtlasUtil.addAudit(session, targetField, String.format(
"The number of source fields '%s' is fewer than expected via target field index '%s'",
sourceFieldGroup.getField().size(), targetField.getIndex()),
AuditStatus.WARN, null);
continue;
}
}
}
if (session.hasErrors()) {
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
if (!convertSourceToTarget(session, session.head().getSourceField(), targetField)) {
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
Field processed = targetField;
if (expression == null || expression.isEmpty()) {
processed = applyFieldActions(session, targetField);
}
targetField.setValue(processed.getValue());
}
} else if (mappingType == MappingType.COMBINE) {
targetField = targetFields.get(0);
Field combined = processCombineField(session, cloned, sourceFields, targetField);
if (!convertSourceToTarget(session, combined, targetField)) {
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
applyFieldActions(session, targetField);
} else if (mappingType == MappingType.SEPARATE) {
List<Field> separatedFields;
try {
separatedFields = processSeparateField(session, cloned, sourceField);
} catch (AtlasException e) {
AtlasUtil.addAudit(session, sourceField, String.format(
"Failed to separate field: %s", AtlasUtil.getChainedMessage(e)),
AuditStatus.ERROR, null);
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
if (separatedFields == null) {
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
for (Field f : targetFields) {
targetField = f;
if (targetField.getIndex() == null || targetField.getIndex() < 0) {
AtlasUtil.addAudit(session, targetField, String.format(
"Separate requires zero or positive Index value to be set on targetField targetField.path=%s",
targetField.getPath()), AuditStatus.WARN, null);
continue;
}
if (separatedFields.size() <= targetField.getIndex()) {
String errorMessage = String.format(
"Separate returned fewer segments count=%s when targetField.path=%s requested index=%s",
separatedFields.size(), targetField.getPath(), targetField.getIndex());
AtlasUtil.addAudit(session, targetField, errorMessage, AuditStatus.WARN, null);
break;
}
if (!convertSourceToTarget(session, separatedFields.get(targetField.getIndex()), targetField)) {
break;
}
applyFieldActions(session, targetField);
}
} else {
AtlasUtil.addAudit(session, (String)null, String.format(
"Unsupported mappingType=%s detected", cloned.getMappingType()),
AuditStatus.ERROR, null);
}
mapping.getOutputField().clear();
mapping.getOutputField().addAll(cloned.getOutputField());
session.getAudits().getAudit().addAll(session.head().getAudits());
return session.getAudits();
}
private boolean restoreSourceFieldType(DefaultAtlasSession session, Field sourceField) throws AtlasException {
try {
Object sourceValue = getContextFactory().getConversionService().convertType(
sourceField.getValue(), null, sourceField.getFieldType(), null);
sourceField.setValue(sourceValue);
} catch (AtlasConversionException e) {
AtlasUtil.addAudit(session, sourceField, String.format(
"Wrong format for source value : %s", AtlasUtil.getChainedMessage(e)),
AuditStatus.ERROR, null);
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
return false;
}
return true;
}
private boolean convertSourceToTarget(DefaultAtlasSession session, Field sourceField, Field targetField)
throws AtlasException {
Object targetValue = null;
if (sourceField.getFieldType() != null && sourceField.getFieldType().equals(targetField.getFieldType())) {
targetValue = sourceField.getValue();
} else if (sourceField.getValue() != null) {
try {
targetValue = getContextFactory().getConversionService().convertType(sourceField.getValue(), sourceField.getFormat(),
targetField.getFieldType(), targetField.getFormat());
} catch (AtlasConversionException e) {
AtlasUtil.addAudit(session, targetField, String.format(
"Failed to convert source value to target type: %s", AtlasUtil.getChainedMessage(e)),
AuditStatus.ERROR, null);
if (LOG.isDebugEnabled()) {
LOG.error("", e);
}
return false;
}
}
targetField.setValue(targetValue);
return true;
}
private class PreviewModule extends BaseAtlasModule {
@Override
public void readSourceValue(AtlasInternalSession session) throws AtlasException {
Field sourceField = session.head().getSourceField();
Mapping mapping = session.head().getMapping();
FieldGroup sourceFieldGroup = mapping.getInputFieldGroup();
if (sourceFieldGroup != null) {
if (matches(sourceField, sourceFieldGroup)) {
session.head().setSourceField(sourceFieldGroup);
return;
}
Field f = readFromGroup(sourceFieldGroup, sourceField);
session.head().setSourceField(f);
return;
}
for (Field f : mapping.getInputField()) {
if (matches(sourceField, f)) {
session.head().setSourceField(f);
return;
}
}
}
private boolean matches(Field f1, Field f2) {
if ((f1.getDocId() == null && f2.getDocId() != null)
|| (f1.getDocId() != null && f2.getDocId() == null)
|| (f1.getDocId() != null && !f1.getDocId().equals(f2.getDocId()))) {
return false;
}
if (f2.getPath() != null && f2.getPath().equals(f1.getPath())) {
return true;
}
return false;
}
private Field readFromGroup(FieldGroup group, Field field) {
if (group.getField() == null) {
return null;
}
for (Field f : group.getField()) {
if (matches(field, f)) {
return f;
}
if (f instanceof FieldGroup) {
Field deeper = readFromGroup((FieldGroup)f, field);
if (deeper != null) {
return deeper;
}
}
}
return null;
}
@Override
public Boolean isSupportedField(Field field) {
// The field type doesn't matter for preview
return true;
}
@Override
public void processPreValidation(AtlasInternalSession session) throws AtlasException {
}
@Override
public void processPreSourceExecution(AtlasInternalSession session) throws AtlasException {
}
@Override
public void processPreTargetExecution(AtlasInternalSession session) throws AtlasException {
}
@Override
public void writeTargetValue(AtlasInternalSession session) throws AtlasException {
}
@Override
public void processPostSourceExecution(AtlasInternalSession session) throws AtlasException {
}
@Override
public void processPostTargetExecution(AtlasInternalSession session) throws AtlasException {
}
@Override
public Field cloneField(Field field) throws AtlasException {
return null;
}
@Override
public String getDocName() {
return "Preview";
}
@Override
public String getDocId() {
return "Preview";
}
@Override
public SimpleField createField() {
return new SimpleField();
}
};
@Override
public Map<String, AtlasModule> getSourceModules() {
return new HashMap<String, AtlasModule>() {
private static final long serialVersionUID = 1L;
@Override
public AtlasModule get(Object key) {
return previewModule;
}
};
}
@Override
protected AtlasModule resolveModule(FieldDirection direction, Field field) {
return previewModule;
}
}