JavaFieldReader.java

/*
 * Copyright (C) 2017 Red Hat, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.atlasmap.java.core;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.LoggerFactory;

import io.atlasmap.api.AtlasException;
import io.atlasmap.core.AtlasPath;
import io.atlasmap.core.AtlasPath.SegmentContext;
import io.atlasmap.core.AtlasUtil;
import io.atlasmap.java.core.accessor.JavaChildAccessor;
import io.atlasmap.java.core.accessor.RootAccessor;
import io.atlasmap.java.v2.AtlasJavaModelFactory;
import io.atlasmap.java.v2.JavaEnumField;
import io.atlasmap.java.v2.JavaField;
import io.atlasmap.spi.AtlasConversionService;
import io.atlasmap.spi.AtlasFieldReader;
import io.atlasmap.spi.AtlasInternalSession;
import io.atlasmap.v2.AtlasModelFactory;
import io.atlasmap.v2.AuditStatus;
import io.atlasmap.v2.CollectionType;
import io.atlasmap.v2.Field;
import io.atlasmap.v2.FieldGroup;
import io.atlasmap.v2.FieldType;

public class JavaFieldReader implements AtlasFieldReader {
    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(JavaFieldReader.class);

    private AtlasConversionService conversionService;
    private Object sourceDocument;

    @Override
    public Field read(AtlasInternalSession session) throws AtlasException {
        try {
            Field field = session.head().getSourceField();
            if (sourceDocument == null) {
                AtlasUtil.addAudit(session, field, String.format(
                    "Unable to read sourceField (path=%s),  document (docId=%s) is null",
                    field.getPath(), field.getDocId()),
                    AuditStatus.ERROR, null);
            }

            AtlasPath path = new AtlasPath(field.getPath());

            List<Field> fields = getFieldsForPath(session, sourceDocument, field, path, 0);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Processed input field sPath=" + field.getPath() + " sV=" + field.getValue()
                    + " sT=" + field.getFieldType() + " docId: " + field.getDocId());
            }

            if (path.hasCollection() && !path.isIndexedCollection()) {
                FieldGroup fieldGroup = AtlasModelFactory.createFieldGroupFrom(field, true);
                fieldGroup.getField().addAll(fields);
                session.head().setSourceField(fieldGroup);
                return fieldGroup;
            } else if (fields.size() == 1) {
                field.setValue(fields.get(0).getValue());
                return field;
            } else {
                return field;
            }
        } catch (Exception e) {
            throw new AtlasException(e);
        }
    }

    private List<Field> getFieldsForPath(AtlasInternalSession session, Object source, Field field, AtlasPath path, int depth) throws AtlasException {
        List<Field> fields = new ArrayList<>();
        List<SegmentContext> segments = path.getSegments(true);
        if (source == null) {
            return fields;
        }

        if (segments.size() < depth) {
            throw new AtlasException(String.format("depth '%s' exceeds segment size '%s'", depth, segments.size()));
        }
        if (segments.size() == depth) {
            Field newField;
            if (field instanceof FieldGroup && field.getFieldType() == FieldType.COMPLEX) {
                FieldGroup group = (FieldGroup) field;
                populateChildFields(source, group, path);
                newField = group;
            } else {
                newField = AtlasJavaModelFactory.cloneJavaField(field, true);
                if (source != null && (conversionService.isPrimitive(source.getClass())
                    || conversionService.isBoxedPrimitive(source.getClass()))) {
                    source = conversionService.copyPrimitive(source);
                }
                newField.setValue(source);
                newField.setIndex(null); //reset index for subfields
            }
            fields.add(newField);
            return fields;
        }

        // segments.size() > depth) {
        SegmentContext segmentContext = segments.get(depth);
        if (depth == 0 && segments.size() > 1 && segmentContext.getCollectionType() == CollectionType.NONE) {
            depth++;
            segmentContext = segments.get(depth);
        }

        JavaChildAccessor childAccessor = getAccessorForSegment(session, source, field, path, segmentContext);
        if (childAccessor == null || childAccessor.getRawValue() == null) {
            return fields;
        }

        if (segmentContext.getCollectionType() == CollectionType.NONE) {
            List<Field> childFields = getFieldsForPath(session, childAccessor.getValue(), field, path, depth + 1);
            fields.addAll(childFields);
            return fields;
        }

        // collection
        if (segmentContext.getCollectionIndex() != null) {
            Object indexItem = childAccessor.getValueAt(segmentContext.getCollectionIndex());
            List<Field> childFields = getFieldsForPath(session, indexItem, field, path, depth + 1);
            fields.addAll(childFields);
        } else {
            List<?> items = childAccessor.getCollectionValues();
            for (int i = 0; i < items.size(); i++) {
                //include the array index within the path
                Field itemField;
                if (field instanceof FieldGroup) {
                    itemField = AtlasJavaModelFactory.cloneFieldGroup((FieldGroup)field);
                    AtlasPath.setCollectionIndexRecursively((FieldGroup)itemField, depth, i);
                } else {
                    itemField = AtlasJavaModelFactory.cloneJavaField(field, false);
                    AtlasPath itemPath = new AtlasPath(field.getPath());
                    itemPath.setCollectionIndex(depth, i);
                    itemField.setPath(itemPath.toString());
                }
                List<Field> arrayFields = getFieldsForPath(
                    session, items.get(i), itemField, new AtlasPath(itemField.getPath()), depth + 1);
                fields.addAll(arrayFields);
            }
        }
        return fields;
    }

    private JavaChildAccessor getAccessorForSegment(AtlasInternalSession session, Object source, Field field, AtlasPath path, SegmentContext segmentContext) throws AtlasException {
        JavaChildAccessor accessor = null;
        if (segmentContext.isRoot()) {
            accessor = new RootAccessor(source);
        } else {
            accessor = ClassHelper.lookupAccessor(source, segmentContext.getName());
        }
        if (accessor == null) {
            AtlasUtil.addAudit(session, field, String.format(
                "Field '%s' not found on object '%s'", segmentContext.getName(), source),
                AuditStatus.ERROR, null);
            return null;
        }

        try {
            accessor.getValue();
        } catch (Exception e) {
            AtlasUtil.addAudit(session, field, String.format(
                "Cannot access field '%s' on object '%s': %s", segmentContext.getName(), source, e.getMessage()),
                AuditStatus.ERROR, null);
            if (LOG.isDebugEnabled()) {
                LOG.error("", e);
            }
            return null;
        }

        if (path.getLastSegment() == segmentContext && field.getFieldType() == null
            && (field instanceof JavaField || field instanceof JavaEnumField)) {
            detectFieldType(session, accessor, field);
        }
        return accessor;
    }

    private void detectFieldType(AtlasInternalSession session, JavaChildAccessor accessor, Field field) throws AtlasException {
        Class<?> returnType = null;
        try {
            returnType = accessor.getRawClass();
        } catch (Exception e) {
            AtlasUtil.addAudit(session, field, String.format(
                "Cannot access the type of field '%s' on object '%s': %s",
                    accessor.getName(), accessor.getRawValue(), e.getMessage()),
                AuditStatus.ERROR, null);
            if (LOG.isDebugEnabled()) {
                LOG.error("", e);
            }
        }

        if (returnType != null) {
            field.setFieldType(conversionService.fieldTypeFromClass(returnType));
            if (LOG.isTraceEnabled()) {
                LOG.trace("Auto-detected sourceField type p=" + field.getPath() + " t="
                    + field.getFieldType());
            }
        } else {
            AtlasUtil.addAudit(session, field, String.format(
                "Unable to auto-detect sourceField type path=%s docId=%s",
                field.getPath(), field.getDocId()),
                AuditStatus.WARN, null);
        }
    }

    private void populateChildFields(Object source, FieldGroup fieldGroup, AtlasPath path) throws AtlasException {
        List<Field> newChildren = new ArrayList<>();
        for (Field child : fieldGroup.getField()) {
            AtlasPath childPath = new AtlasPath(child.getPath());
            JavaChildAccessor accessor = ClassHelper.lookupAccessor(source, childPath.getLastSegment().getName());
            if (childPath.getLastSegment().getCollectionType() != CollectionType.NONE) {
                FieldGroup childGroup = populateCollectionItems(accessor, child);
                newChildren.add(childGroup);
            } else {
                if (child instanceof FieldGroup) {
                    populateChildFields(accessor.getValue(), (FieldGroup)child, childPath);
                } else {
                    child.setValue(accessor.getValue());
                }
                newChildren.add(child);
            }
        }
        fieldGroup.getField().clear();
        fieldGroup.getField().addAll(newChildren);
    }

    private FieldGroup populateCollectionItems(JavaChildAccessor accessor, Field field) throws AtlasException {
        if (accessor == null || accessor.getCollectionType() == CollectionType.NONE) {
            throw new AtlasException(String.format("Couldn't find a collection object for field %s:%s",
                field.getDocId(), field.getPath()));
        }
        FieldGroup group = field instanceof FieldGroup ?
         (FieldGroup)field : AtlasModelFactory.createFieldGroupFrom(field, true);
        for (int i=0; i<accessor.getCollectionValues().size(); i++) {
            AtlasPath itemPath = new AtlasPath(group.getPath());
            List<SegmentContext> segments = itemPath.getSegments(true);
            itemPath.setCollectionIndex(segments.size() - 1, i);
            if (field instanceof FieldGroup) {
                FieldGroup itemGroup = AtlasJavaModelFactory.cloneFieldGroup((FieldGroup)field);
                AtlasPath.setCollectionIndexRecursively(itemGroup, segments.size(), i);
                populateChildFields(accessor.getValueAt(i), itemGroup, itemPath);
                group.getField().add(itemGroup);
            } else {
                Field itemField = AtlasJavaModelFactory.cloneJavaField(field, false);
                itemField.setPath(itemPath.toString());
                itemField.setValue(accessor.getValueAt(i));
                group.getField().add(itemField);
            }
        }
        return group;
    }

    public void setDocument(Object sourceDocument) {
        this.sourceDocument = sourceDocument;
    }

    public void setConversionService(AtlasConversionService conversionService) {
        this.conversionService = conversionService;
    }
}