CsvModule.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.csv.module;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.atlasmap.api.AtlasException;
import io.atlasmap.core.AtlasUtil;
import io.atlasmap.core.BaseAtlasModule;
import io.atlasmap.csv.core.CsvConfig;
import io.atlasmap.csv.core.CsvFieldReader;
import io.atlasmap.csv.core.CsvFieldWriter;
import io.atlasmap.csv.v2.CsvField;
import io.atlasmap.spi.AtlasInternalSession;
import io.atlasmap.spi.AtlasModuleDetail;
import io.atlasmap.v2.AuditStatus;
import io.atlasmap.v2.Field;
import io.atlasmap.v2.FieldGroup;
@AtlasModuleDetail(name = "CsvModule", uri = "atlas:csv", modes = { "SOURCE", "TARGET" }, dataFormats = {
"csv" }, configPackages = { "io.atlasmap.csv.v2" })
public class CsvModule extends BaseAtlasModule {
private static final Logger LOG = LoggerFactory.getLogger(CsvModule.class);
@Override
public void processPreValidation(AtlasInternalSession session) throws AtlasException {
}
@Override
public void processPreSourceExecution(AtlasInternalSession session) throws AtlasException {
Object sourceDocument = session.getSourceDocument(getDocId());
InputStream sourceInputStream = null;
if (sourceDocument == null || !((sourceDocument instanceof String) || (sourceDocument instanceof InputStream))) {
AtlasUtil.addAudit(session, getDocId(), String.format(
"Null, non-String or non-Stream source document: docId='%s'", getDocId()),
AuditStatus.WARN, null);
} else if (sourceDocument instanceof String){
String sourceDocumentString = String.class.cast(sourceDocument);
sourceInputStream = new ByteArrayInputStream(sourceDocumentString.getBytes());
} else {
sourceInputStream = (InputStream) sourceDocument;
}
CsvConfig csvConfig = CsvConfig.newConfig(getUriParameters());
CsvFieldReader reader = new CsvFieldReader(csvConfig);
reader.setDocument(sourceInputStream);
session.setFieldReader(getDocId(), reader);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: processPreSourceExecution completed", getDocId());
}
}
@Override
public void processPreTargetExecution(AtlasInternalSession session) throws AtlasException {
CsvConfig csvConfig = CsvConfig.newConfig(getUriParameters());
CsvFieldWriter writer = new CsvFieldWriter(csvConfig);
session.setFieldWriter(getDocId(), writer);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: processPreTargetExcution completed", getDocId());
}
}
@Override
public void readSourceValue(AtlasInternalSession session) throws AtlasException {
Field sourceField = session.head().getSourceField();
CsvFieldReader reader = session.getFieldReader(getDocId(), CsvFieldReader.class);
if (reader == null) {
AtlasUtil.addAudit(session, sourceField, String.format(
"Source document '%s' doesn't exist", getDocId()),
AuditStatus.ERROR, null);
return;
}
reader.read(session);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: processSourceFieldMapping completed: SourceField:[docId={}, path={}, type={}, value={}]",
getDocId(), sourceField.getDocId(), sourceField.getPath(), sourceField.getFieldType(),
sourceField.getValue());
}
}
@Override
public void writeTargetValue(AtlasInternalSession session) throws AtlasException {
CsvFieldWriter writer = session.getFieldWriter(getDocId(), CsvFieldWriter.class);
writer.write(session);
}
@Override
public void processPostSourceExecution(AtlasInternalSession session) throws AtlasException {
session.removeFieldReader(getDocId());
if (LOG.isDebugEnabled()) {
LOG.debug("{}: processPostSourceExecution completed", getDocId());
}
}
@Override
public void processPostTargetExecution(AtlasInternalSession session) throws AtlasException {
CsvFieldWriter writer = session.getFieldWriter(getDocId(), CsvFieldWriter.class);
if (writer != null && writer.getDocument() != null) {
String targetDocumentString = writer.toCsv();
session.setTargetDocument(getDocId(), targetDocumentString);
} else {
AtlasUtil.addAudit(session, getDocId(), String
.format("No target document created for DataSource:[id=%s, uri=%s]", getDocId(), this.getUri()),
AuditStatus.WARN, null);
}
session.removeFieldWriter(getDocId());
if (LOG.isDebugEnabled()) {
LOG.debug("{}: processPostTargetExecution completed", getDocId());
}
}
@Override
public Field cloneField(Field field) throws AtlasException {
return CsvField.cloneOf((CsvField) field);
}
@Override
public Boolean isSupportedField(Field field) {
return field instanceof CsvField || field instanceof FieldGroup;
}
@Override
public CsvField createField() {
return new CsvField();
}
}