001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import javax.jms.JMSException; 028 029import org.apache.activemq.broker.Broker; 030import org.apache.activemq.broker.ConnectionContext; 031import org.apache.activemq.broker.region.cursors.PendingMessageCursor; 032import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; 033import org.apache.activemq.command.ConsumerControl; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.Message; 036import org.apache.activemq.command.MessageAck; 037import org.apache.activemq.command.MessageDispatch; 038import org.apache.activemq.command.MessageDispatchNotification; 039import org.apache.activemq.command.MessageId; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.thread.Scheduler; 043import org.apache.activemq.transaction.Synchronization; 044import org.apache.activemq.transport.TransmitCallback; 045import org.apache.activemq.usage.SystemUsage; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A subscription that honors the pre-fetch option of the ConsumerInfo. 051 */ 052public abstract class PrefetchSubscription extends AbstractSubscription { 053 054 private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class); 055 protected final Scheduler scheduler; 056 057 protected PendingMessageCursor pending; 058 protected final List<MessageReference> dispatched = new ArrayList<MessageReference>(); 059 protected final AtomicInteger prefetchExtension = new AtomicInteger(); 060 protected boolean usePrefetchExtension = true; 061 private int maxProducersToAudit=32; 062 private int maxAuditDepth=2048; 063 protected final SystemUsage usageManager; 064 protected final Object pendingLock = new Object(); 065 protected final Object dispatchLock = new Object(); 066 private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1); 067 068 public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws JMSException { 069 super(broker,context, info); 070 this.usageManager=usageManager; 071 pending = cursor; 072 try { 073 pending.start(); 074 } catch (Exception e) { 075 throw new JMSException(e.getMessage()); 076 } 077 this.scheduler = broker.getScheduler(); 078 } 079 080 public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 081 this(broker,usageManager,context, info, new VMPendingMessageCursor(false)); 082 } 083 084 /** 085 * Allows a message to be pulled on demand by a client 086 */ 087 @Override 088 public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception { 089 // The slave should not deliver pull messages. 090 // TODO: when the slave becomes a master, He should send a NULL message to all the 091 // consumers to 'wake them up' in case they were waiting for a message. 092 if (getPrefetchSize() == 0) { 093 prefetchExtension.set(pull.getQuantity()); 094 final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount(); 095 096 // Have the destination push us some messages. 097 for (Destination dest : destinations) { 098 dest.iterate(); 099 } 100 dispatchPending(); 101 102 synchronized(this) { 103 // If there was nothing dispatched.. we may need to setup a timeout. 104 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) { 105 // immediate timeout used by receiveNoWait() 106 if (pull.getTimeout() == -1) { 107 // Null message indicates the pull is done or did not have pending. 108 prefetchExtension.set(1); 109 add(QueueMessageReference.NULL_MESSAGE); 110 dispatchPending(); 111 } 112 if (pull.getTimeout() > 0) { 113 scheduler.executeAfterDelay(new Runnable() { 114 @Override 115 public void run() { 116 pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone()); 117 } 118 }, pull.getTimeout()); 119 } 120 } 121 } 122 } 123 return null; 124 } 125 126 /** 127 * Occurs when a pull times out. If nothing has been dispatched since the 128 * timeout was setup, then send the NULL message. 129 */ 130 final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) { 131 synchronized (pendingLock) { 132 if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) { 133 try { 134 prefetchExtension.set(1); 135 add(QueueMessageReference.NULL_MESSAGE); 136 dispatchPending(); 137 } catch (Exception e) { 138 context.getConnection().serviceException(e); 139 } finally { 140 prefetchExtension.set(0); 141 } 142 } 143 } 144 } 145 146 @Override 147 public void add(MessageReference node) throws Exception { 148 synchronized (pendingLock) { 149 // The destination may have just been removed... 150 if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) { 151 // perhaps we should inform the caller that we are no longer valid to dispatch to? 152 return; 153 } 154 155 // Don't increment for the pullTimeout control message. 156 if (!node.equals(QueueMessageReference.NULL_MESSAGE)) { 157 getSubscriptionStatistics().getEnqueues().increment(); 158 } 159 pending.addMessageLast(node); 160 } 161 dispatchPending(); 162 } 163 164 @Override 165 public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { 166 synchronized(pendingLock) { 167 try { 168 pending.reset(); 169 while (pending.hasNext()) { 170 MessageReference node = pending.next(); 171 node.decrementReferenceCount(); 172 if (node.getMessageId().equals(mdn.getMessageId())) { 173 // Synchronize between dispatched list and removal of messages from pending list 174 // related to remove subscription action 175 synchronized(dispatchLock) { 176 pending.remove(); 177 createMessageDispatch(node, node.getMessage()); 178 dispatched.add(node); 179 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 180 onDispatch(node, node.getMessage()); 181 } 182 return; 183 } 184 } 185 } finally { 186 pending.release(); 187 } 188 } 189 throw new JMSException( 190 "Slave broker out of sync with master: Dispatched message (" 191 + mdn.getMessageId() + ") was not in the pending list for " 192 + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName()); 193 } 194 195 @Override 196 public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { 197 // Handle the standard acknowledgment case. 198 boolean callDispatchMatched = false; 199 Destination destination = null; 200 201 if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { 202 // suppress unexpected ack exception in this expected case 203 LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: {}", ack); 204 return; 205 } 206 207 LOG.trace("ack: {}", ack); 208 209 synchronized(dispatchLock) { 210 if (ack.isStandardAck()) { 211 // First check if the ack matches the dispatched. When using failover this might 212 // not be the case. We don't ever want to ack the wrong messages. 213 assertAckMatchesDispatched(ack); 214 215 // Acknowledge all dispatched messages up till the message id of 216 // the acknowledgment. 217 boolean inAckRange = false; 218 List<MessageReference> removeList = new ArrayList<MessageReference>(); 219 for (final MessageReference node : dispatched) { 220 MessageId messageId = node.getMessageId(); 221 if (ack.getFirstMessageId() == null 222 || ack.getFirstMessageId().equals(messageId)) { 223 inAckRange = true; 224 } 225 if (inAckRange) { 226 // Don't remove the nodes until we are committed. 227 if (!context.isInTransaction()) { 228 getSubscriptionStatistics().getDequeues().increment(); 229 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 230 removeList.add(node); 231 } else { 232 registerRemoveSync(context, node); 233 } 234 acknowledge(context, ack, node); 235 if (ack.getLastMessageId().equals(messageId)) { 236 destination = (Destination) node.getRegionDestination(); 237 callDispatchMatched = true; 238 break; 239 } 240 } 241 } 242 for (final MessageReference node : removeList) { 243 dispatched.remove(node); 244 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 245 } 246 // this only happens after a reconnect - get an ack which is not 247 // valid 248 if (!callDispatchMatched) { 249 LOG.warn("Could not correlate acknowledgment with dispatched message: {}", ack); 250 } 251 } else if (ack.isIndividualAck()) { 252 // Message was delivered and acknowledge - but only delete the 253 // individual message 254 for (final MessageReference node : dispatched) { 255 MessageId messageId = node.getMessageId(); 256 if (ack.getLastMessageId().equals(messageId)) { 257 // Don't remove the nodes until we are committed - immediateAck option 258 if (!context.isInTransaction()) { 259 getSubscriptionStatistics().getDequeues().increment(); 260 ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); 261 dispatched.remove(node); 262 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 263 } else { 264 registerRemoveSync(context, node); 265 } 266 267 if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) { 268 // allow transaction batch to exceed prefetch 269 while (true) { 270 int currentExtension = prefetchExtension.get(); 271 int newExtension = Math.max(currentExtension, currentExtension + 1); 272 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 273 break; 274 } 275 } 276 } 277 278 acknowledge(context, ack, node); 279 destination = (Destination) node.getRegionDestination(); 280 callDispatchMatched = true; 281 break; 282 } 283 } 284 }else if (ack.isDeliveredAck()) { 285 // Message was delivered but not acknowledged: update pre-fetch 286 // counters. 287 int index = 0; 288 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 289 final MessageReference node = iter.next(); 290 Destination nodeDest = (Destination) node.getRegionDestination(); 291 if (ack.getLastMessageId().equals(node.getMessageId())) { 292 if (usePrefetchExtension && getPrefetchSize() != 0) { 293 // allow batch to exceed prefetch 294 while (true) { 295 int currentExtension = prefetchExtension.get(); 296 int newExtension = Math.max(currentExtension, index + 1); 297 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 298 break; 299 } 300 } 301 } 302 destination = nodeDest; 303 callDispatchMatched = true; 304 break; 305 } 306 } 307 if (!callDispatchMatched) { 308 throw new JMSException( 309 "Could not correlate acknowledgment with dispatched message: " 310 + ack); 311 } 312 } else if (ack.isExpiredAck()) { 313 // Message was expired 314 int index = 0; 315 boolean inAckRange = false; 316 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { 317 final MessageReference node = iter.next(); 318 Destination nodeDest = (Destination) node.getRegionDestination(); 319 MessageId messageId = node.getMessageId(); 320 if (ack.getFirstMessageId() == null 321 || ack.getFirstMessageId().equals(messageId)) { 322 inAckRange = true; 323 } 324 if (inAckRange) { 325 if (node.isExpired()) { 326 if (broker.isExpired(node)) { 327 Destination regionDestination = nodeDest; 328 regionDestination.messageExpired(context, this, node); 329 } 330 iter.remove(); 331 nodeDest.getDestinationStatistics().getInflight().decrement(); 332 } 333 if (ack.getLastMessageId().equals(messageId)) { 334 if (usePrefetchExtension && getPrefetchSize() != 0) { 335 // allow batch to exceed prefetch 336 while (true) { 337 int currentExtension = prefetchExtension.get(); 338 int newExtension = Math.max(currentExtension, index + 1); 339 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 340 break; 341 } 342 } 343 } 344 345 destination = (Destination) node.getRegionDestination(); 346 callDispatchMatched = true; 347 break; 348 } 349 } 350 } 351 if (!callDispatchMatched) { 352 throw new JMSException( 353 "Could not correlate expiration acknowledgment with dispatched message: " 354 + ack); 355 } 356 } else if (ack.isRedeliveredAck()) { 357 // Message was re-delivered but it was not yet considered to be 358 // a DLQ message. 359 boolean inAckRange = false; 360 for (final MessageReference node : dispatched) { 361 MessageId messageId = node.getMessageId(); 362 if (ack.getFirstMessageId() == null 363 || ack.getFirstMessageId().equals(messageId)) { 364 inAckRange = true; 365 } 366 if (inAckRange) { 367 if (ack.getLastMessageId().equals(messageId)) { 368 destination = (Destination) node.getRegionDestination(); 369 callDispatchMatched = true; 370 break; 371 } 372 } 373 } 374 if (!callDispatchMatched) { 375 throw new JMSException( 376 "Could not correlate acknowledgment with dispatched message: " 377 + ack); 378 } 379 } else if (ack.isPoisonAck()) { 380 // TODO: what if the message is already in a DLQ??? 381 // Handle the poison ACK case: we need to send the message to a 382 // DLQ 383 if (ack.isInTransaction()) { 384 throw new JMSException("Poison ack cannot be transacted: " 385 + ack); 386 } 387 int index = 0; 388 boolean inAckRange = false; 389 List<MessageReference> removeList = new ArrayList<MessageReference>(); 390 for (final MessageReference node : dispatched) { 391 MessageId messageId = node.getMessageId(); 392 if (ack.getFirstMessageId() == null 393 || ack.getFirstMessageId().equals(messageId)) { 394 inAckRange = true; 395 } 396 if (inAckRange) { 397 sendToDLQ(context, node, ack.getPoisonCause()); 398 Destination nodeDest = (Destination) node.getRegionDestination(); 399 nodeDest.getDestinationStatistics() 400 .getInflight().decrement(); 401 removeList.add(node); 402 getSubscriptionStatistics().getDequeues().increment(); 403 index++; 404 acknowledge(context, ack, node); 405 if (ack.getLastMessageId().equals(messageId)) { 406 while (true) { 407 int currentExtension = prefetchExtension.get(); 408 int newExtension = Math.max(0, currentExtension - (index + 1)); 409 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 410 break; 411 } 412 } 413 destination = nodeDest; 414 callDispatchMatched = true; 415 break; 416 } 417 } 418 } 419 for (final MessageReference node : removeList) { 420 dispatched.remove(node); 421 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 422 } 423 if (!callDispatchMatched) { 424 throw new JMSException( 425 "Could not correlate acknowledgment with dispatched message: " 426 + ack); 427 } 428 } 429 } 430 if (callDispatchMatched && destination != null) { 431 destination.wakeup(); 432 dispatchPending(); 433 434 if (pending.isEmpty()) { 435 for (Destination dest : destinations) { 436 dest.wakeup(); 437 } 438 } 439 } else { 440 LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack); 441 } 442 } 443 444 private void registerRemoveSync(ConnectionContext context, final MessageReference node) { 445 // setup a Synchronization to remove nodes from the 446 // dispatched list. 447 context.getTransaction().addSynchronization( 448 new Synchronization() { 449 450 @Override 451 public void beforeEnd() { 452 if (usePrefetchExtension && getPrefetchSize() != 0) { 453 while (true) { 454 int currentExtension = prefetchExtension.get(); 455 int newExtension = Math.max(0, currentExtension - 1); 456 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 457 break; 458 } 459 } 460 } 461 } 462 463 @Override 464 public void afterCommit() 465 throws Exception { 466 Destination nodeDest = (Destination) node.getRegionDestination(); 467 synchronized(dispatchLock) { 468 getSubscriptionStatistics().getDequeues().increment(); 469 dispatched.remove(node); 470 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize()); 471 nodeDest.getDestinationStatistics().getInflight().decrement(); 472 } 473 nodeDest.wakeup(); 474 dispatchPending(); 475 } 476 477 @Override 478 public void afterRollback() throws Exception { 479 synchronized(dispatchLock) { 480 // poisionAck will decrement - otherwise still inflight on client 481 } 482 } 483 }); 484 } 485 486 /** 487 * Checks an ack versus the contents of the dispatched list. 488 * called with dispatchLock held 489 * @param ack 490 * @throws JMSException if it does not match 491 */ 492 protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException { 493 MessageId firstAckedMsg = ack.getFirstMessageId(); 494 MessageId lastAckedMsg = ack.getLastMessageId(); 495 int checkCount = 0; 496 boolean checkFoundStart = false; 497 boolean checkFoundEnd = false; 498 for (MessageReference node : dispatched) { 499 500 if (firstAckedMsg == null) { 501 checkFoundStart = true; 502 } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) { 503 checkFoundStart = true; 504 } 505 506 if (checkFoundStart) { 507 checkCount++; 508 } 509 510 if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { 511 checkFoundEnd = true; 512 break; 513 } 514 } 515 if (!checkFoundStart && firstAckedMsg != null) 516 throw new JMSException("Unmatched acknowledge: " + ack 517 + "; Could not find Message-ID " + firstAckedMsg 518 + " in dispatched-list (start of ack)"); 519 if (!checkFoundEnd && lastAckedMsg != null) 520 throw new JMSException("Unmatched acknowledge: " + ack 521 + "; Could not find Message-ID " + lastAckedMsg 522 + " in dispatched-list (end of ack)"); 523 if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) { 524 throw new JMSException("Unmatched acknowledge: " + ack 525 + "; Expected message count (" + ack.getMessageCount() 526 + ") differs from count in dispatched-list (" + checkCount 527 + ")"); 528 } 529 } 530 531 /** 532 * 533 * @param context 534 * @param node 535 * @param poisonCause 536 * @throws IOException 537 * @throws Exception 538 */ 539 protected void sendToDLQ(final ConnectionContext context, final MessageReference node, Throwable poisonCause) throws IOException, Exception { 540 broker.getRoot().sendToDeadLetterQueue(context, node, this, poisonCause); 541 } 542 543 @Override 544 public int getInFlightSize() { 545 return dispatched.size(); 546 } 547 548 /** 549 * Used to determine if the broker can dispatch to the consumer. 550 * 551 * @return true if the subscription is full 552 */ 553 @Override 554 public boolean isFull() { 555 return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize(); 556 } 557 558 /** 559 * @return true when 60% or more room is left for dispatching messages 560 */ 561 @Override 562 public boolean isLowWaterMark() { 563 return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4); 564 } 565 566 /** 567 * @return true when 10% or less room is left for dispatching messages 568 */ 569 @Override 570 public boolean isHighWaterMark() { 571 return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9); 572 } 573 574 @Override 575 public int countBeforeFull() { 576 return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - dispatched.size(); 577 } 578 579 @Override 580 public int getPendingQueueSize() { 581 return pending.size(); 582 } 583 584 @Override 585 public long getPendingMessageSize() { 586 synchronized (pendingLock) { 587 return pending.messageSize(); 588 } 589 } 590 591 @Override 592 public int getDispatchedQueueSize() { 593 return dispatched.size(); 594 } 595 596 @Override 597 public long getDequeueCounter() { 598 return getSubscriptionStatistics().getDequeues().getCount(); 599 } 600 601 @Override 602 public long getDispatchedCounter() { 603 return getSubscriptionStatistics().getDispatched().getCount(); 604 } 605 606 @Override 607 public long getEnqueueCounter() { 608 return getSubscriptionStatistics().getEnqueues().getCount(); 609 } 610 611 @Override 612 public boolean isRecoveryRequired() { 613 return pending.isRecoveryRequired(); 614 } 615 616 public PendingMessageCursor getPending() { 617 return this.pending; 618 } 619 620 public void setPending(PendingMessageCursor pending) { 621 this.pending = pending; 622 if (this.pending!=null) { 623 this.pending.setSystemUsage(usageManager); 624 this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); 625 } 626 } 627 628 @Override 629 public void add(ConnectionContext context, Destination destination) throws Exception { 630 synchronized(pendingLock) { 631 super.add(context, destination); 632 pending.add(context, destination); 633 } 634 } 635 636 @Override 637 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 638 return remove(context, destination, dispatched); 639 } 640 641 public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { 642 List<MessageReference> rc = new ArrayList<MessageReference>(); 643 synchronized(pendingLock) { 644 super.remove(context, destination); 645 // Here is a potential problem concerning Inflight stat: 646 // Messages not already committed or rolled back may not be removed from dispatched list at the moment 647 // Except if each commit or rollback callback action comes before remove of subscriber. 648 rc.addAll(pending.remove(context, destination)); 649 650 if (dispatched == null) { 651 return rc; 652 } 653 654 // Synchronized to DispatchLock if necessary 655 if (dispatched == this.dispatched) { 656 synchronized(dispatchLock) { 657 updateDestinationStats(rc, destination, dispatched); 658 } 659 } else { 660 updateDestinationStats(rc, destination, dispatched); 661 } 662 } 663 return rc; 664 } 665 666 private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) { 667 ArrayList<MessageReference> references = new ArrayList<MessageReference>(); 668 for (MessageReference r : dispatched) { 669 if (r.getRegionDestination() == destination) { 670 references.add(r); 671 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); 672 } 673 } 674 rc.addAll(references); 675 destination.getDestinationStatistics().getInflight().subtract(references.size()); 676 dispatched.removeAll(references); 677 } 678 679 // made public so it can be used in MQTTProtocolConverter 680 public void dispatchPending() throws IOException { 681 synchronized(pendingLock) { 682 try { 683 int numberToDispatch = countBeforeFull(); 684 if (numberToDispatch > 0) { 685 setSlowConsumer(false); 686 setPendingBatchSize(pending, numberToDispatch); 687 int count = 0; 688 pending.reset(); 689 while (pending.hasNext() && !isFull() && count < numberToDispatch) { 690 MessageReference node = pending.next(); 691 if (node == null) { 692 break; 693 } 694 695 // Synchronize between dispatched list and remove of message from pending list 696 // related to remove subscription action 697 synchronized(dispatchLock) { 698 pending.remove(); 699 if (!isDropped(node) && canDispatch(node)) { 700 701 // Message may have been sitting in the pending 702 // list a while waiting for the consumer to ak the message. 703 if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) { 704 //increment number to dispatch 705 numberToDispatch++; 706 if (broker.isExpired(node)) { 707 ((Destination)node.getRegionDestination()).messageExpired(context, this, node); 708 } 709 710 if (!isBrowser()) { 711 node.decrementReferenceCount(); 712 continue; 713 } 714 } 715 dispatch(node); 716 count++; 717 } 718 } 719 // decrement after dispatch has taken ownership to avoid usage jitter 720 node.decrementReferenceCount(); 721 } 722 } else if (!isSlowConsumer()) { 723 setSlowConsumer(true); 724 for (Destination dest :destinations) { 725 dest.slowConsumer(context, this); 726 } 727 } 728 } finally { 729 pending.release(); 730 } 731 } 732 } 733 734 protected void setPendingBatchSize(PendingMessageCursor pending, int numberToDispatch) { 735 pending.setMaxBatchSize(numberToDispatch); 736 } 737 738 // called with dispatchLock held 739 protected boolean dispatch(final MessageReference node) throws IOException { 740 final Message message = node.getMessage(); 741 if (message == null) { 742 return false; 743 } 744 745 okForAckAsDispatchDone.countDown(); 746 747 MessageDispatch md = createMessageDispatch(node, message); 748 if (node != QueueMessageReference.NULL_MESSAGE) { 749 getSubscriptionStatistics().getDispatched().increment(); 750 dispatched.add(node); 751 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize()); 752 } 753 if (getPrefetchSize() == 0) { 754 while (true) { 755 int currentExtension = prefetchExtension.get(); 756 int newExtension = Math.max(0, currentExtension - 1); 757 if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { 758 break; 759 } 760 } 761 } 762 if (info.isDispatchAsync()) { 763 md.setTransmitCallback(new TransmitCallback() { 764 765 @Override 766 public void onSuccess() { 767 // Since the message gets queued up in async dispatch, we don't want to 768 // decrease the reference count until it gets put on the wire. 769 onDispatch(node, message); 770 } 771 772 @Override 773 public void onFailure() { 774 Destination nodeDest = (Destination) node.getRegionDestination(); 775 if (nodeDest != null) { 776 if (node != QueueMessageReference.NULL_MESSAGE) { 777 nodeDest.getDestinationStatistics().getDispatched().increment(); 778 nodeDest.getDestinationStatistics().getInflight().increment(); 779 LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 780 } 781 } 782 if (node instanceof QueueMessageReference) { 783 ((QueueMessageReference) node).unlock(); 784 } 785 } 786 }); 787 context.getConnection().dispatchAsync(md); 788 } else { 789 context.getConnection().dispatchSync(md); 790 onDispatch(node, message); 791 } 792 return true; 793 } 794 795 protected void onDispatch(final MessageReference node, final Message message) { 796 Destination nodeDest = (Destination) node.getRegionDestination(); 797 if (nodeDest != null) { 798 if (node != QueueMessageReference.NULL_MESSAGE) { 799 nodeDest.getDestinationStatistics().getDispatched().increment(); 800 nodeDest.getDestinationStatistics().getInflight().increment(); 801 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() }); 802 } 803 } 804 805 if (info.isDispatchAsync()) { 806 try { 807 dispatchPending(); 808 } catch (IOException e) { 809 context.getConnection().serviceExceptionAsync(e); 810 } 811 } 812 } 813 814 /** 815 * inform the MessageConsumer on the client to change it's prefetch 816 * 817 * @param newPrefetch 818 */ 819 @Override 820 public void updateConsumerPrefetch(int newPrefetch) { 821 if (context != null && context.getConnection() != null && context.getConnection().isManageable()) { 822 ConsumerControl cc = new ConsumerControl(); 823 cc.setConsumerId(info.getConsumerId()); 824 cc.setPrefetch(newPrefetch); 825 context.getConnection().dispatchAsync(cc); 826 } 827 } 828 829 /** 830 * @param node 831 * @param message 832 * @return MessageDispatch 833 */ 834 protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { 835 MessageDispatch md = new MessageDispatch(); 836 md.setConsumerId(info.getConsumerId()); 837 838 if (node == QueueMessageReference.NULL_MESSAGE) { 839 md.setMessage(null); 840 md.setDestination(null); 841 } else { 842 Destination regionDestination = (Destination) node.getRegionDestination(); 843 md.setDestination(regionDestination.getActiveMQDestination()); 844 md.setMessage(message); 845 md.setRedeliveryCounter(node.getRedeliveryCounter()); 846 } 847 848 return md; 849 } 850 851 /** 852 * Use when a matched message is about to be dispatched to the client. 853 * 854 * @param node 855 * @return false if the message should not be dispatched to the client 856 * (another sub may have already dispatched it for example). 857 * @throws IOException 858 */ 859 protected abstract boolean canDispatch(MessageReference node) throws IOException; 860 861 protected abstract boolean isDropped(MessageReference node); 862 863 /** 864 * Used during acknowledgment to remove the message. 865 * 866 * @throws IOException 867 */ 868 protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException; 869 870 871 public int getMaxProducersToAudit() { 872 return maxProducersToAudit; 873 } 874 875 public void setMaxProducersToAudit(int maxProducersToAudit) { 876 this.maxProducersToAudit = maxProducersToAudit; 877 if (this.pending != null) { 878 this.pending.setMaxProducersToAudit(maxProducersToAudit); 879 } 880 } 881 882 public int getMaxAuditDepth() { 883 return maxAuditDepth; 884 } 885 886 public void setMaxAuditDepth(int maxAuditDepth) { 887 this.maxAuditDepth = maxAuditDepth; 888 if (this.pending != null) { 889 this.pending.setMaxAuditDepth(maxAuditDepth); 890 } 891 } 892 893 public boolean isUsePrefetchExtension() { 894 return usePrefetchExtension; 895 } 896 897 public void setUsePrefetchExtension(boolean usePrefetchExtension) { 898 this.usePrefetchExtension = usePrefetchExtension; 899 } 900 901 protected int getPrefetchExtension() { 902 return this.prefetchExtension.get(); 903 } 904 905 @Override 906 public void setPrefetchSize(int prefetchSize) { 907 this.info.setPrefetchSize(prefetchSize); 908 try { 909 this.dispatchPending(); 910 } catch (Exception e) { 911 LOG.trace("Caught exception during dispatch after prefetch change.", e); 912 } 913 } 914}