DefaultAtlasContextFactory.java
/*
* Copyright (C) 2017 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atlasmap.core;
import java.io.File;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.atlasmap.api.AtlasContextFactory;
import io.atlasmap.api.AtlasException;
import io.atlasmap.api.AtlasValidationService;
import io.atlasmap.mxbean.AtlasContextFactoryMXBean;
import io.atlasmap.spi.AtlasCombineStrategy;
import io.atlasmap.spi.AtlasModule;
import io.atlasmap.spi.AtlasModuleDetail;
import io.atlasmap.spi.AtlasModuleInfo;
import io.atlasmap.spi.AtlasModuleInfoRegistry;
import io.atlasmap.spi.AtlasPropertyStrategy;
import io.atlasmap.spi.AtlasSeparateStrategy;
import io.atlasmap.v2.AtlasMapping;
public class DefaultAtlasContextFactory implements AtlasContextFactory, AtlasContextFactoryMXBean {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAtlasContextFactory.class);
private static DefaultAtlasContextFactory factory = null;
private boolean initialized = false;
private String uuid = null;
private String threadName = null;
private ObjectName objectName = null;
private DefaultAtlasConversionService atlasConversionService = null;
private DefaultAtlasFieldActionService atlasFieldActionService = null;
private AtlasCombineStrategy atlasCombineStrategy = null;
private AtlasPropertyStrategy atlasPropertyStrategy = null;
private AtlasSeparateStrategy atlasSeparateStrategy = null;
private AtlasValidationService atlasValidationService = null;
private AtlasModuleInfoRegistry moduleInfoRegistry;
private Map<String, String> properties = null;
private CompoundClassLoader classLoader = null;
private DefaultAtlasContextFactory() {
}
public static DefaultAtlasContextFactory getInstance() {
return getInstance(true);
}
/**
* Returns the default singleton, possibly creating it if necessary.
*
* @param init if {@code true} the newly created {@link DefaultAtlasContextFactory} will be initialized upon
* creation via {@link #init()}; otherwise, the newly created {@link DefaultAtlasContextFactory} will not
* be initialized upon creation via {@link #init()}
* @return the singleton
*/
public static DefaultAtlasContextFactory getInstance(boolean init) {
if (factory == null) {
factory = new DefaultAtlasContextFactory();
if (init) {
factory.init();
}
}
return factory;
}
@Override
public synchronized void init() {
CompoundClassLoader cl = new DefaultAtlasCompoundClassLoader();
cl.addAlternativeLoader(AtlasMapping.class.getClassLoader());
init(cl);
}
public synchronized void init(CompoundClassLoader cl) {
if (this.initialized) {
return;
}
this.uuid = UUID.randomUUID().toString();
this.threadName = Thread.currentThread().getName();
this.classLoader = cl;
try {
this.properties = new HashMap<>();
Properties props = new Properties();
props.load(this.getClass().getClassLoader().getResourceAsStream("atlasmap.properties"));
String version = props.getProperty(PROPERTY_ATLASMAP_CORE_VERSION);
this.properties.put(PROPERTY_ATLASMAP_CORE_VERSION, version);
} catch (Exception e) {
LOG.debug("Failed to read atlasmap.properties", e);
}
this.atlasConversionService = DefaultAtlasConversionService.getInstance();
this.atlasFieldActionService = DefaultAtlasFieldActionService.getInstance();
this.atlasFieldActionService.init(this.classLoader);
this.atlasCombineStrategy = new DefaultAtlasCombineStrategy();
this.atlasPropertyStrategy = new DefaultAtlasPropertyStrategy();
this.atlasSeparateStrategy = new DefaultAtlasSeparateStrategy();
this.atlasValidationService = new DefaultAtlasValidationService();
registerFactoryJmx(this);
this.moduleInfoRegistry = new DefaultAtlasModuleInfoRegistry(this);
loadModules("moduleClass", AtlasModule.class);
this.initialized = true;
}
@Override
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
@Override
public void setProperties(Properties properties) {
this.properties = new HashMap<>();
properties.forEach((key, value) -> this.properties.put(key.toString(), value.toString()));
}
@Override
public Map<String, String> getProperties() {
return this.properties;
}
@Override
public synchronized void destroy() {
if (!this.initialized) {
return;
}
unloadModules();
try {
if (ManagementFactory.getPlatformMBeanServer().isRegistered(getJmxObjectName())) {
ManagementFactory.getPlatformMBeanServer().unregisterMBean(getJmxObjectName());
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistered AtlasContextFactory with JMX");
}
}
} catch (Exception e) {
LOG.warn("Unable to unregister with JMX", e);
}
this.uuid = null;
this.objectName = null;
this.properties = null;
this.atlasFieldActionService = null;
this.atlasConversionService = null;
this.atlasPropertyStrategy = null;
this.atlasCombineStrategy = null;
this.atlasSeparateStrategy = null;
this.atlasValidationService = null;
this.moduleInfoRegistry = null;
this.classLoader = null;
this.threadName = null;
this.initialized = false;
}
@Override
public DefaultAtlasContext createContext(File atlasMappingFile) throws AtlasException {
if (atlasMappingFile == null) {
throw new AtlasException("AtlasMappingFile must be specified");
}
return createContext(atlasMappingFile.toURI());
}
@Override
public DefaultAtlasContext createContext(URI atlasMappingUri) throws AtlasException {
if (atlasMappingUri == null) {
throw new AtlasException("AtlasMappingUri must be specified");
}
DefaultAtlasContext context = new DefaultAtlasContext(this, atlasMappingUri);
return context;
}
public DefaultAtlasContext createContext(AtlasMapping mapping) throws AtlasException {
DefaultAtlasContext context = new DefaultAtlasContext(this, mapping);
return context;
}
@Override
public DefaultAtlasContext createContext(Format format, InputStream stream) throws AtlasException {
DefaultAtlasContext context = new DefaultAtlasContext(this, format, stream);
return context;
}
@Override
public DefaultAtlasPreviewContext createPreviewContext() {
DefaultAtlasPreviewContext context = new DefaultAtlasPreviewContext(this);
return context;
}
@Override
public String getClassName() {
return this.getClass().getName();
}
@Override
public String getThreadName() {
return this.threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
@Override
public String getVersion() {
return this.getClass().getPackage().getImplementationVersion();
}
@Override
public String getUuid() {
return this.uuid;
}
public ObjectName getJmxObjectName() {
return this.objectName;
}
public AtlasModuleInfoRegistry getModuleInfoRegistry() {
return this.moduleInfoRegistry;
}
public void setModuleInfoRegistry(AtlasModuleInfoRegistry registry) {
this.moduleInfoRegistry = registry;
}
@Override
public DefaultAtlasConversionService getConversionService() {
return this.atlasConversionService;
}
@Override
public DefaultAtlasFieldActionService getFieldActionService() {
return this.atlasFieldActionService;
}
@Override
public AtlasCombineStrategy getCombineStrategy() {
return atlasCombineStrategy;
}
public void setCombineStrategy(AtlasCombineStrategy atlasCombineStrategy) {
this.atlasCombineStrategy = atlasCombineStrategy;
}
@Override
public AtlasPropertyStrategy getPropertyStrategy() {
return atlasPropertyStrategy;
}
@Override
public void setPropertyStrategy(AtlasPropertyStrategy atlasPropertyStrategy) {
this.atlasPropertyStrategy = atlasPropertyStrategy;
}
@Override
public AtlasSeparateStrategy getSeparateStrategy() {
return atlasSeparateStrategy;
}
public void setSeparateStrategy(AtlasSeparateStrategy atlasSeparateStrategy) {
this.atlasSeparateStrategy = atlasSeparateStrategy;
}
@Override
public AtlasValidationService getValidationService() {
return atlasValidationService;
}
public void setValidationService(AtlasValidationService atlasValidationService) {
this.atlasValidationService = atlasValidationService;
}
public CompoundClassLoader getClassLoader() {
return this.classLoader;
}
@Override
public void addClassLoader(ClassLoader cl) {
this.classLoader.addAlternativeLoader(cl);
}
public void setClassLoader(CompoundClassLoader cl) {
this.classLoader = cl;
}
protected void loadModules(String moduleClassProperty, Class<?> moduleInterface) {
Class<?> moduleClass = null;
String moduleClassName = null;
Set<String> serviceClasses = new HashSet<>();
try {
Enumeration<URL> urls = classLoader.getResources("META-INF/services/atlas/module/atlas.module");
while (urls.hasMoreElements()) {
URL tmp = urls.nextElement();
Properties prop = AtlasUtil.loadPropertiesFromURL(tmp);
String serviceClassPropertyValue = (String) prop.get(moduleClassProperty);
String[] splitted = serviceClassPropertyValue != null ? serviceClassPropertyValue.split(",") : new String[0];
for (String entry : splitted) {
if (!AtlasUtil.isEmpty(entry)) {
serviceClasses.add((entry));
}
}
}
} catch (Exception e) {
LOG.warn("Error loading module resources", e);
}
for (String clazz : serviceClasses) {
try {
moduleClass = classLoader.loadClass(clazz);
moduleClassName = moduleClass.getName();
if (isClassAtlasModule(moduleClass, moduleInterface)) {
@SuppressWarnings("unchecked")
Class<AtlasModule> atlasModuleClass = (Class<AtlasModule>) moduleClass;
Constructor<AtlasModule> constructor = atlasModuleClass.getDeclaredConstructor();
if (constructor != null) {
AtlasModuleInfo module = new DefaultAtlasModuleInfo(getModuleName(moduleClass),
getModuleUri(moduleClass), atlasModuleClass, constructor,
getSupportedDataFormats(moduleClass), getConfigPackages(moduleClass));
getModuleInfoRegistry().register(module);
} else {
LOG.warn("Invalid module class {}: constructor is not present", moduleClassName);
}
} else {
LOG.warn("Invalid module class {}: unsupported AtlasModule", moduleClassName);
}
} catch (NoSuchMethodException e) {
LOG.warn(String.format("Invalid module class %s: constructor is not present.", moduleClassName), e);
} catch (ClassNotFoundException e) {
LOG.warn(String.format("Invalid module class %s: not found in classLoader.", moduleClassName), e);
} catch (Exception e) {
LOG.warn(String.format("Invalid module class %s: unknown error.", moduleClassName), e);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded: {} of {} detected modules", getModuleInfoRegistry().size(), serviceClasses.size());
}
}
protected void unloadModules() {
if (getModuleInfoRegistry() == null) {
return;
}
int moduleCount = getModuleInfoRegistry().size();
getModuleInfoRegistry().unregisterAll();
if (LOG.isDebugEnabled()) {
LOG.debug("Unloaded: {} modules", moduleCount);
}
}
protected boolean isClassAtlasModule(Class<?> clazz, Class<?> moduleInterface) {
if (clazz == null) {
return false;
}
if (isAtlasModuleInterface(clazz, moduleInterface) && clazz.isAnnotationPresent(AtlasModuleDetail.class)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} is a '{}' implementation", clazz.getCanonicalName(), moduleInterface.getSimpleName());
}
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} is NOT a '{}' implementation", clazz.getCanonicalName(), moduleInterface.getSimpleName());
}
return false;
}
protected boolean isAtlasModuleInterface(Class<?> clazz, Class<?> moduleInterface) {
if (clazz == null) {
return false;
}
boolean isIface = false;
Class<?> superClazz = clazz.getSuperclass();
if (superClazz != null) {
isIface = isAtlasModuleInterface(superClazz, moduleInterface);
}
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> iface : interfaces) {
if (iface.equals(moduleInterface)) {
isIface = true;
}
}
return isIface;
}
protected String getModuleUri(Class<?> clazz) {
AtlasModuleDetail detail = clazz.getAnnotation(AtlasModuleDetail.class);
if (detail != null) {
return detail.uri();
}
return "UNDEFINED";
}
protected String getModuleName(Class<?> clazz) {
AtlasModuleDetail detail = clazz.getAnnotation(AtlasModuleDetail.class);
if (detail != null) {
return detail.name();
}
return "UNDEFINED-" + UUID.randomUUID().toString();
}
protected List<String> getSupportedDataFormats(Class<?> clazz) {
List<String> dataFormats = null;
AtlasModuleDetail detail = clazz.getAnnotation(AtlasModuleDetail.class);
if (detail != null) {
dataFormats = new ArrayList<>();
String[] formats = detail.dataFormats();
for (String format : formats) {
dataFormats.add(format.trim());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Module: {} supports data formats: {}", clazz.getCanonicalName(), dataFormats);
}
return dataFormats;
}
protected List<String> getConfigPackages(Class<?> clazz) {
List<String> configPackages = null;
AtlasModuleDetail detail = clazz.getAnnotation(AtlasModuleDetail.class);
if (detail != null) {
configPackages = new ArrayList<>();
String[] packages = detail.configPackages();
for (String pkg : packages) {
configPackages.add(pkg.trim());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Module: {} config packages: {}", clazz.getCanonicalName(), configPackages);
}
return configPackages;
}
protected List<String> getAllModuleConfigPackages(AtlasModuleInfoRegistry registry) {
List<String> pkgs = new ArrayList<>();
for (AtlasModuleInfo moduleInfo : registry.getAll()) {
pkgs.addAll(Arrays.asList(moduleInfo.getPackageNames()));
}
return pkgs;
}
protected void registerFactoryJmx(DefaultAtlasContextFactory factory) {
if (factory == null) {
return;
}
try {
factory.setObjectName();
if (!ManagementFactory.getPlatformMBeanServer().isRegistered(factory.getJmxObjectName())) {
ManagementFactory.getPlatformMBeanServer().registerMBean(factory, factory.getJmxObjectName());
if (LOG.isDebugEnabled()) {
LOG.debug("Registered AtlasContextFactory with JMX");
}
}
} catch (Exception e) {
LOG.warn("Unable to resgister DefaultAtlasContextFactory with JMX", e);
}
}
protected void setObjectName() throws MalformedObjectNameException {
this.objectName = new ObjectName(String.format("io.atlasmap:type=AtlasServiceFactory,factoryUuid=%s", getUuid()));
}
}