001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.Set;
020
021import org.apache.activemq.ActiveMQPrefetchPolicy;
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.region.BaseDestination;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.broker.region.DurableTopicSubscription;
026import org.apache.activemq.broker.region.Queue;
027import org.apache.activemq.broker.region.QueueBrowserSubscription;
028import org.apache.activemq.broker.region.QueueSubscription;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.Topic;
031import org.apache.activemq.broker.region.TopicSubscription;
032import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
033import org.apache.activemq.broker.region.group.GroupFactoryFinder;
034import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
035import org.apache.activemq.filter.DestinationMapEntry;
036import org.apache.activemq.network.NetworkBridgeFilterFactory;
037import org.apache.activemq.usage.SystemUsage;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Represents an entry in a {@link PolicyMap} for assigning policies to a
043 * specific destination or a hierarchical wildcard area of destinations.
044 *
045 * @org.apache.xbean.XBean
046 *
047 */
048public class PolicyEntry extends DestinationMapEntry {
049
050    private static final Logger LOG = LoggerFactory.getLogger(PolicyEntry.class);
051    private DispatchPolicy dispatchPolicy;
052    private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
053    private boolean sendAdvisoryIfNoConsumers;
054    private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
055    private PendingMessageLimitStrategy pendingMessageLimitStrategy;
056    private MessageEvictionStrategy messageEvictionStrategy;
057    private long memoryLimit;
058    private String messageGroupMapFactoryType = "cached";
059    private MessageGroupMapFactory messageGroupMapFactory;
060    private PendingQueueMessageStoragePolicy pendingQueuePolicy;
061    private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
062    private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
063    private int maxProducersToAudit=BaseDestination.MAX_PRODUCERS_TO_AUDIT;
064    private int maxAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
065    private int maxQueueAuditDepth=BaseDestination.MAX_AUDIT_DEPTH;
066    private boolean enableAudit=true;
067    private boolean producerFlowControl = true;
068    private boolean alwaysRetroactive = false;
069    private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
070    private boolean optimizedDispatch=false;
071    private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
072    private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
073    private boolean useCache=true;
074    private long minimumMessageSize=1024;
075    private boolean useConsumerPriority=true;
076    private boolean strictOrderDispatch=false;
077    private boolean lazyDispatch=false;
078    private int timeBeforeDispatchStarts = 0;
079    private int consumersBeforeDispatchStarts = 0;
080    private boolean advisoryForSlowConsumers;
081    private boolean advisoryForFastProducers;
082    private boolean advisoryForDiscardingMessages;
083    private boolean advisoryWhenFull;
084    private boolean advisoryForDelivery;
085    private boolean advisoryForConsumed;
086    private boolean includeBodyForAdvisory;
087    private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
088    private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
089    private int queuePrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
090    private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
091    private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
092    private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
093    private boolean usePrefetchExtension = true;
094    private int cursorMemoryHighWaterMark = 70;
095    private int storeUsageHighWaterMark = 100;
096    private SlowConsumerStrategy slowConsumerStrategy;
097    private boolean prioritizedMessages;
098    private boolean allConsumersExclusiveByDefault;
099    private boolean gcInactiveDestinations;
100    private boolean gcWithNetworkConsumers;
101    private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
102    private boolean reduceMemoryFootprint;
103    private NetworkBridgeFilterFactory networkBridgeFilterFactory;
104    private boolean doOptimzeMessageStorage = true;
105    private int maxDestinations = -1;
106
107    /*
108     * percentage of in-flight messages above which optimize message store is disabled
109     */
110    private int optimizeMessageStoreInFlightLimit = 10;
111    private boolean persistJMSRedelivered = false;
112
113
114    public void configure(Broker broker,Queue queue) {
115        baseConfiguration(broker,queue);
116        if (dispatchPolicy != null) {
117            queue.setDispatchPolicy(dispatchPolicy);
118        }
119        queue.setDeadLetterStrategy(getDeadLetterStrategy());
120        queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
121        if (memoryLimit > 0) {
122            queue.getMemoryUsage().setLimit(memoryLimit);
123        }
124        if (pendingQueuePolicy != null) {
125            PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
126            queue.setMessages(messages);
127        }
128
129        queue.setUseConsumerPriority(isUseConsumerPriority());
130        queue.setStrictOrderDispatch(isStrictOrderDispatch());
131        queue.setOptimizedDispatch(isOptimizedDispatch());
132        queue.setLazyDispatch(isLazyDispatch());
133        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
134        queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
135        queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
136        queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
137    }
138
139    public void update(Queue queue) {
140        update(queue, null);
141    }
142
143    /**
144     * Update a queue with this policy.  Only apply properties that
145     * match the includedProperties list.  Not all properties are eligible
146     * to be updated.
147     *
148     * If includedProperties is null then all of the properties will be set as
149     * isUpdate will return true
150     * @param baseDestination
151     * @param includedProperties
152     */
153    public void update(Queue queue, Set<String> includedProperties) {
154        baseUpdate(queue, includedProperties);
155        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
156            queue.getMemoryUsage().setLimit(memoryLimit);
157        }
158        if (isUpdate("useConsumerPriority", includedProperties)) {
159            queue.setUseConsumerPriority(isUseConsumerPriority());
160        }
161        if (isUpdate("strictOrderDispatch", includedProperties)) {
162            queue.setStrictOrderDispatch(isStrictOrderDispatch());
163        }
164        if (isUpdate("optimizedDispatch", includedProperties)) {
165            queue.setOptimizedDispatch(isOptimizedDispatch());
166        }
167        if (isUpdate("lazyDispatch", includedProperties)) {
168            queue.setLazyDispatch(isLazyDispatch());
169        }
170        if (isUpdate("timeBeforeDispatchStarts", includedProperties)) {
171            queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
172        }
173        if (isUpdate("consumersBeforeDispatchStarts", includedProperties)) {
174            queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
175        }
176        if (isUpdate("allConsumersExclusiveByDefault", includedProperties)) {
177            queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
178        }
179        if (isUpdate("persistJMSRedelivered", includedProperties)) {
180            queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
181        }
182    }
183
184    public void configure(Broker broker,Topic topic) {
185        baseConfiguration(broker,topic);
186        if (dispatchPolicy != null) {
187            topic.setDispatchPolicy(dispatchPolicy);
188        }
189        topic.setDeadLetterStrategy(getDeadLetterStrategy());
190        if (subscriptionRecoveryPolicy != null) {
191            SubscriptionRecoveryPolicy srp = subscriptionRecoveryPolicy.copy();
192            srp.setBroker(broker);
193            topic.setSubscriptionRecoveryPolicy(srp);
194        }
195        if (memoryLimit > 0) {
196            topic.getMemoryUsage().setLimit(memoryLimit);
197        }
198        topic.setLazyDispatch(isLazyDispatch());
199    }
200
201    public void update(Topic topic) {
202        update(topic, null);
203    }
204
205    //If includedProperties is null then all of the properties will be set as
206    //isUpdate will return true
207    public void update(Topic topic, Set<String> includedProperties) {
208        baseUpdate(topic, includedProperties);
209        if (isUpdate("memoryLimit", includedProperties) && memoryLimit > 0) {
210            topic.getMemoryUsage().setLimit(memoryLimit);
211        }
212        if (isUpdate("lazyDispatch", includedProperties)) {
213            topic.setLazyDispatch(isLazyDispatch());
214        }
215    }
216
217    // attributes that can change on the fly
218    public void baseUpdate(BaseDestination destination) {
219        baseUpdate(destination, null);
220    }
221
222    // attributes that can change on the fly
223    //If includedProperties is null then all of the properties will be set as
224    //isUpdate will return true
225    public void baseUpdate(BaseDestination destination, Set<String> includedProperties) {
226        if (isUpdate("producerFlowControl", includedProperties)) {
227            destination.setProducerFlowControl(isProducerFlowControl());
228        }
229        if (isUpdate("alwaysRetroactive", includedProperties)) {
230            destination.setAlwaysRetroactive(isAlwaysRetroactive());
231        }
232        if (isUpdate("blockedProducerWarningInterval", includedProperties)) {
233            destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
234        }
235        if (isUpdate("maxPageSize", includedProperties)) {
236            destination.setMaxPageSize(getMaxPageSize());
237        }
238        if (isUpdate("maxBrowsePageSize", includedProperties)) {
239            destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
240        }
241
242        if (isUpdate("minimumMessageSize", includedProperties)) {
243            destination.setMinimumMessageSize((int) getMinimumMessageSize());
244        }
245        if (isUpdate("maxExpirePageSize", includedProperties)) {
246            destination.setMaxExpirePageSize(getMaxExpirePageSize());
247        }
248        if (isUpdate("cursorMemoryHighWaterMark", includedProperties)) {
249            destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
250        }
251        if (isUpdate("storeUsageHighWaterMark", includedProperties)) {
252            destination.setStoreUsageHighWaterMark(getStoreUsageHighWaterMark());
253        }
254        if (isUpdate("gcInactiveDestinations", includedProperties)) {
255            destination.setGcIfInactive(isGcInactiveDestinations());
256        }
257        if (isUpdate("gcWithNetworkConsumers", includedProperties)) {
258            destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
259        }
260        if (isUpdate("inactiveTimeoutBeforeGc", includedProperties)) {
261            destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
262        }
263        if (isUpdate("reduceMemoryFootprint", includedProperties)) {
264            destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
265        }
266        if (isUpdate("doOptimizeMessageStore", includedProperties)) {
267            destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
268        }
269        if (isUpdate("optimizeMessageStoreInFlightLimit", includedProperties)) {
270            destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
271        }
272        if (isUpdate("advisoryForConsumed", includedProperties)) {
273            destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
274        }
275        if (isUpdate("advisoryForDelivery", includedProperties)) {
276            destination.setAdvisoryForDelivery(isAdvisoryForDelivery());
277        }
278        if (isUpdate("advisoryForDiscardingMessages", includedProperties)) {
279            destination.setAdvisoryForDiscardingMessages(isAdvisoryForDiscardingMessages());
280        }
281        if (isUpdate("advisoryForSlowConsumers", includedProperties)) {
282            destination.setAdvisoryForSlowConsumers(isAdvisoryForSlowConsumers());
283        }
284        if (isUpdate("advisoryForFastProducers", includedProperties)) {
285            destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
286        }
287        if (isUpdate("advisoryWhenFull", includedProperties)) {
288            destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
289        }
290        if (isUpdate("includeBodyForAdvisory", includedProperties)) {
291            destination.setIncludeBodyForAdvisory(isIncludeBodyForAdvisory());
292        }
293        if (isUpdate("sendAdvisoryIfNoConsumers", includedProperties)) {
294            destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
295        }
296    }
297
298    public void baseConfiguration(Broker broker, BaseDestination destination) {
299        baseUpdate(destination);
300        destination.setEnableAudit(isEnableAudit());
301        destination.setMaxAuditDepth(getMaxQueueAuditDepth());
302        destination.setMaxProducersToAudit(getMaxProducersToAudit());
303        destination.setUseCache(isUseCache());
304        destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
305        SlowConsumerStrategy scs = getSlowConsumerStrategy();
306        if (scs != null) {
307            scs.setBrokerService(broker);
308            scs.addDestination(destination);
309        }
310        destination.setSlowConsumerStrategy(scs);
311        destination.setPrioritizedMessages(isPrioritizedMessages());
312    }
313
314    public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
315        configurePrefetch(subscription);
316        if (pendingMessageLimitStrategy != null) {
317            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
318            int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
319            if (consumerLimit > 0) {
320                if (value < 0 || consumerLimit < value) {
321                    value = consumerLimit;
322                }
323            }
324            if (value >= 0) {
325                LOG.debug("Setting the maximumPendingMessages size to: {} for consumer: {}", value, subscription.getInfo().getConsumerId());
326                subscription.setMaximumPendingMessages(value);
327            }
328        }
329        if (messageEvictionStrategy != null) {
330            subscription.setMessageEvictionStrategy(messageEvictionStrategy);
331        }
332        if (pendingSubscriberPolicy != null) {
333            String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
334            int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
335            subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize,subscription));
336        }
337        if (enableAudit) {
338            subscription.setEnableAudit(enableAudit);
339            subscription.setMaxProducersToAudit(maxProducersToAudit);
340            subscription.setMaxAuditDepth(maxAuditDepth);
341        }
342    }
343
344    public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
345        String clientId = sub.getSubscriptionKey().getClientId();
346        String subName = sub.getSubscriptionKey().getSubscriptionName();
347        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
348        configurePrefetch(sub);
349        if (pendingDurableSubscriberPolicy != null) {
350            PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,sub.getPrefetchSize(),sub);
351            cursor.setSystemUsage(memoryManager);
352            sub.setPending(cursor);
353        }
354        int auditDepth = getMaxAuditDepth();
355        if (auditDepth == BaseDestination.MAX_AUDIT_DEPTH && this.isPrioritizedMessages()) {
356            sub.setMaxAuditDepth(auditDepth * 10);
357        } else {
358            sub.setMaxAuditDepth(auditDepth);
359        }
360        sub.setMaxProducersToAudit(getMaxProducersToAudit());
361        sub.setUsePrefetchExtension(isUsePrefetchExtension());
362    }
363
364    public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) {
365        configurePrefetch(sub);
366        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
367        sub.setUsePrefetchExtension(isUsePrefetchExtension());
368
369        // TODO
370        // We currently need an infinite audit because of the way that browser dispatch
371        // is done.  We should refactor the browsers to better handle message dispatch so
372        // we can remove this and perform a more efficient dispatch.
373        sub.setMaxProducersToAudit(Integer.MAX_VALUE);
374        sub.setMaxAuditDepth(Short.MAX_VALUE);
375
376        // part solution - dispatching to browsers needs to be restricted
377        sub.setMaxMessages(getMaxBrowsePageSize());
378    }
379
380    public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {
381        configurePrefetch(sub);
382        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
383        sub.setUsePrefetchExtension(isUsePrefetchExtension());
384        sub.setMaxProducersToAudit(getMaxProducersToAudit());
385    }
386
387    public void configurePrefetch(Subscription subscription) {
388
389        final int currentPrefetch = subscription.getConsumerInfo().getPrefetchSize();
390        if (subscription instanceof QueueBrowserSubscription) {
391            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH) {
392                ((QueueBrowserSubscription) subscription).setPrefetchSize(getQueueBrowserPrefetch());
393            }
394        } else if (subscription instanceof QueueSubscription) {
395            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH) {
396                ((QueueSubscription) subscription).setPrefetchSize(getQueuePrefetch());
397            }
398        } else if (subscription instanceof DurableTopicSubscription) {
399            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH ||
400                    subscription.getConsumerInfo().getPrefetchSize() == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH) {
401                ((DurableTopicSubscription)subscription).setPrefetchSize(getDurableTopicPrefetch());
402            }
403        } else if (subscription instanceof TopicSubscription) {
404            if (currentPrefetch == ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH) {
405                ((TopicSubscription) subscription).setPrefetchSize(getTopicPrefetch());
406            }
407        }
408        if (currentPrefetch != 0 && subscription.getPrefetchSize() == 0) {
409            // tell the sub so that it can issue a pull request
410            subscription.updateConsumerPrefetch(0);
411        }
412    }
413
414    private boolean isUpdate(String property, Set<String> includedProperties) {
415        return includedProperties == null || includedProperties.contains(property);
416    }
417    // Properties
418    // -------------------------------------------------------------------------
419    public DispatchPolicy getDispatchPolicy() {
420        return dispatchPolicy;
421    }
422
423    public void setDispatchPolicy(DispatchPolicy policy) {
424        this.dispatchPolicy = policy;
425    }
426
427    public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
428        return subscriptionRecoveryPolicy;
429    }
430
431    public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
432        this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
433    }
434
435    public boolean isSendAdvisoryIfNoConsumers() {
436        return sendAdvisoryIfNoConsumers;
437    }
438
439    /**
440     * Sends an advisory message if a non-persistent message is sent and there
441     * are no active consumers
442     */
443    public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
444        this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
445    }
446
447    public DeadLetterStrategy getDeadLetterStrategy() {
448        return deadLetterStrategy;
449    }
450
451    /**
452     * Sets the policy used to determine which dead letter queue destination
453     * should be used
454     */
455    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
456        this.deadLetterStrategy = deadLetterStrategy;
457    }
458
459    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
460        return pendingMessageLimitStrategy;
461    }
462
463    /**
464     * Sets the strategy to calculate the maximum number of messages that are
465     * allowed to be pending on consumers (in addition to their prefetch sizes).
466     * Once the limit is reached, non-durable topics can then start discarding
467     * old messages. This allows us to keep dispatching messages to slow
468     * consumers while not blocking fast consumers and discarding the messages
469     * oldest first.
470     */
471    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
472        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
473    }
474
475    public MessageEvictionStrategy getMessageEvictionStrategy() {
476        return messageEvictionStrategy;
477    }
478
479    /**
480     * Sets the eviction strategy used to decide which message to evict when the
481     * slow consumer needs to discard messages
482     */
483    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
484        this.messageEvictionStrategy = messageEvictionStrategy;
485    }
486
487    public long getMemoryLimit() {
488        return memoryLimit;
489    }
490
491    /**
492     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
493     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
494     */
495    public void setMemoryLimit(long memoryLimit) {
496        this.memoryLimit = memoryLimit;
497    }
498
499    public MessageGroupMapFactory getMessageGroupMapFactory() {
500        if (messageGroupMapFactory == null) {
501            try {
502            messageGroupMapFactory = GroupFactoryFinder.createMessageGroupMapFactory(getMessageGroupMapFactoryType());
503            }catch(Exception e){
504                LOG.error("Failed to create message group Factory ",e);
505            }
506        }
507        return messageGroupMapFactory;
508    }
509
510    /**
511     * Sets the factory used to create new instances of {MessageGroupMap} used
512     * to implement the <a
513     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
514     * functionality.
515     */
516    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
517        this.messageGroupMapFactory = messageGroupMapFactory;
518    }
519
520
521    public String getMessageGroupMapFactoryType() {
522        return messageGroupMapFactoryType;
523    }
524
525    public void setMessageGroupMapFactoryType(String messageGroupMapFactoryType) {
526        this.messageGroupMapFactoryType = messageGroupMapFactoryType;
527    }
528
529
530    /**
531     * @return the pendingDurableSubscriberPolicy
532     */
533    public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
534        return this.pendingDurableSubscriberPolicy;
535    }
536
537    /**
538     * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy
539     *                to set
540     */
541    public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
542        this.pendingDurableSubscriberPolicy = pendingDurableSubscriberPolicy;
543    }
544
545    /**
546     * @return the pendingQueuePolicy
547     */
548    public PendingQueueMessageStoragePolicy getPendingQueuePolicy() {
549        return this.pendingQueuePolicy;
550    }
551
552    /**
553     * @param pendingQueuePolicy the pendingQueuePolicy to set
554     */
555    public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy) {
556        this.pendingQueuePolicy = pendingQueuePolicy;
557    }
558
559    /**
560     * @return the pendingSubscriberPolicy
561     */
562    public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy() {
563        return this.pendingSubscriberPolicy;
564    }
565
566    /**
567     * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
568     */
569    public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) {
570        this.pendingSubscriberPolicy = pendingSubscriberPolicy;
571    }
572
573    /**
574     * @return true if producer flow control enabled
575     */
576    public boolean isProducerFlowControl() {
577        return producerFlowControl;
578    }
579
580    /**
581     * @param producerFlowControl
582     */
583    public void setProducerFlowControl(boolean producerFlowControl) {
584        this.producerFlowControl = producerFlowControl;
585    }
586
587    /**
588     * @return true if topic is always retroactive
589     */
590    public boolean isAlwaysRetroactive() {
591        return alwaysRetroactive;
592    }
593
594    /**
595     * @param alwaysRetroactive
596     */
597    public void setAlwaysRetroactive(boolean alwaysRetroactive) {
598        this.alwaysRetroactive = alwaysRetroactive;
599    }
600
601
602    /**
603     * Set's the interval at which warnings about producers being blocked by
604     * resource usage will be triggered. Values of 0 or less will disable
605     * warnings
606     *
607     * @param blockedProducerWarningInterval the interval at which warning about
608     *            blocked producers will be triggered.
609     */
610    public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
611        this.blockedProducerWarningInterval = blockedProducerWarningInterval;
612    }
613
614    /**
615     *
616     * @return the interval at which warning about blocked producers will be
617     *         triggered.
618     */
619    public long getBlockedProducerWarningInterval() {
620        return blockedProducerWarningInterval;
621    }
622
623    /**
624     * @return the maxProducersToAudit
625     */
626    public int getMaxProducersToAudit() {
627        return maxProducersToAudit;
628    }
629
630    /**
631     * @param maxProducersToAudit the maxProducersToAudit to set
632     */
633    public void setMaxProducersToAudit(int maxProducersToAudit) {
634        this.maxProducersToAudit = maxProducersToAudit;
635    }
636
637    /**
638     * @return the maxAuditDepth
639     */
640    public int getMaxAuditDepth() {
641        return maxAuditDepth;
642    }
643
644    /**
645     * @param maxAuditDepth the maxAuditDepth to set
646     */
647    public void setMaxAuditDepth(int maxAuditDepth) {
648        this.maxAuditDepth = maxAuditDepth;
649    }
650
651    /**
652     * @return the enableAudit
653     */
654    public boolean isEnableAudit() {
655        return enableAudit;
656    }
657
658    /**
659     * @param enableAudit the enableAudit to set
660     */
661    public void setEnableAudit(boolean enableAudit) {
662        this.enableAudit = enableAudit;
663    }
664
665    public int getMaxQueueAuditDepth() {
666        return maxQueueAuditDepth;
667    }
668
669    public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
670        this.maxQueueAuditDepth = maxQueueAuditDepth;
671    }
672
673    public boolean isOptimizedDispatch() {
674        return optimizedDispatch;
675    }
676
677    public void setOptimizedDispatch(boolean optimizedDispatch) {
678        this.optimizedDispatch = optimizedDispatch;
679    }
680
681    public int getMaxPageSize() {
682        return maxPageSize;
683    }
684
685    public void setMaxPageSize(int maxPageSize) {
686        this.maxPageSize = maxPageSize;
687    }
688
689    public int getMaxBrowsePageSize() {
690        return maxBrowsePageSize;
691    }
692
693    public void setMaxBrowsePageSize(int maxPageSize) {
694        this.maxBrowsePageSize = maxPageSize;
695    }
696
697    public boolean isUseCache() {
698        return useCache;
699    }
700
701    public void setUseCache(boolean useCache) {
702        this.useCache = useCache;
703    }
704
705    public long getMinimumMessageSize() {
706        return minimumMessageSize;
707    }
708
709    public void setMinimumMessageSize(long minimumMessageSize) {
710        this.minimumMessageSize = minimumMessageSize;
711    }
712
713    public boolean isUseConsumerPriority() {
714        return useConsumerPriority;
715    }
716
717    public void setUseConsumerPriority(boolean useConsumerPriority) {
718        this.useConsumerPriority = useConsumerPriority;
719    }
720
721    public boolean isStrictOrderDispatch() {
722        return strictOrderDispatch;
723    }
724
725    public void setStrictOrderDispatch(boolean strictOrderDispatch) {
726        this.strictOrderDispatch = strictOrderDispatch;
727    }
728
729    public boolean isLazyDispatch() {
730        return lazyDispatch;
731    }
732
733    public void setLazyDispatch(boolean lazyDispatch) {
734        this.lazyDispatch = lazyDispatch;
735    }
736
737    public int getTimeBeforeDispatchStarts() {
738        return timeBeforeDispatchStarts;
739    }
740
741    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
742        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
743    }
744
745    public int getConsumersBeforeDispatchStarts() {
746        return consumersBeforeDispatchStarts;
747    }
748
749    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
750        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
751    }
752
753    /**
754     * @return the advisoryForSlowConsumers
755     */
756    public boolean isAdvisoryForSlowConsumers() {
757        return advisoryForSlowConsumers;
758    }
759
760    /**
761     * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
762     */
763    public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
764        this.advisoryForSlowConsumers = advisoryForSlowConsumers;
765    }
766
767    /**
768     * @return the advisoryForDiscardingMessages
769     */
770    public boolean isAdvisoryForDiscardingMessages() {
771        return advisoryForDiscardingMessages;
772    }
773
774    /**
775     * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
776     */
777    public void setAdvisoryForDiscardingMessages(
778            boolean advisoryForDiscardingMessages) {
779        this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
780    }
781
782    /**
783     * @return the advisoryWhenFull
784     */
785    public boolean isAdvisoryWhenFull() {
786        return advisoryWhenFull;
787    }
788
789    /**
790     * @param advisoryWhenFull the advisoryWhenFull to set
791     */
792    public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
793        this.advisoryWhenFull = advisoryWhenFull;
794    }
795
796    /**
797     * @return the advisoryForDelivery
798     */
799    public boolean isAdvisoryForDelivery() {
800        return advisoryForDelivery;
801    }
802
803    /**
804     * @param advisoryForDelivery the advisoryForDelivery to set
805     */
806    public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
807        this.advisoryForDelivery = advisoryForDelivery;
808    }
809
810    /**
811     * @return the advisoryForConsumed
812     */
813    public boolean isAdvisoryForConsumed() {
814        return advisoryForConsumed;
815    }
816
817    /**
818     * @param advisoryForConsumed the advisoryForConsumed to set
819     */
820    public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
821        this.advisoryForConsumed = advisoryForConsumed;
822    }
823
824    /**
825     * @return the advisdoryForFastProducers
826     */
827    public boolean isAdvisoryForFastProducers() {
828        return advisoryForFastProducers;
829    }
830
831    /**
832     * @param advisoryForFastProducers the advisdoryForFastProducers to set
833     */
834    public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) {
835        this.advisoryForFastProducers = advisoryForFastProducers;
836    }
837
838    /**
839     * Returns true if the original message body should be included when applicable
840     * for advisory messages
841     *
842     * @return
843     */
844    public boolean isIncludeBodyForAdvisory() {
845        return includeBodyForAdvisory;
846    }
847
848    /**
849     * Sets if the original message body should be included when applicable
850     * for advisory messages
851     *
852     * @param includeBodyForAdvisory
853     */
854    public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) {
855        this.includeBodyForAdvisory = includeBodyForAdvisory;
856    }
857
858    public void setMaxExpirePageSize(int maxExpirePageSize) {
859        this.maxExpirePageSize = maxExpirePageSize;
860    }
861
862    public int getMaxExpirePageSize() {
863        return maxExpirePageSize;
864    }
865
866    public void setExpireMessagesPeriod(long expireMessagesPeriod) {
867        this.expireMessagesPeriod = expireMessagesPeriod;
868    }
869
870    public long getExpireMessagesPeriod() {
871        return expireMessagesPeriod;
872    }
873
874    /**
875     * Get the queuePrefetch
876     * @return the queuePrefetch
877     */
878    public int getQueuePrefetch() {
879        return this.queuePrefetch;
880    }
881
882    /**
883     * Set the queuePrefetch
884     * @param queuePrefetch the queuePrefetch to set
885     */
886    public void setQueuePrefetch(int queuePrefetch) {
887        this.queuePrefetch = queuePrefetch;
888    }
889
890    /**
891     * Get the queueBrowserPrefetch
892     * @return the queueBrowserPrefetch
893     */
894    public int getQueueBrowserPrefetch() {
895        return this.queueBrowserPrefetch;
896    }
897
898    /**
899     * Set the queueBrowserPrefetch
900     * @param queueBrowserPrefetch the queueBrowserPrefetch to set
901     */
902    public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
903        this.queueBrowserPrefetch = queueBrowserPrefetch;
904    }
905
906    /**
907     * Get the topicPrefetch
908     * @return the topicPrefetch
909     */
910    public int getTopicPrefetch() {
911        return this.topicPrefetch;
912    }
913
914    /**
915     * Set the topicPrefetch
916     * @param topicPrefetch the topicPrefetch to set
917     */
918    public void setTopicPrefetch(int topicPrefetch) {
919        this.topicPrefetch = topicPrefetch;
920    }
921
922    /**
923     * Get the durableTopicPrefetch
924     * @return the durableTopicPrefetch
925     */
926    public int getDurableTopicPrefetch() {
927        return this.durableTopicPrefetch;
928    }
929
930    /**
931     * Set the durableTopicPrefetch
932     * @param durableTopicPrefetch the durableTopicPrefetch to set
933     */
934    public void setDurableTopicPrefetch(int durableTopicPrefetch) {
935        this.durableTopicPrefetch = durableTopicPrefetch;
936    }
937
938    public boolean isUsePrefetchExtension() {
939        return this.usePrefetchExtension;
940    }
941
942    public void setUsePrefetchExtension(boolean usePrefetchExtension) {
943        this.usePrefetchExtension = usePrefetchExtension;
944    }
945
946    public int getCursorMemoryHighWaterMark() {
947        return this.cursorMemoryHighWaterMark;
948    }
949
950    public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
951        this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
952    }
953
954    public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
955        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
956    }
957
958    public int getStoreUsageHighWaterMark() {
959        return storeUsageHighWaterMark;
960    }
961
962    public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
963        this.slowConsumerStrategy = slowConsumerStrategy;
964    }
965
966    public SlowConsumerStrategy getSlowConsumerStrategy() {
967        return this.slowConsumerStrategy;
968    }
969
970
971    public boolean isPrioritizedMessages() {
972        return this.prioritizedMessages;
973    }
974
975    public void setPrioritizedMessages(boolean prioritizedMessages) {
976        this.prioritizedMessages = prioritizedMessages;
977    }
978
979    public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
980        this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
981    }
982
983    public boolean isAllConsumersExclusiveByDefault() {
984        return allConsumersExclusiveByDefault;
985    }
986
987    public boolean isGcInactiveDestinations() {
988        return this.gcInactiveDestinations;
989    }
990
991    public void setGcInactiveDestinations(boolean gcInactiveDestinations) {
992        this.gcInactiveDestinations = gcInactiveDestinations;
993    }
994
995    /**
996     * @return the amount of time spent inactive before GC of the destination kicks in.
997     *
998     * @deprecated use getInactiveTimeoutBeforeGC instead.
999     */
1000    @Deprecated
1001    public long getInactiveTimoutBeforeGC() {
1002        return getInactiveTimeoutBeforeGC();
1003    }
1004
1005    /**
1006     * Sets the amount of time a destination is inactive before it is marked for GC
1007     *
1008     * @param inactiveTimoutBeforeGC
1009     *        time in milliseconds to configure as the inactive timeout.
1010     *
1011     * @deprecated use getInactiveTimeoutBeforeGC instead.
1012     */
1013    @Deprecated
1014    public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
1015        setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC);
1016    }
1017
1018    /**
1019     * @return the amount of time spent inactive before GC of the destination kicks in.
1020     */
1021    public long getInactiveTimeoutBeforeGC() {
1022        return this.inactiveTimeoutBeforeGC;
1023    }
1024
1025    /**
1026     * Sets the amount of time a destination is inactive before it is marked for GC
1027     *
1028     * @param inactiveTimoutBeforeGC
1029     *        time in milliseconds to configure as the inactive timeout.
1030     */
1031    public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
1032        this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
1033    }
1034
1035    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
1036        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
1037    }
1038
1039    public boolean isGcWithNetworkConsumers() {
1040        return gcWithNetworkConsumers;
1041    }
1042
1043    public boolean isReduceMemoryFootprint() {
1044        return reduceMemoryFootprint;
1045    }
1046
1047    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
1048        this.reduceMemoryFootprint = reduceMemoryFootprint;
1049    }
1050
1051    public void setNetworkBridgeFilterFactory(NetworkBridgeFilterFactory networkBridgeFilterFactory) {
1052        this.networkBridgeFilterFactory = networkBridgeFilterFactory;
1053    }
1054
1055    public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
1056        return networkBridgeFilterFactory;
1057    }
1058
1059    public boolean isDoOptimzeMessageStorage() {
1060        return doOptimzeMessageStorage;
1061    }
1062
1063    public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
1064        this.doOptimzeMessageStorage = doOptimzeMessageStorage;
1065    }
1066
1067    public int getOptimizeMessageStoreInFlightLimit() {
1068        return optimizeMessageStoreInFlightLimit;
1069    }
1070
1071    public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) {
1072        this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
1073    }
1074
1075    public void setPersistJMSRedelivered(boolean val) {
1076        this.persistJMSRedelivered = val;
1077    }
1078
1079    public boolean isPersistJMSRedelivered() {
1080        return persistJMSRedelivered;
1081    }
1082
1083    public int getMaxDestinations() {
1084        return maxDestinations;
1085    }
1086
1087    /**
1088     * Sets the maximum number of destinations that can be created
1089     *
1090     * @param maxDestinations
1091     *            maximum number of destinations
1092     */
1093    public void setMaxDestinations(int maxDestinations) {
1094        this.maxDestinations = maxDestinations;
1095    }
1096
1097    @Override
1098    public String toString() {
1099        return "PolicyEntry [" + destination + "]";
1100    }
1101}