001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected final Object sendMutex = new Object(); 223 protected final Object redeliveryGuard = new Object(); 224 225 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 226 227 private MessageListener messageListener; 228 private final JMSSessionStatsImpl stats; 229 private TransactionContext transactionContext; 230 private DeliveryListener deliveryListener; 231 private MessageTransformer transformer; 232 private BlobTransferPolicy blobTransferPolicy; 233 private long lastDeliveredSequenceId = -2; 234 235 /** 236 * Construct the Session 237 * 238 * @param connection 239 * @param sessionId 240 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 241 * Session.SESSION_TRANSACTED 242 * @param asyncDispatch 243 * @param sessionAsyncDispatch 244 * @throws JMSException on internal error 245 */ 246 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 247 this.debug = LOG.isDebugEnabled(); 248 this.connection = connection; 249 this.acknowledgementMode = acknowledgeMode; 250 this.asyncDispatch = asyncDispatch; 251 this.sessionAsyncDispatch = sessionAsyncDispatch; 252 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 253 setTransactionContext(new TransactionContext(connection)); 254 stats = new JMSSessionStatsImpl(producers, consumers); 255 this.connection.asyncSendPacket(info); 256 setTransformer(connection.getTransformer()); 257 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 258 this.connectionExecutor=connection.getExecutor(); 259 this.executor = new ActiveMQSessionExecutor(this); 260 connection.addSession(this); 261 if (connection.isStarted()) { 262 start(); 263 } 264 265 } 266 267 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 268 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 269 } 270 271 /** 272 * Sets the transaction context of the session. 273 * 274 * @param transactionContext - provides the means to control a JMS 275 * transaction. 276 */ 277 public void setTransactionContext(TransactionContext transactionContext) { 278 this.transactionContext = transactionContext; 279 } 280 281 /** 282 * Returns the transaction context of the session. 283 * 284 * @return transactionContext - session's transaction context. 285 */ 286 public TransactionContext getTransactionContext() { 287 return transactionContext; 288 } 289 290 /* 291 * (non-Javadoc) 292 * 293 * @see org.apache.activemq.management.StatsCapable#getStats() 294 */ 295 @Override 296 public StatsImpl getStats() { 297 return stats; 298 } 299 300 /** 301 * Returns the session's statistics. 302 * 303 * @return stats - session's statistics. 304 */ 305 public JMSSessionStatsImpl getSessionStats() { 306 return stats; 307 } 308 309 /** 310 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 311 * object is used to send a message containing a stream of uninterpreted 312 * bytes. 313 * 314 * @return the an ActiveMQBytesMessage 315 * @throws JMSException if the JMS provider fails to create this message due 316 * to some internal error. 317 */ 318 @Override 319 public BytesMessage createBytesMessage() throws JMSException { 320 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 321 configureMessage(message); 322 return message; 323 } 324 325 /** 326 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 327 * object is used to send a self-defining set of name-value pairs, where 328 * names are <CODE>String</CODE> objects and values are primitive values 329 * in the Java programming language. 330 * 331 * @return an ActiveMQMapMessage 332 * @throws JMSException if the JMS provider fails to create this message due 333 * to some internal error. 334 */ 335 @Override 336 public MapMessage createMapMessage() throws JMSException { 337 ActiveMQMapMessage message = new ActiveMQMapMessage(); 338 configureMessage(message); 339 return message; 340 } 341 342 /** 343 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 344 * interface is the root interface of all JMS messages. A 345 * <CODE>Message</CODE> object holds all the standard message header 346 * information. It can be sent when a message containing only header 347 * information is sufficient. 348 * 349 * @return an ActiveMQMessage 350 * @throws JMSException if the JMS provider fails to create this message due 351 * to some internal error. 352 */ 353 @Override 354 public Message createMessage() throws JMSException { 355 ActiveMQMessage message = new ActiveMQMessage(); 356 configureMessage(message); 357 return message; 358 } 359 360 /** 361 * Creates an <CODE>ObjectMessage</CODE> object. An 362 * <CODE>ObjectMessage</CODE> object is used to send a message that 363 * contains a serializable Java object. 364 * 365 * @return an ActiveMQObjectMessage 366 * @throws JMSException if the JMS provider fails to create this message due 367 * to some internal error. 368 */ 369 @Override 370 public ObjectMessage createObjectMessage() throws JMSException { 371 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 372 configureMessage(message); 373 return message; 374 } 375 376 /** 377 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 378 * <CODE>ObjectMessage</CODE> object is used to send a message that 379 * contains a serializable Java object. 380 * 381 * @param object the object to use to initialize this message 382 * @return an ActiveMQObjectMessage 383 * @throws JMSException if the JMS provider fails to create this message due 384 * to some internal error. 385 */ 386 @Override 387 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 388 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 389 configureMessage(message); 390 message.setObject(object); 391 return message; 392 } 393 394 /** 395 * Creates a <CODE>StreamMessage</CODE> object. A 396 * <CODE>StreamMessage</CODE> object is used to send a self-defining 397 * stream of primitive values in the Java programming language. 398 * 399 * @return an ActiveMQStreamMessage 400 * @throws JMSException if the JMS provider fails to create this message due 401 * to some internal error. 402 */ 403 @Override 404 public StreamMessage createStreamMessage() throws JMSException { 405 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 406 configureMessage(message); 407 return message; 408 } 409 410 /** 411 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 412 * object is used to send a message containing a <CODE>String</CODE> 413 * object. 414 * 415 * @return an ActiveMQTextMessage 416 * @throws JMSException if the JMS provider fails to create this message due 417 * to some internal error. 418 */ 419 @Override 420 public TextMessage createTextMessage() throws JMSException { 421 ActiveMQTextMessage message = new ActiveMQTextMessage(); 422 configureMessage(message); 423 return message; 424 } 425 426 /** 427 * Creates an initialized <CODE>TextMessage</CODE> object. A 428 * <CODE>TextMessage</CODE> object is used to send a message containing a 429 * <CODE>String</CODE>. 430 * 431 * @param text the string used to initialize this message 432 * @return an ActiveMQTextMessage 433 * @throws JMSException if the JMS provider fails to create this message due 434 * to some internal error. 435 */ 436 @Override 437 public TextMessage createTextMessage(String text) throws JMSException { 438 ActiveMQTextMessage message = new ActiveMQTextMessage(); 439 message.setText(text); 440 configureMessage(message); 441 return message; 442 } 443 444 /** 445 * Creates an initialized <CODE>BlobMessage</CODE> object. A 446 * <CODE>BlobMessage</CODE> object is used to send a message containing a 447 * <CODE>URL</CODE> which points to some network addressible BLOB. 448 * 449 * @param url the network addressable URL used to pass directly to the 450 * consumer 451 * @return a BlobMessage 452 * @throws JMSException if the JMS provider fails to create this message due 453 * to some internal error. 454 */ 455 public BlobMessage createBlobMessage(URL url) throws JMSException { 456 return createBlobMessage(url, false); 457 } 458 459 /** 460 * Creates an initialized <CODE>BlobMessage</CODE> object. A 461 * <CODE>BlobMessage</CODE> object is used to send a message containing a 462 * <CODE>URL</CODE> which points to some network addressible BLOB. 463 * 464 * @param url the network addressable URL used to pass directly to the 465 * consumer 466 * @param deletedByBroker indicates whether or not the resource is deleted 467 * by the broker when the message is acknowledged 468 * @return a BlobMessage 469 * @throws JMSException if the JMS provider fails to create this message due 470 * to some internal error. 471 */ 472 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 473 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 474 configureMessage(message); 475 message.setURL(url); 476 message.setDeletedByBroker(deletedByBroker); 477 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 478 return message; 479 } 480 481 /** 482 * Creates an initialized <CODE>BlobMessage</CODE> object. A 483 * <CODE>BlobMessage</CODE> object is used to send a message containing 484 * the <CODE>File</CODE> content. Before the message is sent the file 485 * conent will be uploaded to the broker or some other remote repository 486 * depending on the {@link #getBlobTransferPolicy()}. 487 * 488 * @param file the file to be uploaded to some remote repo (or the broker) 489 * depending on the strategy 490 * @return a BlobMessage 491 * @throws JMSException if the JMS provider fails to create this message due 492 * to some internal error. 493 */ 494 public BlobMessage createBlobMessage(File file) throws JMSException { 495 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 496 configureMessage(message); 497 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 498 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 499 message.setDeletedByBroker(true); 500 message.setName(file.getName()); 501 return message; 502 } 503 504 /** 505 * Creates an initialized <CODE>BlobMessage</CODE> object. A 506 * <CODE>BlobMessage</CODE> object is used to send a message containing 507 * the <CODE>File</CODE> content. Before the message is sent the file 508 * conent will be uploaded to the broker or some other remote repository 509 * depending on the {@link #getBlobTransferPolicy()}. <br/> 510 * <p> 511 * The caller of this method is responsible for closing the 512 * input stream that is used, however the stream can not be closed 513 * until <b>after</b> the message has been sent. To have this class 514 * manage the stream and close it automatically, use the method 515 * {@link ActiveMQSession#createBlobMessage(File)} 516 * 517 * @param in the stream to be uploaded to some remote repo (or the broker) 518 * depending on the strategy 519 * @return a BlobMessage 520 * @throws JMSException if the JMS provider fails to create this message due 521 * to some internal error. 522 */ 523 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 524 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 525 configureMessage(message); 526 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 527 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 528 message.setDeletedByBroker(true); 529 return message; 530 } 531 532 /** 533 * Indicates whether the session is in transacted mode. 534 * 535 * @return true if the session is in transacted mode 536 * @throws JMSException if there is some internal error. 537 */ 538 @Override 539 public boolean getTransacted() throws JMSException { 540 checkClosed(); 541 return isTransacted(); 542 } 543 544 /** 545 * Returns the acknowledgement mode of the session. The acknowledgement mode 546 * is set at the time that the session is created. If the session is 547 * transacted, the acknowledgement mode is ignored. 548 * 549 * @return If the session is not transacted, returns the current 550 * acknowledgement mode for the session. If the session is 551 * transacted, returns SESSION_TRANSACTED. 552 * @throws JMSException 553 * @see javax.jms.Connection#createSession(boolean,int) 554 * @since 1.1 exception JMSException if there is some internal error. 555 */ 556 @Override 557 public int getAcknowledgeMode() throws JMSException { 558 checkClosed(); 559 return this.acknowledgementMode; 560 } 561 562 /** 563 * Commits all messages done in this transaction and releases any locks 564 * currently held. 565 * 566 * @throws JMSException if the JMS provider fails to commit the transaction 567 * due to some internal error. 568 * @throws TransactionRolledBackException if the transaction is rolled back 569 * due to some internal error during commit. 570 * @throws javax.jms.IllegalStateException if the method is not called by a 571 * transacted session. 572 */ 573 @Override 574 public void commit() throws JMSException { 575 checkClosed(); 576 if (!getTransacted()) { 577 throw new javax.jms.IllegalStateException("Not a transacted session"); 578 } 579 if (LOG.isDebugEnabled()) { 580 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 581 } 582 transactionContext.commit(); 583 } 584 585 /** 586 * Rolls back any messages done in this transaction and releases any locks 587 * currently held. 588 * 589 * @throws JMSException if the JMS provider fails to roll back the 590 * transaction due to some internal error. 591 * @throws javax.jms.IllegalStateException if the method is not called by a 592 * transacted session. 593 */ 594 @Override 595 public void rollback() throws JMSException { 596 checkClosed(); 597 if (!getTransacted()) { 598 throw new javax.jms.IllegalStateException("Not a transacted session"); 599 } 600 if (LOG.isDebugEnabled()) { 601 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 602 } 603 transactionContext.rollback(); 604 } 605 606 /** 607 * Closes the session. 608 * <P> 609 * Since a provider may allocate some resources on behalf of a session 610 * outside the JVM, clients should close the resources when they are not 611 * needed. Relying on garbage collection to eventually reclaim these 612 * resources may not be timely enough. 613 * <P> 614 * There is no need to close the producers and consumers of a closed 615 * session. 616 * <P> 617 * This call will block until a <CODE>receive</CODE> call or message 618 * listener in progress has completed. A blocked message consumer 619 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 620 * is closed. 621 * <P> 622 * Closing a transacted session must roll back the transaction in progress. 623 * <P> 624 * This method is the only <CODE>Session</CODE> method that can be called 625 * concurrently. 626 * <P> 627 * Invoking any other <CODE>Session</CODE> method on a closed session must 628 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 629 * closed session must <I>not </I> throw an exception. 630 * 631 * @throws JMSException if the JMS provider fails to close the session due 632 * to some internal error. 633 */ 634 @Override 635 public void close() throws JMSException { 636 if (!closed) { 637 if (getTransactionContext().isInXATransaction()) { 638 if (!synchronizationRegistered) { 639 synchronizationRegistered = true; 640 getTransactionContext().addSynchronization(new Synchronization() { 641 642 @Override 643 public void afterCommit() throws Exception { 644 doClose(); 645 synchronizationRegistered = false; 646 } 647 648 @Override 649 public void afterRollback() throws Exception { 650 doClose(); 651 synchronizationRegistered = false; 652 } 653 }); 654 } 655 656 } else { 657 doClose(); 658 } 659 } 660 } 661 662 private void doClose() throws JMSException { 663 dispose(); 664 RemoveInfo removeCommand = info.createRemoveCommand(); 665 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 666 connection.asyncSendPacket(removeCommand); 667 } 668 669 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 670 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 671 clearRequestsCounter.incrementAndGet(); 672 executor.clearMessagesInProgress(); 673 // we are called from inside the transport reconnection logic which involves us 674 // clearing all the connections' consumers dispatch and delivered lists. So rather 675 // than trying to grab a mutex (which could be already owned by the message listener 676 // calling the send or an ack) we allow it to complete in a separate thread via the 677 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 678 // 679 // We must be careful though not to allow multiple calls to this method from a 680 // connection that is having issue becoming fully established from causing a large 681 // build up of scheduled tasks to clear the same consumers over and over. 682 if (consumers.isEmpty()) { 683 return; 684 } 685 686 if (clearInProgress.compareAndSet(false, true)) { 687 for (final ActiveMQMessageConsumer consumer : consumers) { 688 consumer.inProgressClearRequired(); 689 transportInterruptionProcessingComplete.incrementAndGet(); 690 try { 691 connection.getScheduler().executeAfterDelay(new Runnable() { 692 @Override 693 public void run() { 694 consumer.clearMessagesInProgress(); 695 }}, 0l); 696 } catch (JMSException e) { 697 connection.onClientInternalException(e); 698 } 699 } 700 701 try { 702 connection.getScheduler().executeAfterDelay(new Runnable() { 703 @Override 704 public void run() { 705 clearInProgress.set(false); 706 }}, 0l); 707 } catch (JMSException e) { 708 connection.onClientInternalException(e); 709 } 710 } 711 } 712 713 void deliverAcks() { 714 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 715 ActiveMQMessageConsumer consumer = iter.next(); 716 consumer.deliverAcks(); 717 } 718 } 719 720 public synchronized void dispose() throws JMSException { 721 if (!closed) { 722 723 try { 724 executor.close(); 725 726 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 727 ActiveMQMessageConsumer consumer = iter.next(); 728 consumer.setFailureError(connection.getFirstFailureError()); 729 consumer.dispose(); 730 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 731 } 732 consumers.clear(); 733 734 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 735 ActiveMQMessageProducer producer = iter.next(); 736 producer.dispose(); 737 } 738 producers.clear(); 739 740 try { 741 if (getTransactionContext().isInLocalTransaction()) { 742 rollback(); 743 } 744 } catch (JMSException e) { 745 } 746 747 } finally { 748 connection.removeSession(this); 749 this.transactionContext = null; 750 closed = true; 751 } 752 } 753 } 754 755 /** 756 * Checks that the session is not closed then configures the message 757 */ 758 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 759 checkClosed(); 760 message.setConnection(connection); 761 } 762 763 /** 764 * Check if the session is closed. It is used for ensuring that the session 765 * is open before performing various operations. 766 * 767 * @throws IllegalStateException if the Session is closed 768 */ 769 protected void checkClosed() throws IllegalStateException { 770 if (closed) { 771 throw new IllegalStateException("The Session is closed"); 772 } 773 } 774 775 /** 776 * Checks if the session is closed. 777 * 778 * @return true if the session is closed, false otherwise. 779 */ 780 public boolean isClosed() { 781 return closed; 782 } 783 784 /** 785 * Stops message delivery in this session, and restarts message delivery 786 * with the oldest unacknowledged message. 787 * <P> 788 * All consumers deliver messages in a serial order. Acknowledging a 789 * received message automatically acknowledges all messages that have been 790 * delivered to the client. 791 * <P> 792 * Restarting a session causes it to take the following actions: 793 * <UL> 794 * <LI>Stop message delivery 795 * <LI>Mark all messages that might have been delivered but not 796 * acknowledged as "redelivered" 797 * <LI>Restart the delivery sequence including all unacknowledged messages 798 * that had been previously delivered. Redelivered messages do not have to 799 * be delivered in exactly their original delivery order. 800 * </UL> 801 * 802 * @throws JMSException if the JMS provider fails to stop and restart 803 * message delivery due to some internal error. 804 * @throws IllegalStateException if the method is called by a transacted 805 * session. 806 */ 807 @Override 808 public void recover() throws JMSException { 809 810 checkClosed(); 811 if (getTransacted()) { 812 throw new IllegalStateException("This session is transacted"); 813 } 814 815 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 816 ActiveMQMessageConsumer c = iter.next(); 817 c.rollback(); 818 } 819 820 } 821 822 /** 823 * Returns the session's distinguished message listener (optional). 824 * 825 * @return the message listener associated with this session 826 * @throws JMSException if the JMS provider fails to get the message 827 * listener due to an internal error. 828 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 829 * @see javax.jms.ServerSessionPool 830 * @see javax.jms.ServerSession 831 */ 832 @Override 833 public MessageListener getMessageListener() throws JMSException { 834 checkClosed(); 835 return this.messageListener; 836 } 837 838 /** 839 * Sets the session's distinguished message listener (optional). 840 * <P> 841 * When the distinguished message listener is set, no other form of message 842 * receipt in the session can be used; however, all forms of sending 843 * messages are still supported. 844 * <P> 845 * If this session has been closed, then an {@link IllegalStateException} is 846 * thrown, if trying to set a new listener. However setting the listener 847 * to <tt>null</tt> is allowed, to clear the listener, even if this session 848 * has been closed prior. 849 * <P> 850 * This is an expert facility not used by regular JMS clients. 851 * 852 * @param listener the message listener to associate with this session 853 * @throws JMSException if the JMS provider fails to set the message 854 * listener due to an internal error. 855 * @see javax.jms.Session#getMessageListener() 856 * @see javax.jms.ServerSessionPool 857 * @see javax.jms.ServerSession 858 */ 859 @Override 860 public void setMessageListener(MessageListener listener) throws JMSException { 861 // only check for closed if we set a new listener, as we allow to clear 862 // the listener, such as when an application is shutting down, and is 863 // no longer using a message listener on this session 864 if (listener != null) { 865 checkClosed(); 866 } 867 this.messageListener = listener; 868 869 if (listener != null) { 870 executor.setDispatchedBySessionPool(true); 871 } 872 } 873 874 /** 875 * Optional operation, intended to be used only by Application Servers, not 876 * by ordinary JMS clients. 877 * 878 * @see javax.jms.ServerSession 879 */ 880 @Override 881 public void run() { 882 MessageDispatch messageDispatch; 883 while ((messageDispatch = executor.dequeueNoWait()) != null) { 884 final MessageDispatch md = messageDispatch; 885 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 886 887 MessageAck earlyAck = null; 888 if (message.isExpired()) { 889 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 890 earlyAck.setFirstMessageId(message.getMessageId()); 891 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 892 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 893 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 894 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 895 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 896 } 897 if (earlyAck != null) { 898 try { 899 asyncSendPacket(earlyAck); 900 } catch (Throwable t) { 901 LOG.error("error dispatching ack: {} ", earlyAck, t); 902 connection.onClientInternalException(t); 903 } finally { 904 continue; 905 } 906 } 907 908 if (isClientAcknowledge()||isIndividualAcknowledge()) { 909 message.setAcknowledgeCallback(new Callback() { 910 @Override 911 public void execute() throws Exception { 912 } 913 }); 914 } 915 916 if (deliveryListener != null) { 917 deliveryListener.beforeDelivery(this, message); 918 } 919 920 md.setDeliverySequenceId(getNextDeliveryId()); 921 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 922 923 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 924 925 final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); 926 /* 927 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. 928 * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. 929 * */ 930 synchronized (redeliveryGuard) { 931 try { 932 ack.setFirstMessageId(md.getMessage().getMessageId()); 933 doStartTransaction(); 934 ack.setTransactionId(getTransactionContext().getTransactionId()); 935 if (ack.getTransactionId() != null) { 936 getTransactionContext().addSynchronization(new Synchronization() { 937 938 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 939 940 @Override 941 public void beforeEnd() throws Exception { 942 // validate our consumer so we don't push stale acks that get ignored 943 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 944 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 945 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 946 } 947 LOG.trace("beforeEnd ack {}", ack); 948 sendAck(ack); 949 } 950 951 @Override 952 public void afterRollback() throws Exception { 953 LOG.trace("rollback {}", ack, new Throwable("here")); 954 // ensure we don't filter this as a duplicate 955 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 956 957 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 958 if (clearRequestsCounter.get() > clearRequestCount) { 959 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 960 return; 961 } 962 963 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 964 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 965 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 966 return; 967 } 968 969 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 970 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 971 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 972 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 973 // We need to NACK the messages so that they get 974 // sent to the 975 // DLQ. 976 // Acknowledge the last message. 977 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 978 ack.setFirstMessageId(md.getMessage().getMessageId()); 979 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 980 asyncSendPacket(ack); 981 982 } else { 983 984 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 985 ack.setFirstMessageId(md.getMessage().getMessageId()); 986 asyncSendPacket(ack); 987 988 // Figure out how long we should wait to resend 989 // this message. 990 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 991 for (int i = 0; i < redeliveryCounter; i++) { 992 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 993 } 994 995 /* 996 * If we are a non blocking delivery then we need to stop the executor to avoid more 997 * messages being delivered, once the message is redelivered we can restart it. 998 * */ 999 if (!connection.isNonBlockingRedelivery()) { 1000 LOG.debug("Blocking session until re-delivery..."); 1001 executor.stop(); 1002 } 1003 1004 connection.getScheduler().executeAfterDelay(new Runnable() { 1005 1006 @Override 1007 public void run() { 1008 /* 1009 * wait for the first delivery to be complete, i.e. after delivery has been called. 1010 * */ 1011 synchronized (redeliveryGuard) { 1012 /* 1013 * If its non blocking then we can just dispatch in a new session. 1014 * */ 1015 if (connection.isNonBlockingRedelivery()) { 1016 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1017 } else { 1018 /* 1019 * If there has been an error thrown during afterDelivery then the 1020 * endpoint will be marked as dead so redelivery will fail (and eventually 1021 * the session marked as stale), in this case we can only call dispatch 1022 * which will create a new session with a new endpoint. 1023 * */ 1024 if (afterDeliveryError.get()) { 1025 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1026 } else { 1027 executor.executeFirst(md); 1028 executor.start(); 1029 } 1030 } 1031 } 1032 } 1033 }, redeliveryDelay); 1034 } 1035 md.getMessage().onMessageRolledBack(); 1036 } 1037 }); 1038 } 1039 1040 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1041 messageListener.onMessage(message); 1042 1043 } catch (Throwable e) { 1044 LOG.error("error dispatching message: ", e); 1045 1046 // A problem while invoking the MessageListener does not 1047 // in general indicate a problem with the connection to the broker, i.e. 1048 // it will usually be sufficient to let the afterDelivery() method either 1049 // commit or roll back in order to deal with the exception. 1050 // However, we notify any registered client internal exception listener 1051 // of the problem. 1052 connection.onClientInternalException(e); 1053 } finally { 1054 if (ack.getTransactionId() == null) { 1055 try { 1056 asyncSendPacket(ack); 1057 } catch (Throwable e) { 1058 connection.onClientInternalException(e); 1059 } 1060 } 1061 } 1062 1063 if (deliveryListener != null) { 1064 try { 1065 deliveryListener.afterDelivery(this, message); 1066 } catch (Throwable t) { 1067 LOG.debug("Unable to call after delivery", t); 1068 afterDeliveryError.set(true); 1069 throw new RuntimeException(t); 1070 } 1071 } 1072 } 1073 /* 1074 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. 1075 * It also needs to be outside the redelivery guard. 1076 * */ 1077 try { 1078 executor.waitForQueueRestart(); 1079 } catch (InterruptedException ex) { 1080 connection.onClientInternalException(ex); 1081 } 1082 } 1083 } 1084 1085 /** 1086 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1087 * specified destination. 1088 * <P> 1089 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1090 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1091 * inherit from <CODE>Destination</CODE>, they can be used in the 1092 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1093 * 1094 * @param destination the <CODE>Destination</CODE> to send to, or null if 1095 * this is a producer which does not have a specified 1096 * destination. 1097 * @return the MessageProducer 1098 * @throws JMSException if the session fails to create a MessageProducer due 1099 * to some internal error. 1100 * @throws InvalidDestinationException if an invalid destination is 1101 * specified. 1102 * @since 1.1 1103 */ 1104 @Override 1105 public MessageProducer createProducer(Destination destination) throws JMSException { 1106 checkClosed(); 1107 if (destination instanceof CustomDestination) { 1108 CustomDestination customDestination = (CustomDestination)destination; 1109 return customDestination.createProducer(this); 1110 } 1111 int timeSendOut = connection.getSendTimeout(); 1112 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1113 } 1114 1115 /** 1116 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1117 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1118 * <CODE>Destination</CODE>, they can be used in the destination 1119 * parameter to create a <CODE>MessageConsumer</CODE>. 1120 * 1121 * @param destination the <CODE>Destination</CODE> to access. 1122 * @return the MessageConsumer 1123 * @throws JMSException if the session fails to create a consumer due to 1124 * some internal error. 1125 * @throws InvalidDestinationException if an invalid destination is 1126 * specified. 1127 * @since 1.1 1128 */ 1129 @Override 1130 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1131 return createConsumer(destination, (String) null); 1132 } 1133 1134 /** 1135 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1136 * using a message selector. Since <CODE> Queue</CODE> and 1137 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1138 * can be used in the destination parameter to create a 1139 * <CODE>MessageConsumer</CODE>. 1140 * <P> 1141 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1142 * that have been sent to a destination. 1143 * 1144 * @param destination the <CODE>Destination</CODE> to access 1145 * @param messageSelector only messages with properties matching the message 1146 * selector expression are delivered. A value of null or an 1147 * empty string indicates that there is no message selector 1148 * for the message consumer. 1149 * @return the MessageConsumer 1150 * @throws JMSException if the session fails to create a MessageConsumer due 1151 * to some internal error. 1152 * @throws InvalidDestinationException if an invalid destination is 1153 * specified. 1154 * @throws InvalidSelectorException if the message selector is invalid. 1155 * @since 1.1 1156 */ 1157 @Override 1158 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1159 return createConsumer(destination, messageSelector, false); 1160 } 1161 1162 /** 1163 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1164 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1165 * <CODE>Destination</CODE>, they can be used in the destination 1166 * parameter to create a <CODE>MessageConsumer</CODE>. 1167 * 1168 * @param destination the <CODE>Destination</CODE> to access. 1169 * @param messageListener the listener to use for async consumption of messages 1170 * @return the MessageConsumer 1171 * @throws JMSException if the session fails to create a consumer due to 1172 * some internal error. 1173 * @throws InvalidDestinationException if an invalid destination is 1174 * specified. 1175 * @since 1.1 1176 */ 1177 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1178 return createConsumer(destination, null, messageListener); 1179 } 1180 1181 /** 1182 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1183 * using a message selector. Since <CODE> Queue</CODE> and 1184 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1185 * can be used in the destination parameter to create a 1186 * <CODE>MessageConsumer</CODE>. 1187 * <P> 1188 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1189 * that have been sent to a destination. 1190 * 1191 * @param destination the <CODE>Destination</CODE> to access 1192 * @param messageSelector only messages with properties matching the message 1193 * selector expression are delivered. A value of null or an 1194 * empty string indicates that there is no message selector 1195 * for the message consumer. 1196 * @param messageListener the listener to use for async consumption of messages 1197 * @return the MessageConsumer 1198 * @throws JMSException if the session fails to create a MessageConsumer due 1199 * to some internal error. 1200 * @throws InvalidDestinationException if an invalid destination is 1201 * specified. 1202 * @throws InvalidSelectorException if the message selector is invalid. 1203 * @since 1.1 1204 */ 1205 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1206 return createConsumer(destination, messageSelector, false, messageListener); 1207 } 1208 1209 /** 1210 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1211 * using a message selector. This method can specify whether messages 1212 * published by its own connection should be delivered to it, if the 1213 * destination is a topic. 1214 * <P> 1215 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1216 * <CODE>Destination</CODE>, they can be used in the destination 1217 * parameter to create a <CODE>MessageConsumer</CODE>. 1218 * <P> 1219 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1220 * that have been published to a destination. 1221 * <P> 1222 * In some cases, a connection may both publish and subscribe to a topic. 1223 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1224 * inhibit the delivery of messages published by its own connection. The 1225 * default value for this attribute is False. The <CODE>noLocal</CODE> 1226 * value must be supported by destinations that are topics. 1227 * 1228 * @param destination the <CODE>Destination</CODE> to access 1229 * @param messageSelector only messages with properties matching the message 1230 * selector expression are delivered. A value of null or an 1231 * empty string indicates that there is no message selector 1232 * for the message consumer. 1233 * @param noLocal - if true, and the destination is a topic, inhibits the 1234 * delivery of messages published by its own connection. The 1235 * behavior for <CODE>NoLocal</CODE> is not specified if 1236 * the destination is a queue. 1237 * @return the MessageConsumer 1238 * @throws JMSException if the session fails to create a MessageConsumer due 1239 * to some internal error. 1240 * @throws InvalidDestinationException if an invalid destination is 1241 * specified. 1242 * @throws InvalidSelectorException if the message selector is invalid. 1243 * @since 1.1 1244 */ 1245 @Override 1246 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1247 return createConsumer(destination, messageSelector, noLocal, null); 1248 } 1249 1250 /** 1251 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1252 * using a message selector. This method can specify whether messages 1253 * published by its own connection should be delivered to it, if the 1254 * destination is a topic. 1255 * <P> 1256 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1257 * <CODE>Destination</CODE>, they can be used in the destination 1258 * parameter to create a <CODE>MessageConsumer</CODE>. 1259 * <P> 1260 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1261 * that have been published to a destination. 1262 * <P> 1263 * In some cases, a connection may both publish and subscribe to a topic. 1264 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1265 * inhibit the delivery of messages published by its own connection. The 1266 * default value for this attribute is False. The <CODE>noLocal</CODE> 1267 * value must be supported by destinations that are topics. 1268 * 1269 * @param destination the <CODE>Destination</CODE> to access 1270 * @param messageSelector only messages with properties matching the message 1271 * selector expression are delivered. A value of null or an 1272 * empty string indicates that there is no message selector 1273 * for the message consumer. 1274 * @param noLocal - if true, and the destination is a topic, inhibits the 1275 * delivery of messages published by its own connection. The 1276 * behavior for <CODE>NoLocal</CODE> is not specified if 1277 * the destination is a queue. 1278 * @param messageListener the listener to use for async consumption of messages 1279 * @return the MessageConsumer 1280 * @throws JMSException if the session fails to create a MessageConsumer due 1281 * to some internal error. 1282 * @throws InvalidDestinationException if an invalid destination is 1283 * specified. 1284 * @throws InvalidSelectorException if the message selector is invalid. 1285 * @since 1.1 1286 */ 1287 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1288 checkClosed(); 1289 1290 if (destination instanceof CustomDestination) { 1291 CustomDestination customDestination = (CustomDestination)destination; 1292 return customDestination.createConsumer(this, messageSelector, noLocal); 1293 } 1294 1295 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1296 int prefetch = 0; 1297 if (destination instanceof Topic) { 1298 prefetch = prefetchPolicy.getTopicPrefetch(); 1299 } else { 1300 prefetch = prefetchPolicy.getQueuePrefetch(); 1301 } 1302 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1303 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1304 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1305 } 1306 1307 /** 1308 * Creates a queue identity given a <CODE>Queue</CODE> name. 1309 * <P> 1310 * This facility is provided for the rare cases where clients need to 1311 * dynamically manipulate queue identity. It allows the creation of a queue 1312 * identity with a provider-specific name. Clients that depend on this 1313 * ability are not portable. 1314 * <P> 1315 * Note that this method is not for creating the physical queue. The 1316 * physical creation of queues is an administrative task and is not to be 1317 * initiated by the JMS API. The one exception is the creation of temporary 1318 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1319 * method. 1320 * 1321 * @param queueName the name of this <CODE>Queue</CODE> 1322 * @return a <CODE>Queue</CODE> with the given name 1323 * @throws JMSException if the session fails to create a queue due to some 1324 * internal error. 1325 * @since 1.1 1326 */ 1327 @Override 1328 public Queue createQueue(String queueName) throws JMSException { 1329 checkClosed(); 1330 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1331 return new ActiveMQTempQueue(queueName); 1332 } 1333 return new ActiveMQQueue(queueName); 1334 } 1335 1336 /** 1337 * Creates a topic identity given a <CODE>Topic</CODE> name. 1338 * <P> 1339 * This facility is provided for the rare cases where clients need to 1340 * dynamically manipulate topic identity. This allows the creation of a 1341 * topic identity with a provider-specific name. Clients that depend on this 1342 * ability are not portable. 1343 * <P> 1344 * Note that this method is not for creating the physical topic. The 1345 * physical creation of topics is an administrative task and is not to be 1346 * initiated by the JMS API. The one exception is the creation of temporary 1347 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1348 * method. 1349 * 1350 * @param topicName the name of this <CODE>Topic</CODE> 1351 * @return a <CODE>Topic</CODE> with the given name 1352 * @throws JMSException if the session fails to create a topic due to some 1353 * internal error. 1354 * @since 1.1 1355 */ 1356 @Override 1357 public Topic createTopic(String topicName) throws JMSException { 1358 checkClosed(); 1359 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1360 return new ActiveMQTempTopic(topicName); 1361 } 1362 return new ActiveMQTopic(topicName); 1363 } 1364 1365 /** 1366 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1367 * the specified queue. 1368 * 1369 * @param queue the <CODE>queue</CODE> to access 1370 * @exception InvalidDestinationException if an invalid destination is 1371 * specified 1372 * @since 1.1 1373 */ 1374 /** 1375 * Creates a durable subscriber to the specified topic. 1376 * <P> 1377 * If a client needs to receive all the messages published on a topic, 1378 * including the ones published while the subscriber is inactive, it uses a 1379 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1380 * record of this durable subscription and insures that all messages from 1381 * the topic's publishers are retained until they are acknowledged by this 1382 * durable subscriber or they have expired. 1383 * <P> 1384 * Sessions with durable subscribers must always provide the same client 1385 * identifier. In addition, each client must specify a name that uniquely 1386 * identifies (within client identifier) each durable subscription it 1387 * creates. Only one session at a time can have a 1388 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1389 * <P> 1390 * A client can change an existing durable subscription by creating a 1391 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1392 * and/or message selector. Changing a durable subscriber is equivalent to 1393 * unsubscribing (deleting) the old one and creating a new one. 1394 * <P> 1395 * In some cases, a connection may both publish and subscribe to a topic. 1396 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1397 * inhibit the delivery of messages published by its own connection. The 1398 * default value for this attribute is false. 1399 * 1400 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1401 * @param name the name used to identify this subscription 1402 * @return the TopicSubscriber 1403 * @throws JMSException if the session fails to create a subscriber due to 1404 * some internal error. 1405 * @throws InvalidDestinationException if an invalid topic is specified. 1406 * @since 1.1 1407 */ 1408 @Override 1409 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1410 checkClosed(); 1411 return createDurableSubscriber(topic, name, null, false); 1412 } 1413 1414 /** 1415 * Creates a durable subscriber to the specified topic, using a message 1416 * selector and specifying whether messages published by its own connection 1417 * should be delivered to it. 1418 * <P> 1419 * If a client needs to receive all the messages published on a topic, 1420 * including the ones published while the subscriber is inactive, it uses a 1421 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1422 * record of this durable subscription and insures that all messages from 1423 * the topic's publishers are retained until they are acknowledged by this 1424 * durable subscriber or they have expired. 1425 * <P> 1426 * Sessions with durable subscribers must always provide the same client 1427 * identifier. In addition, each client must specify a name which uniquely 1428 * identifies (within client identifier) each durable subscription it 1429 * creates. Only one session at a time can have a 1430 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1431 * inactive durable subscriber is one that exists but does not currently 1432 * have a message consumer associated with it. 1433 * <P> 1434 * A client can change an existing durable subscription by creating a 1435 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1436 * and/or message selector. Changing a durable subscriber is equivalent to 1437 * unsubscribing (deleting) the old one and creating a new one. 1438 * 1439 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1440 * @param name the name used to identify this subscription 1441 * @param messageSelector only messages with properties matching the message 1442 * selector expression are delivered. A value of null or an 1443 * empty string indicates that there is no message selector 1444 * for the message consumer. 1445 * @param noLocal if set, inhibits the delivery of messages published by its 1446 * own connection 1447 * @return the Queue Browser 1448 * @throws JMSException if the session fails to create a subscriber due to 1449 * some internal error. 1450 * @throws InvalidDestinationException if an invalid topic is specified. 1451 * @throws InvalidSelectorException if the message selector is invalid. 1452 * @since 1.1 1453 */ 1454 @Override 1455 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1456 checkClosed(); 1457 1458 if (topic == null) { 1459 throw new InvalidDestinationException("Topic cannot be null"); 1460 } 1461 1462 if (topic instanceof CustomDestination) { 1463 CustomDestination customDestination = (CustomDestination)topic; 1464 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1465 } 1466 1467 connection.checkClientIDWasManuallySpecified(); 1468 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1469 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1470 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1471 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1472 noLocal, false, asyncDispatch); 1473 } 1474 1475 /** 1476 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1477 * the specified queue. 1478 * 1479 * @param queue the <CODE>queue</CODE> to access 1480 * @return the Queue Browser 1481 * @throws JMSException if the session fails to create a browser due to some 1482 * internal error. 1483 * @throws InvalidDestinationException if an invalid destination is 1484 * specified 1485 * @since 1.1 1486 */ 1487 @Override 1488 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1489 checkClosed(); 1490 return createBrowser(queue, null); 1491 } 1492 1493 /** 1494 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1495 * the specified queue using a message selector. 1496 * 1497 * @param queue the <CODE>queue</CODE> to access 1498 * @param messageSelector only messages with properties matching the message 1499 * selector expression are delivered. A value of null or an 1500 * empty string indicates that there is no message selector 1501 * for the message consumer. 1502 * @return the Queue Browser 1503 * @throws JMSException if the session fails to create a browser due to some 1504 * internal error. 1505 * @throws InvalidDestinationException if an invalid destination is 1506 * specified 1507 * @throws InvalidSelectorException if the message selector is invalid. 1508 * @since 1.1 1509 */ 1510 @Override 1511 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1512 checkClosed(); 1513 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1514 } 1515 1516 /** 1517 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1518 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1519 * 1520 * @return a temporary queue identity 1521 * @throws JMSException if the session fails to create a temporary queue due 1522 * to some internal error. 1523 * @since 1.1 1524 */ 1525 @Override 1526 public TemporaryQueue createTemporaryQueue() throws JMSException { 1527 checkClosed(); 1528 return (TemporaryQueue)connection.createTempDestination(false); 1529 } 1530 1531 /** 1532 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1533 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1534 * 1535 * @return a temporary topic identity 1536 * @throws JMSException if the session fails to create a temporary topic due 1537 * to some internal error. 1538 * @since 1.1 1539 */ 1540 @Override 1541 public TemporaryTopic createTemporaryTopic() throws JMSException { 1542 checkClosed(); 1543 return (TemporaryTopic)connection.createTempDestination(true); 1544 } 1545 1546 /** 1547 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1548 * the specified queue. 1549 * 1550 * @param queue the <CODE>Queue</CODE> to access 1551 * @return a new QueueBrowser instance. 1552 * @throws JMSException if the session fails to create a receiver due to 1553 * some internal error. 1554 * @throws JMSException 1555 * @throws InvalidDestinationException if an invalid queue is specified. 1556 */ 1557 @Override 1558 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1559 checkClosed(); 1560 return createReceiver(queue, null); 1561 } 1562 1563 /** 1564 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1565 * the specified queue using a message selector. 1566 * 1567 * @param queue the <CODE>Queue</CODE> to access 1568 * @param messageSelector only messages with properties matching the message 1569 * selector expression are delivered. A value of null or an 1570 * empty string indicates that there is no message selector 1571 * for the message consumer. 1572 * @return QueueReceiver 1573 * @throws JMSException if the session fails to create a receiver due to 1574 * some internal error. 1575 * @throws InvalidDestinationException if an invalid queue is specified. 1576 * @throws InvalidSelectorException if the message selector is invalid. 1577 */ 1578 @Override 1579 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1580 checkClosed(); 1581 1582 if (queue instanceof CustomDestination) { 1583 CustomDestination customDestination = (CustomDestination)queue; 1584 return customDestination.createReceiver(this, messageSelector); 1585 } 1586 1587 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1588 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1589 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1590 } 1591 1592 /** 1593 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1594 * specified queue. 1595 * 1596 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1597 * unidentified producer 1598 * @return QueueSender 1599 * @throws JMSException if the session fails to create a sender due to some 1600 * internal error. 1601 * @throws InvalidDestinationException if an invalid queue is specified. 1602 */ 1603 @Override 1604 public QueueSender createSender(Queue queue) throws JMSException { 1605 checkClosed(); 1606 if (queue instanceof CustomDestination) { 1607 CustomDestination customDestination = (CustomDestination)queue; 1608 return customDestination.createSender(this); 1609 } 1610 int timeSendOut = connection.getSendTimeout(); 1611 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1612 } 1613 1614 /** 1615 * Creates a nondurable subscriber to the specified topic. <p/> 1616 * <P> 1617 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1618 * that have been published to a topic. <p/> 1619 * <P> 1620 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1621 * receive only messages that are published while they are active. <p/> 1622 * <P> 1623 * In some cases, a connection may both publish and subscribe to a topic. 1624 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1625 * inhibit the delivery of messages published by its own connection. The 1626 * default value for this attribute is false. 1627 * 1628 * @param topic the <CODE>Topic</CODE> to subscribe to 1629 * @return TopicSubscriber 1630 * @throws JMSException if the session fails to create a subscriber due to 1631 * some internal error. 1632 * @throws InvalidDestinationException if an invalid topic is specified. 1633 */ 1634 @Override 1635 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1636 checkClosed(); 1637 return createSubscriber(topic, null, false); 1638 } 1639 1640 /** 1641 * Creates a nondurable subscriber to the specified topic, using a message 1642 * selector or specifying whether messages published by its own connection 1643 * should be delivered to it. <p/> 1644 * <P> 1645 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1646 * that have been published to a topic. <p/> 1647 * <P> 1648 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1649 * receive only messages that are published while they are active. <p/> 1650 * <P> 1651 * Messages filtered out by a subscriber's message selector will never be 1652 * delivered to the subscriber. From the subscriber's perspective, they do 1653 * not exist. <p/> 1654 * <P> 1655 * In some cases, a connection may both publish and subscribe to a topic. 1656 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1657 * inhibit the delivery of messages published by its own connection. The 1658 * default value for this attribute is false. 1659 * 1660 * @param topic the <CODE>Topic</CODE> to subscribe to 1661 * @param messageSelector only messages with properties matching the message 1662 * selector expression are delivered. A value of null or an 1663 * empty string indicates that there is no message selector 1664 * for the message consumer. 1665 * @param noLocal if set, inhibits the delivery of messages published by its 1666 * own connection 1667 * @return TopicSubscriber 1668 * @throws JMSException if the session fails to create a subscriber due to 1669 * some internal error. 1670 * @throws InvalidDestinationException if an invalid topic is specified. 1671 * @throws InvalidSelectorException if the message selector is invalid. 1672 */ 1673 @Override 1674 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1675 checkClosed(); 1676 1677 if (topic instanceof CustomDestination) { 1678 CustomDestination customDestination = (CustomDestination)topic; 1679 return customDestination.createSubscriber(this, messageSelector, noLocal); 1680 } 1681 1682 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1683 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1684 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1685 } 1686 1687 /** 1688 * Creates a publisher for the specified topic. <p/> 1689 * <P> 1690 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1691 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1692 * a topic, it defines a new sequence of messages that have no ordering 1693 * relationship with the messages it has previously sent. 1694 * 1695 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1696 * an unidentified producer 1697 * @return TopicPublisher 1698 * @throws JMSException if the session fails to create a publisher due to 1699 * some internal error. 1700 * @throws InvalidDestinationException if an invalid topic is specified. 1701 */ 1702 @Override 1703 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1704 checkClosed(); 1705 1706 if (topic instanceof CustomDestination) { 1707 CustomDestination customDestination = (CustomDestination)topic; 1708 return customDestination.createPublisher(this); 1709 } 1710 int timeSendOut = connection.getSendTimeout(); 1711 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1712 } 1713 1714 /** 1715 * Unsubscribes a durable subscription that has been created by a client. 1716 * <P> 1717 * This method deletes the state being maintained on behalf of the 1718 * subscriber by its provider. 1719 * <P> 1720 * It is erroneous for a client to delete a durable subscription while there 1721 * is an active <CODE>MessageConsumer </CODE> or 1722 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1723 * message is part of a pending transaction or has not been acknowledged in 1724 * the session. 1725 * 1726 * @param name the name used to identify this subscription 1727 * @throws JMSException if the session fails to unsubscribe to the durable 1728 * subscription due to some internal error. 1729 * @throws InvalidDestinationException if an invalid subscription name is 1730 * specified. 1731 * @since 1.1 1732 */ 1733 @Override 1734 public void unsubscribe(String name) throws JMSException { 1735 checkClosed(); 1736 connection.unsubscribe(name); 1737 } 1738 1739 @Override 1740 public void dispatch(MessageDispatch messageDispatch) { 1741 try { 1742 executor.execute(messageDispatch); 1743 } catch (InterruptedException e) { 1744 Thread.currentThread().interrupt(); 1745 connection.onClientInternalException(e); 1746 } 1747 } 1748 1749 /** 1750 * Acknowledges all consumed messages of the session of this consumed 1751 * message. 1752 * <P> 1753 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1754 * for use when a client has specified that its JMS session's consumed 1755 * messages are to be explicitly acknowledged. By invoking 1756 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1757 * all messages consumed by the session that the message was delivered to. 1758 * <P> 1759 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1760 * sessions and sessions specified to use implicit acknowledgement modes. 1761 * <P> 1762 * A client may individually acknowledge each message as it is consumed, or 1763 * it may choose to acknowledge messages as an application-defined group 1764 * (which is done by calling acknowledge on the last received message of the 1765 * group, thereby acknowledging all messages consumed by the session.) 1766 * <P> 1767 * Messages that have been received but not acknowledged may be redelivered. 1768 * 1769 * @throws JMSException if the JMS provider fails to acknowledge the 1770 * messages due to some internal error. 1771 * @throws javax.jms.IllegalStateException if this method is called on a 1772 * closed session. 1773 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1774 */ 1775 public void acknowledge() throws JMSException { 1776 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1777 ActiveMQMessageConsumer c = iter.next(); 1778 c.acknowledge(); 1779 } 1780 } 1781 1782 /** 1783 * Add a message consumer. 1784 * 1785 * @param consumer - message consumer. 1786 * @throws JMSException 1787 */ 1788 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1789 this.consumers.add(consumer); 1790 if (consumer.isDurableSubscriber()) { 1791 stats.onCreateDurableSubscriber(); 1792 } 1793 this.connection.addDispatcher(consumer.getConsumerId(), this); 1794 } 1795 1796 /** 1797 * Remove the message consumer. 1798 * 1799 * @param consumer - consumer to be removed. 1800 * @throws JMSException 1801 */ 1802 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1803 this.connection.removeDispatcher(consumer.getConsumerId()); 1804 if (consumer.isDurableSubscriber()) { 1805 stats.onRemoveDurableSubscriber(); 1806 } 1807 this.consumers.remove(consumer); 1808 this.connection.removeDispatcher(consumer); 1809 } 1810 1811 /** 1812 * Adds a message producer. 1813 * 1814 * @param producer - message producer to be added. 1815 * @throws JMSException 1816 */ 1817 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1818 this.producers.add(producer); 1819 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1820 } 1821 1822 /** 1823 * Removes a message producer. 1824 * 1825 * @param producer - message producer to be removed. 1826 * @throws JMSException 1827 */ 1828 protected void removeProducer(ActiveMQMessageProducer producer) { 1829 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1830 this.producers.remove(producer); 1831 } 1832 1833 /** 1834 * Start this Session. 1835 * 1836 * @throws JMSException 1837 */ 1838 protected void start() throws JMSException { 1839 started.set(true); 1840 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1841 ActiveMQMessageConsumer c = iter.next(); 1842 c.start(); 1843 } 1844 executor.start(); 1845 } 1846 1847 /** 1848 * Stops this session. 1849 * 1850 * @throws JMSException 1851 */ 1852 protected void stop() throws JMSException { 1853 1854 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1855 ActiveMQMessageConsumer c = iter.next(); 1856 c.stop(); 1857 } 1858 1859 started.set(false); 1860 executor.stop(); 1861 } 1862 1863 /** 1864 * Returns the session id. 1865 * 1866 * @return value - session id. 1867 */ 1868 protected SessionId getSessionId() { 1869 return info.getSessionId(); 1870 } 1871 1872 /** 1873 * @return a unique ConsumerId instance. 1874 */ 1875 protected ConsumerId getNextConsumerId() { 1876 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1877 } 1878 1879 /** 1880 * @return a unique ProducerId instance. 1881 */ 1882 protected ProducerId getNextProducerId() { 1883 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1884 } 1885 1886 /** 1887 * Sends the message for dispatch by the broker. 1888 * 1889 * @param producer - message producer. 1890 * @param destination - message destination. 1891 * @param message - message to be sent. 1892 * @param deliveryMode - JMS message delivery mode. 1893 * @param priority - message priority. 1894 * @param timeToLive - message expiration. 1895 * @param producerWindow 1896 * @param onComplete 1897 * @throws JMSException 1898 */ 1899 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1900 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1901 1902 checkClosed(); 1903 if (destination.isTemporary() && connection.isDeleted(destination)) { 1904 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1905 } 1906 synchronized (sendMutex) { 1907 // tell the Broker we are about to start a new transaction 1908 doStartTransaction(); 1909 TransactionId txid = transactionContext.getTransactionId(); 1910 long sequenceNumber = producer.getMessageSequence(); 1911 1912 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1913 message.setJMSDeliveryMode(deliveryMode); 1914 long expiration = 0L; 1915 if (!producer.getDisableMessageTimestamp()) { 1916 long timeStamp = System.currentTimeMillis(); 1917 message.setJMSTimestamp(timeStamp); 1918 if (timeToLive > 0) { 1919 expiration = timeToLive + timeStamp; 1920 } 1921 } 1922 message.setJMSExpiration(expiration); 1923 message.setJMSPriority(priority); 1924 message.setJMSRedelivered(false); 1925 1926 // transform to our own message format here 1927 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1928 msg.setDestination(destination); 1929 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1930 1931 // Set the message id. 1932 if (msg != message) { 1933 message.setJMSMessageID(msg.getMessageId().toString()); 1934 // Make sure the JMS destination is set on the foreign messages too. 1935 message.setJMSDestination(destination); 1936 } 1937 //clear the brokerPath in case we are re-sending this message 1938 msg.setBrokerPath(null); 1939 1940 msg.setTransactionId(txid); 1941 if (connection.isCopyMessageOnSend()) { 1942 msg = (ActiveMQMessage)msg.copy(); 1943 } 1944 msg.setConnection(connection); 1945 msg.onSend(); 1946 msg.setProducerId(msg.getMessageId().getProducerId()); 1947 if (LOG.isTraceEnabled()) { 1948 LOG.trace(getSessionId() + " sending message: " + msg); 1949 } 1950 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1951 this.connection.asyncSendPacket(msg); 1952 if (producerWindow != null) { 1953 // Since we defer lots of the marshaling till we hit the 1954 // wire, this might not 1955 // provide and accurate size. We may change over to doing 1956 // more aggressive marshaling, 1957 // to get more accurate sizes.. this is more important once 1958 // users start using producer window 1959 // flow control. 1960 int size = msg.getSize(); 1961 producerWindow.increaseUsage(size); 1962 } 1963 } else { 1964 if (sendTimeout > 0 && onComplete==null) { 1965 this.connection.syncSendPacket(msg,sendTimeout); 1966 }else { 1967 this.connection.syncSendPacket(msg, onComplete); 1968 } 1969 } 1970 1971 } 1972 } 1973 1974 /** 1975 * Send TransactionInfo to indicate transaction has started 1976 * 1977 * @throws JMSException if some internal error occurs 1978 */ 1979 protected void doStartTransaction() throws JMSException { 1980 if (getTransacted() && !transactionContext.isInXATransaction()) { 1981 transactionContext.begin(); 1982 } 1983 } 1984 1985 /** 1986 * Checks whether the session has unconsumed messages. 1987 * 1988 * @return true - if there are unconsumed messages. 1989 */ 1990 public boolean hasUncomsumedMessages() { 1991 return executor.hasUncomsumedMessages(); 1992 } 1993 1994 /** 1995 * Checks whether the session uses transactions. 1996 * 1997 * @return true - if the session uses transactions. 1998 */ 1999 public boolean isTransacted() { 2000 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 2001 } 2002 2003 /** 2004 * Checks whether the session used client acknowledgment. 2005 * 2006 * @return true - if the session uses client acknowledgment. 2007 */ 2008 protected boolean isClientAcknowledge() { 2009 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 2010 } 2011 2012 /** 2013 * Checks whether the session used auto acknowledgment. 2014 * 2015 * @return true - if the session uses client acknowledgment. 2016 */ 2017 public boolean isAutoAcknowledge() { 2018 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 2019 } 2020 2021 /** 2022 * Checks whether the session used dup ok acknowledgment. 2023 * 2024 * @return true - if the session uses client acknowledgment. 2025 */ 2026 public boolean isDupsOkAcknowledge() { 2027 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 2028 } 2029 2030 public boolean isIndividualAcknowledge(){ 2031 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 2032 } 2033 2034 /** 2035 * Returns the message delivery listener. 2036 * 2037 * @return deliveryListener - message delivery listener. 2038 */ 2039 public DeliveryListener getDeliveryListener() { 2040 return deliveryListener; 2041 } 2042 2043 /** 2044 * Sets the message delivery listener. 2045 * 2046 * @param deliveryListener - message delivery listener. 2047 */ 2048 public void setDeliveryListener(DeliveryListener deliveryListener) { 2049 this.deliveryListener = deliveryListener; 2050 } 2051 2052 /** 2053 * Returns the SessionInfo bean. 2054 * 2055 * @return info - SessionInfo bean. 2056 * @throws JMSException 2057 */ 2058 protected SessionInfo getSessionInfo() throws JMSException { 2059 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2060 return info; 2061 } 2062 2063 /** 2064 * Send the asynchronous command. 2065 * 2066 * @param command - command to be executed. 2067 * @throws JMSException 2068 */ 2069 public void asyncSendPacket(Command command) throws JMSException { 2070 connection.asyncSendPacket(command); 2071 } 2072 2073 /** 2074 * Send the synchronous command. 2075 * 2076 * @param command - command to be executed. 2077 * @return Response 2078 * @throws JMSException 2079 */ 2080 public Response syncSendPacket(Command command) throws JMSException { 2081 return connection.syncSendPacket(command); 2082 } 2083 2084 public long getNextDeliveryId() { 2085 return deliveryIdGenerator.getNextSequenceId(); 2086 } 2087 2088 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2089 2090 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2091 for (MessageDispatch md : c) { 2092 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2093 } 2094 Collections.reverse(c); 2095 2096 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2097 MessageDispatch md = iter.next(); 2098 executor.executeFirst(md); 2099 } 2100 2101 } 2102 2103 public boolean isRunning() { 2104 return started.get(); 2105 } 2106 2107 public boolean isAsyncDispatch() { 2108 return asyncDispatch; 2109 } 2110 2111 public void setAsyncDispatch(boolean asyncDispatch) { 2112 this.asyncDispatch = asyncDispatch; 2113 } 2114 2115 /** 2116 * @return Returns the sessionAsyncDispatch. 2117 */ 2118 public boolean isSessionAsyncDispatch() { 2119 return sessionAsyncDispatch; 2120 } 2121 2122 /** 2123 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2124 */ 2125 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2126 this.sessionAsyncDispatch = sessionAsyncDispatch; 2127 } 2128 2129 public MessageTransformer getTransformer() { 2130 return transformer; 2131 } 2132 2133 public ActiveMQConnection getConnection() { 2134 return connection; 2135 } 2136 2137 /** 2138 * Sets the transformer used to transform messages before they are sent on 2139 * to the JMS bus or when they are received from the bus but before they are 2140 * delivered to the JMS client 2141 */ 2142 public void setTransformer(MessageTransformer transformer) { 2143 this.transformer = transformer; 2144 } 2145 2146 public BlobTransferPolicy getBlobTransferPolicy() { 2147 return blobTransferPolicy; 2148 } 2149 2150 /** 2151 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2152 * OBjects) are transferred from producers to brokers to consumers 2153 */ 2154 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2155 this.blobTransferPolicy = blobTransferPolicy; 2156 } 2157 2158 public List<MessageDispatch> getUnconsumedMessages() { 2159 return executor.getUnconsumedMessages(); 2160 } 2161 2162 @Override 2163 public String toString() { 2164 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex; 2165 } 2166 2167 public void checkMessageListener() throws JMSException { 2168 if (messageListener != null) { 2169 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2170 } 2171 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2172 ActiveMQMessageConsumer consumer = i.next(); 2173 if (consumer.hasMessageListener()) { 2174 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2175 } 2176 } 2177 } 2178 2179 protected void setOptimizeAcknowledge(boolean value) { 2180 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2181 ActiveMQMessageConsumer c = iter.next(); 2182 c.setOptimizeAcknowledge(value); 2183 } 2184 } 2185 2186 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2187 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2188 ActiveMQMessageConsumer c = iter.next(); 2189 if (c.getConsumerId().equals(id)) { 2190 c.setPrefetchSize(prefetch); 2191 break; 2192 } 2193 } 2194 } 2195 2196 protected void close(ConsumerId id) { 2197 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2198 ActiveMQMessageConsumer c = iter.next(); 2199 if (c.getConsumerId().equals(id)) { 2200 try { 2201 c.close(); 2202 } catch (JMSException e) { 2203 LOG.warn("Exception closing consumer", e); 2204 } 2205 LOG.warn("Closed consumer on Command, " + id); 2206 break; 2207 } 2208 } 2209 } 2210 2211 public boolean isInUse(ActiveMQTempDestination destination) { 2212 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2213 ActiveMQMessageConsumer c = iter.next(); 2214 if (c.isInUse(destination)) { 2215 return true; 2216 } 2217 } 2218 return false; 2219 } 2220 2221 /** 2222 * highest sequence id of the last message delivered by this session. 2223 * Passed to the broker in the close command, maintained by dispose() 2224 * @return lastDeliveredSequenceId 2225 */ 2226 public long getLastDeliveredSequenceId() { 2227 return lastDeliveredSequenceId; 2228 } 2229 2230 protected void sendAck(MessageAck ack) throws JMSException { 2231 sendAck(ack,false); 2232 } 2233 2234 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2235 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2236 asyncSendPacket(ack); 2237 } else { 2238 syncSendPacket(ack); 2239 } 2240 } 2241 2242 protected Scheduler getScheduler() throws JMSException { 2243 return this.connection.getScheduler(); 2244 } 2245 2246 protected ThreadPoolExecutor getConnectionExecutor() { 2247 return this.connectionExecutor; 2248 } 2249}