DefaultAtlasContext.java

/*
 * Copyright (C) 2017 Red Hat, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.atlasmap.core;

import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.nio.file.Paths;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.management.ObjectName;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.atlasmap.api.AtlasConstants;
import io.atlasmap.api.AtlasContext;
import io.atlasmap.api.AtlasContextFactory;
import io.atlasmap.api.AtlasConversionException;
import io.atlasmap.api.AtlasException;
import io.atlasmap.api.AtlasSession;
import io.atlasmap.mxbean.AtlasContextMXBean;
import io.atlasmap.spi.AtlasInternalSession;
import io.atlasmap.spi.AtlasModule;
import io.atlasmap.spi.AtlasModuleInfo;
import io.atlasmap.spi.AtlasModuleInfoRegistry;
import io.atlasmap.spi.AtlasModuleMode;
import io.atlasmap.spi.FieldDirection;
import io.atlasmap.spi.StringDelimiter;
import io.atlasmap.v2.AtlasMapping;
import io.atlasmap.v2.AtlasModelFactory;
import io.atlasmap.v2.AuditStatus;
import io.atlasmap.v2.Audits;
import io.atlasmap.v2.BaseMapping;
import io.atlasmap.v2.Collection;
import io.atlasmap.v2.ConstantField;
import io.atlasmap.v2.CopyTo;
import io.atlasmap.v2.CustomMapping;
import io.atlasmap.v2.DataSource;
import io.atlasmap.v2.DataSourceKey;
import io.atlasmap.v2.DataSourceMetadata;
import io.atlasmap.v2.DataSourceType;
import io.atlasmap.v2.Field;
import io.atlasmap.v2.FieldGroup;
import io.atlasmap.v2.FieldType;
import io.atlasmap.v2.LookupTable;
import io.atlasmap.v2.Mapping;
import io.atlasmap.v2.MappingType;
import io.atlasmap.v2.Mappings;
import io.atlasmap.v2.PropertyField;
import io.atlasmap.v2.SimpleField;
import io.atlasmap.v2.Validation;
import io.atlasmap.v2.Validations;

public class DefaultAtlasContext implements AtlasContext, AtlasContextMXBean {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultAtlasContext.class);
    private ObjectName jmxObjectName;
    private final UUID uuid;
    private DefaultAtlasContextFactory factory;
    private URI atlasMappingUri;
    private ADMArchiveHandler admHandler;
    private Map<String, AtlasModule> sourceModules = new HashMap<>();
    private Map<String, AtlasModule> targetModules = new HashMap<>();
    private Map<String, LookupTable> lookupTables = new HashMap<>();
    private Map<DataSourceKey, DataSourceMetadata> dataSourceMetadataMap;
    private boolean initialized;

    public DefaultAtlasContext(URI atlasMappingUri) {
        this(DefaultAtlasContextFactory.getInstance(), atlasMappingUri);
    }

    public DefaultAtlasContext(DefaultAtlasContextFactory factory, URI atlasMappingUri) {
        this.factory = factory;
        this.uuid = UUID.randomUUID();
        this.atlasMappingUri = atlasMappingUri;
    }

    public DefaultAtlasContext(DefaultAtlasContextFactory factory, AtlasMapping mapping) {
        this.factory = factory;
        this.uuid = UUID.randomUUID();
        this.admHandler = new ADMArchiveHandler(factory.getClassLoader());
        this.admHandler.setIgnoreLibrary(true);
        this.admHandler.setMappingDefinition(mapping);
    }

    public DefaultAtlasContext(DefaultAtlasContextFactory factory,
            AtlasContextFactory.Format format, InputStream stream) throws AtlasException {
        this.factory = factory;
        this.uuid = UUID.randomUUID();
        this.admHandler = new ADMArchiveHandler(factory.getClassLoader());
        this.admHandler.setIgnoreLibrary(true);
        this.admHandler.load(format, stream);
        this.dataSourceMetadataMap = this.admHandler.getDataSourceMetadataMap();
    }

    /**
     * Compare the core version to the extract mapping version.  No mapping version
     * we'll accept as ok, otherwise compare major.minor.
     *
     * @param coreVersion - this runtime version
     * @param mappingVersion - the extracted mapping version
     *
     * @return true if valid, false otherwise
     */
    private boolean validateVersion(String coreVersion, String mappingVersion) {
        if (mappingVersion != null && mappingVersion.length() > 0) {
            String[] mappingVersionComps = mappingVersion.split("\\.");
            String[] coreVersionComps = coreVersion.split("\\.");
            if (coreVersionComps.length < 2 || mappingVersionComps.length < 2) {
                return false;
            }
            if (Integer.parseInt(coreVersionComps[0]) < Integer.parseInt(mappingVersionComps[0])) {
                return false;
            }
            if (Integer.parseInt(coreVersionComps[0]) == Integer.parseInt(mappingVersionComps[0]) &&
                Integer.parseInt(coreVersionComps[1]) < Integer.parseInt(mappingVersionComps[1])) {
                return false;
            }
        }
        return true;
    }

    /**
     * TODO: For dynamic re-load. This needs lock()
     *
     * @throws AtlasException failed to initialize
     */
    protected synchronized void init() throws AtlasException {

        if (this.initialized) {
            return;
        }

        registerJmx(this);

        if (this.atlasMappingUri != null) {
            this.admHandler = new ADMArchiveHandler(factory.getClassLoader());
            this.admHandler.setIgnoreLibrary(true);
            this.admHandler.load(Paths.get(this.atlasMappingUri));
            this.dataSourceMetadataMap = this.admHandler.getDataSourceMetadataMap();
        }
        if (this.admHandler == null || this.admHandler.getMappingDefinition() == null) {
            LOG.warn("AtlasMap context cannot initialize without mapping definition, ignoring:"
                    + " Mapping URI={}"
                    , this.atlasMappingUri);
            return;
        }
        AtlasMapping atlasMapping = this.admHandler.getMappingDefinition();
        String version = factory.getProperties().get(AtlasContextFactory.PROPERTY_ATLASMAP_CORE_VERSION);
        String mappingVersion = atlasMapping.getVersion();
        if (!validateVersion(version, mappingVersion)) {
            LOG.error("Mapping definition version {} detected. It may not work as expected with runtime version {}.",
                mappingVersion,
                version);
        }
        sourceModules.clear();
        ConstantModule constant = new ConstantModule();
        constant.setConversionService(factory.getConversionService());
        constant.setFieldActionService(factory.getFieldActionService());
        sourceModules.put(AtlasConstants.CONSTANTS_DOCUMENT_ID, constant);
        PropertyModule property = new PropertyModule(factory.getPropertyStrategy());
        property.setConversionService(factory.getConversionService());
        property.setFieldActionService(factory.getFieldActionService());
        property.setMode(AtlasModuleMode.SOURCE);
        sourceModules.put(AtlasConstants.PROPERTIES_SOURCE_DOCUMENT_ID, property);
        targetModules.clear();
        property = new PropertyModule(factory.getPropertyStrategy());
        property.setConversionService(factory.getConversionService());
        property.setFieldActionService(factory.getFieldActionService());
        property.setMode(AtlasModuleMode.TARGET);
        targetModules.put(AtlasConstants.PROPERTIES_TARGET_DOCUMENT_ID, property);

        lookupTables.clear();
        if (admHandler.getMappingDefinition().getLookupTables() != null
                && admHandler.getMappingDefinition().getLookupTables().getLookupTable() != null) {
            for (LookupTable table : admHandler.getMappingDefinition().getLookupTables().getLookupTable()) {
                lookupTables.put(table.getName(), table);
            }
        }

        AtlasModuleInfoRegistry moduleInfoRegistry = factory.getModuleInfoRegistry();
        for (DataSource ds : admHandler.getMappingDefinition().getDataSource()) {
            AtlasModuleInfo moduleInfo = moduleInfoRegistry.lookupByUri(ds.getUri());
            if (moduleInfo == null) {
                LOG.error("Cannot find module info for the DataSource uri '{}'", ds.getUri());
                continue;
            }
            if (ds.getDataSourceType() != DataSourceType.SOURCE && ds.getDataSourceType() != DataSourceType.TARGET) {
                LOG.error("Unsupported DataSource type '{}'", ds.getDataSourceType());
                continue;
            }

            String docId = ds.getId();
            if (docId == null || docId.isEmpty()) {
                docId = ds.getDataSourceType() == DataSourceType.SOURCE ? AtlasConstants.DEFAULT_SOURCE_DOCUMENT_ID
                        : AtlasConstants.DEFAULT_TARGET_DOCUMENT_ID;
            }
            if (ds.getDataSourceType() == DataSourceType.SOURCE && sourceModules.containsKey(docId)) {
                LOG.error("Duplicated {} DataSource ID '{}' was detected, ignoring...", ds.getDataSourceType(),
                        ds.getId());
                continue;
            }
            if (ds.getDataSourceType() == DataSourceType.TARGET && targetModules.containsKey(docId)) {
                LOG.error("Duplicated {} DataSource ID '{}' was detected, ignoring...", ds.getDataSourceType(), docId);
                continue;
            }

            try {
                AtlasModule module = moduleInfo.getModuleClass().getDeclaredConstructor().newInstance();
                module.setClassLoader(factory.getClassLoader());
                module.setConversionService(factory.getConversionService());
                module.setFieldActionService(factory.getFieldActionService());
                module.setUri(ds.getUri());
                if (ds.getDataSourceType() == DataSourceType.SOURCE) {
                    module.setMode(AtlasModuleMode.SOURCE);
                    getSourceModules().put(docId, module);
                } else if (ds.getDataSourceType() == DataSourceType.TARGET) {
                    module.setMode(AtlasModuleMode.TARGET);
                    getTargetModules().put(docId, module);
                }
                module.setDocId(docId);
                module.setDocName(ds.getName());
                if (this.dataSourceMetadataMap != null) {
                    DataSourceKey dskey = new DataSourceKey(ds.getDataSourceType() == DataSourceType.SOURCE, docId);
                    DataSourceMetadata meta = this.dataSourceMetadataMap.get(dskey);
                    if (meta != null) {
                        module.setDataSourceMetadata(meta);
                    }
                }
                module.init();
            } catch (Exception t) {
                LOG.error("Unable to initialize {} module: {}", ds.getDataSourceType(), moduleInfo);
                LOG.error(t.getMessage(), t);
                throw new AtlasException(String.format("Unable to initialize %s module: %s", ds.getDataSourceType(),
                        moduleInfo.toString()), t);
            }
        }
        initialized = true;
    }

    protected void registerJmx(DefaultAtlasContext context) {
        try {
            setJmxObjectName(new ObjectName(
                    getContextFactory().getJmxObjectName() + ",context=Contexts,uuid=" + uuid.toString()));
            if (ManagementFactory.getPlatformMBeanServer().isRegistered(getJmxObjectName())) {
                ManagementFactory.getPlatformMBeanServer().registerMBean(this, getJmxObjectName());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registered AtlasContext {} with JMX", context.getUuid());
                }
            }
        } catch (Exception t) {
            LOG.warn("Failed to register AtlasContext {} with JMX", context.getUuid());
            LOG.warn(t.getMessage(), t);
        }
    }

    /**
     * Process single mapping entry in preview mode. Since modules don't participate
     * in preview mode, any document format specific function won't be applied.
     * @deprecated Use {@code AtlasPreviewContext#processPreview(Mapping)}
     *
     * @param mapping A @link{Mapping} entry to process
     */
    @Override
    @Deprecated
    public Audits processPreview(Mapping mapping) throws AtlasException {
        return this.factory.createPreviewContext().processPreview(mapping);
    }

    protected Field applyFieldActions(DefaultAtlasSession session, Field field) {
        if (field.getActions() == null) {
            return field;
        }
        try {
            return factory.getFieldActionService().processActions(session, field);
        } catch (AtlasException e) {
            AtlasUtil.addAudit(session, field, String.format(
                    "Failed to apply field action: %s", AtlasUtil.getChainedMessage(e)),
                    AuditStatus.ERROR, null);
            if (LOG.isDebugEnabled()) {
                LOG.error("", e);
            }
            return field;
        }
    }

    /**
     * Process session lifecycle.
     *
     */
    @Override
    public void process(AtlasSession userSession) throws AtlasException {
        if (!(userSession instanceof DefaultAtlasSession)) {
            throw new AtlasException(String.format("Unsupported session class '%s'", userSession.getClass().getName()));
        }
        if (!this.equals(userSession.getAtlasContext())) {
            throw new AtlasException("Cannot execute AtlasSession created by the other AtlasContext");
        }

        DefaultAtlasSession session = (DefaultAtlasSession) userSession;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin process {}", (session == null ? null : session.toString()));
        }

        session.head().unset();
        session.getAudits().getAudit().clear();
        session.getValidations().getValidation().clear();

        // TODO https://github.com/atlasmap/atlasmap/issues/863 - Add an option to enable/disable runtime validation
        processValidation(session);
        for (Validation v : session.getValidations().getValidation()) {
            AtlasUtil.addAudit(session, v);
        }

        // Additional runtime only audit
        Mappings mappings = session.getMapping().getMappings();
        if (mappings != null && mappings.getMapping().isEmpty()) {
            AtlasUtil.addAudit(session, (String)null,
                String.format("Field mappings should not be empty"),
                AuditStatus.WARN, null);
        }

        session.getValidations().getValidation().clear();
        if (session.hasErrors()) {
            if (LOG.isDebugEnabled()) {
                LOG.error("Aborting due to {} errors in pre-validation", session.errorCount());
            }
            return;
        }

        for (AtlasModule module : getSourceModules().values()) {
            module.processPreSourceExecution(session);
        }
        for (AtlasModule module : getTargetModules().values()) {
            module.processPreTargetExecution(session);
        }

        if (session.hasErrors()) {
            if (LOG.isDebugEnabled()) {
                LOG.error("Aborting due to {} errors in pre-execution", session.errorCount());
            }
            return;
        }

        for (BaseMapping baseMapping : session.getMapping().getMappings().getMapping()) {
            for (BaseMapping innerMapping : unwrapCollectionMappings(session, baseMapping)) {
                if (innerMapping instanceof CustomMapping) {
                    DefaultAtlasCustomMappingProcessor.getInstance().process(
                            session, (CustomMapping)innerMapping);
                    continue;
                }

                Mapping mapping = (Mapping) innerMapping;
                session.head().setMapping(mapping).setLookupTable(lookupTables.get(mapping.getLookupTableName()));

                if (mapping.getOutputField() == null || mapping.getOutputField().isEmpty()) {
                    AtlasUtil.addAudit(session, (String)null,
                            String.format("Mapping does not contain at least one target field: alias=%s desc=%s",
                                    mapping.getAlias(), mapping.getDescription()),
                            AuditStatus.WARN, null);
                    continue;
                }

                processSourceFieldMapping(session);
                if (!session.head().hasError()) {
                    processTargetFieldMapping(session, mapping);
                }
                session.getAudits().getAudit().addAll(session.head().getAudits());
                session.head().unset();
            }
        }

        for (AtlasModule module : getSourceModules().values()) {
            module.processPostValidation(session);
        }
        for (AtlasModule module : getTargetModules().values()) {
            module.processPostValidation(session);
        }

        for (AtlasModule module : getSourceModules().values()) {
            module.processPostSourceExecution(session);
        }
        for (AtlasModule module : getTargetModules().values()) {
            module.processPostTargetExecution(session);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("End process {}", session == null ? null : session.toString());
        }
    }

    // just unwrap collection mappings to be compatible with older UI
    private List<BaseMapping> unwrapCollectionMappings(DefaultAtlasSession session, BaseMapping baseMapping) {
        if (baseMapping.getMappingType() == null || !baseMapping.getMappingType().equals(MappingType.COLLECTION)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Mapping is not a collection mapping, not cloning: {}", baseMapping);
            }
            return Arrays.asList((BaseMapping) baseMapping);
        }

        List<BaseMapping> mappings = new LinkedList<>();
        for(BaseMapping m : ((Collection) baseMapping).getMappings().getMapping()) {
            mappings.add(m);
        }
        return mappings;
    }

    protected void processSourceFieldMapping(DefaultAtlasSession session) {
        try {
            Mapping mapping = session.head().getMapping();
            if (mapping.getInputFieldGroup() != null) {
                if (mapping.getExpression() != null) {
                    session.head().setSourceField(mapping.getInputFieldGroup());
                    DefaultAtlasExpressionProcessor.processExpression(session, mapping.getExpression());
                } else {
                    processSourceFieldGroup(session, mapping.getInputFieldGroup());
                }
            } else if (mapping.getInputField() != null && !mapping.getInputField().isEmpty()) {
                if (mapping.getExpression() != null) {
                    FieldGroup sourceFieldGroup = new FieldGroup();
                    sourceFieldGroup.getField().addAll(mapping.getInputField());
                    session.head().setSourceField(sourceFieldGroup);
                    DefaultAtlasExpressionProcessor.processExpression(session, mapping.getExpression());
                } else {
                    List<Field> sourceFields = mapping.getInputField();
                    applyCopyToActions(sourceFields, mapping);
                    processSourceFields(session, sourceFields);
                }
            } else {
                session.head().addAudit(AuditStatus.WARN, null, String.format(
                    "Mapping does not contain expression or at least one source field: alias=%s desc=%s",
                    mapping.getAlias(), mapping.getDescription()));
            }
        } catch (Exception t) {
            Field sourceField = session.head().getSourceField();
            String docId = sourceField != null ? sourceField.getDocId() : null;
            String path =  sourceField != null ? sourceField.getPath() : null;
            session.head().addAudit(AuditStatus.ERROR, sourceField, String.format(
                    "Unexpected exception is thrown while reading source field: %s", t.getMessage()));
            if (LOG.isDebugEnabled()) {
                LOG.error("", t);
            }
        }
    }

    private void processSourceFieldGroup(DefaultAtlasSession session, FieldGroup sourceFieldGroup) throws AtlasException {
        processSourceFields(session, sourceFieldGroup.getField());
        session.head().setSourceField(sourceFieldGroup);
        Field processed = applyFieldActions(session, session.head().getSourceField());
        session.head().setSourceField(processed);
    }

    private void processSourceFields(DefaultAtlasSession session, List<Field> sourceFields)
            throws AtlasException {
        for (int i = 0; i < sourceFields.size(); i++) {
            Field sourceField = sourceFields.get(i);
            session.head().setSourceField(sourceField);
            if (sourceField instanceof FieldGroup) {
                processSourceFields(session, ((FieldGroup)sourceField).getField());
                Field processed = applyFieldActions(session, sourceField);
                session.head().setSourceField(processed);
                continue;
            }

            AtlasModule module = resolveModule(FieldDirection.SOURCE, sourceField);
            if (module == null) {
                AtlasUtil.addAudit(session, sourceField,
                        String.format("Module not found for docId '%s'", sourceField.getDocId()),
                        AuditStatus.ERROR, null);
                return;
            }
            if (!module.isSupportedField(sourceField)) {
                AtlasUtil.addAudit(session, sourceField,
                        String.format("Unsupported source field type '%s' for DataSource '%s'",
                                sourceField.getClass().getName(), module.getUri()),
                        AuditStatus.ERROR, null);
                return;
            }

            module.readSourceValue(session);
            Field processed = applyFieldActions(session, session.head().getSourceField());
            session.head().setSourceField(processed);
            sourceFields.set(i, processed);
        }
    }

    protected AtlasModule resolveModule(FieldDirection direction, Field field) {
        String docId = field.getDocId();
        if (docId == null || docId.isEmpty()) {
            docId = direction == FieldDirection.SOURCE ? AtlasConstants.DEFAULT_SOURCE_DOCUMENT_ID
                    : AtlasConstants.DEFAULT_TARGET_DOCUMENT_ID;
        }
        Map<String, AtlasModule> modules =
                direction == FieldDirection.SOURCE ? sourceModules : targetModules;
        if (direction == FieldDirection.SOURCE && field instanceof ConstantField) {
            AtlasModule answer = sourceModules.get(AtlasConstants.CONSTANTS_DOCUMENT_ID);
            if (!modules.containsKey(docId)) {
                modules.put(docId, answer);
            }
            return answer;
        }
        if (field instanceof PropertyField) {
            AtlasModule answer = modules.get(
                    direction == FieldDirection.SOURCE ? AtlasConstants.PROPERTIES_SOURCE_DOCUMENT_ID : AtlasConstants.PROPERTIES_TARGET_DOCUMENT_ID);
            if (!modules.containsKey(docId)) {
                modules.put(docId, answer);
            }
            return answer;
        }
        return modules.get(docId);
    }

    /**
     * Checks for CopyTo actions and correctly sets the path for targetField by setting the indexes specified in each action
     */
    private void applyCopyToActions(List<Field> sourceFields, Mapping mapping) {
        for (Field sourceField : sourceFields) {

            if (sourceField instanceof FieldGroup) {
                applyCopyToActions(((FieldGroup) sourceField).getField(), mapping);
                continue;
            }

            if (sourceField.getActions() == null) {
                continue;
            }

            List<CopyTo> copyTos = sourceField.getActions().stream().filter(a -> a instanceof CopyTo).map(a -> (CopyTo) a).collect(Collectors.toList());
            if (copyTos.size() == 0) {
                return;
            }

            if (copyTos.stream().flatMap(c -> c.getIndexes().stream().filter(i -> i < 0)).count() > 0) {
                throw new IllegalArgumentException("Indexes must be >= 0");
            }

            /*
             * For each index present in CopyTo, set the corresponding index in the path.
             * each index of copyTo is supposed to have a counterpart in the path.
             */
            for (CopyTo copyTo : copyTos) {
                for (Field field : mapping.getOutputField()) {
                    AtlasPath path = new AtlasPath(field.getPath());
                    List<AtlasPath.SegmentContext> segments = path.getCollectionSegments(true);
                    for (int i = 0; i < copyTo.getIndexes().size(); i++) {
                        if (i < segments.size()) { // In case there are too many indexes specified
                            path.setCollectionIndex(i + 1, copyTo.getIndexes().get(i));// +1 since 0 is the root segment
                        }
                    }
                    field.setPath(path.toString());
                }
                // The processor associated to this action is a fake. It shall not execute, so remove the action.
                sourceField.getActions().remove(copyTo);
            }
        }
    }

    private void processTargetFieldMapping(DefaultAtlasSession session, Mapping mapping) {
        MappingType mappingType = mapping.getMappingType();
        List<Field> sourceFields = mapping.getInputField();
        List<Field> targetFields = mapping.getOutputField();

        AtlasModule module = null;
        Field targetField = null;
        if (mappingType == null || mappingType == MappingType.LOOKUP || mappingType == MappingType.MAP) {
            Field sourceField = session.head().getSourceField();
            FieldGroup sourceFieldGroup = null;
            if (sourceField instanceof FieldGroup) {
                sourceFieldGroup = unwrapNestedGroup((FieldGroup)sourceField);
            }
            for (Field f : targetFields) {
                targetField = f;
                module = resolveModule(FieldDirection.TARGET, targetField);
                if (!auditTargetFieldType(session, module, targetField)) {
                    continue;
                }
                session.head().setTargetField(targetField);
                if (sourceFieldGroup != null) {
                    Integer index = targetField.getIndex();
                    AtlasPath targetPath = new AtlasPath(targetField.getPath());
                    if (targetPath.hasCollection() && !targetPath.isIndexedCollection()) {
                        if (targetFields.size() > 1) {
                            AtlasUtil.addAudit(session, targetField,
                                    "It's not yet supported to have a collection field as a part of multiple target fields in a same mapping",
                                    AuditStatus.ERROR, null);
                            return;
                        }
                        session.head().setSourceField(sourceFieldGroup);
                    } else if (index == null) {
                        if (sourceFieldGroup.getField().size() > 0) {
                            session.head().setSourceField(sourceFieldGroup.getField().get(sourceFieldGroup.getField().size()-1));
                        }
                    } else {
                        if (sourceFieldGroup.getField().size() > index) {
                            session.head().setSourceField(sourceFieldGroup.getField().get(index));
                        } else {
                            AtlasUtil.addAudit(session, targetField, String.format(
                                    "The number of source fields '%s' is fewer than expected via target field index '%s'",
                                    sourceFieldGroup.getField().size(), targetField.getIndex()),
                                    AuditStatus.WARN, null);
                            continue;
                        }
                    }
                }
                try {
                    module.populateTargetField(session);
                } catch (Exception e) {
                    AtlasUtil.addAudit(session, targetField,
                            "Failed to populate target field: " + e.getMessage(),
                            AuditStatus.ERROR, null);
                    if (LOG.isDebugEnabled()) {
                        LOG.error(String.format("populateTargetField() failed for %s:%s",
                                targetField.getDocId(), targetField.getPath()), e);
                    }
                    return;
                }
                Field processed = applyFieldActions(session, session.head().getTargetField());
                session.head().setTargetField(processed);
                try {
                    module.writeTargetValue(session);
                } catch (Exception e) {
                    AtlasUtil.addAudit(session, targetField,
                            "Failed to write field value into target document: " + e.getMessage(),
                            AuditStatus.ERROR, null);
                    if (LOG.isDebugEnabled()) {
                        LOG.error(String.format("writeTargetValue() failed for %s:%s",
                                targetField.getDocId(), targetField.getPath()), e);
                    }
                    return;
                }
            }
            return;

        } else if (mappingType == MappingType.COMBINE) {
            targetField = targetFields.get(0);
            module = resolveModule(FieldDirection.TARGET, targetField);
            if (!auditTargetFieldType(session, module, targetField)) {
                return;
            }
            Field sourceField = processCombineField(session, mapping, sourceFields, targetField);
            session.head().setSourceField(sourceField).setTargetField(targetField);
            try {
                module.populateTargetField(session);
            } catch (Exception e) {
                AtlasUtil.addAudit(session, targetField,
                        "Failed to populate target field: " + e.getMessage(),
                        AuditStatus.ERROR, null);
                return;
            }
            applyFieldActions(session, session.head().getTargetField());
            try {
                module.writeTargetValue(session);
            } catch (Exception e) {
                AtlasUtil.addAudit(session, targetField,
                        "Failed to write field value into target document: " + e.getMessage(),
                        AuditStatus.ERROR, null);
                return;
            }
            return;

        } else if (mappingType == MappingType.SEPARATE) {
            List<Field> separatedFields = null;
            try {
                separatedFields = processSeparateField(session, mapping, sourceFields.get(0));
            } catch (Exception e) {
                AtlasUtil.addAudit(session, targetField,
                        "Failed to process separate mode: " + e.getMessage(),
                        AuditStatus.ERROR, null);
                return;
            }
            if (separatedFields == null) {
                return;
            }

            for (Field f : targetFields) {
                targetField = f;
                module = resolveModule(FieldDirection.TARGET, targetField);
                if (!auditTargetFieldType(session, module, targetField)) {
                    continue;
                }
                if (targetField.getIndex() == null || targetField.getIndex() < 0) {
                    AtlasUtil.addAudit(session, targetField, String.format(
                            "Separate requires zero or positive Index value to be set on targetField targetField.path=%s",
                            targetField.getPath()), AuditStatus.WARN, null);
                    continue;
                }
                if (separatedFields.size() <= targetField.getIndex()) {
                    String errorMessage = String.format(
                            "Separate returned fewer segments count=%s when targetField.path=%s requested index=%s",
                            separatedFields.size(), targetField.getPath(), targetField.getIndex());
                    AtlasUtil.addAudit(session, targetField, errorMessage, AuditStatus.WARN, null);
                    break;
                }
                session.head().setSourceField(separatedFields.get(targetField.getIndex())).setTargetField(targetField);
                try {
                    module.populateTargetField(session);
                } catch (Exception e) {
                    AtlasUtil.addAudit(session, targetField,
                            "Failed to populate target field: " + e.getMessage(),
                            AuditStatus.ERROR, null);
                    return;
                }
                Field processed = applyFieldActions(session, session.head().getTargetField());
                session.head().setTargetField(processed);
                try {
                    module.writeTargetValue(session);
                } catch (Exception e) {
                    AtlasUtil.addAudit(session, targetField,
                            "Failed to write field value into target document: " + e.getMessage(),
                            AuditStatus.ERROR, null);
                    return;
                }
            }
            return;
        }

        AtlasUtil.addAudit(session, (String)null,
                String.format("Unsupported mappingType=%s detected", mapping.getMappingType()),
                AuditStatus.ERROR, null);
    }

    private FieldGroup unwrapNestedGroup(FieldGroup parent) {
        if (parent.getPath() == null && parent.getField().size() == 1
                && parent.getField().get(0) instanceof FieldGroup) {
            return (FieldGroup) parent.getField().get(0);
        }
        return parent;
    }

    private boolean auditTargetFieldType(DefaultAtlasSession session, AtlasModule module, Field field) {
        if (module == null) {
            AtlasUtil.addAudit(session, field, String
                    .format("Module not found for field type='%s', path='%s'", field.getFieldType(), field.getPath()),
                    AuditStatus.ERROR, null);
            return false;
        }
        if (!module.isSupportedField(field)) {
            AtlasUtil
                    .addAudit(session, field,
                            String.format("Unsupported target field type '%s' for DataSource '%s'",
                                    field.getClass().getName(), module.getUri()),
                            AuditStatus.ERROR, null);
            return false;
        }
        return true;
    }

    protected Field processCombineField(DefaultAtlasSession session, Mapping mapping, List<Field> sourceFields,
            Field targetField) {
        Map<Integer, String> combineValues = null;
        for (Field sourceField : sourceFields) {
            if (sourceField.getIndex() == null || sourceField.getIndex() < 0) {
                AtlasUtil.addAudit(session, targetField, String.format(
                        "Combine requires zero or positive Index value to be set on all sourceFields sourceField.path=%s",
                        sourceField.getPath()), AuditStatus.WARN, null);
                continue;
            }

            if (combineValues == null) {
                // We need to support a sorted map w/ null values
                combineValues = new HashMap<>();
            }

            if (sourceField.getValue() != null) {
                String sourceValue;
                try {
                    sourceValue = (String) factory.getConversionService().convertType(sourceField.getValue(),
                            sourceField.getFormat(), FieldType.STRING, null);
                } catch (AtlasConversionException e) {
                    AtlasUtil.addAudit(session, targetField,
                            String.format("Suitable converter for sourceField.path=%s hasn't been found",
                                    sourceField.getPath()),
                            AuditStatus.WARN, null);

                    sourceValue = sourceField.getValue() != null ? sourceField.getValue().toString() : null;
                }
                combineValues.put(sourceField.getIndex(), sourceValue);
            }
        }

        String combinedValue = null;
        StringDelimiter delimiter = StringDelimiter.fromName(mapping.getDelimiter());
        if (delimiter != null) {
            combinedValue = factory.getCombineStrategy().combineValues(combineValues, delimiter);
        } else if (mapping.getDelimiterString() != null && !mapping.getDelimiterString().isEmpty()) {
            combinedValue = factory.getCombineStrategy().combineValues(combineValues, mapping.getDelimiterString());
        } else {
            combinedValue = factory.getCombineStrategy().combineValues(combineValues);
        }

        Field answer = AtlasModelFactory.cloneFieldToSimpleField(sourceFields.get(0));
        if (combinedValue == null || combinedValue.trim().isEmpty()) {
            LOG.debug("Empty combined string for Combine mapping targetField.path={}",
                      targetField.getPath());
        } else {
            answer.setValue(combinedValue);
        }
        return answer;
    }

    protected List<Field> processSeparateField(DefaultAtlasSession session, Mapping mapping, Field sourceField)
            throws AtlasException {
        if (sourceField.getValue() == null) {
            AtlasUtil.addAudit(session, sourceField,
                    String.format("null value can't be separated for sourceField.path=%s",
                            sourceField.getPath()),
                    AuditStatus.WARN, null);
            return null;
        }
        if (!sourceField.getValue().getClass().isAssignableFrom(String.class)) {
            Object converted = factory.getConversionService().convertType(
                    sourceField.getValue(), sourceField.getFormat(), FieldType.STRING, null);
            sourceField.setValue(converted);
        }

        List<Field> answer = new ArrayList<>();

        String sourceValue;
        try {
            sourceValue = (String) factory.getConversionService().convertType(sourceField.getValue(),
                    sourceField.getFormat(), FieldType.STRING, null);
        } catch (AtlasConversionException e) {
            AtlasUtil.addAudit(session, sourceField, String
                    .format("Suitable converter for sourceField.path=%s hasn't been found", sourceField.getPath()),
                    AuditStatus.WARN, null);
            sourceValue = sourceField.getValue().toString();
        }
        List<String> separatedValues = null;
        StringDelimiter delimiter = StringDelimiter.fromName(mapping.getDelimiter());
        if (delimiter != null) {
            separatedValues = factory.getSeparateStrategy().separateValue(sourceValue, delimiter);
        } else {
            separatedValues = factory.getSeparateStrategy().separateValue(sourceValue);
        }

        if (separatedValues == null || separatedValues.isEmpty()) {
            LOG.debug("Empty string for Separate mapping sourceField.path={}", sourceField.getPath());
        } else {
            for (String separatedValue : separatedValues) {
                SimpleField simpleField = AtlasModelFactory.cloneFieldToSimpleField(sourceField);
                simpleField.setValue(separatedValue);
                simpleField.setFieldType(FieldType.STRING);
                answer.add(simpleField);
            }
        }
        return answer;
    }

    @Override
    public void processValidation(AtlasSession userSession) throws AtlasException {
        if (!(userSession instanceof DefaultAtlasSession)) {
            throw new AtlasException(String.format("Unsupported session class '%s'", userSession.getClass().getName()));
        }
        if (!this.equals(userSession.getAtlasContext())) {
            throw new AtlasException("Cannot execute AtlasSession created by the other AtlasContext");
        }

        DefaultAtlasSession session = (DefaultAtlasSession) userSession;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin processValidation {}", session);
        }

        AtlasMapping atlasMapping = this.admHandler.getMappingDefinition();
        String version = factory.getProperties().get(AtlasContextFactory.PROPERTY_ATLASMAP_CORE_VERSION);
        String mappingVersion = atlasMapping.getVersion();
        if (!validateVersion(version, mappingVersion)) {
            AtlasUtil.addAudit((AtlasInternalSession)userSession, (Field)null,
                String.format("Mapping definition version %s detected. It may not work as expected with runtime version %s.",
                    mappingVersion,
                    version),
                AuditStatus.WARN, null);
        }

        List<Validation> validations = getContextFactory().getValidationService().validateMapping(session.getMapping());
        if (validations != null && !validations.isEmpty()) {
            session.getValidations().getValidation().addAll(validations);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Detected {} core validation notices", validations == null ? 0 : validations.size());
        }

        for (AtlasModule module : getSourceModules().values()) {
            module.processPreValidation(session);
        }
        for (AtlasModule module : getTargetModules().values()) {
            module.processPreValidation(session);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("End processValidation {}", session);
        }
    }

    @Override
    public DefaultAtlasContextFactory getContextFactory() {
        return this.factory;
    }

    public AtlasMapping getMapping() {
        return admHandler != null ? admHandler.getMappingDefinition() : null;
    }

    @Override
    public AtlasSession createSession() throws AtlasException {
        init();
        return doCreateSession();
    }

    public AtlasSession createSession(AtlasMapping mappingDefinition) throws AtlasException {
        this.atlasMappingUri = null;
        this.admHandler = new ADMArchiveHandler(this.factory.getClassLoader());
        this.admHandler.setIgnoreLibrary(true);
        this.admHandler.setMappingDefinition(mappingDefinition);
        this.initialized = false;
        init();
        return doCreateSession();
    }

    private AtlasSession doCreateSession() throws AtlasException {
        AtlasSession session = new DefaultAtlasSession(this);
        session.setAtlasContext(this);
        session.setAudits(new Audits());
        session.setValidations(new Validations());
        setDefaultSessionProperties(session);
        return session;
    }

    protected void setDefaultSessionProperties(AtlasSession session) {
        Date date = new Date();
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
        df.setTimeZone(TimeZone.getDefault());
        session.getProperties().put("Atlas.CreatedDateTimeTZ", df.format(date));
    }

    public Map<String, AtlasModule> getSourceModules() {
        return sourceModules;
    }

    public void setSourceModules(Map<String, AtlasModule> sourceModules) {
        this.sourceModules = sourceModules;
    }

    public Map<String, AtlasModule> getTargetModules() {
        return targetModules;
    }

    public void setTargetModules(Map<String, AtlasModule> targetModules) {
        this.targetModules = targetModules;
    }

    public Map<String, LookupTable> getLookupTables() {
        return lookupTables;
    }

    public void setLookupTables(Map<String, LookupTable> lookupTables) {
        this.lookupTables = lookupTables;
    }

    protected void setJmxObjectName(ObjectName jmxObjectName) {
        this.jmxObjectName = jmxObjectName;
    }

    public ObjectName getJmxObjectName() {
        return this.jmxObjectName;
    }

    @Override
    public String getUuid() {
        return (this.uuid != null ? this.uuid.toString() : null);
    }

    @Override
    public String getVersion() {
        return this.getClass().getPackage().getImplementationVersion();
    }

    @Override
    public String getMappingName() {
        return (admHandler.getMappingDefinition() != null
                ? admHandler.getMappingDefinition().getName() : null);
    }

    protected void setMappingUri(URI atlasMappingUri) {
        this.atlasMappingUri = atlasMappingUri;
    }

    @Override
    public String getMappingUri() {
        return (atlasMappingUri != null ? atlasMappingUri.toString() : null);
    }

    @Override
    public String getClassName() {
        return this.getClass().getSimpleName();
    }

    @Override
    public String getThreadName() {
        return Thread.currentThread().getName();
    }

    @Override
    public String toString() {
        return "DefaultAtlasContext [jmxObjectName=" + jmxObjectName + ", uuid=" + uuid + ", factory=" + factory
                + ", mappingName=" + getMappingName() + ", mappingUri=" + getMappingUri() + ", sourceModules="
                + sourceModules + ", targetModules=" + targetModules + "]";
    }

    public ADMArchiveHandler getADMArchiveHandler() {
        return this.admHandler;
    }

}