ConnectionWorker.java
package com.github.mk23.jmxproxy.jmx;
import com.github.mk23.jmxproxy.core.Host;
import com.github.mk23.jmxproxy.core.MBean;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>JMX fetcher and tracker.</p>
*
* Creates a {@link ScheduledExecutorService} to periodically make to a {@link JMXConnector}
* to a JMX endpoint. At every execution, discovers and fetches all remote registered mbeans
* and populates the local {@link Host} object with assicated {@link MBean}s and their
* {@link com.github.mk23.jmxproxy.core.Attribute}. Also maintains the {@link Host} access
* time to allow reaping of
* unaccessed workers.
*
* @since 2015-05-11
* @author mk23
* @version 3.2.0
*/
public class ConnectionWorker {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionWorker.class);
private Host host;
private ConnectionCredentials authCreds;
private boolean authValid;
private IOException connError;
private final Object fetchLock;
private final int historySize;
private final long cacheDuration;
private final JMXServiceURL url;
private long accessTime;
private ScheduledExecutorService pollerSvc;
/**
* <p>Default constructor.</p>
*
* Initializes the {@link JMXServiceURL} to the specified hostName
* and starts the {@link ScheduledExecutorService} for periodically
* connecting and fetching JMX objects.
*
* @param hostName host:port {@link String} JMX agent target.
* @param cacheDuration period in milliseconds for how often to connect to the JMX agent.
* @param historySize number of {@link com.github.mk23.jmxproxy.core.Attribute}s to keep
* per {@link MBean} {@link com.github.mk23.jmxproxy.util.History}.
*
* @throws MalformedURLException if the specified host is not a valid host:port {@link String}.
*/
public ConnectionWorker(
final String hostName,
final long cacheDuration,
final int historySize
) throws MalformedURLException {
this.historySize = historySize;
this.cacheDuration = cacheDuration;
fetchLock = new Object();
url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + hostName + "/jmxrmi");
}
/**
* <p>Getter for host.<p>
*
* Fetches the tracked {@link Host} object and resets the request access time.
*
* @param testCreds optional {@link ConnectionCredentials} for the provided JMX agent or null if none.
*
* @return {@link Host} as populated by the most recent fetch operation.
*
* @throws IOException if the specified host is unreachable.
* @throws SecurityException if the specified credentials to the host are incorrect.
*/
public final Host getHost(final ConnectionCredentials testCreds) throws IOException, SecurityException {
accessTime = System.currentTimeMillis();
if (host == null) {
final CountDownLatch ready = new CountDownLatch(1);
shutdown();
host = new Host();
authCreds = (testCreds != null) ? testCreds : new ConnectionCredentials();
pollerSvc = Executors.newSingleThreadScheduledExecutor();
pollerSvc.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchJMXValues();
} catch (Exception e) {
e.printStackTrace();
}
ready.countDown();
}
}, 0, cacheDuration, TimeUnit.MILLISECONDS);
try {
ready.await();
} catch (InterruptedException e) {
LOG.error("unable to finish first run", e);
}
}
if (connError != null) {
throw connError;
} else if (!authCreds.equals(testCreds) && authValid) {
throw new SecurityException();
} else if (!authValid) {
throw new SecurityException();
} else {
synchronized (fetchLock) {
return host;
}
}
}
/**
* <p>Checks expiration of this worker object.</p>
*
* Checks the last access time against the provided duration limit to determine if this worker
* is expired and can be purged.
*
* @param accessDuration time in milliseconds of no access after which this worker is expired.
*
* @return true if this worker hasn't been accessed recently, false otherwise.
*/
public final boolean isExpired(final long accessDuration) {
return System.currentTimeMillis() - accessTime > accessDuration;
}
/**
* <p>Stops the scheduled fetcher.</p>
*
* Signals the {@link ScheduledExecutorService} to shutdown for process termination cleanup.
*/
public final void shutdown() {
if (pollerSvc != null && !pollerSvc.isShutdown()) {
pollerSvc.shutdown();
}
}
private void fetchJMXValues() {
JMXConnector connection = null;
MBeanServerConnection server = null;
Map<String, Object> environment = new HashMap<String, Object>();
if (authCreds != null) {
environment.put(JMXConnector.CREDENTIALS, new String[]{authCreds.getUsername(), authCreds.getPassword()});
}
try {
LOG.debug("connecting to mbean server " + url);
synchronized (fetchLock) {
connection = JMXConnectorFactory.connect(url, environment);
server = connection.getMBeanServerConnection();
authValid = true;
connError = null;
Set<String> freshMBeans = new HashSet<String>();
for (ObjectName mbeanName : server.queryNames(null, null)) {
LOG.debug("discovered mbean " + mbeanName);
freshMBeans.add(mbeanName.toString());
MBean mbean = host.addMBean(mbeanName.toString(), historySize);
try {
for (MBeanAttributeInfo attributeObject : server.getMBeanInfo(mbeanName).getAttributes()) {
if (attributeObject.isReadable()) {
try {
Object attribute = server.getAttribute(mbeanName, attributeObject.getName());
mbean.addAttribute(attributeObject.getName(), attribute);
} catch (java.lang.NullPointerException
| java.rmi.RemoteException
| javax.management.JMException
| javax.management.JMRuntimeException e) {
LOG.error("failed to add attribute " + attributeObject.toString() + ": " + e);
}
}
}
} catch (javax.management.JMException e) {
LOG.error("failed to get mbean info for " + mbeanName, e);
}
}
Set<String> staleMBeans = new HashSet<String>(host.getMBeans());
staleMBeans.removeAll(freshMBeans);
for (String mbeanName : staleMBeans) {
host.removeMBean(mbeanName);
LOG.debug("removed stale mbean " + mbeanName);
}
}
} catch (IOException e) {
host = null;
connError = e;
LOG.error("communication failure with " + url, e);
} catch (SecurityException e) {
host = null;
authValid = false;
LOG.error("invalid credentials for " + url, e);
} finally {
if (connection != null) {
try {
connection.close();
LOG.debug("disconnected from " + url);
} catch (IOException e) {
LOG.error("failed to disconnect from " + url, e);
} finally {
connection = null;
}
}
}
}
}