001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030 031import javax.management.JMException; 032import javax.management.ObjectName; 033 034import org.apache.activemq.advisory.AdvisorySupport; 035import org.apache.activemq.broker.Broker; 036import org.apache.activemq.broker.BrokerFilter; 037import org.apache.activemq.broker.BrokerService; 038import org.apache.activemq.broker.ConnectionContext; 039import org.apache.activemq.broker.jmx.AnnotatedMBean; 040import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 041import org.apache.activemq.broker.jmx.VirtualDestinationSelectorCacheView; 042import org.apache.activemq.broker.region.Subscription; 043import org.apache.activemq.command.ConsumerInfo; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A plugin which allows the caching of the selector from a subscription queue. 049 * <p/> 050 * This stops the build-up of unwanted messages, especially when consumers may 051 * disconnect from time to time when using virtual destinations. 052 * <p/> 053 * This is influenced by code snippets developed by Maciej Rakowicz 054 * 055 * Refer to: 056 * https://issues.apache.org/activemq/browse/AMQ-3004 057 * http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E 058 */ 059public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { 060 private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); 061 public static final String MATCH_EVERYTHING = "TRUE"; 062 063 /** 064 * The subscription's selector cache. We cache compiled expressions keyed 065 * by the target destination. 066 */ 067 private ConcurrentMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>(); 068 069 private final File persistFile; 070 private boolean singleSelectorPerDestination = false; 071 private boolean ignoreWildcardSelectors = false; 072 private ObjectName objectName; 073 074 private boolean running = true; 075 private final Thread persistThread; 076 private long persistInterval = MAX_PERSIST_INTERVAL; 077 public static final long MAX_PERSIST_INTERVAL = 600000; 078 private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; 079 080 /** 081 * Constructor 082 */ 083 public SubQueueSelectorCacheBroker(Broker next, final File persistFile) { 084 super(next); 085 this.persistFile = persistFile; 086 LOG.info("Using persisted selector cache from[{}]", persistFile); 087 088 readCache(); 089 090 persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); 091 persistThread.start(); 092 enableJmx(); 093 } 094 095 private void enableJmx() { 096 BrokerService broker = getBrokerService(); 097 if (broker.isUseJmx()) { 098 VirtualDestinationSelectorCacheView view = new VirtualDestinationSelectorCacheView(this); 099 try { 100 objectName = BrokerMBeanSupport.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); 101 LOG.trace("virtualDestinationCacheSelector mbean name; " + objectName.toString()); 102 AnnotatedMBean.registerMBean(broker.getManagementContext(), view, objectName); 103 } catch (Exception e) { 104 LOG.warn("JMX is enabled, but when installing the VirtualDestinationSelectorCache, couldn't install the JMX mbeans. Continuing without installing the mbeans."); 105 } 106 107 } 108 } 109 110 @Override 111 public void stop() throws Exception { 112 running = false; 113 if (persistThread != null) { 114 persistThread.interrupt(); 115 persistThread.join(); 116 } //if 117 unregisterMBeans(); 118 } 119 120 private void unregisterMBeans() { 121 BrokerService broker = getBrokerService(); 122 if (broker.isUseJmx() && this.objectName != null) { 123 try { 124 broker.getManagementContext().unregisterMBean(objectName); 125 } catch (JMException e) { 126 LOG.warn("Trying uninstall VirtualDestinationSelectorCache; couldn't uninstall mbeans, continuting..."); 127 } 128 } 129 } 130 131 @Override 132 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 133 // don't track selectors for advisory topics 134 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 135 String destinationName = info.getDestination().getQualifiedName(); 136 LOG.debug("Caching consumer selector [{}] on '{}'", info.getSelector(), destinationName); 137 138 String selector = info.getSelector() == null ? MATCH_EVERYTHING : info.getSelector(); 139 140 if (!(ignoreWildcardSelectors && hasWildcards(selector))) { 141 142 Set<String> selectors = subSelectorCache.get(destinationName); 143 if (selectors == null) { 144 selectors = Collections.synchronizedSet(new HashSet<String>()); 145 } else if (singleSelectorPerDestination && !MATCH_EVERYTHING.equals(selector)) { 146 // in this case, we allow only ONE selector. But we don't count the catch-all "null/TRUE" selector 147 // here, we always allow that one. But only one true selector. 148 boolean containsMatchEverything = selectors.contains(MATCH_EVERYTHING); 149 selectors.clear(); 150 151 // put back the MATCH_EVERYTHING selector 152 if (containsMatchEverything) { 153 selectors.add(MATCH_EVERYTHING); 154 } 155 } 156 157 LOG.debug("adding new selector: into cache " + selector); 158 selectors.add(selector); 159 LOG.debug("current selectors in cache: " + selectors); 160 subSelectorCache.put(destinationName, selectors); 161 } 162 163 164 } 165 return super.addConsumer(context, info); 166 } 167 168 // trivial check for SQL92/selector wildcards 169 private boolean hasWildcards(String selector) { 170 return selector.contains("%") || selector.contains("_"); 171 } 172 173 @Override 174 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 175 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { 176 177 if (singleSelectorPerDestination) { 178 String destinationName = info.getDestination().getQualifiedName(); 179 Set<String> selectors = subSelectorCache.get(destinationName); 180 if (info.getSelector() == null && selectors.size() > 1) { 181 boolean removed = selectors.remove(MATCH_EVERYTHING); 182 LOG.debug("A non-selector consumer has dropped. Removing the catchall matching pattern 'TRUE'. Successful? " + removed); 183 } 184 } 185 186 } 187 super.removeConsumer(context, info); 188 } 189 190 private void readCache() { 191 if (persistFile != null && persistFile.exists()) { 192 try { 193 FileInputStream fis = new FileInputStream(persistFile); 194 try { 195 ObjectInputStream in = new ObjectInputStream(fis); 196 try { 197 subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject(); 198 } catch (ClassNotFoundException ex) { 199 LOG.error("Invalid selector cache data found. Please remove file.", ex); 200 } finally { 201 in.close(); 202 } //try 203 } finally { 204 fis.close(); 205 } //try 206 } catch (IOException ex) { 207 LOG.error("Unable to read persisted selector cache...it will be ignored!", ex); 208 } //try 209 } //if 210 } 211 212 /** 213 * Persist the selector cache. 214 */ 215 private void persistCache() { 216 LOG.debug("Persisting selector cache...."); 217 try { 218 FileOutputStream fos = new FileOutputStream(persistFile); 219 try { 220 ObjectOutputStream out = new ObjectOutputStream(fos); 221 try { 222 out.writeObject(subSelectorCache); 223 } finally { 224 out.flush(); 225 out.close(); 226 } //try 227 } catch (IOException ex) { 228 LOG.error("Unable to persist selector cache", ex); 229 } finally { 230 fos.close(); 231 } //try 232 } catch (IOException ex) { 233 LOG.error("Unable to access file[{}]", persistFile, ex); 234 } //try 235 } 236 237 /** 238 * @return The JMS selector for the specified {@code destination} 239 */ 240 public Set<String> getSelector(final String destination) { 241 return subSelectorCache.get(destination); 242 } 243 244 /** 245 * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms. 246 * 247 * @see java.lang.Runnable#run() 248 */ 249 @Override 250 public void run() { 251 while (running) { 252 try { 253 Thread.sleep(persistInterval); 254 } catch (InterruptedException ex) { 255 } //try 256 257 persistCache(); 258 } 259 } 260 261 public boolean isSingleSelectorPerDestination() { 262 return singleSelectorPerDestination; 263 } 264 265 public void setSingleSelectorPerDestination(boolean singleSelectorPerDestination) { 266 this.singleSelectorPerDestination = singleSelectorPerDestination; 267 } 268 269 public Set<String> getSelectorsForDestination(String destinationName) { 270 if (subSelectorCache.containsKey(destinationName)) { 271 return new HashSet<String>(subSelectorCache.get(destinationName)); 272 } 273 274 return Collections.EMPTY_SET; 275 } 276 277 public long getPersistInterval() { 278 return persistInterval; 279 } 280 281 public void setPersistInterval(long persistInterval) { 282 this.persistInterval = persistInterval; 283 } 284 285 public boolean deleteSelectorForDestination(String destinationName, String selector) { 286 if (subSelectorCache.containsKey(destinationName)) { 287 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 288 return cachedSelectors.remove(selector); 289 } 290 291 return false; 292 } 293 294 public boolean deleteAllSelectorsForDestination(String destinationName) { 295 if (subSelectorCache.containsKey(destinationName)) { 296 Set<String> cachedSelectors = subSelectorCache.get(destinationName); 297 cachedSelectors.clear(); 298 } 299 return true; 300 } 301 302 public boolean isIgnoreWildcardSelectors() { 303 return ignoreWildcardSelectors; 304 } 305 306 public void setIgnoreWildcardSelectors(boolean ignoreWildcardSelectors) { 307 this.ignoreWildcardSelectors = ignoreWildcardSelectors; 308 } 309} 310