001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.IndirectMessageReference; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.QueueMessageReference; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 032import org.apache.activemq.openwire.OpenWireFormat; 033import org.apache.activemq.store.PList; 034import org.apache.activemq.store.PListStore; 035import org.apache.activemq.store.PListEntry; 036import org.apache.activemq.usage.SystemUsage; 037import org.apache.activemq.usage.Usage; 038import org.apache.activemq.usage.UsageListener; 039import org.apache.activemq.wireformat.WireFormat; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042import org.apache.activemq.util.ByteSequence; 043 044/** 045 * persist pending messages pending message (messages awaiting dispatch to a 046 * consumer) cursor 047 * 048 * 049 */ 050public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { 051 static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class); 052 private static final AtomicLong NAME_COUNT = new AtomicLong(); 053 protected Broker broker; 054 private final PListStore store; 055 private final String name; 056 private PendingList memoryList; 057 private PList diskList; 058 private Iterator<MessageReference> iter; 059 private Destination regionDestination; 060 private boolean iterating; 061 private boolean flushRequired; 062 private final AtomicBoolean started = new AtomicBoolean(); 063 private final WireFormat wireFormat = new OpenWireFormat(); 064 /** 065 * @param broker 066 * @param name 067 * @param prioritizedMessages 068 */ 069 public FilePendingMessageCursor(Broker broker, String name, boolean prioritizedMessages) { 070 super(prioritizedMessages); 071 if (this.prioritizedMessages) { 072 this.memoryList = new PrioritizedPendingList(); 073 } else { 074 this.memoryList = new OrderedPendingList(); 075 } 076 this.broker = broker; 077 // the store can be null if the BrokerService has persistence 078 // turned off 079 this.store = broker.getTempDataStore(); 080 this.name = NAME_COUNT.incrementAndGet() + "_" + name; 081 } 082 083 @Override 084 public void start() throws Exception { 085 if (started.compareAndSet(false, true)) { 086 if( this.broker != null) { 087 wireFormat.setVersion(this.broker.getBrokerService().getStoreOpenWireVersion()); 088 } 089 super.start(); 090 if (systemUsage != null) { 091 systemUsage.getMemoryUsage().addUsageListener(this); 092 } 093 } 094 } 095 096 @Override 097 public void stop() throws Exception { 098 if (started.compareAndSet(true, false)) { 099 super.stop(); 100 if (systemUsage != null) { 101 systemUsage.getMemoryUsage().removeUsageListener(this); 102 } 103 } 104 } 105 106 /** 107 * @return true if there are no pending messages 108 */ 109 @Override 110 public synchronized boolean isEmpty() { 111 if (memoryList.isEmpty() && isDiskListEmpty()) { 112 return true; 113 } 114 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 115 MessageReference node = iterator.next(); 116 if (node == QueueMessageReference.NULL_MESSAGE) { 117 continue; 118 } 119 if (!node.isDropped()) { 120 return false; 121 } 122 // We can remove dropped references. 123 iterator.remove(); 124 } 125 return isDiskListEmpty(); 126 } 127 128 /** 129 * reset the cursor 130 */ 131 @Override 132 public synchronized void reset() { 133 iterating = true; 134 last = null; 135 if (isDiskListEmpty()) { 136 this.iter = this.memoryList.iterator(); 137 } else { 138 this.iter = new DiskIterator(); 139 } 140 } 141 142 @Override 143 public synchronized void release() { 144 iterating = false; 145 if (iter instanceof DiskIterator) { 146 ((DiskIterator)iter).release(); 147 }; 148 if (flushRequired) { 149 flushRequired = false; 150 if (!hasSpace()) { 151 flushToDisk(); 152 } 153 } 154 // ensure any memory ref is released 155 iter = null; 156 } 157 158 @Override 159 public synchronized void destroy() throws Exception { 160 stop(); 161 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { 162 MessageReference node = i.next(); 163 node.decrementReferenceCount(); 164 } 165 memoryList.clear(); 166 destroyDiskList(); 167 } 168 169 private void destroyDiskList() throws Exception { 170 if (diskList != null) { 171 store.removePList(name); 172 diskList = null; 173 } 174 } 175 176 @Override 177 public synchronized LinkedList<MessageReference> pageInList(int maxItems) { 178 LinkedList<MessageReference> result = new LinkedList<MessageReference>(); 179 int count = 0; 180 for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { 181 MessageReference ref = i.next(); 182 ref.incrementReferenceCount(); 183 result.add(ref); 184 count++; 185 } 186 if (count < maxItems && !isDiskListEmpty()) { 187 for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) { 188 Message message = (Message) i.next(); 189 message.setRegionDestination(regionDestination); 190 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 191 message.incrementReferenceCount(); 192 result.add(message); 193 count++; 194 } 195 } 196 return result; 197 } 198 199 /** 200 * add message to await dispatch 201 * 202 * @param node 203 * @throws Exception 204 */ 205 @Override 206 public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 207 if (!node.isExpired()) { 208 try { 209 regionDestination = (Destination) node.getMessage().getRegionDestination(); 210 if (isDiskListEmpty()) { 211 if (hasSpace() || this.store == null) { 212 memoryList.addMessageLast(node); 213 node.incrementReferenceCount(); 214 setCacheEnabled(true); 215 return true; 216 } 217 } 218 if (!hasSpace()) { 219 if (isDiskListEmpty()) { 220 expireOldMessages(); 221 if (hasSpace()) { 222 memoryList.addMessageLast(node); 223 node.incrementReferenceCount(); 224 return true; 225 } else { 226 flushToDisk(); 227 } 228 } 229 } 230 if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) { 231 ByteSequence bs = getByteSequence(node.getMessage()); 232 getDiskList().addLast(node.getMessageId().toString(), bs); 233 return true; 234 } 235 return false; 236 237 } catch (Exception e) { 238 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 239 throw new RuntimeException(e); 240 } 241 } else { 242 discardExpiredMessage(node); 243 } 244 //message expired 245 return true; 246 } 247 248 /** 249 * add message to await dispatch 250 * 251 * @param node 252 */ 253 @Override 254 public synchronized void addMessageFirst(MessageReference node) { 255 if (!node.isExpired()) { 256 try { 257 regionDestination = (Destination) node.getMessage().getRegionDestination(); 258 if (isDiskListEmpty()) { 259 if (hasSpace()) { 260 memoryList.addMessageFirst(node); 261 node.incrementReferenceCount(); 262 setCacheEnabled(true); 263 return; 264 } 265 } 266 if (!hasSpace()) { 267 if (isDiskListEmpty()) { 268 expireOldMessages(); 269 if (hasSpace()) { 270 memoryList.addMessageFirst(node); 271 node.incrementReferenceCount(); 272 return; 273 } else { 274 flushToDisk(); 275 } 276 } 277 } 278 systemUsage.getTempUsage().waitForSpace(); 279 node.decrementReferenceCount(); 280 ByteSequence bs = getByteSequence(node.getMessage()); 281 Object locator = getDiskList().addFirst(node.getMessageId().toString(), bs); 282 node.getMessageId().setPlistLocator(locator); 283 284 } catch (Exception e) { 285 LOG.error("Caught an Exception adding a message: {} first to FilePendingMessageCursor ", node, e); 286 throw new RuntimeException(e); 287 } 288 } else { 289 discardExpiredMessage(node); 290 } 291 } 292 293 /** 294 * @return true if there pending messages to dispatch 295 */ 296 @Override 297 public synchronized boolean hasNext() { 298 return iter.hasNext(); 299 } 300 301 /** 302 * @return the next pending message 303 */ 304 @Override 305 public synchronized MessageReference next() { 306 MessageReference reference = iter.next(); 307 last = reference; 308 if (!isDiskListEmpty()) { 309 // got from disk 310 reference.getMessage().setRegionDestination(regionDestination); 311 reference.getMessage().setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 312 } 313 reference.incrementReferenceCount(); 314 return reference; 315 } 316 317 /** 318 * remove the message at the cursor position 319 */ 320 @Override 321 public synchronized void remove() { 322 iter.remove(); 323 if (last != null) { 324 last.decrementReferenceCount(); 325 } 326 } 327 328 /** 329 * @param node 330 * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) 331 */ 332 @Override 333 public synchronized void remove(MessageReference node) { 334 if (memoryList.remove(node) != null) { 335 node.decrementReferenceCount(); 336 } 337 if (!isDiskListEmpty()) { 338 try { 339 getDiskList().remove(node.getMessageId().getPlistLocator()); 340 } catch (IOException e) { 341 throw new RuntimeException(e); 342 } 343 } 344 } 345 346 /** 347 * @return the number of pending messages 348 */ 349 @Override 350 public synchronized int size() { 351 return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size()); 352 } 353 354 @Override 355 public synchronized long messageSize() { 356 return memoryList.messageSize() + (isDiskListEmpty() ? 0 : (int)getDiskList().messageSize()); 357 } 358 359 /** 360 * clear all pending messages 361 */ 362 @Override 363 public synchronized void clear() { 364 memoryList.clear(); 365 if (!isDiskListEmpty()) { 366 try { 367 getDiskList().destroy(); 368 } catch (IOException e) { 369 throw new RuntimeException(e); 370 } 371 } 372 last = null; 373 } 374 375 @Override 376 public synchronized boolean isFull() { 377 378 return super.isFull() || (!isDiskListEmpty() && systemUsage != null && systemUsage.getTempUsage().isFull()); 379 380 } 381 382 @Override 383 public boolean hasMessagesBufferedToDeliver() { 384 return !isEmpty(); 385 } 386 387 @Override 388 public void setSystemUsage(SystemUsage usageManager) { 389 super.setSystemUsage(usageManager); 390 } 391 392 @Override 393 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { 394 if (newPercentUsage >= getMemoryUsageHighWaterMark()) { 395 synchronized (this) { 396 if (!flushRequired && size() != 0) { 397 flushRequired =true; 398 if (!iterating) { 399 expireOldMessages(); 400 if (!hasSpace()) { 401 flushToDisk(); 402 flushRequired = false; 403 } 404 } 405 } 406 } 407 } 408 } 409 410 @Override 411 public boolean isTransient() { 412 return true; 413 } 414 415 protected synchronized void expireOldMessages() { 416 if (!memoryList.isEmpty()) { 417 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 418 MessageReference node = iterator.next(); 419 if (node.isExpired()) { 420 node.decrementReferenceCount(); 421 discardExpiredMessage(node); 422 iterator.remove(); 423 } 424 } 425 } 426 } 427 428 protected synchronized void flushToDisk() { 429 if (!memoryList.isEmpty() && store != null) { 430 long start = 0; 431 if (LOG.isTraceEnabled()) { 432 start = System.currentTimeMillis(); 433 LOG.trace("{}, flushToDisk() mem list size: {} {}", new Object[]{ name, memoryList.size(), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 434 } 435 for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { 436 MessageReference node = iterator.next(); 437 node.decrementReferenceCount(); 438 ByteSequence bs; 439 try { 440 bs = getByteSequence(node.getMessage()); 441 getDiskList().addLast(node.getMessageId().toString(), bs); 442 } catch (IOException e) { 443 LOG.error("Failed to write to disk list", e); 444 throw new RuntimeException(e); 445 } 446 447 } 448 memoryList.clear(); 449 setCacheEnabled(false); 450 LOG.trace("{}, flushToDisk() done - {} ms {}", new Object[]{ name, (System.currentTimeMillis() - start), (systemUsage != null ? systemUsage.getMemoryUsage() : "") }); 451 } 452 } 453 454 protected boolean isDiskListEmpty() { 455 return diskList == null || diskList.isEmpty(); 456 } 457 458 public PList getDiskList() { 459 if (diskList == null) { 460 try { 461 diskList = store.getPList(name); 462 } catch (Exception e) { 463 LOG.error("Caught an IO Exception getting the DiskList {}", name, e); 464 throw new RuntimeException(e); 465 } 466 } 467 return diskList; 468 } 469 470 private void discardExpiredMessage(MessageReference reference) { 471 LOG.debug("Discarding expired message {}", reference); 472 if (broker.isExpired(reference)) { 473 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); 474 context.setBroker(broker); 475 ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); 476 } 477 } 478 479 protected ByteSequence getByteSequence(Message message) throws IOException { 480 org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message); 481 return new ByteSequence(packet.data, packet.offset, packet.length); 482 } 483 484 protected Message getMessage(ByteSequence bs) throws IOException { 485 org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs 486 .getOffset(), bs.getLength()); 487 return (Message) this.wireFormat.unmarshal(packet); 488 489 } 490 491 final class DiskIterator implements Iterator<MessageReference> { 492 private final PList.PListIterator iterator; 493 DiskIterator() { 494 try { 495 iterator = getDiskList().iterator(); 496 } catch (Exception e) { 497 throw new RuntimeException(e); 498 } 499 } 500 501 @Override 502 public boolean hasNext() { 503 return iterator.hasNext(); 504 } 505 506 @Override 507 public MessageReference next() { 508 try { 509 PListEntry entry = iterator.next(); 510 Message message = getMessage(entry.getByteSequence()); 511 message.getMessageId().setPlistLocator(entry.getLocator()); 512 return message; 513 } catch (IOException e) { 514 LOG.error("I/O error", e); 515 throw new RuntimeException(e); 516 } 517 } 518 519 @Override 520 public void remove() { 521 iterator.remove(); 522 } 523 524 public void release() { 525 iterator.release(); 526 } 527 } 528}