001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.List; 021 022import javax.jms.ResourceAllocationException; 023 024import org.apache.activemq.advisory.AdvisorySupport; 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.BrokerService; 027import org.apache.activemq.broker.ConnectionContext; 028import org.apache.activemq.broker.ProducerBrokerExchange; 029import org.apache.activemq.broker.region.policy.DeadLetterStrategy; 030import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ActiveMQTopic; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.command.MessageDispatchNotification; 036import org.apache.activemq.command.ProducerInfo; 037import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 038import org.apache.activemq.security.SecurityContext; 039import org.apache.activemq.state.ProducerState; 040import org.apache.activemq.store.MessageStore; 041import org.apache.activemq.thread.Scheduler; 042import org.apache.activemq.usage.MemoryUsage; 043import org.apache.activemq.usage.SystemUsage; 044import org.apache.activemq.usage.Usage; 045import org.slf4j.Logger; 046 047/** 048 * 049 */ 050public abstract class BaseDestination implements Destination { 051 /** 052 * The maximum number of messages to page in to the destination from 053 * persistent storage 054 */ 055 public static final int MAX_PAGE_SIZE = 200; 056 public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2; 057 public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000; 058 public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000; 059 public static final int MAX_PRODUCERS_TO_AUDIT = 64; 060 public static final int MAX_AUDIT_DEPTH = 10000; 061 062 protected final ActiveMQDestination destination; 063 protected final Broker broker; 064 protected final MessageStore store; 065 protected SystemUsage systemUsage; 066 protected MemoryUsage memoryUsage; 067 private boolean producerFlowControl = true; 068 private boolean alwaysRetroactive = false; 069 protected boolean warnOnProducerFlowControl = true; 070 protected long blockedProducerWarningInterval = DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL; 071 072 private int maxProducersToAudit = 1024; 073 private int maxAuditDepth = 2048; 074 private boolean enableAudit = true; 075 private int maxPageSize = MAX_PAGE_SIZE; 076 private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE; 077 private boolean useCache = true; 078 private int minimumMessageSize = 1024; 079 private boolean lazyDispatch = false; 080 private boolean advisoryForSlowConsumers; 081 private boolean advisoryForFastProducers; 082 private boolean advisoryForDiscardingMessages; 083 private boolean advisoryWhenFull; 084 private boolean advisoryForDelivery; 085 private boolean advisoryForConsumed; 086 private boolean sendAdvisoryIfNoConsumers; 087 private boolean includeBodyForAdvisory; 088 protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); 089 protected final BrokerService brokerService; 090 protected final Broker regionBroker; 091 protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY; 092 protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD; 093 private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE; 094 protected int cursorMemoryHighWaterMark = 70; 095 protected int storeUsageHighWaterMark = 100; 096 private SlowConsumerStrategy slowConsumerStrategy; 097 private boolean prioritizedMessages; 098 private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; 099 private boolean gcIfInactive; 100 private boolean gcWithNetworkConsumers; 101 private long lastActiveTime=0l; 102 private boolean reduceMemoryFootprint = false; 103 protected final Scheduler scheduler; 104 private boolean disposed = false; 105 private boolean doOptimzeMessageStorage = true; 106 /* 107 * percentage of in-flight messages above which optimize message store is disabled 108 */ 109 private int optimizeMessageStoreInFlightLimit = 10; 110 private boolean persistJMSRedelivered; 111 112 /** 113 * @param brokerService 114 * @param store 115 * @param destination 116 * @param parentStats 117 * @throws Exception 118 */ 119 public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { 120 this.brokerService = brokerService; 121 this.broker = brokerService.getBroker(); 122 this.store = store; 123 this.destination = destination; 124 // let's copy the enabled property from the parent DestinationStatistics 125 this.destinationStatistics.setEnabled(parentStats.isEnabled()); 126 this.destinationStatistics.setParent(parentStats); 127 this.systemUsage = new SystemUsage(brokerService.getProducerSystemUsage(), destination.toString()); 128 this.memoryUsage = this.systemUsage.getMemoryUsage(); 129 this.memoryUsage.setUsagePortion(1.0f); 130 this.regionBroker = brokerService.getRegionBroker(); 131 this.scheduler = brokerService.getBroker().getScheduler(); 132 } 133 134 /** 135 * initialize the destination 136 * 137 * @throws Exception 138 */ 139 public void initialize() throws Exception { 140 // Let the store know what usage manager we are using so that he can 141 // flush messages to disk when usage gets high. 142 if (store != null) { 143 store.setMemoryUsage(this.memoryUsage); 144 } 145 } 146 147 /** 148 * @return the producerFlowControl 149 */ 150 @Override 151 public boolean isProducerFlowControl() { 152 return producerFlowControl; 153 } 154 155 /** 156 * @param producerFlowControl the producerFlowControl to set 157 */ 158 @Override 159 public void setProducerFlowControl(boolean producerFlowControl) { 160 this.producerFlowControl = producerFlowControl; 161 } 162 163 @Override 164 public boolean isAlwaysRetroactive() { 165 return alwaysRetroactive; 166 } 167 168 @Override 169 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 170 this.alwaysRetroactive = alwaysRetroactive; 171 } 172 173 /** 174 * Set's the interval at which warnings about producers being blocked by 175 * resource usage will be triggered. Values of 0 or less will disable 176 * warnings 177 * 178 * @param blockedProducerWarningInterval the interval at which warning about 179 * blocked producers will be triggered. 180 */ 181 @Override 182 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 183 this.blockedProducerWarningInterval = blockedProducerWarningInterval; 184 } 185 186 /** 187 * 188 * @return the interval at which warning about blocked producers will be 189 * triggered. 190 */ 191 @Override 192 public long getBlockedProducerWarningInterval() { 193 return blockedProducerWarningInterval; 194 } 195 196 /** 197 * @return the maxProducersToAudit 198 */ 199 @Override 200 public int getMaxProducersToAudit() { 201 return maxProducersToAudit; 202 } 203 204 /** 205 * @param maxProducersToAudit the maxProducersToAudit to set 206 */ 207 @Override 208 public void setMaxProducersToAudit(int maxProducersToAudit) { 209 this.maxProducersToAudit = maxProducersToAudit; 210 } 211 212 /** 213 * @return the maxAuditDepth 214 */ 215 @Override 216 public int getMaxAuditDepth() { 217 return maxAuditDepth; 218 } 219 220 /** 221 * @param maxAuditDepth the maxAuditDepth to set 222 */ 223 @Override 224 public void setMaxAuditDepth(int maxAuditDepth) { 225 this.maxAuditDepth = maxAuditDepth; 226 } 227 228 /** 229 * @return the enableAudit 230 */ 231 @Override 232 public boolean isEnableAudit() { 233 return enableAudit; 234 } 235 236 /** 237 * @param enableAudit the enableAudit to set 238 */ 239 @Override 240 public void setEnableAudit(boolean enableAudit) { 241 this.enableAudit = enableAudit; 242 } 243 244 @Override 245 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 246 destinationStatistics.getProducers().increment(); 247 this.lastActiveTime=0l; 248 } 249 250 @Override 251 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 252 destinationStatistics.getProducers().decrement(); 253 } 254 255 @Override 256 public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{ 257 destinationStatistics.getConsumers().increment(); 258 this.lastActiveTime=0l; 259 } 260 261 @Override 262 public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{ 263 destinationStatistics.getConsumers().decrement(); 264 } 265 266 267 @Override 268 public final MemoryUsage getMemoryUsage() { 269 return memoryUsage; 270 } 271 272 @Override 273 public void setMemoryUsage(MemoryUsage memoryUsage) { 274 this.memoryUsage = memoryUsage; 275 } 276 277 @Override 278 public DestinationStatistics getDestinationStatistics() { 279 return destinationStatistics; 280 } 281 282 @Override 283 public ActiveMQDestination getActiveMQDestination() { 284 return destination; 285 } 286 287 @Override 288 public final String getName() { 289 return getActiveMQDestination().getPhysicalName(); 290 } 291 292 @Override 293 public final MessageStore getMessageStore() { 294 return store; 295 } 296 297 @Override 298 public boolean isActive() { 299 boolean isActive = destinationStatistics.getConsumers().getCount() != 0 || 300 destinationStatistics.getProducers().getCount() != 0; 301 if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) { 302 isActive = hasRegularConsumers(getConsumers()); 303 } 304 return isActive; 305 } 306 307 @Override 308 public int getMaxPageSize() { 309 return maxPageSize; 310 } 311 312 @Override 313 public void setMaxPageSize(int maxPageSize) { 314 this.maxPageSize = maxPageSize; 315 } 316 317 @Override 318 public int getMaxBrowsePageSize() { 319 return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize(); 320 } 321 322 @Override 323 public void setMaxBrowsePageSize(int maxPageSize) { 324 this.maxBrowsePageSize = maxPageSize; 325 } 326 327 public int getMaxExpirePageSize() { 328 return this.maxExpirePageSize; 329 } 330 331 public void setMaxExpirePageSize(int maxPageSize) { 332 this.maxExpirePageSize = maxPageSize; 333 } 334 335 public void setExpireMessagesPeriod(long expireMessagesPeriod) { 336 this.expireMessagesPeriod = expireMessagesPeriod; 337 } 338 339 public long getExpireMessagesPeriod() { 340 return expireMessagesPeriod; 341 } 342 343 @Override 344 public boolean isUseCache() { 345 return useCache; 346 } 347 348 @Override 349 public void setUseCache(boolean useCache) { 350 this.useCache = useCache; 351 } 352 353 @Override 354 public int getMinimumMessageSize() { 355 return minimumMessageSize; 356 } 357 358 @Override 359 public void setMinimumMessageSize(int minimumMessageSize) { 360 this.minimumMessageSize = minimumMessageSize; 361 } 362 363 @Override 364 public boolean isLazyDispatch() { 365 return lazyDispatch; 366 } 367 368 @Override 369 public void setLazyDispatch(boolean lazyDispatch) { 370 this.lazyDispatch = lazyDispatch; 371 } 372 373 protected long getDestinationSequenceId() { 374 return regionBroker.getBrokerSequenceId(); 375 } 376 377 /** 378 * @return the advisoryForSlowConsumers 379 */ 380 public boolean isAdvisoryForSlowConsumers() { 381 return advisoryForSlowConsumers; 382 } 383 384 /** 385 * @param advisoryForSlowConsumers the advisoryForSlowConsumers to set 386 */ 387 public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) { 388 this.advisoryForSlowConsumers = advisoryForSlowConsumers; 389 } 390 391 /** 392 * @return the advisoryForDiscardingMessages 393 */ 394 public boolean isAdvisoryForDiscardingMessages() { 395 return advisoryForDiscardingMessages; 396 } 397 398 /** 399 * @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to 400 * set 401 */ 402 public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) { 403 this.advisoryForDiscardingMessages = advisoryForDiscardingMessages; 404 } 405 406 /** 407 * @return the advisoryWhenFull 408 */ 409 public boolean isAdvisoryWhenFull() { 410 return advisoryWhenFull; 411 } 412 413 /** 414 * @param advisoryWhenFull the advisoryWhenFull to set 415 */ 416 public void setAdvisoryWhenFull(boolean advisoryWhenFull) { 417 this.advisoryWhenFull = advisoryWhenFull; 418 } 419 420 /** 421 * @return the advisoryForDelivery 422 */ 423 public boolean isAdvisoryForDelivery() { 424 return advisoryForDelivery; 425 } 426 427 /** 428 * @param advisoryForDelivery the advisoryForDelivery to set 429 */ 430 public void setAdvisoryForDelivery(boolean advisoryForDelivery) { 431 this.advisoryForDelivery = advisoryForDelivery; 432 } 433 434 /** 435 * @return the advisoryForConsumed 436 */ 437 public boolean isAdvisoryForConsumed() { 438 return advisoryForConsumed; 439 } 440 441 /** 442 * @param advisoryForConsumed the advisoryForConsumed to set 443 */ 444 public void setAdvisoryForConsumed(boolean advisoryForConsumed) { 445 this.advisoryForConsumed = advisoryForConsumed; 446 } 447 448 /** 449 * @return the advisdoryForFastProducers 450 */ 451 public boolean isAdvisoryForFastProducers() { 452 return advisoryForFastProducers; 453 } 454 455 /** 456 * @param advisoryForFastProducers the advisdoryForFastProducers to set 457 */ 458 public void setAdvisoryForFastProducers(boolean advisoryForFastProducers) { 459 this.advisoryForFastProducers = advisoryForFastProducers; 460 } 461 462 public boolean isSendAdvisoryIfNoConsumers() { 463 return sendAdvisoryIfNoConsumers; 464 } 465 466 public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) { 467 this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; 468 } 469 470 public boolean isIncludeBodyForAdvisory() { 471 return includeBodyForAdvisory; 472 } 473 474 public void setIncludeBodyForAdvisory(boolean includeBodyForAdvisory) { 475 this.includeBodyForAdvisory = includeBodyForAdvisory; 476 } 477 478 /** 479 * @return the dead letter strategy 480 */ 481 @Override 482 public DeadLetterStrategy getDeadLetterStrategy() { 483 return deadLetterStrategy; 484 } 485 486 /** 487 * set the dead letter strategy 488 * 489 * @param deadLetterStrategy 490 */ 491 public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) { 492 this.deadLetterStrategy = deadLetterStrategy; 493 } 494 495 @Override 496 public int getCursorMemoryHighWaterMark() { 497 return this.cursorMemoryHighWaterMark; 498 } 499 500 @Override 501 public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { 502 this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; 503 } 504 505 /** 506 * called when message is consumed 507 * 508 * @param context 509 * @param messageReference 510 */ 511 @Override 512 public void messageConsumed(ConnectionContext context, MessageReference messageReference) { 513 if (advisoryForConsumed) { 514 broker.messageConsumed(context, messageReference); 515 } 516 } 517 518 /** 519 * Called when message is delivered to the broker 520 * 521 * @param context 522 * @param messageReference 523 */ 524 @Override 525 public void messageDelivered(ConnectionContext context, MessageReference messageReference) { 526 this.lastActiveTime = 0L; 527 if (advisoryForDelivery) { 528 broker.messageDelivered(context, messageReference); 529 } 530 } 531 532 /** 533 * Called when a message is discarded - e.g. running low on memory This will 534 * happen only if the policy is enabled - e.g. non durable topics 535 * 536 * @param context 537 * @param messageReference 538 */ 539 @Override 540 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { 541 if (advisoryForDiscardingMessages) { 542 broker.messageDiscarded(context, sub, messageReference); 543 } 544 } 545 546 /** 547 * Called when there is a slow consumer 548 * 549 * @param context 550 * @param subs 551 */ 552 @Override 553 public void slowConsumer(ConnectionContext context, Subscription subs) { 554 if (advisoryForSlowConsumers) { 555 broker.slowConsumer(context, this, subs); 556 } 557 if (slowConsumerStrategy != null) { 558 slowConsumerStrategy.slowConsumer(context, subs); 559 } 560 } 561 562 /** 563 * Called to notify a producer is too fast 564 * 565 * @param context 566 * @param producerInfo 567 */ 568 @Override 569 public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) { 570 if (advisoryForFastProducers) { 571 broker.fastProducer(context, producerInfo, getActiveMQDestination()); 572 } 573 } 574 575 /** 576 * Called when a Usage reaches a limit 577 * 578 * @param context 579 * @param usage 580 */ 581 @Override 582 public void isFull(ConnectionContext context, Usage<?> usage) { 583 if (advisoryWhenFull) { 584 broker.isFull(context, this, usage); 585 } 586 } 587 588 @Override 589 public void dispose(ConnectionContext context) throws IOException { 590 if (this.store != null) { 591 this.store.removeAllMessages(context); 592 this.store.dispose(context); 593 } 594 this.destinationStatistics.setParent(null); 595 this.memoryUsage.stop(); 596 this.disposed = true; 597 } 598 599 @Override 600 public boolean isDisposed() { 601 return this.disposed; 602 } 603 604 /** 605 * Provides a hook to allow messages with no consumer to be processed in 606 * some way - such as to send to a dead letter queue or something.. 607 */ 608 protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception { 609 if (!msg.isPersistent()) { 610 if (isSendAdvisoryIfNoConsumers()) { 611 // allow messages with no consumers to be dispatched to a dead 612 // letter queue 613 if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) { 614 615 Message message = msg.copy(); 616 // The original destination and transaction id do not get 617 // filled when the message is first sent, 618 // it is only populated if the message is routed to another 619 // destination like the DLQ 620 if (message.getOriginalDestination() != null) { 621 message.setOriginalDestination(message.getDestination()); 622 } 623 if (message.getOriginalTransactionId() != null) { 624 message.setOriginalTransactionId(message.getTransactionId()); 625 } 626 627 ActiveMQTopic advisoryTopic; 628 if (destination.isQueue()) { 629 advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(destination); 630 } else { 631 advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination); 632 } 633 message.setDestination(advisoryTopic); 634 message.setTransactionId(null); 635 636 // Disable flow control for this since since we don't want 637 // to block. 638 boolean originalFlowControl = context.isProducerFlowControl(); 639 try { 640 context.setProducerFlowControl(false); 641 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); 642 producerExchange.setMutable(false); 643 producerExchange.setConnectionContext(context); 644 producerExchange.setProducerState(new ProducerState(new ProducerInfo())); 645 context.getBroker().send(producerExchange, message); 646 } finally { 647 context.setProducerFlowControl(originalFlowControl); 648 } 649 650 } 651 } 652 } 653 } 654 655 @Override 656 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 657 } 658 659 public final int getStoreUsageHighWaterMark() { 660 return this.storeUsageHighWaterMark; 661 } 662 663 public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { 664 this.storeUsageHighWaterMark = storeUsageHighWaterMark; 665 } 666 667 protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException { 668 waitForSpace(context, producerBrokerExchange, usage, 100, warning); 669 } 670 671 protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException { 672 if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { 673 getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: {}: {}", usage, warning); 674 throw new ResourceAllocationException(warning); 675 } 676 if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { 677 if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) { 678 getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: {}: {}", usage, warning); 679 throw new ResourceAllocationException(warning); 680 } 681 } else { 682 long start = System.currentTimeMillis(); 683 long nextWarn = start; 684 producerBrokerExchange.blockingOnFlowControl(true); 685 destinationStatistics.getBlockedSends().increment(); 686 while (!usage.waitForSpace(1000, highWaterMark)) { 687 if (context.getStopping().get()) { 688 throw new IOException("Connection closed, send aborted."); 689 } 690 691 long now = System.currentTimeMillis(); 692 if (now >= nextWarn) { 693 getLog().info("{}: {} (blocking for: {}s)", new Object[]{ usage, warning, new Long(((now - start) / 1000))}); 694 nextWarn = now + blockedProducerWarningInterval; 695 } 696 } 697 long finish = System.currentTimeMillis(); 698 long totalTimeBlocked = finish - start; 699 destinationStatistics.getBlockedTime().addTime(totalTimeBlocked); 700 producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked); 701 producerBrokerExchange.blockingOnFlowControl(false); 702 } 703 } 704 705 protected abstract Logger getLog(); 706 707 public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { 708 this.slowConsumerStrategy = slowConsumerStrategy; 709 } 710 711 @Override 712 public SlowConsumerStrategy getSlowConsumerStrategy() { 713 return this.slowConsumerStrategy; 714 } 715 716 717 @Override 718 public boolean isPrioritizedMessages() { 719 return this.prioritizedMessages; 720 } 721 722 public void setPrioritizedMessages(boolean prioritizedMessages) { 723 this.prioritizedMessages = prioritizedMessages; 724 if (store != null) { 725 store.setPrioritizedMessages(prioritizedMessages); 726 } 727 } 728 729 /** 730 * @return the inactiveTimeoutBeforeGC 731 */ 732 @Override 733 public long getInactiveTimeoutBeforeGC() { 734 return this.inactiveTimeoutBeforeGC; 735 } 736 737 /** 738 * @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set 739 */ 740 public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { 741 this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; 742 } 743 744 /** 745 * @return the gcIfInactive 746 */ 747 public boolean isGcIfInactive() { 748 return this.gcIfInactive; 749 } 750 751 /** 752 * @param gcIfInactive the gcIfInactive to set 753 */ 754 public void setGcIfInactive(boolean gcIfInactive) { 755 this.gcIfInactive = gcIfInactive; 756 } 757 758 /** 759 * Indicate if it is ok to gc destinations that have only network consumers 760 * @param gcWithNetworkConsumers 761 */ 762 public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { 763 this.gcWithNetworkConsumers = gcWithNetworkConsumers; 764 } 765 766 public boolean isGcWithNetworkConsumers() { 767 return gcWithNetworkConsumers; 768 } 769 770 @Override 771 public void markForGC(long timeStamp) { 772 if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false 773 && destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { 774 this.lastActiveTime = timeStamp; 775 } 776 } 777 778 @Override 779 public boolean canGC() { 780 boolean result = false; 781 if (isGcIfInactive() && this.lastActiveTime != 0l && destinationStatistics.messages.getCount() == 0L ) { 782 if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) { 783 result = true; 784 } 785 } 786 return result; 787 } 788 789 public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) { 790 this.reduceMemoryFootprint = reduceMemoryFootprint; 791 } 792 793 public boolean isReduceMemoryFootprint() { 794 return this.reduceMemoryFootprint; 795 } 796 797 @Override 798 public boolean isDoOptimzeMessageStorage() { 799 return doOptimzeMessageStorage; 800 } 801 802 @Override 803 public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) { 804 this.doOptimzeMessageStorage = doOptimzeMessageStorage; 805 } 806 807 public int getOptimizeMessageStoreInFlightLimit() { 808 return optimizeMessageStoreInFlightLimit; 809 } 810 811 public void setOptimizeMessageStoreInFlightLimit(int optimizeMessageStoreInFlightLimit) { 812 this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; 813 } 814 815 816 @Override 817 public abstract List<Subscription> getConsumers(); 818 819 protected boolean hasRegularConsumers(List<Subscription> consumers) { 820 boolean hasRegularConsumers = false; 821 for (Subscription subscription: consumers) { 822 if (!subscription.getConsumerInfo().isNetworkSubscription()) { 823 hasRegularConsumers = true; 824 break; 825 } 826 } 827 return hasRegularConsumers; 828 } 829 830 public ConnectionContext createConnectionContext() { 831 ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); 832 answer.setBroker(this.broker); 833 answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); 834 answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); 835 return answer; 836 } 837 838 protected MessageAck convertToNonRangedAck(MessageAck ack, MessageReference node) { 839 // the original ack may be a ranged ack, but we are trying to delete 840 // a specific 841 // message store here so we need to convert to a non ranged ack. 842 if (ack.getMessageCount() > 0) { 843 // Dup the ack 844 MessageAck a = new MessageAck(); 845 ack.copy(a); 846 ack = a; 847 // Convert to non-ranged. 848 ack.setMessageCount(1); 849 } 850 // always use node messageId so we can access entry/data Location 851 ack.setFirstMessageId(node.getMessageId()); 852 ack.setLastMessageId(node.getMessageId()); 853 return ack; 854 } 855 856 protected boolean isDLQ() { 857 return destination.isDLQ(); 858 } 859 860 @Override 861 public void duplicateFromStore(Message message, Subscription durableSub) { 862 ConnectionContext connectionContext = createConnectionContext(); 863 getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId()); 864 Throwable cause = new Throwable("duplicate from store for " + destination); 865 message.setRegionDestination(this); 866 broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause); 867 MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1); 868 messageAck.setPoisonCause(cause); 869 try { 870 acknowledge(connectionContext, durableSub, messageAck, message); 871 } catch (IOException e) { 872 getLog().error("Failed to acknowledge duplicate message {} from {} with {}", message.getMessageId(), destination, messageAck); 873 } 874 } 875 876 public void setPersistJMSRedelivered(boolean persistJMSRedelivered) { 877 this.persistJMSRedelivered = persistJMSRedelivered; 878 } 879 880 public boolean isPersistJMSRedelivered() { 881 return persistJMSRedelivered; 882 } 883}