ADMArchiveHandler.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.core;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.atlasmap.api.AtlasContextFactory;
import io.atlasmap.api.AtlasException;
import io.atlasmap.v2.ADMDigest;
import io.atlasmap.v2.AtlasMapping;
import io.atlasmap.v2.DataSourceKey;
import io.atlasmap.v2.DataSourceMetadata;
import io.atlasmap.v2.Json;
/**
* <div>
* The API for handling ADM archive. It encapsulates ADM archive structure
* and format and isolate file/stream I/O from other part.
* ADM archive is a zipped archive file or its exploded directory which contains
* <ul>
* <li>Mapping Definition file (atlasmapping-UI.n.json)</li>
* <li>Gzipped digest file which contains all non-Java document metadata
* and mapping definition in a single JSON file (adm-catalog-files-n.gz)</li>
* <li>Java libraries (jar files in lib/ directory)</li>
* </ul>
* </div>
* {@link #load(Path)} {@link #export(OutputStream)}
*
* <div>
* This handler follows lazy loading strategy as much as
* possible, i.e. defer to serialize/deserialize until it is really required.
* Also note that at this moment Java library directory is not managed by this class.
* Only when it imports/exports ADM archive file, library jars are extracted/bundled
* if {@link #isIgnoreLibrary} is set to {@code false}.
* </div>
*
* <div>
* TODO <a href="https://github.com/atlasmap/atlasmap/issues/1476">
* https://github.com/atlasmap/atlasmap/issues/1476</a>
* A gzipped digest file have to be splitted into individual schemas and a catalog file.
* </div>
*/
public class ADMArchiveHandler {
private static final Logger LOG = LoggerFactory.getLogger(ADMArchiveHandler.class);
private static final String MAPPING_DEFINITION_FILTER = "atlasmapping";
private static final String MAPPING_DEFINITION_TEMPLATE = "atlasmapping-UI.%s.json";
private static final String GZIPPED_ADM_DIGEST_FILTER = "adm-catalog-files";
private static final String GZIPPED_ADM_DIGEST_TEMPLATE = "adm-catalog-files-%s.gz";
private byte[] buffer = new byte[2048];
private byte[] gzippedAdmDigestBytes = null;
private byte[] mappingDefinitionBytes = null;
private ObjectMapper jsonMapper;
private ObjectMapper jsonMapperForDigest;
private AtlasMapping mappingDefinition = null;
private String mappingDefinitionId = "0";
private Map<DataSourceKey, DataSourceMetadata> dataSourceMetadata;
private boolean ignoreLibrary = false;
private Path persistDirectory;
private Path libraryDirectory;
public ADMArchiveHandler() {
this(ADMArchiveHandler.class.getClassLoader());
}
public ADMArchiveHandler(ClassLoader loader) {
this.jsonMapper = Json.withClassLoader(loader);
this.jsonMapperForDigest = this.jsonMapper.copy();
this.jsonMapperForDigest.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, false);
}
/**
* Load an ADM archive file, an exploded directory or mapping definition JSON file.
* @param path {@code java.nio.file.Path} of the ADM archive file or an exploded directory
* @throws AtlasException If it fails to load
*/
public void load(Path path) throws AtlasException {
clear();
File file = path.toFile();
if (!file.exists() || (!file.isFile() && !file.isDirectory())) {
throw new AtlasException(
String.format("'%s' doesn't exist or is not a regular file/directory", path.toString()));
}
if (file.isDirectory()) {
loadExploded(file);
} else if (file.getName().toLowerCase().endsWith(".adm")){
loadADMFile(file);
} else {
try (FileInputStream fin = new FileInputStream(file)) {
this.mappingDefinitionBytes = readIntoByteArray(fin);
} catch (Exception e) {
throw new AtlasException(
String.format("Invalid mapping definition file: '%s'", path.toString()), e);
}
}
}
/**
* Load an ADM archive from stream.
* @param in InputStream to read an ADM Archive
* @throws AtlasException If it fails to load
*/
public void load(InputStream in) throws AtlasException {
load(AtlasContextFactory.Format.ADM, in);
}
/**
* Load an ADM archive or mapping definition from stream.
* @param format {@code AtlasContextFactory.Format} to indicate stream format
* @param in InputStream to read an ADM Archive
* @throws AtlasException If it fails to load
*/
public void load(AtlasContextFactory.Format format, InputStream in) throws AtlasException {
if (format == AtlasContextFactory.Format.ADM) {
loadADMStream(in);
} else {
try {
this.mappingDefinitionBytes = readIntoByteArray(in);
} catch (Exception e) {
throw new AtlasException("Invalid mapping definition from stream", e);
}
}
}
/**
* Export into an ADM archive.
* @param out OutputStream to write an ADM archive
* @throws AtlasException If it fails to export
*/
public void export(OutputStream out) throws AtlasException {
LOG.debug("Creating ADM archive file for ID:'{}'", this.mappingDefinitionId);
try (ZipOutputStream zipOut = new ZipOutputStream(out)) {
ZipEntry catEntry = null;
if (this.getMappingDefinitionBytes() != null) {
String mappingFileName = getMappingDefinitionFileName();
LOG.debug(" Creating mapping definition file '{}'", mappingFileName);
catEntry = new ZipEntry(mappingFileName);
zipOut.putNextEntry(catEntry);
zipOut.write(getMappingDefinitionBytes(), 0, getMappingDefinitionBytes().length);
zipOut.closeEntry();
}
if (getGzippedADMDigestBytes() != null) {
LOG.debug(" Creating gzipped ADM digest file '{}'", getGzippedADMDigestFileName());
catEntry = new ZipEntry(getGzippedADMDigestFileName());
zipOut.putNextEntry(catEntry);
zipOut.write(getGzippedADMDigestBytes(), 0, getGzippedADMDigestBytes().length);
zipOut.closeEntry();
zipOut.putNextEntry(new ZipEntry("lib/"));
zipOut.closeEntry();
}
if (!isIgnoreLibrary() && libraryDirectory != null && libraryDirectory.toFile().isDirectory()) {
for (File jarFile : libraryDirectory.toFile().listFiles()) {
LOG.debug(" Creating jar file entry '{}'", "lib/" + jarFile.getName());
ZipEntry libEntry = new ZipEntry("lib/" + jarFile.getName());
zipOut.putNextEntry(libEntry);
redirectStream(new FileInputStream(jarFile), zipOut);
zipOut.closeEntry();
}
}
} catch (Exception e) {
throw new AtlasException("Error exporting ADM archive file", e);
}
}
/**
* Persist ADM archive into a directory.
* @throws AtlasException If it fails to persist
*/
public void persist() throws AtlasException {
if (this.persistDirectory == null) {
throw new AtlasException("Persist Directory must be set");
}
Path mdPath = this.persistDirectory.resolve(getMappingDefinitionFileName());
if (getMappingDefinitionBytes() != null) {
try {
this.mappingDefinition = jsonMapper.readValue(getMappingDefinitionBytes(), AtlasMapping.class);
} catch (Exception e) {
LOG.warn("Invalid serialized mapping definition content detected, discarding");
if (LOG.isDebugEnabled()) {
String str = String.format("Mapping Definition: [%s]: ",
getMappingDefinitionBytes() != null ? new String(getMappingDefinitionBytes()) : "");
LOG.warn(str, e);
}
this.mappingDefinitionBytes = null;
this.mappingDefinition = null;
}
}
if (this.mappingDefinition != null) {
try {
jsonMapper.writeValue(mdPath.toFile(), this.mappingDefinition);
} catch (Exception e) {
LOG.warn("Failed to persist mapping definition", e);
}
}
if (getGzippedADMDigestBytes() != null) {
Path digestPath = this.persistDirectory.resolve(getGzippedADMDigestFileName());
try (FileOutputStream out = new FileOutputStream(digestPath.toFile())) {
out.write(getGzippedADMDigestBytes());
} catch (Exception e) {
LOG.warn("Failed to persist gzipped ADM digest file");
}
}
}
public AtlasMapping getMappingDefinition() {
if (this.mappingDefinition == null && this.mappingDefinitionBytes != null) {
try {
this.mappingDefinition = jsonMapper.readValue(this.mappingDefinitionBytes,
AtlasMapping.class);
} catch (Exception e) {
LOG.warn("Invalid serialized mapping definition content detected, discarding");
this.mappingDefinitionBytes = null;
if (LOG.isDebugEnabled()) {
LOG.warn("", e);
}
}
}
return this.mappingDefinition;
}
public void setMappingDefinition(AtlasMapping mapping) {
this.mappingDefinitionBytes = null;
this.mappingDefinition = mapping;
}
public void setMappingDefinitionBytes(InputStream is) throws AtlasException {
try {
this.mappingDefinition = null;
this.mappingDefinitionBytes = readIntoByteArray(is);
if (LOG.isDebugEnabled()) {
LOG.debug(this.jsonMapper.writeValueAsString(getMappingDefinition()));
}
} catch (Exception e) {
throw new AtlasException(e);
}
}
public byte[] getMappingDefinitionBytes() throws AtlasException {
try {
if (this.mappingDefinitionBytes == null && this.mappingDefinition != null) {
this.mappingDefinitionBytes = jsonMapper.writeValueAsBytes(this.mappingDefinition);
}
return this.mappingDefinitionBytes;
} catch (Exception e) {
throw new AtlasException(e);
}
}
public void setGzippedADMDigest(InputStream is) throws AtlasException {
try {
this.gzippedAdmDigestBytes = readIntoByteArray(is);
} catch (Exception e) {
throw new AtlasException(e);
}
}
public byte[] getGzippedADMDigestBytes() {
return this.gzippedAdmDigestBytes;
}
public DataSourceMetadata getDataSourceMetadata(boolean isSource, String documentId) throws AtlasException {
return getDataSourceMetadata(new DataSourceKey(isSource, documentId));
}
public DataSourceMetadata getDataSourceMetadata(DataSourceKey key) throws AtlasException {
if (getDataSourceMetadataMap() == null) {
return null;
}
return getDataSourceMetadataMap().get(key);
}
public Map<DataSourceKey, DataSourceMetadata> getDataSourceMetadataMap() throws AtlasException {
if (this.dataSourceMetadata == null) {
if (this.gzippedAdmDigestBytes == null) {
return null;
}
try (GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(this.gzippedAdmDigestBytes))) {
ADMDigest digest = jsonMapperForDigest.readValue(in, ADMDigest.class);
this.dataSourceMetadata = new HashMap<>();
for (int i=0; i<digest.getExportMeta().length; i++) {
DataSourceMetadata meta = digest.getExportMeta()[i];
String spec = digest.getExportBlockData()[i].getValue();
if (meta.getId() == null) {
meta.setId(meta.getName());
}
meta.setSpecification(spec != null ? spec.getBytes() : null);
this.dataSourceMetadata.put(new DataSourceKey(meta.getIsSource(), meta.getId()), meta);
}
} catch (Exception e) {
throw new AtlasException(e);
}
}
return Collections.unmodifiableMap(this.dataSourceMetadata);
}
public AtlasMapping cloneMappingDefinition() throws AtlasException {
AtlasMapping atlasMapping = getMappingDefinition();
if (atlasMapping == null) {
return null;
}
try {
byte[] bytes = this.jsonMapper.writeValueAsBytes(atlasMapping);
return this.jsonMapper.readValue(bytes, AtlasMapping.class);
} catch (Exception e) {
throw new AtlasException(e);
}
}
public void clear() {
this.mappingDefinitionBytes = null;
this.mappingDefinition = null;
this.gzippedAdmDigestBytes = null;
this.dataSourceMetadata = null;
}
public void setIgnoreLibrary(boolean ignoreLib) {
this.ignoreLibrary = ignoreLib;
}
public boolean isIgnoreLibrary() {
return this.ignoreLibrary;
}
public void setPersistDirectory(Path dir) throws AtlasException {
ensureDirectory(dir);
this.persistDirectory = dir;
}
public void setLibraryDirectory(Path dir) throws AtlasException {
ensureDirectory(dir);
this.libraryDirectory = dir;
}
public void setMappingDefinitionId(String id) {
this.mappingDefinitionId = id;
}
public String getGzippedADMDigestFileName() {
return String.format(GZIPPED_ADM_DIGEST_TEMPLATE, this.mappingDefinitionId);
}
public String getMappingDefinitionFileName() {
return String.format(MAPPING_DEFINITION_TEMPLATE, this.mappingDefinitionId);
}
private void loadExploded(File dir) throws AtlasException {
setPersistDirectory(dir.toPath());
this.mappingDefinitionId = dir.getName();
File mappingDefinitionFile = dir.toPath().resolve(getMappingDefinitionFileName()).toFile();
if (mappingDefinitionFile.exists() && mappingDefinitionFile.isFile()) {
try (InputStream mappingdefis = new FileInputStream(mappingDefinitionFile)) {
this.mappingDefinitionBytes = readIntoByteArray(mappingdefis);
} catch (Exception e) {
throw new AtlasException("Failed to read mapping definition file", e);
}
}
File digestFile = dir.toPath().resolve(getGzippedADMDigestFileName()).toFile();
if (digestFile.exists() && digestFile.isFile()) {
try (InputStream digestis = new FileInputStream(digestFile)) {
this.gzippedAdmDigestBytes = readIntoByteArray(digestis);
} catch (Exception e) {
throw new AtlasException("Failed to read digest file", e);
}
}
}
private void loadADMFile(File file) throws AtlasException {
try {
loadADMStream(new FileInputStream(file));
} catch (AtlasException ae) {
throw ae;
} catch (Exception e) {
throw new AtlasException(e);
}
}
private void loadADMStream(InputStream in) throws AtlasException {
String catEntryName;
ZipEntry catEntry;
ZipInputStream zipIn = null;
try {
zipIn = new ZipInputStream(in);
boolean mappingDefinitionFound = false;
while ((catEntry = zipIn.getNextEntry()) != null) {
catEntryName = catEntry.getName();
LOG.debug(" Extracting ADM file entry '{}'", catEntryName);
if (catEntryName.contains(GZIPPED_ADM_DIGEST_FILTER)) {
this.gzippedAdmDigestBytes = readIntoByteArray(zipIn);
} else if (!isIgnoreLibrary() && catEntryName.contains(".jar")) {
if (this.libraryDirectory == null) {
throw new AtlasException("Library directory is not specified");
}
int separatorPos = catEntryName.replaceAll("\\\\", "/").lastIndexOf("/");
String name = separatorPos == -1 ? catEntryName : catEntryName.substring(separatorPos + 1);
Path libPath = this.libraryDirectory.resolve(name);
try (FileOutputStream fos = new FileOutputStream(libPath.toFile())) {
redirectStream(zipIn, fos);
} catch (Exception e) {
LOG.warn(String.format("Failed to save a jar file '%s', ignoring...", name), e);
}
} else if (catEntryName.contains(MAPPING_DEFINITION_FILTER)) {
if (mappingDefinitionFound) {
throw new AtlasException("Multiple mapping definition files are found in a same .adm archive");
}
this.mappingDefinitionBytes = readIntoByteArray(zipIn);
mappingDefinitionFound = true;
} else {
LOG.debug("Ignoring file '{}' in .adm archive", catEntryName);
}
}
} catch (Exception e) {
throw new AtlasException(e);
} finally {
try {
zipIn.close();
} catch (Exception e) {}
}
}
private void redirectStream(InputStream in, OutputStream out) throws Exception {
int len = 0;
while ((len = in.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
}
private byte[] readIntoByteArray(InputStream in) throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
redirectStream(in, baos);
return baos.toByteArray();
}
}
private boolean ensureDirectory(Path dir) throws AtlasException {
if (dir == null) {
throw new AtlasException(String.format("Directory must not be Null"));
}
File dirf = dir.toFile();
if (dirf.exists() && !dirf.isDirectory()) {
throw new AtlasException(String.format("File '%s' is not a directory", dirf.getAbsolutePath()));
} else if (!dirf.exists()) {
dirf.mkdirs();
}
return true;
}
}