CsvFieldReader.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.csv.core;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import io.atlasmap.api.AtlasException;
import io.atlasmap.core.AtlasPath;
import io.atlasmap.core.AtlasUtil;
import io.atlasmap.csv.v2.CsvComplexType;
import io.atlasmap.csv.v2.CsvField;
import io.atlasmap.csv.v2.CsvFields;
import io.atlasmap.spi.AtlasFieldReader;
import io.atlasmap.spi.AtlasInternalSession;
import io.atlasmap.v2.AtlasModelFactory;
import io.atlasmap.v2.AuditStatus;
import io.atlasmap.v2.CollectionType;
import io.atlasmap.v2.Document;
import io.atlasmap.v2.Field;
import io.atlasmap.v2.FieldGroup;
import io.atlasmap.v2.FieldType;
import io.atlasmap.v2.Fields;
/**
* It accepts InputStream as a document in order to process big files efficiently.
* It uses the mark operation of the InputStream to reset the stream and read consecutive fields.
* If InputStream does not support the mark operation it is wrapped in BufferedInputStream.
*/
public class CsvFieldReader implements AtlasFieldReader {
private final CsvConfig csvConfig;
private InputStream document;
public CsvFieldReader(CsvConfig csvConfig) {
this.csvConfig = csvConfig;
}
public void setDocument(InputStream inputStream) {
if (inputStream != null && !inputStream.markSupported()) {
this.document = new BufferedInputStream(inputStream);
} else {
this.document = inputStream;
}
}
@Override
public Field read(AtlasInternalSession session) throws AtlasException {
Field field = session.head().getSourceField();
if (document == null) {
AtlasUtil.addAudit(session, field,
String.format("Cannot read field '%s' of document '%s', document is null",
field.getPath(), field.getDocId()),
AuditStatus.ERROR, null);
return field;
}
if (field == null) {
throw new AtlasException(new IllegalArgumentException("Argument 'field' cannot be null"));
}
if (!(field instanceof CsvField) && !(field instanceof FieldGroup)) {
throw new AtlasException(String.format("Unsupported field type '%s'", field.getClass()));
}
if (field instanceof FieldGroup) {
//complex field
FieldGroup fieldGroup = (FieldGroup) field;
List<Field> fields = fieldGroup.getField();
FieldGroup readFieldGroup = AtlasModelFactory.copyFieldGroup(fieldGroup);
for (Field subField: fields) {
if (subField instanceof FieldGroup) {
//support only one level grouping
subField = ((FieldGroup) subField).getField().get(0);
}
if (subField instanceof CsvField) {
Field readSubField = readFields((CsvField) subField);
readFieldGroup.getField().add(readSubField);
}
}
session.head().setSourceField(readFieldGroup);
return readFieldGroup;
} else {
Field readField = readFields((CsvField) field);
session.head().setSourceField(readField);
return readField;
}
}
private Field readFields(CsvField field) throws AtlasException {
List<Field> fields = new ArrayList<>();
CsvField csvField = field;
CSVFormat csvFormat = csvConfig.newCsvFormat();
try {
document.mark(Integer.MAX_VALUE);
CSVParser parser = csvFormat.parse(new InputStreamReader(document));
AtlasPath atlasPath = new AtlasPath(csvField.getPath());
int i = 0;
Integer fieldIndex = atlasPath.getRootSegment().getCollectionIndex();
if (fieldIndex != null) {
for (CSVRecord record: parser) {
if (i == fieldIndex) {
CsvField newField = CsvField.cloneOf(csvField);
newField.setIndex(null); //do not copy over index if set
String value;
if (csvField.getColumn() != null) {
value = record.get(csvField.getColumn());
} else {
value = record.get(csvField.getName());
}
newField.setValue(value);
fields.add(newField);
break;
}
i++;
}
} else {
for (CSVRecord record: parser) {
CsvField collectionField = CsvField.cloneOf(csvField);
collectionField.setIndex(null); //do not copy over index if set
String value;
if (csvField.getColumn() != null) {
value = record.get(csvField.getColumn());
} else {
value = record.get(csvField.getName());
}
collectionField.setValue(value);
AtlasPath collectionFieldPath = new AtlasPath(collectionField.getPath());
collectionFieldPath.setCollectionIndex(0, i);
collectionField.setPath(collectionFieldPath.toString());
fields.add(collectionField);
i++;
}
}
document.reset();
} catch (IOException e) {
throw new AtlasException(e);
}
if (fields.size() == 1) {
return fields.get(0);
} else {
FieldGroup fieldGroup = AtlasModelFactory.createFieldGroupFrom(field, true);
fieldGroup.getField().addAll(fields);
return fieldGroup;
}
}
/**
* Reads only the first row of the document.
*
* If firstRecordAsHeader is set to true it uses column names for field names, otherwise it uses an index
* starting from 0.
*
* @return {@link Document} built from CSV
* @throws AtlasException if it fails
*/
public Document readSchema() throws AtlasException {
CSVFormat csvFormat = csvConfig.newCsvFormat();
CSVParser parser;
try {
document.mark(Integer.MAX_VALUE);
parser = csvFormat.parse(new InputStreamReader(document));
} catch (IOException e) {
throw new AtlasException(e);
}
List<CsvField> fields = new ArrayList<>();
if (csvConfig.isFirstRecordAsHeader()) {
for (String headerName : parser.getHeaderNames()) {
CsvField field = new CsvField();
field.setName(headerName);
field.setPath("/<>/" + headerName);
field.setFieldType(FieldType.STRING);
fields.add(field);
}
} else {
CSVRecord record = parser.iterator().next();
for (int i = 0; i < record.size(); i++) {
CsvField field = new CsvField();
if (parser.getHeaderNames() != null && parser.getHeaderNames().size() > i) {
field.setName(parser.getHeaderNames().get(i));
} else {
field.setColumn(i);
field.setName(String.valueOf(i));
}
field.setPath("/<>/" + field.getName());
field.setFieldType(FieldType.STRING);
fields.add(field);
}
}
try {
document.reset();
} catch (IOException e) {
throw new AtlasException(e);
}
CsvFields csvFields = new CsvFields();
csvFields.getCsvField().addAll(fields);
CsvComplexType csvComplexType = new CsvComplexType();
csvComplexType.setFieldType(FieldType.COMPLEX);
csvComplexType.setCollectionType(CollectionType.LIST);
csvComplexType.setPath("/<>");
csvComplexType.setName("");
csvComplexType.setCsvFields(csvFields);
Fields documentFields = new Fields();
documentFields.getField().add(csvComplexType);
Document document = new Document();
document.setFields(documentFields);
return document;
}
}