001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.EOFException; 024import java.io.File; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InterruptedIOException; 028import java.io.ObjectInputStream; 029import java.io.ObjectOutputStream; 030import java.io.OutputStream; 031import java.util.ArrayList; 032import java.util.Arrays; 033import java.util.Collection; 034import java.util.Collections; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.Iterator; 039import java.util.LinkedHashMap; 040import java.util.LinkedHashSet; 041import java.util.LinkedList; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.SortedSet; 047import java.util.TreeMap; 048import java.util.TreeSet; 049import java.util.concurrent.ConcurrentHashMap; 050import java.util.concurrent.ConcurrentMap; 051import java.util.concurrent.atomic.AtomicBoolean; 052import java.util.concurrent.atomic.AtomicLong; 053import java.util.concurrent.locks.ReentrantReadWriteLock; 054 055import org.apache.activemq.ActiveMQMessageAuditNoSync; 056import org.apache.activemq.broker.BrokerService; 057import org.apache.activemq.broker.BrokerServiceAware; 058import org.apache.activemq.broker.region.Destination; 059import org.apache.activemq.broker.region.Queue; 060import org.apache.activemq.broker.region.Topic; 061import org.apache.activemq.command.MessageAck; 062import org.apache.activemq.command.TransactionId; 063import org.apache.activemq.openwire.OpenWireFormat; 064import org.apache.activemq.protobuf.Buffer; 065import org.apache.activemq.store.MessageStore; 066import org.apache.activemq.store.MessageStoreStatistics; 067import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 068import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 069import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 070import org.apache.activemq.store.kahadb.data.KahaDestination; 071import org.apache.activemq.store.kahadb.data.KahaEntryType; 072import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 073import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 074import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 075import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 076import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 077import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 078import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 079import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 080import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 081import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 082import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 083import org.apache.activemq.store.kahadb.disk.index.ListIndex; 084import org.apache.activemq.store.kahadb.disk.journal.DataFile; 085import org.apache.activemq.store.kahadb.disk.journal.Journal; 086import org.apache.activemq.store.kahadb.disk.journal.Location; 087import org.apache.activemq.store.kahadb.disk.page.Page; 088import org.apache.activemq.store.kahadb.disk.page.PageFile; 089import org.apache.activemq.store.kahadb.disk.page.Transaction; 090import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 091import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 092import org.apache.activemq.store.kahadb.disk.util.Marshaller; 093import org.apache.activemq.store.kahadb.disk.util.Sequence; 094import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 095import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 096import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 097import org.apache.activemq.util.ByteSequence; 098import org.apache.activemq.util.DataByteArrayInputStream; 099import org.apache.activemq.util.DataByteArrayOutputStream; 100import org.apache.activemq.util.IOHelper; 101import org.apache.activemq.util.ServiceStopper; 102import org.apache.activemq.util.ServiceSupport; 103import org.slf4j.Logger; 104import org.slf4j.LoggerFactory; 105 106public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 107 108 protected BrokerService brokerService; 109 110 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 111 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 112 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 113 protected static final Buffer UNMATCHED; 114 static { 115 UNMATCHED = new Buffer(new byte[]{}); 116 } 117 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 118 119 static final int CLOSED_STATE = 1; 120 static final int OPEN_STATE = 2; 121 static final long NOT_ACKED = -1; 122 123 static final int VERSION = 6; 124 125 protected class Metadata { 126 protected Page<Metadata> page; 127 protected int state; 128 protected BTreeIndex<String, StoredDestination> destinations; 129 protected Location lastUpdate; 130 protected Location firstInProgressTransactionLocation; 131 protected Location producerSequenceIdTrackerLocation = null; 132 protected Location ackMessageFileMapLocation = null; 133 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 134 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 135 protected int version = VERSION; 136 protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION; 137 138 public void read(DataInput is) throws IOException { 139 state = is.readInt(); 140 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 141 if (is.readBoolean()) { 142 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 143 } else { 144 lastUpdate = null; 145 } 146 if (is.readBoolean()) { 147 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 148 } else { 149 firstInProgressTransactionLocation = null; 150 } 151 try { 152 if (is.readBoolean()) { 153 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 154 } else { 155 producerSequenceIdTrackerLocation = null; 156 } 157 } catch (EOFException expectedOnUpgrade) { 158 } 159 try { 160 version = is.readInt(); 161 } catch (EOFException expectedOnUpgrade) { 162 version = 1; 163 } 164 if (version >= 5 && is.readBoolean()) { 165 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 166 } else { 167 ackMessageFileMapLocation = null; 168 } 169 try { 170 openwireVersion = is.readInt(); 171 } catch (EOFException expectedOnUpgrade) { 172 openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; 173 } 174 LOG.info("KahaDB is version " + version); 175 } 176 177 public void write(DataOutput os) throws IOException { 178 os.writeInt(state); 179 os.writeLong(destinations.getPageId()); 180 181 if (lastUpdate != null) { 182 os.writeBoolean(true); 183 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 184 } else { 185 os.writeBoolean(false); 186 } 187 188 if (firstInProgressTransactionLocation != null) { 189 os.writeBoolean(true); 190 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 191 } else { 192 os.writeBoolean(false); 193 } 194 195 if (producerSequenceIdTrackerLocation != null) { 196 os.writeBoolean(true); 197 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 198 } else { 199 os.writeBoolean(false); 200 } 201 os.writeInt(VERSION); 202 if (ackMessageFileMapLocation != null) { 203 os.writeBoolean(true); 204 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 205 } else { 206 os.writeBoolean(false); 207 } 208 os.writeInt(this.openwireVersion); 209 } 210 } 211 212 class MetadataMarshaller extends VariableMarshaller<Metadata> { 213 @Override 214 public Metadata readPayload(DataInput dataIn) throws IOException { 215 Metadata rc = createMetadata(); 216 rc.read(dataIn); 217 return rc; 218 } 219 220 @Override 221 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 222 object.write(dataOut); 223 } 224 } 225 226 protected PageFile pageFile; 227 protected Journal journal; 228 protected Metadata metadata = new Metadata(); 229 230 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 231 232 protected boolean failIfDatabaseIsLocked; 233 234 protected boolean deleteAllMessages; 235 protected File directory = DEFAULT_DIRECTORY; 236 protected File indexDirectory = null; 237 protected Thread checkpointThread; 238 protected boolean enableJournalDiskSyncs=true; 239 protected boolean archiveDataLogs; 240 protected File directoryArchive; 241 protected AtomicLong journalSize = new AtomicLong(0); 242 long checkpointInterval = 5*1000; 243 long cleanupInterval = 30*1000; 244 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 245 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 246 boolean enableIndexWriteAsync = false; 247 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 248 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 249 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 250 251 protected AtomicBoolean opened = new AtomicBoolean(); 252 private boolean ignoreMissingJournalfiles = false; 253 private int indexCacheSize = 10000; 254 private boolean checkForCorruptJournalFiles = false; 255 private boolean checksumJournalFiles = true; 256 protected boolean forceRecoverIndex = false; 257 private final Object checkpointThreadLock = new Object(); 258 private boolean archiveCorruptedIndex = false; 259 private boolean useIndexLFRUEviction = false; 260 private float indexLFUEvictionFactor = 0.2f; 261 private boolean enableIndexDiskSyncs = true; 262 private boolean enableIndexRecoveryFile = true; 263 private boolean enableIndexPageCaching = true; 264 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 265 266 @Override 267 public void doStart() throws Exception { 268 load(); 269 } 270 271 @Override 272 public void doStop(ServiceStopper stopper) throws Exception { 273 unload(); 274 } 275 276 private void loadPageFile() throws IOException { 277 this.indexLock.writeLock().lock(); 278 try { 279 final PageFile pageFile = getPageFile(); 280 pageFile.load(); 281 pageFile.tx().execute(new Transaction.Closure<IOException>() { 282 @Override 283 public void execute(Transaction tx) throws IOException { 284 if (pageFile.getPageCount() == 0) { 285 // First time this is created.. Initialize the metadata 286 Page<Metadata> page = tx.allocate(); 287 assert page.getPageId() == 0; 288 page.set(metadata); 289 metadata.page = page; 290 metadata.state = CLOSED_STATE; 291 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 292 293 tx.store(metadata.page, metadataMarshaller, true); 294 } else { 295 Page<Metadata> page = tx.load(0, metadataMarshaller); 296 metadata = page.get(); 297 metadata.page = page; 298 } 299 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 300 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 301 metadata.destinations.load(tx); 302 } 303 }); 304 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 305 // Perhaps we should just keep an index of file 306 storedDestinations.clear(); 307 pageFile.tx().execute(new Transaction.Closure<IOException>() { 308 @Override 309 public void execute(Transaction tx) throws IOException { 310 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 311 Entry<String, StoredDestination> entry = iterator.next(); 312 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 313 storedDestinations.put(entry.getKey(), sd); 314 315 if (checkForCorruptJournalFiles) { 316 // sanity check the index also 317 if (!entry.getValue().locationIndex.isEmpty(tx)) { 318 if (entry.getValue().orderIndex.nextMessageId <= 0) { 319 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 320 } 321 } 322 } 323 } 324 } 325 }); 326 pageFile.flush(); 327 } finally { 328 this.indexLock.writeLock().unlock(); 329 } 330 } 331 332 private void startCheckpoint() { 333 if (checkpointInterval == 0 && cleanupInterval == 0) { 334 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 335 return; 336 } 337 synchronized (checkpointThreadLock) { 338 boolean start = false; 339 if (checkpointThread == null) { 340 start = true; 341 } else if (!checkpointThread.isAlive()) { 342 start = true; 343 LOG.info("KahaDB: Recovering checkpoint thread after death"); 344 } 345 if (start) { 346 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 347 @Override 348 public void run() { 349 try { 350 long lastCleanup = System.currentTimeMillis(); 351 long lastCheckpoint = System.currentTimeMillis(); 352 // Sleep for a short time so we can periodically check 353 // to see if we need to exit this thread. 354 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 355 while (opened.get()) { 356 Thread.sleep(sleepTime); 357 long now = System.currentTimeMillis(); 358 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { 359 checkpointCleanup(true); 360 lastCleanup = now; 361 lastCheckpoint = now; 362 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { 363 checkpointCleanup(false); 364 lastCheckpoint = now; 365 } 366 } 367 } catch (InterruptedException e) { 368 // Looks like someone really wants us to exit this thread... 369 } catch (IOException ioe) { 370 LOG.error("Checkpoint failed", ioe); 371 brokerService.handleIOException(ioe); 372 } 373 } 374 }; 375 376 checkpointThread.setDaemon(true); 377 checkpointThread.start(); 378 } 379 } 380 } 381 382 public void open() throws IOException { 383 if( opened.compareAndSet(false, true) ) { 384 getJournal().start(); 385 try { 386 loadPageFile(); 387 } catch (Throwable t) { 388 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 389 if (LOG.isDebugEnabled()) { 390 LOG.debug("Index load failure", t); 391 } 392 // try to recover index 393 try { 394 pageFile.unload(); 395 } catch (Exception ignore) {} 396 if (archiveCorruptedIndex) { 397 pageFile.archive(); 398 } else { 399 pageFile.delete(); 400 } 401 metadata = createMetadata(); 402 //The metadata was recreated after a detect corruption so we need to 403 //reconfigure anything that was configured on the old metadata on startup 404 configureMetadata(); 405 pageFile = null; 406 loadPageFile(); 407 } 408 startCheckpoint(); 409 recover(); 410 } 411 } 412 413 public void load() throws IOException { 414 this.indexLock.writeLock().lock(); 415 IOHelper.mkdirs(directory); 416 try { 417 if (deleteAllMessages) { 418 getJournal().start(); 419 getJournal().delete(); 420 getJournal().close(); 421 journal = null; 422 getPageFile().delete(); 423 LOG.info("Persistence store purged."); 424 deleteAllMessages = false; 425 } 426 427 open(); 428 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 429 } finally { 430 this.indexLock.writeLock().unlock(); 431 } 432 } 433 434 public void close() throws IOException, InterruptedException { 435 if( opened.compareAndSet(true, false)) { 436 checkpointLock.writeLock().lock(); 437 try { 438 if (metadata.page != null) { 439 checkpointUpdate(true); 440 } 441 pageFile.unload(); 442 metadata = createMetadata(); 443 } finally { 444 checkpointLock.writeLock().unlock(); 445 } 446 journal.close(); 447 synchronized (checkpointThreadLock) { 448 if (checkpointThread != null) { 449 checkpointThread.join(); 450 } 451 } 452 //clear the cache and journalSize on shutdown of the store 453 storeCache.clear(); 454 journalSize.set(0); 455 } 456 } 457 458 public void unload() throws IOException, InterruptedException { 459 this.indexLock.writeLock().lock(); 460 try { 461 if( pageFile != null && pageFile.isLoaded() ) { 462 metadata.state = CLOSED_STATE; 463 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 464 465 if (metadata.page != null) { 466 pageFile.tx().execute(new Transaction.Closure<IOException>() { 467 @Override 468 public void execute(Transaction tx) throws IOException { 469 tx.store(metadata.page, metadataMarshaller, true); 470 } 471 }); 472 } 473 } 474 } finally { 475 this.indexLock.writeLock().unlock(); 476 } 477 close(); 478 } 479 480 // public for testing 481 @SuppressWarnings("rawtypes") 482 public Location[] getInProgressTxLocationRange() { 483 Location[] range = new Location[]{null, null}; 484 synchronized (inflightTransactions) { 485 if (!inflightTransactions.isEmpty()) { 486 for (List<Operation> ops : inflightTransactions.values()) { 487 if (!ops.isEmpty()) { 488 trackMaxAndMin(range, ops); 489 } 490 } 491 } 492 if (!preparedTransactions.isEmpty()) { 493 for (List<Operation> ops : preparedTransactions.values()) { 494 if (!ops.isEmpty()) { 495 trackMaxAndMin(range, ops); 496 } 497 } 498 } 499 } 500 return range; 501 } 502 503 @SuppressWarnings("rawtypes") 504 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 505 Location t = ops.get(0).getLocation(); 506 if (range[0]==null || t.compareTo(range[0]) <= 0) { 507 range[0] = t; 508 } 509 t = ops.get(ops.size() -1).getLocation(); 510 if (range[1]==null || t.compareTo(range[1]) >= 0) { 511 range[1] = t; 512 } 513 } 514 515 class TranInfo { 516 TransactionId id; 517 Location location; 518 519 class opCount { 520 int add; 521 int remove; 522 } 523 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 524 525 @SuppressWarnings("rawtypes") 526 public void track(Operation operation) { 527 if (location == null ) { 528 location = operation.getLocation(); 529 } 530 KahaDestination destination; 531 boolean isAdd = false; 532 if (operation instanceof AddOperation) { 533 AddOperation add = (AddOperation) operation; 534 destination = add.getCommand().getDestination(); 535 isAdd = true; 536 } else { 537 RemoveOperation removeOpperation = (RemoveOperation) operation; 538 destination = removeOpperation.getCommand().getDestination(); 539 } 540 opCount opCount = destinationOpCount.get(destination); 541 if (opCount == null) { 542 opCount = new opCount(); 543 destinationOpCount.put(destination, opCount); 544 } 545 if (isAdd) { 546 opCount.add++; 547 } else { 548 opCount.remove++; 549 } 550 } 551 552 @Override 553 public String toString() { 554 StringBuffer buffer = new StringBuffer(); 555 buffer.append(location).append(";").append(id).append(";\n"); 556 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 557 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 558 } 559 return buffer.toString(); 560 } 561 } 562 563 @SuppressWarnings("rawtypes") 564 public String getTransactions() { 565 566 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 567 synchronized (inflightTransactions) { 568 if (!inflightTransactions.isEmpty()) { 569 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 570 TranInfo info = new TranInfo(); 571 info.id = entry.getKey(); 572 for (Operation operation : entry.getValue()) { 573 info.track(operation); 574 } 575 infos.add(info); 576 } 577 } 578 } 579 synchronized (preparedTransactions) { 580 if (!preparedTransactions.isEmpty()) { 581 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 582 TranInfo info = new TranInfo(); 583 info.id = entry.getKey(); 584 for (Operation operation : entry.getValue()) { 585 info.track(operation); 586 } 587 infos.add(info); 588 } 589 } 590 } 591 return infos.toString(); 592 } 593 594 /** 595 * Move all the messages that were in the journal into long term storage. We 596 * just replay and do a checkpoint. 597 * 598 * @throws IOException 599 * @throws IOException 600 * @throws IllegalStateException 601 */ 602 private void recover() throws IllegalStateException, IOException { 603 this.indexLock.writeLock().lock(); 604 try { 605 606 long start = System.currentTimeMillis(); 607 Location producerAuditPosition = recoverProducerAudit(); 608 Location ackMessageFileLocation = recoverAckMessageFileMap(); 609 Location lastIndoubtPosition = getRecoveryPosition(); 610 611 Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); 612 recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); 613 614 if (recoveryPosition != null) { 615 int redoCounter = 0; 616 LOG.info("Recovering from the journal @" + recoveryPosition); 617 while (recoveryPosition != null) { 618 try { 619 JournalCommand<?> message = load(recoveryPosition); 620 metadata.lastUpdate = recoveryPosition; 621 process(message, recoveryPosition, lastIndoubtPosition); 622 redoCounter++; 623 } catch (IOException failedRecovery) { 624 if (isIgnoreMissingJournalfiles()) { 625 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 626 // track this dud location 627 journal.corruptRecoveryLocation(recoveryPosition); 628 } else { 629 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 630 } 631 } 632 recoveryPosition = journal.getNextLocation(recoveryPosition); 633 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 634 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 635 } 636 } 637 if (LOG.isInfoEnabled()) { 638 long end = System.currentTimeMillis(); 639 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 640 } 641 } 642 643 // We may have to undo some index updates. 644 pageFile.tx().execute(new Transaction.Closure<IOException>() { 645 @Override 646 public void execute(Transaction tx) throws IOException { 647 recoverIndex(tx); 648 } 649 }); 650 651 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 652 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 653 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 654 synchronized (inflightTransactions) { 655 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 656 TransactionId id = it.next(); 657 if (id.isLocalTransaction()) { 658 toRollback.add(id); 659 } else { 660 toDiscard.add(id); 661 } 662 } 663 for (TransactionId tx: toRollback) { 664 if (LOG.isDebugEnabled()) { 665 LOG.debug("rolling back recovered indoubt local transaction " + tx); 666 } 667 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 668 } 669 for (TransactionId tx: toDiscard) { 670 if (LOG.isDebugEnabled()) { 671 LOG.debug("discarding recovered in-flight XA transaction " + tx); 672 } 673 inflightTransactions.remove(tx); 674 } 675 } 676 677 synchronized (preparedTransactions) { 678 for (TransactionId txId : preparedTransactions.keySet()) { 679 LOG.warn("Recovered prepared XA TX: [{}]", txId); 680 } 681 } 682 683 } finally { 684 this.indexLock.writeLock().unlock(); 685 } 686 } 687 688 @SuppressWarnings("unused") 689 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 690 return TransactionIdConversion.convertToLocal(tx); 691 } 692 693 private Location minimum(Location producerAuditPosition, 694 Location lastIndoubtPosition) { 695 Location min = null; 696 if (producerAuditPosition != null) { 697 min = producerAuditPosition; 698 if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { 699 min = lastIndoubtPosition; 700 } 701 } else { 702 min = lastIndoubtPosition; 703 } 704 return min; 705 } 706 707 private Location recoverProducerAudit() throws IOException { 708 if (metadata.producerSequenceIdTrackerLocation != null) { 709 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 710 try { 711 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 712 int maxNumProducers = getMaxFailoverProducersToTrack(); 713 int maxAuditDepth = getFailoverProducersAuditDepth(); 714 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 715 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 716 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 717 return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation); 718 } catch (Exception e) { 719 LOG.warn("Cannot recover message audit", e); 720 return journal.getNextLocation(null); 721 } 722 } else { 723 // got no audit stored so got to recreate via replay from start of the journal 724 return journal.getNextLocation(null); 725 } 726 } 727 728 @SuppressWarnings("unchecked") 729 private Location recoverAckMessageFileMap() throws IOException { 730 if (metadata.ackMessageFileMapLocation != null) { 731 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 732 try { 733 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 734 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 735 return journal.getNextLocation(metadata.ackMessageFileMapLocation); 736 } catch (Exception e) { 737 LOG.warn("Cannot recover ackMessageFileMap", e); 738 return journal.getNextLocation(null); 739 } 740 } else { 741 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 742 return journal.getNextLocation(null); 743 } 744 } 745 746 protected void recoverIndex(Transaction tx) throws IOException { 747 long start = System.currentTimeMillis(); 748 // It is possible index updates got applied before the journal updates.. 749 // in that case we need to removed references to messages that are not in the journal 750 final Location lastAppendLocation = journal.getLastAppendLocation(); 751 long undoCounter=0; 752 753 // Go through all the destinations to see if they have messages past the lastAppendLocation 754 for (String key : storedDestinations.keySet()) { 755 StoredDestination sd = storedDestinations.get(key); 756 757 final ArrayList<Long> matches = new ArrayList<Long>(); 758 // Find all the Locations that are >= than the last Append Location. 759 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 760 @Override 761 protected void matched(Location key, Long value) { 762 matches.add(value); 763 } 764 }); 765 766 for (Long sequenceId : matches) { 767 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 768 if (keys != null) { 769 sd.locationIndex.remove(tx, keys.location); 770 sd.messageIdIndex.remove(tx, keys.messageId); 771 metadata.producerSequenceIdTracker.rollback(keys.messageId); 772 undoCounter++; 773 decrementAndSubSizeToStoreStat(key, keys.location.getSize()); 774 // TODO: do we need to modify the ack positions for the pub sub case? 775 } 776 } 777 } 778 779 if( undoCounter > 0 ) { 780 // The rolledback operations are basically in flight journal writes. To avoid getting 781 // these the end user should do sync writes to the journal. 782 if (LOG.isInfoEnabled()) { 783 long end = System.currentTimeMillis(); 784 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 785 } 786 } 787 788 undoCounter = 0; 789 start = System.currentTimeMillis(); 790 791 // Lets be extra paranoid here and verify that all the datafiles being referenced 792 // by the indexes still exists. 793 794 final SequenceSet ss = new SequenceSet(); 795 for (StoredDestination sd : storedDestinations.values()) { 796 // Use a visitor to cut down the number of pages that we load 797 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 798 int last=-1; 799 800 @Override 801 public boolean isInterestedInKeysBetween(Location first, Location second) { 802 if( first==null ) { 803 return !ss.contains(0, second.getDataFileId()); 804 } else if( second==null ) { 805 return true; 806 } else { 807 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 808 } 809 } 810 811 @Override 812 public void visit(List<Location> keys, List<Long> values) { 813 for (Location l : keys) { 814 int fileId = l.getDataFileId(); 815 if( last != fileId ) { 816 ss.add(fileId); 817 last = fileId; 818 } 819 } 820 } 821 822 }); 823 } 824 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 825 while (!ss.isEmpty()) { 826 missingJournalFiles.add((int) ss.removeFirst()); 827 } 828 829 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 830 missingJournalFiles.add(entry.getKey()); 831 for (Integer i : entry.getValue()) { 832 missingJournalFiles.add(i); 833 } 834 } 835 836 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 837 838 if (!missingJournalFiles.isEmpty()) { 839 LOG.warn("Some journal files are missing: " + missingJournalFiles); 840 } 841 842 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 843 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 844 for (Integer missing : missingJournalFiles) { 845 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 846 } 847 848 if (checkForCorruptJournalFiles) { 849 Collection<DataFile> dataFiles = journal.getFileMap().values(); 850 for (DataFile dataFile : dataFiles) { 851 int id = dataFile.getDataFileId(); 852 // eof to next file id 853 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 854 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 855 while (seq != null) { 856 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 857 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 858 missingPredicates.add(visitor); 859 knownCorruption.add(visitor); 860 seq = seq.getNext(); 861 } 862 } 863 } 864 865 if (!missingPredicates.isEmpty()) { 866 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 867 final StoredDestination sd = sdEntry.getValue(); 868 final ArrayList<Long> matches = new ArrayList<Long>(); 869 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 870 @Override 871 protected void matched(Location key, Long value) { 872 matches.add(value); 873 } 874 }); 875 876 // If some message references are affected by the missing data files... 877 if (!matches.isEmpty()) { 878 879 // We either 'gracefully' recover dropping the missing messages or 880 // we error out. 881 if( ignoreMissingJournalfiles ) { 882 // Update the index to remove the references to the missing data 883 for (Long sequenceId : matches) { 884 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 885 sd.locationIndex.remove(tx, keys.location); 886 sd.messageIdIndex.remove(tx, keys.messageId); 887 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 888 undoCounter++; 889 decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); 890 // TODO: do we need to modify the ack positions for the pub sub case? 891 } 892 } else { 893 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected."); 894 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 895 } 896 } 897 } 898 } 899 900 if (!ignoreMissingJournalfiles) { 901 if (!knownCorruption.isEmpty()) { 902 LOG.error("Detected corrupt journal files. " + knownCorruption); 903 throw new IOException("Detected corrupt journal files. " + knownCorruption); 904 } 905 906 if (!missingJournalFiles.isEmpty()) { 907 LOG.error("Detected missing journal files. " + missingJournalFiles); 908 throw new IOException("Detected missing journal files. " + missingJournalFiles); 909 } 910 } 911 912 if( undoCounter > 0 ) { 913 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 914 // should do sync writes to the journal. 915 if (LOG.isInfoEnabled()) { 916 long end = System.currentTimeMillis(); 917 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 918 } 919 } 920 } 921 922 private Location nextRecoveryPosition; 923 private Location lastRecoveryPosition; 924 925 public void incrementalRecover() throws IOException { 926 this.indexLock.writeLock().lock(); 927 try { 928 if( nextRecoveryPosition == null ) { 929 if( lastRecoveryPosition==null ) { 930 nextRecoveryPosition = getRecoveryPosition(); 931 } else { 932 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 933 } 934 } 935 while (nextRecoveryPosition != null) { 936 lastRecoveryPosition = nextRecoveryPosition; 937 metadata.lastUpdate = lastRecoveryPosition; 938 JournalCommand<?> message = load(lastRecoveryPosition); 939 process(message, lastRecoveryPosition, (IndexAware) null); 940 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 941 } 942 } finally { 943 this.indexLock.writeLock().unlock(); 944 } 945 } 946 947 public Location getLastUpdatePosition() throws IOException { 948 return metadata.lastUpdate; 949 } 950 951 private Location getRecoveryPosition() throws IOException { 952 953 if (!this.forceRecoverIndex) { 954 955 // If we need to recover the transactions.. 956 if (metadata.firstInProgressTransactionLocation != null) { 957 return metadata.firstInProgressTransactionLocation; 958 } 959 960 // Perhaps there were no transactions... 961 if( metadata.lastUpdate!=null) { 962 // Start replay at the record after the last one recorded in the index file. 963 return journal.getNextLocation(metadata.lastUpdate); 964 } 965 } 966 // This loads the first position. 967 return journal.getNextLocation(null); 968 } 969 970 protected void checkpointCleanup(final boolean cleanup) throws IOException { 971 long start; 972 this.indexLock.writeLock().lock(); 973 try { 974 start = System.currentTimeMillis(); 975 if( !opened.get() ) { 976 return; 977 } 978 } finally { 979 this.indexLock.writeLock().unlock(); 980 } 981 checkpointUpdate(cleanup); 982 long end = System.currentTimeMillis(); 983 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 984 if (LOG.isInfoEnabled()) { 985 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 986 } 987 } 988 } 989 990 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 991 int size = data.serializedSizeFramed(); 992 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 993 os.writeByte(data.type().getNumber()); 994 data.writeFramed(os); 995 return os.toByteSequence(); 996 } 997 998 // ///////////////////////////////////////////////////////////////// 999 // Methods call by the broker to update and query the store. 1000 // ///////////////////////////////////////////////////////////////// 1001 public Location store(JournalCommand<?> data) throws IOException { 1002 return store(data, false, null,null); 1003 } 1004 1005 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1006 return store(data, false, null, null, onJournalStoreComplete); 1007 } 1008 1009 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1010 return store(data, sync, before, after, null); 1011 } 1012 1013 /** 1014 * All updated are are funneled through this method. The updates are converted 1015 * to a JournalMessage which is logged to the journal and then the data from 1016 * the JournalMessage is used to update the index just like it would be done 1017 * during a recovery process. 1018 */ 1019 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1020 try { 1021 ByteSequence sequence = toByteSequence(data); 1022 1023 Location location; 1024 checkpointLock.readLock().lock(); 1025 try { 1026 1027 long start = System.currentTimeMillis(); 1028 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1029 long start2 = System.currentTimeMillis(); 1030 process(data, location, before); 1031 1032 long end = System.currentTimeMillis(); 1033 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1034 if (LOG.isInfoEnabled()) { 1035 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1036 } 1037 } 1038 1039 } finally{ 1040 checkpointLock.readLock().unlock(); 1041 } 1042 if (after != null) { 1043 after.run(); 1044 } 1045 1046 if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) { 1047 startCheckpoint(); 1048 } 1049 return location; 1050 } catch (IOException ioe) { 1051 LOG.error("KahaDB failed to store to Journal", ioe); 1052 brokerService.handleIOException(ioe); 1053 throw ioe; 1054 } 1055 } 1056 1057 /** 1058 * Loads a previously stored JournalMessage 1059 * 1060 * @param location 1061 * @return 1062 * @throws IOException 1063 */ 1064 public JournalCommand<?> load(Location location) throws IOException { 1065 long start = System.currentTimeMillis(); 1066 ByteSequence data = journal.read(location); 1067 long end = System.currentTimeMillis(); 1068 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1069 if (LOG.isInfoEnabled()) { 1070 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1071 } 1072 } 1073 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1074 byte readByte = is.readByte(); 1075 KahaEntryType type = KahaEntryType.valueOf(readByte); 1076 if( type == null ) { 1077 try { 1078 is.close(); 1079 } catch (IOException e) {} 1080 throw new IOException("Could not load journal record. Invalid location: "+location); 1081 } 1082 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1083 message.mergeFramed(is); 1084 return message; 1085 } 1086 1087 /** 1088 * do minimal recovery till we reach the last inDoubtLocation 1089 * @param data 1090 * @param location 1091 * @param inDoubtlocation 1092 * @throws IOException 1093 */ 1094 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1095 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1096 process(data, location, (IndexAware) null); 1097 } else { 1098 // just recover producer audit 1099 data.visit(new Visitor() { 1100 @Override 1101 public void visit(KahaAddMessageCommand command) throws IOException { 1102 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1103 } 1104 }); 1105 } 1106 } 1107 1108 // ///////////////////////////////////////////////////////////////// 1109 // Journaled record processing methods. Once the record is journaled, 1110 // these methods handle applying the index updates. These may be called 1111 // from the recovery method too so they need to be idempotent 1112 // ///////////////////////////////////////////////////////////////// 1113 1114 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1115 data.visit(new Visitor() { 1116 @Override 1117 public void visit(KahaAddMessageCommand command) throws IOException { 1118 process(command, location, onSequenceAssignedCallback); 1119 } 1120 1121 @Override 1122 public void visit(KahaRemoveMessageCommand command) throws IOException { 1123 process(command, location); 1124 } 1125 1126 @Override 1127 public void visit(KahaPrepareCommand command) throws IOException { 1128 process(command, location); 1129 } 1130 1131 @Override 1132 public void visit(KahaCommitCommand command) throws IOException { 1133 process(command, location, onSequenceAssignedCallback); 1134 } 1135 1136 @Override 1137 public void visit(KahaRollbackCommand command) throws IOException { 1138 process(command, location); 1139 } 1140 1141 @Override 1142 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1143 process(command, location); 1144 } 1145 1146 @Override 1147 public void visit(KahaSubscriptionCommand command) throws IOException { 1148 process(command, location); 1149 } 1150 1151 @Override 1152 public void visit(KahaProducerAuditCommand command) throws IOException { 1153 processLocation(location); 1154 } 1155 1156 @Override 1157 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1158 processLocation(location); 1159 } 1160 1161 @Override 1162 public void visit(KahaTraceCommand command) { 1163 processLocation(location); 1164 } 1165 1166 @Override 1167 public void visit(KahaUpdateMessageCommand command) throws IOException { 1168 process(command, location); 1169 } 1170 }); 1171 } 1172 1173 @SuppressWarnings("rawtypes") 1174 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1175 if (command.hasTransactionInfo()) { 1176 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1177 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1178 } else { 1179 this.indexLock.writeLock().lock(); 1180 try { 1181 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1182 @Override 1183 public void execute(Transaction tx) throws IOException { 1184 long assignedIndex = updateIndex(tx, command, location); 1185 if (runWithIndexLock != null) { 1186 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1187 } 1188 } 1189 }); 1190 1191 } finally { 1192 this.indexLock.writeLock().unlock(); 1193 } 1194 } 1195 } 1196 1197 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1198 this.indexLock.writeLock().lock(); 1199 try { 1200 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1201 @Override 1202 public void execute(Transaction tx) throws IOException { 1203 updateIndex(tx, command, location); 1204 } 1205 }); 1206 } finally { 1207 this.indexLock.writeLock().unlock(); 1208 } 1209 } 1210 1211 @SuppressWarnings("rawtypes") 1212 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1213 if (command.hasTransactionInfo()) { 1214 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1215 inflightTx.add(new RemoveOperation(command, location)); 1216 } else { 1217 this.indexLock.writeLock().lock(); 1218 try { 1219 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1220 @Override 1221 public void execute(Transaction tx) throws IOException { 1222 updateIndex(tx, command, location); 1223 } 1224 }); 1225 } finally { 1226 this.indexLock.writeLock().unlock(); 1227 } 1228 } 1229 } 1230 1231 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1232 this.indexLock.writeLock().lock(); 1233 try { 1234 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1235 @Override 1236 public void execute(Transaction tx) throws IOException { 1237 updateIndex(tx, command, location); 1238 } 1239 }); 1240 } finally { 1241 this.indexLock.writeLock().unlock(); 1242 } 1243 } 1244 1245 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1246 this.indexLock.writeLock().lock(); 1247 try { 1248 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1249 @Override 1250 public void execute(Transaction tx) throws IOException { 1251 updateIndex(tx, command, location); 1252 } 1253 }); 1254 } finally { 1255 this.indexLock.writeLock().unlock(); 1256 } 1257 } 1258 1259 protected void processLocation(final Location location) { 1260 this.indexLock.writeLock().lock(); 1261 try { 1262 metadata.lastUpdate = location; 1263 } finally { 1264 this.indexLock.writeLock().unlock(); 1265 } 1266 } 1267 1268 @SuppressWarnings("rawtypes") 1269 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1270 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1271 List<Operation> inflightTx; 1272 synchronized (inflightTransactions) { 1273 inflightTx = inflightTransactions.remove(key); 1274 if (inflightTx == null) { 1275 inflightTx = preparedTransactions.remove(key); 1276 } 1277 } 1278 if (inflightTx == null) { 1279 // only non persistent messages in this tx 1280 if (before != null) { 1281 before.sequenceAssignedWithIndexLocked(-1); 1282 } 1283 return; 1284 } 1285 1286 final List<Operation> messagingTx = inflightTx; 1287 indexLock.writeLock().lock(); 1288 try { 1289 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1290 @Override 1291 public void execute(Transaction tx) throws IOException { 1292 for (Operation op : messagingTx) { 1293 op.execute(tx); 1294 } 1295 } 1296 }); 1297 metadata.lastUpdate = location; 1298 } finally { 1299 indexLock.writeLock().unlock(); 1300 } 1301 } 1302 1303 @SuppressWarnings("rawtypes") 1304 protected void process(KahaPrepareCommand command, Location location) { 1305 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1306 synchronized (inflightTransactions) { 1307 List<Operation> tx = inflightTransactions.remove(key); 1308 if (tx != null) { 1309 preparedTransactions.put(key, tx); 1310 } 1311 } 1312 } 1313 1314 @SuppressWarnings("rawtypes") 1315 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1316 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1317 List<Operation> updates = null; 1318 synchronized (inflightTransactions) { 1319 updates = inflightTransactions.remove(key); 1320 if (updates == null) { 1321 updates = preparedTransactions.remove(key); 1322 } 1323 } 1324 } 1325 1326 // ///////////////////////////////////////////////////////////////// 1327 // These methods do the actual index updates. 1328 // ///////////////////////////////////////////////////////////////// 1329 1330 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1331 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1332 1333 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1334 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1335 1336 // Skip adding the message to the index if this is a topic and there are 1337 // no subscriptions. 1338 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1339 return -1; 1340 } 1341 1342 // Add the message. 1343 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1344 long id = sd.orderIndex.getNextMessageId(); 1345 Long previous = sd.locationIndex.put(tx, location, id); 1346 if (previous == null) { 1347 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1348 if (previous == null) { 1349 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1350 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1351 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1352 addAckLocationForNewMessage(tx, sd, id); 1353 } 1354 metadata.lastUpdate = location; 1355 } else { 1356 1357 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1358 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1359 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1360 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1361 } 1362 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1363 sd.locationIndex.remove(tx, location); 1364 id = -1; 1365 } 1366 } else { 1367 // restore the previous value.. Looks like this was a redo of a previously 1368 // added message. We don't want to assign it a new id as the other indexes would 1369 // be wrong.. 1370 sd.locationIndex.put(tx, location, previous); 1371 // ensure sequence is not broken 1372 sd.orderIndex.revertNextMessageId(); 1373 metadata.lastUpdate = location; 1374 } 1375 // record this id in any event, initial send or recovery 1376 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1377 1378 return id; 1379 } 1380 1381 void trackPendingAdd(KahaDestination destination, Long seq) { 1382 StoredDestination sd = storedDestinations.get(key(destination)); 1383 if (sd != null) { 1384 sd.trackPendingAdd(seq); 1385 } 1386 } 1387 1388 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1389 StoredDestination sd = storedDestinations.get(key(destination)); 1390 if (sd != null) { 1391 sd.trackPendingAddComplete(seq); 1392 } 1393 } 1394 1395 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1396 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1397 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1398 1399 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1400 if (id != null) { 1401 MessageKeys previousKeys = sd.orderIndex.put( 1402 tx, 1403 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1404 id, 1405 new MessageKeys(command.getMessageId(), location) 1406 ); 1407 sd.locationIndex.put(tx, location, id); 1408 incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); 1409 // on first update previous is original location, on recovery/replay it may be the updated location 1410 if(previousKeys != null && !previousKeys.location.equals(location)) { 1411 sd.locationIndex.remove(tx, previousKeys.location); 1412 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); 1413 } 1414 metadata.lastUpdate = location; 1415 } else { 1416 //Add the message if it can't be found 1417 this.updateIndex(tx, command, location); 1418 } 1419 } 1420 1421 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1422 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1423 if (!command.hasSubscriptionKey()) { 1424 1425 // In the queue case we just remove the message from the index.. 1426 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1427 if (sequenceId != null) { 1428 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1429 if (keys != null) { 1430 sd.locationIndex.remove(tx, keys.location); 1431 decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); 1432 recordAckMessageReferenceLocation(ackLocation, keys.location); 1433 metadata.lastUpdate = ackLocation; 1434 } else if (LOG.isDebugEnabled()) { 1435 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1436 } 1437 } else if (LOG.isDebugEnabled()) { 1438 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1439 } 1440 } else { 1441 // In the topic case we need remove the message once it's been acked 1442 // by all the subs 1443 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1444 1445 // Make sure it's a valid message id... 1446 if (sequence != null) { 1447 String subscriptionKey = command.getSubscriptionKey(); 1448 if (command.getAck() != UNMATCHED) { 1449 sd.orderIndex.get(tx, sequence); 1450 byte priority = sd.orderIndex.lastGetPriority(); 1451 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1452 } 1453 1454 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1455 if (keys != null) { 1456 recordAckMessageReferenceLocation(ackLocation, keys.location); 1457 } 1458 // The following method handles deleting un-referenced messages. 1459 removeAckLocation(command, tx, sd, subscriptionKey, sequence); 1460 metadata.lastUpdate = ackLocation; 1461 } else if (LOG.isDebugEnabled()) { 1462 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1463 } 1464 1465 } 1466 } 1467 1468 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1469 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1470 if (referenceFileIds == null) { 1471 referenceFileIds = new HashSet<Integer>(); 1472 referenceFileIds.add(messageLocation.getDataFileId()); 1473 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1474 } else { 1475 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1476 if (!referenceFileIds.contains(id)) { 1477 referenceFileIds.add(id); 1478 } 1479 } 1480 } 1481 1482 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1483 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1484 sd.orderIndex.remove(tx); 1485 1486 sd.locationIndex.clear(tx); 1487 sd.locationIndex.unload(tx); 1488 tx.free(sd.locationIndex.getPageId()); 1489 1490 sd.messageIdIndex.clear(tx); 1491 sd.messageIdIndex.unload(tx); 1492 tx.free(sd.messageIdIndex.getPageId()); 1493 1494 if (sd.subscriptions != null) { 1495 sd.subscriptions.clear(tx); 1496 sd.subscriptions.unload(tx); 1497 tx.free(sd.subscriptions.getPageId()); 1498 1499 sd.subscriptionAcks.clear(tx); 1500 sd.subscriptionAcks.unload(tx); 1501 tx.free(sd.subscriptionAcks.getPageId()); 1502 1503 sd.ackPositions.clear(tx); 1504 sd.ackPositions.unload(tx); 1505 tx.free(sd.ackPositions.getHeadPageId()); 1506 1507 sd.subLocations.clear(tx); 1508 sd.subLocations.unload(tx); 1509 tx.free(sd.subLocations.getHeadPageId()); 1510 } 1511 1512 String key = key(command.getDestination()); 1513 storedDestinations.remove(key); 1514 metadata.destinations.remove(tx, key); 1515 clearStoreStats(command.getDestination()); 1516 storeCache.remove(key(command.getDestination())); 1517 } 1518 1519 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1520 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1521 final String subscriptionKey = command.getSubscriptionKey(); 1522 1523 // If set then we are creating it.. otherwise we are destroying the sub 1524 if (command.hasSubscriptionInfo()) { 1525 Location existing = sd.subLocations.get(tx, subscriptionKey); 1526 if (existing != null && existing.compareTo(location) == 0) { 1527 // replay on recovery, ignore 1528 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1529 return; 1530 } 1531 1532 sd.subscriptions.put(tx, subscriptionKey, command); 1533 sd.subLocations.put(tx, subscriptionKey, location); 1534 long ackLocation=NOT_ACKED; 1535 if (!command.getRetroactive()) { 1536 ackLocation = sd.orderIndex.nextMessageId-1; 1537 } else { 1538 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1539 } 1540 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1541 sd.subscriptionCache.add(subscriptionKey); 1542 } else { 1543 // delete the sub... 1544 sd.subscriptions.remove(tx, subscriptionKey); 1545 sd.subLocations.remove(tx, subscriptionKey); 1546 sd.subscriptionAcks.remove(tx, subscriptionKey); 1547 sd.subscriptionCache.remove(subscriptionKey); 1548 removeAckLocationsForSub(command, tx, sd, subscriptionKey); 1549 1550 if (sd.subscriptions.isEmpty(tx)) { 1551 // remove the stored destination 1552 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1553 removeDestinationCommand.setDestination(command.getDestination()); 1554 updateIndex(tx, removeDestinationCommand, null); 1555 clearStoreStats(command.getDestination()); 1556 } 1557 } 1558 } 1559 1560 private void checkpointUpdate(final boolean cleanup) throws IOException { 1561 checkpointLock.writeLock().lock(); 1562 try { 1563 this.indexLock.writeLock().lock(); 1564 try { 1565 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1566 @Override 1567 public void execute(Transaction tx) throws IOException { 1568 checkpointUpdate(tx, cleanup); 1569 } 1570 }); 1571 } finally { 1572 this.indexLock.writeLock().unlock(); 1573 } 1574 1575 } finally { 1576 checkpointLock.writeLock().unlock(); 1577 } 1578 } 1579 1580 /** 1581 * @param tx 1582 * @throws IOException 1583 */ 1584 void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1585 LOG.debug("Checkpoint started."); 1586 1587 // reflect last update exclusive of current checkpoint 1588 Location lastUpdate = metadata.lastUpdate; 1589 1590 metadata.state = OPEN_STATE; 1591 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1592 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1593 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1594 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1595 tx.store(metadata.page, metadataMarshaller, true); 1596 pageFile.flush(); 1597 1598 if( cleanup ) { 1599 1600 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1601 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 1602 1603 if (LOG.isTraceEnabled()) { 1604 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1605 } 1606 1607 if (lastUpdate != null) { 1608 gcCandidateSet.remove(lastUpdate.getDataFileId()); 1609 } 1610 1611 // Don't GC files under replication 1612 if( journalFilesBeingReplicated!=null ) { 1613 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1614 } 1615 1616 if (metadata.producerSequenceIdTrackerLocation != null) { 1617 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1618 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1619 // rewrite so we don't prevent gc 1620 metadata.producerSequenceIdTracker.setModified(true); 1621 if (LOG.isTraceEnabled()) { 1622 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1623 } 1624 } 1625 gcCandidateSet.remove(dataFileId); 1626 if (LOG.isTraceEnabled()) { 1627 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); 1628 } 1629 } 1630 1631 if (metadata.ackMessageFileMapLocation != null) { 1632 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1633 gcCandidateSet.remove(dataFileId); 1634 if (LOG.isTraceEnabled()) { 1635 LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); 1636 } 1637 } 1638 1639 // Don't GC files referenced by in-progress tx 1640 if (inProgressTxRange[0] != null) { 1641 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1642 gcCandidateSet.remove(pendingTx); 1643 } 1644 } 1645 if (LOG.isTraceEnabled()) { 1646 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1647 } 1648 1649 // Go through all the destinations to see if any of them can remove GC candidates. 1650 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1651 if( gcCandidateSet.isEmpty() ) { 1652 break; 1653 } 1654 1655 // Use a visitor to cut down the number of pages that we load 1656 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1657 int last=-1; 1658 @Override 1659 public boolean isInterestedInKeysBetween(Location first, Location second) { 1660 if( first==null ) { 1661 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1662 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1663 subset.remove(second.getDataFileId()); 1664 } 1665 return !subset.isEmpty(); 1666 } else if( second==null ) { 1667 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1668 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1669 subset.remove(first.getDataFileId()); 1670 } 1671 return !subset.isEmpty(); 1672 } else { 1673 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1674 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1675 subset.remove(first.getDataFileId()); 1676 } 1677 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1678 subset.remove(second.getDataFileId()); 1679 } 1680 return !subset.isEmpty(); 1681 } 1682 } 1683 1684 @Override 1685 public void visit(List<Location> keys, List<Long> values) { 1686 for (Location l : keys) { 1687 int fileId = l.getDataFileId(); 1688 if( last != fileId ) { 1689 gcCandidateSet.remove(fileId); 1690 last = fileId; 1691 } 1692 } 1693 } 1694 }); 1695 1696 // Durable Subscription 1697 if (entry.getValue().subLocations != null) { 1698 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1699 while (iter.hasNext()) { 1700 Entry<String, Location> subscription = iter.next(); 1701 int dataFileId = subscription.getValue().getDataFileId(); 1702 1703 // Move subscription along if it has no outstanding messages that need ack'd 1704 // and its in the last log file in the journal. 1705 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1706 final StoredDestination destination = entry.getValue(); 1707 final String subscriptionKey = subscription.getKey(); 1708 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1709 1710 // When pending is size one that is the next message Id meaning there 1711 // are no pending messages currently. 1712 if (pendingAcks == null || pendingAcks.isEmpty() || 1713 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1714 1715 if (LOG.isTraceEnabled()) { 1716 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1717 } 1718 1719 final KahaSubscriptionCommand kahaSub = 1720 destination.subscriptions.get(tx, subscriptionKey); 1721 destination.subLocations.put( 1722 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1723 1724 // Skips the remove from candidates if we rewrote the subscription 1725 // in order to prevent duplicate subscription commands on recover. 1726 // If another subscription is on the same file and isn't rewritten 1727 // than it will remove the file from the set. 1728 continue; 1729 } 1730 } 1731 1732 gcCandidateSet.remove(dataFileId); 1733 } 1734 } 1735 1736 if (LOG.isTraceEnabled()) { 1737 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1738 } 1739 } 1740 1741 // check we are not deleting file with ack for in-use journal files 1742 if (LOG.isTraceEnabled()) { 1743 LOG.trace("gc candidates: " + gcCandidateSet); 1744 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1745 } 1746 boolean ackMessageFileMapMod = false; 1747 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1748 while (candidates.hasNext()) { 1749 Integer candidate = candidates.next(); 1750 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1751 if (referencedFileIds != null) { 1752 for (Integer referencedFileId : referencedFileIds) { 1753 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1754 // active file that is not targeted for deletion is referenced so don't delete 1755 candidates.remove(); 1756 break; 1757 } 1758 } 1759 if (gcCandidateSet.contains(candidate)) { 1760 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1761 } else { 1762 if (LOG.isTraceEnabled()) { 1763 LOG.trace("not removing data file: " + candidate 1764 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1765 } 1766 } 1767 } 1768 } 1769 1770 if (!gcCandidateSet.isEmpty()) { 1771 if (LOG.isDebugEnabled()) { 1772 LOG.debug("Cleanup removing the data files: " + gcCandidateSet); 1773 } 1774 journal.removeDataFiles(gcCandidateSet); 1775 for (Integer candidate : gcCandidateSet) { 1776 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1777 ackMessageFileMapMod |= ackFiles.remove(candidate); 1778 } 1779 } 1780 if (ackMessageFileMapMod) { 1781 checkpointUpdate(tx, false); 1782 } 1783 } 1784 } 1785 1786 LOG.debug("Checkpoint done."); 1787 } 1788 1789 final Runnable nullCompletionCallback = new Runnable() { 1790 @Override 1791 public void run() { 1792 } 1793 }; 1794 1795 private Location checkpointProducerAudit() throws IOException { 1796 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 1797 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1798 ObjectOutputStream oout = new ObjectOutputStream(baos); 1799 oout.writeObject(metadata.producerSequenceIdTracker); 1800 oout.flush(); 1801 oout.close(); 1802 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1803 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 1804 try { 1805 location.getLatch().await(); 1806 } catch (InterruptedException e) { 1807 throw new InterruptedIOException(e.toString()); 1808 } 1809 return location; 1810 } 1811 return metadata.producerSequenceIdTrackerLocation; 1812 } 1813 1814 private Location checkpointAckMessageFileMap() throws IOException { 1815 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 1816 ObjectOutputStream oout = new ObjectOutputStream(baos); 1817 oout.writeObject(metadata.ackMessageFileMap); 1818 oout.flush(); 1819 oout.close(); 1820 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 1821 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 1822 try { 1823 location.getLatch().await(); 1824 } catch (InterruptedException e) { 1825 throw new InterruptedIOException(e.toString()); 1826 } 1827 return location; 1828 } 1829 1830 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 1831 1832 ByteSequence sequence = toByteSequence(subscription); 1833 Location location = journal.write(sequence, nullCompletionCallback) ; 1834 1835 try { 1836 location.getLatch().await(); 1837 } catch (InterruptedException e) { 1838 throw new InterruptedIOException(e.toString()); 1839 } 1840 return location; 1841 } 1842 1843 public HashSet<Integer> getJournalFilesBeingReplicated() { 1844 return journalFilesBeingReplicated; 1845 } 1846 1847 // ///////////////////////////////////////////////////////////////// 1848 // StoredDestination related implementation methods. 1849 // ///////////////////////////////////////////////////////////////// 1850 1851 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 1852 1853 static class MessageKeys { 1854 final String messageId; 1855 final Location location; 1856 1857 public MessageKeys(String messageId, Location location) { 1858 this.messageId=messageId; 1859 this.location=location; 1860 } 1861 1862 @Override 1863 public String toString() { 1864 return "["+messageId+","+location+"]"; 1865 } 1866 } 1867 1868 protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 1869 final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); 1870 1871 @Override 1872 public MessageKeys readPayload(DataInput dataIn) throws IOException { 1873 return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); 1874 } 1875 1876 @Override 1877 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 1878 dataOut.writeUTF(object.messageId); 1879 locationSizeMarshaller.writePayload(object.location, dataOut); 1880 } 1881 } 1882 1883 class LastAck { 1884 long lastAckedSequence; 1885 byte priority; 1886 1887 public LastAck(LastAck source) { 1888 this.lastAckedSequence = source.lastAckedSequence; 1889 this.priority = source.priority; 1890 } 1891 1892 public LastAck() { 1893 this.priority = MessageOrderIndex.HI; 1894 } 1895 1896 public LastAck(long ackLocation) { 1897 this.lastAckedSequence = ackLocation; 1898 this.priority = MessageOrderIndex.LO; 1899 } 1900 1901 public LastAck(long ackLocation, byte priority) { 1902 this.lastAckedSequence = ackLocation; 1903 this.priority = priority; 1904 } 1905 1906 @Override 1907 public String toString() { 1908 return "[" + lastAckedSequence + ":" + priority + "]"; 1909 } 1910 } 1911 1912 protected class LastAckMarshaller implements Marshaller<LastAck> { 1913 1914 @Override 1915 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 1916 dataOut.writeLong(object.lastAckedSequence); 1917 dataOut.writeByte(object.priority); 1918 } 1919 1920 @Override 1921 public LastAck readPayload(DataInput dataIn) throws IOException { 1922 LastAck lastAcked = new LastAck(); 1923 lastAcked.lastAckedSequence = dataIn.readLong(); 1924 if (metadata.version >= 3) { 1925 lastAcked.priority = dataIn.readByte(); 1926 } 1927 return lastAcked; 1928 } 1929 1930 @Override 1931 public int getFixedSize() { 1932 return 9; 1933 } 1934 1935 @Override 1936 public LastAck deepCopy(LastAck source) { 1937 return new LastAck(source); 1938 } 1939 1940 @Override 1941 public boolean isDeepCopySupported() { 1942 return true; 1943 } 1944 } 1945 1946 1947 class StoredDestination { 1948 1949 MessageOrderIndex orderIndex = new MessageOrderIndex(); 1950 BTreeIndex<Location, Long> locationIndex; 1951 BTreeIndex<String, Long> messageIdIndex; 1952 1953 // These bits are only set for Topics 1954 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 1955 BTreeIndex<String, LastAck> subscriptionAcks; 1956 HashMap<String, MessageOrderCursor> subscriptionCursors; 1957 ListIndex<String, SequenceSet> ackPositions; 1958 ListIndex<String, Location> subLocations; 1959 1960 // Transient data used to track which Messages are no longer needed. 1961 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 1962 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 1963 1964 public void trackPendingAdd(Long seq) { 1965 orderIndex.trackPendingAdd(seq); 1966 } 1967 1968 public void trackPendingAddComplete(Long seq) { 1969 orderIndex.trackPendingAddComplete(seq); 1970 } 1971 1972 @Override 1973 public String toString() { 1974 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 1975 } 1976 } 1977 1978 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 1979 1980 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 1981 1982 @Override 1983 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 1984 final StoredDestination value = new StoredDestination(); 1985 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 1986 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 1987 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 1988 1989 if (dataIn.readBoolean()) { 1990 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 1991 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 1992 if (metadata.version >= 4) { 1993 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 1994 } else { 1995 // upgrade 1996 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1997 @Override 1998 public void execute(Transaction tx) throws IOException { 1999 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2000 2001 if (metadata.version >= 3) { 2002 // migrate 2003 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2004 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2005 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2006 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2007 oldAckPositions.load(tx); 2008 2009 2010 // Do the initial build of the data in memory before writing into the store 2011 // based Ack Positions List to avoid a lot of disk thrashing. 2012 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2013 while (iterator.hasNext()) { 2014 Entry<Long, HashSet<String>> entry = iterator.next(); 2015 2016 for(String subKey : entry.getValue()) { 2017 SequenceSet pendingAcks = temp.get(subKey); 2018 if (pendingAcks == null) { 2019 pendingAcks = new SequenceSet(); 2020 temp.put(subKey, pendingAcks); 2021 } 2022 2023 pendingAcks.add(entry.getKey()); 2024 } 2025 } 2026 } 2027 // Now move the pending messages to ack data into the store backed 2028 // structure. 2029 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2030 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2031 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2032 value.ackPositions.load(tx); 2033 for(String subscriptionKey : temp.keySet()) { 2034 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2035 } 2036 2037 } 2038 }); 2039 } 2040 2041 if (metadata.version >= 5) { 2042 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2043 } else { 2044 // upgrade 2045 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2046 @Override 2047 public void execute(Transaction tx) throws IOException { 2048 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2049 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2050 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2051 value.subLocations.load(tx); 2052 } 2053 }); 2054 } 2055 } 2056 if (metadata.version >= 2) { 2057 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2058 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2059 } else { 2060 // upgrade 2061 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2062 @Override 2063 public void execute(Transaction tx) throws IOException { 2064 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2065 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2066 value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2067 value.orderIndex.lowPriorityIndex.load(tx); 2068 2069 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2070 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2071 value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 2072 value.orderIndex.highPriorityIndex.load(tx); 2073 } 2074 }); 2075 } 2076 2077 return value; 2078 } 2079 2080 @Override 2081 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2082 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2083 dataOut.writeLong(value.locationIndex.getPageId()); 2084 dataOut.writeLong(value.messageIdIndex.getPageId()); 2085 if (value.subscriptions != null) { 2086 dataOut.writeBoolean(true); 2087 dataOut.writeLong(value.subscriptions.getPageId()); 2088 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2089 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2090 dataOut.writeLong(value.subLocations.getHeadPageId()); 2091 } else { 2092 dataOut.writeBoolean(false); 2093 } 2094 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2095 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2096 } 2097 } 2098 2099 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2100 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2101 2102 @Override 2103 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2104 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2105 rc.mergeFramed((InputStream)dataIn); 2106 return rc; 2107 } 2108 2109 @Override 2110 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2111 object.writeFramed((OutputStream)dataOut); 2112 } 2113 } 2114 2115 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2116 String key = key(destination); 2117 StoredDestination rc = storedDestinations.get(key); 2118 if (rc == null) { 2119 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2120 rc = loadStoredDestination(tx, key, topic); 2121 // Cache it. We may want to remove/unload destinations from the 2122 // cache that are not used for a while 2123 // to reduce memory usage. 2124 storedDestinations.put(key, rc); 2125 } 2126 return rc; 2127 } 2128 2129 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2130 String key = key(destination); 2131 StoredDestination rc = storedDestinations.get(key); 2132 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2133 rc = getStoredDestination(destination, tx); 2134 } 2135 return rc; 2136 } 2137 2138 /** 2139 * @param tx 2140 * @param key 2141 * @param topic 2142 * @return 2143 * @throws IOException 2144 */ 2145 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2146 // Try to load the existing indexes.. 2147 StoredDestination rc = metadata.destinations.get(tx, key); 2148 if (rc == null) { 2149 // Brand new destination.. allocate indexes for it. 2150 rc = new StoredDestination(); 2151 rc.orderIndex.allocate(tx); 2152 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2153 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2154 2155 if (topic) { 2156 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2157 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2158 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2159 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2160 } 2161 metadata.destinations.put(tx, key, rc); 2162 } 2163 2164 // Configure the marshalers and load. 2165 rc.orderIndex.load(tx); 2166 2167 // Figure out the next key using the last entry in the destination. 2168 rc.orderIndex.configureLast(tx); 2169 2170 rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller()); 2171 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2172 rc.locationIndex.load(tx); 2173 2174 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2175 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2176 rc.messageIdIndex.load(tx); 2177 2178 //go through an upgrade old index if older than version 6 2179 if (metadata.version < 6) { 2180 for (Iterator<Entry<Location, Long>> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) { 2181 Entry<Location, Long> entry = iterator.next(); 2182 // modify so it is upgraded 2183 rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); 2184 } 2185 //upgrade the order index 2186 for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { 2187 Entry<Long, MessageKeys> entry = iterator.next(); 2188 //call get so that the last priority is updated 2189 rc.orderIndex.get(tx, entry.getKey()); 2190 rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); 2191 } 2192 } 2193 2194 // If it was a topic... 2195 if (topic) { 2196 2197 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2198 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2199 rc.subscriptions.load(tx); 2200 2201 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2202 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2203 rc.subscriptionAcks.load(tx); 2204 2205 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2206 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2207 rc.ackPositions.load(tx); 2208 2209 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2210 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2211 rc.subLocations.load(tx); 2212 2213 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2214 2215 if (metadata.version < 3) { 2216 2217 // on upgrade need to fill ackLocation with available messages past last ack 2218 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2219 Entry<String, LastAck> entry = iterator.next(); 2220 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2221 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2222 Long sequence = orderIterator.next().getKey(); 2223 addAckLocation(tx, rc, sequence, entry.getKey()); 2224 } 2225 // modify so it is upgraded 2226 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2227 } 2228 } 2229 2230 // Configure the message references index 2231 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2232 while (subscriptions.hasNext()) { 2233 Entry<String, SequenceSet> subscription = subscriptions.next(); 2234 SequenceSet pendingAcks = subscription.getValue(); 2235 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2236 Long lastPendingAck = pendingAcks.getTail().getLast(); 2237 for (Long sequenceId : pendingAcks) { 2238 Long current = rc.messageReferences.get(sequenceId); 2239 if (current == null) { 2240 current = new Long(0); 2241 } 2242 2243 // We always add a trailing empty entry for the next position to start from 2244 // so we need to ensure we don't count that as a message reference on reload. 2245 if (!sequenceId.equals(lastPendingAck)) { 2246 current = current.longValue() + 1; 2247 } else { 2248 current = Long.valueOf(0L); 2249 } 2250 2251 rc.messageReferences.put(sequenceId, current); 2252 } 2253 } 2254 } 2255 2256 // Configure the subscription cache 2257 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2258 Entry<String, LastAck> entry = iterator.next(); 2259 rc.subscriptionCache.add(entry.getKey()); 2260 } 2261 2262 if (rc.orderIndex.nextMessageId == 0) { 2263 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2264 if (!rc.subscriptionAcks.isEmpty(tx)) { 2265 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2266 Entry<String, LastAck> entry = iterator.next(); 2267 rc.orderIndex.nextMessageId = 2268 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2269 } 2270 } 2271 } else { 2272 // update based on ackPositions for unmatched, last entry is always the next 2273 if (!rc.messageReferences.isEmpty()) { 2274 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2275 rc.orderIndex.nextMessageId = 2276 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2277 } 2278 } 2279 } 2280 2281 if (metadata.version < VERSION) { 2282 // store again after upgrade 2283 metadata.destinations.put(tx, key, rc); 2284 } 2285 return rc; 2286 } 2287 2288 /** 2289 * Clear the counter for the destination, if one exists. 2290 * 2291 * @param kahaDestination 2292 */ 2293 protected void clearStoreStats(KahaDestination kahaDestination) { 2294 MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination)); 2295 if (storeStats != null) { 2296 storeStats.reset(); 2297 } 2298 } 2299 2300 /** 2301 * Update MessageStoreStatistics 2302 * 2303 * @param kahaDestination 2304 * @param size 2305 */ 2306 protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { 2307 incrementAndAddSizeToStoreStat(key(kahaDestination), size); 2308 } 2309 2310 protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { 2311 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2312 if (storeStats != null) { 2313 storeStats.getMessageCount().increment(); 2314 if (size > 0) { 2315 storeStats.getMessageSize().addSize(size); 2316 } 2317 } 2318 } 2319 2320 protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { 2321 decrementAndSubSizeToStoreStat(key(kahaDestination), size); 2322 } 2323 2324 protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { 2325 MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); 2326 if (storeStats != null) { 2327 storeStats.getMessageCount().decrement(); 2328 if (size > 0) { 2329 storeStats.getMessageSize().addSize(-size); 2330 } 2331 } 2332 } 2333 2334 /** 2335 * This is a map to cache DestinationStatistics for a specific 2336 * KahaDestination key 2337 */ 2338 protected final ConcurrentMap<String, MessageStore> storeCache = 2339 new ConcurrentHashMap<String, MessageStore>(); 2340 2341 /** 2342 * Locate the storeMessageSize counter for this KahaDestination 2343 * @param kahaDestination 2344 * @return 2345 */ 2346 protected MessageStoreStatistics getStoreStats(String kahaDestKey) { 2347 MessageStoreStatistics storeStats = null; 2348 try { 2349 MessageStore messageStore = storeCache.get(kahaDestKey); 2350 if (messageStore != null) { 2351 storeStats = messageStore.getMessageStoreStatistics(); 2352 } 2353 } catch (Exception e1) { 2354 LOG.error("Getting size counter of destination failed", e1); 2355 } 2356 2357 return storeStats; 2358 } 2359 2360 /** 2361 * Determine whether this Destination matches the DestinationType 2362 * 2363 * @param destination 2364 * @param type 2365 * @return 2366 */ 2367 protected boolean matchType(Destination destination, 2368 KahaDestination.DestinationType type) { 2369 if (destination instanceof Topic 2370 && type.equals(KahaDestination.DestinationType.TOPIC)) { 2371 return true; 2372 } else if (destination instanceof Queue 2373 && type.equals(KahaDestination.DestinationType.QUEUE)) { 2374 return true; 2375 } 2376 return false; 2377 } 2378 2379 class LocationSizeMarshaller implements Marshaller<Location> { 2380 2381 public LocationSizeMarshaller() { 2382 2383 } 2384 2385 @Override 2386 public Location readPayload(DataInput dataIn) throws IOException { 2387 Location rc = new Location(); 2388 rc.setDataFileId(dataIn.readInt()); 2389 rc.setOffset(dataIn.readInt()); 2390 if (metadata.version >= 6) { 2391 rc.setSize(dataIn.readInt()); 2392 } 2393 return rc; 2394 } 2395 2396 @Override 2397 public void writePayload(Location object, DataOutput dataOut) 2398 throws IOException { 2399 dataOut.writeInt(object.getDataFileId()); 2400 dataOut.writeInt(object.getOffset()); 2401 dataOut.writeInt(object.getSize()); 2402 } 2403 2404 @Override 2405 public int getFixedSize() { 2406 return 12; 2407 } 2408 2409 @Override 2410 public Location deepCopy(Location source) { 2411 return new Location(source); 2412 } 2413 2414 @Override 2415 public boolean isDeepCopySupported() { 2416 return true; 2417 } 2418 } 2419 2420 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2421 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2422 if (sequences == null) { 2423 sequences = new SequenceSet(); 2424 sequences.add(messageSequence); 2425 sd.ackPositions.add(tx, subscriptionKey, sequences); 2426 } else { 2427 sequences.add(messageSequence); 2428 sd.ackPositions.put(tx, subscriptionKey, sequences); 2429 } 2430 2431 Long count = sd.messageReferences.get(messageSequence); 2432 if (count == null) { 2433 count = Long.valueOf(0L); 2434 } 2435 count = count.longValue() + 1; 2436 sd.messageReferences.put(messageSequence, count); 2437 } 2438 2439 // new sub is interested in potentially all existing messages 2440 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2441 SequenceSet allOutstanding = new SequenceSet(); 2442 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2443 while (iterator.hasNext()) { 2444 SequenceSet set = iterator.next().getValue(); 2445 for (Long entry : set) { 2446 allOutstanding.add(entry); 2447 } 2448 } 2449 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2450 2451 for (Long ackPosition : allOutstanding) { 2452 Long count = sd.messageReferences.get(ackPosition); 2453 2454 // There might not be a reference if the ackLocation was the last 2455 // one which is a placeholder for the next incoming message and 2456 // no value was added to the message references table. 2457 if (count != null) { 2458 count = count.longValue() + 1; 2459 sd.messageReferences.put(ackPosition, count); 2460 } 2461 } 2462 } 2463 2464 // on a new message add, all existing subs are interested in this message 2465 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2466 for(String subscriptionKey : sd.subscriptionCache) { 2467 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2468 if (sequences == null) { 2469 sequences = new SequenceSet(); 2470 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2471 sd.ackPositions.add(tx, subscriptionKey, sequences); 2472 } else { 2473 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2474 sd.ackPositions.put(tx, subscriptionKey, sequences); 2475 } 2476 2477 Long count = sd.messageReferences.get(messageSequence); 2478 if (count == null) { 2479 count = Long.valueOf(0L); 2480 } 2481 count = count.longValue() + 1; 2482 sd.messageReferences.put(messageSequence, count); 2483 sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L)); 2484 } 2485 } 2486 2487 private void removeAckLocationsForSub(KahaSubscriptionCommand command, 2488 Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2489 if (!sd.ackPositions.isEmpty(tx)) { 2490 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2491 if (sequences == null || sequences.isEmpty()) { 2492 return; 2493 } 2494 2495 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2496 2497 for(Long sequenceId : sequences) { 2498 Long references = sd.messageReferences.get(sequenceId); 2499 if (references != null) { 2500 references = references.longValue() - 1; 2501 2502 if (references.longValue() > 0) { 2503 sd.messageReferences.put(sequenceId, references); 2504 } else { 2505 sd.messageReferences.remove(sequenceId); 2506 unreferenced.add(sequenceId); 2507 } 2508 } 2509 } 2510 2511 for(Long sequenceId : unreferenced) { 2512 // Find all the entries that need to get deleted. 2513 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2514 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2515 2516 // Do the actual deletes. 2517 for (Entry<Long, MessageKeys> entry : deletes) { 2518 sd.locationIndex.remove(tx, entry.getValue().location); 2519 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2520 sd.orderIndex.remove(tx, entry.getKey()); 2521 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2522 } 2523 } 2524 } 2525 } 2526 2527 /** 2528 * @param tx 2529 * @param sd 2530 * @param subscriptionKey 2531 * @param messageSequence 2532 * @throws IOException 2533 */ 2534 private void removeAckLocation(KahaRemoveMessageCommand command, 2535 Transaction tx, StoredDestination sd, String subscriptionKey, 2536 Long messageSequence) throws IOException { 2537 // Remove the sub from the previous location set.. 2538 if (messageSequence != null) { 2539 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2540 if (range != null && !range.isEmpty()) { 2541 range.remove(messageSequence); 2542 if (!range.isEmpty()) { 2543 sd.ackPositions.put(tx, subscriptionKey, range); 2544 } else { 2545 sd.ackPositions.remove(tx, subscriptionKey); 2546 } 2547 2548 // Check if the message is reference by any other subscription. 2549 Long count = sd.messageReferences.get(messageSequence); 2550 if (count != null) { 2551 long references = count.longValue() - 1; 2552 if (references > 0) { 2553 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2554 return; 2555 } else { 2556 sd.messageReferences.remove(messageSequence); 2557 } 2558 } 2559 2560 // Find all the entries that need to get deleted. 2561 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2562 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2563 2564 // Do the actual deletes. 2565 for (Entry<Long, MessageKeys> entry : deletes) { 2566 sd.locationIndex.remove(tx, entry.getValue().location); 2567 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2568 sd.orderIndex.remove(tx, entry.getKey()); 2569 decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); 2570 } 2571 } 2572 } 2573 } 2574 2575 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2576 return sd.subscriptionAcks.get(tx, subscriptionKey); 2577 } 2578 2579 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2580 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2581 if (messageSequences != null) { 2582 long result = messageSequences.rangeSize(); 2583 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2584 return result > 0 ? result - 1 : 0; 2585 } 2586 2587 return 0; 2588 } 2589 2590 public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2591 //grab the messages attached to this subscription 2592 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2593 2594 long locationSize = 0; 2595 if (messageSequences != null) { 2596 Sequence head = messageSequences.getHead(); 2597 if (head != null) { 2598 //get an iterator over the order index starting at the first unacked message 2599 //and go over each message to add up the size 2600 Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, 2601 new MessageOrderCursor(head.getFirst())); 2602 2603 while (iterator.hasNext()) { 2604 Entry<Long, MessageKeys> entry = iterator.next(); 2605 locationSize += entry.getValue().location.getSize(); 2606 } 2607 } 2608 } 2609 2610 return locationSize; 2611 } 2612 2613 protected String key(KahaDestination destination) { 2614 return destination.getType().getNumber() + ":" + destination.getName(); 2615 } 2616 2617 // ///////////////////////////////////////////////////////////////// 2618 // Transaction related implementation methods. 2619 // ///////////////////////////////////////////////////////////////// 2620 @SuppressWarnings("rawtypes") 2621 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2622 @SuppressWarnings("rawtypes") 2623 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2624 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 2625 protected final Set<String> rolledBackAcks = new HashSet<String>(); 2626 2627 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 2628 // till then they are skipped by the store. 2629 // 'at most once' XA guarantee 2630 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 2631 this.indexLock.writeLock().lock(); 2632 try { 2633 for (MessageAck ack : acks) { 2634 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 2635 } 2636 } finally { 2637 this.indexLock.writeLock().unlock(); 2638 } 2639 } 2640 2641 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 2642 if (acks != null) { 2643 this.indexLock.writeLock().lock(); 2644 try { 2645 for (MessageAck ack : acks) { 2646 final String id = ack.getLastMessageId().toProducerKey(); 2647 ackedAndPrepared.remove(id); 2648 if (rollback) { 2649 rolledBackAcks.add(id); 2650 } 2651 } 2652 } finally { 2653 this.indexLock.writeLock().unlock(); 2654 } 2655 } 2656 } 2657 2658 @SuppressWarnings("rawtypes") 2659 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2660 TransactionId key = TransactionIdConversion.convert(info); 2661 List<Operation> tx; 2662 synchronized (inflightTransactions) { 2663 tx = inflightTransactions.get(key); 2664 if (tx == null) { 2665 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2666 inflightTransactions.put(key, tx); 2667 } 2668 } 2669 return tx; 2670 } 2671 2672 @SuppressWarnings("unused") 2673 private TransactionId key(KahaTransactionInfo transactionInfo) { 2674 return TransactionIdConversion.convert(transactionInfo); 2675 } 2676 2677 abstract class Operation <T extends JournalCommand<T>> { 2678 final T command; 2679 final Location location; 2680 2681 public Operation(T command, Location location) { 2682 this.command = command; 2683 this.location = location; 2684 } 2685 2686 public Location getLocation() { 2687 return location; 2688 } 2689 2690 public T getCommand() { 2691 return command; 2692 } 2693 2694 abstract public void execute(Transaction tx) throws IOException; 2695 } 2696 2697 class AddOperation extends Operation<KahaAddMessageCommand> { 2698 final IndexAware runWithIndexLock; 2699 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2700 super(command, location); 2701 this.runWithIndexLock = runWithIndexLock; 2702 } 2703 2704 @Override 2705 public void execute(Transaction tx) throws IOException { 2706 long seq = updateIndex(tx, command, location); 2707 if (runWithIndexLock != null) { 2708 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2709 } 2710 } 2711 2712 } 2713 2714 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2715 2716 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2717 super(command, location); 2718 } 2719 2720 @Override 2721 public void execute(Transaction tx) throws IOException { 2722 updateIndex(tx, command, location); 2723 } 2724 } 2725 2726 // ///////////////////////////////////////////////////////////////// 2727 // Initialization related implementation methods. 2728 // ///////////////////////////////////////////////////////////////// 2729 2730 private PageFile createPageFile() throws IOException { 2731 if( indexDirectory == null ) { 2732 indexDirectory = directory; 2733 } 2734 IOHelper.mkdirs(indexDirectory); 2735 PageFile index = new PageFile(indexDirectory, "db"); 2736 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2737 index.setWriteBatchSize(getIndexWriteBatchSize()); 2738 index.setPageCacheSize(indexCacheSize); 2739 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2740 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2741 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2742 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2743 index.setEnablePageCaching(isEnableIndexPageCaching()); 2744 return index; 2745 } 2746 2747 private Journal createJournal() throws IOException { 2748 Journal manager = new Journal(); 2749 manager.setDirectory(directory); 2750 manager.setMaxFileLength(getJournalMaxFileLength()); 2751 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2752 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2753 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2754 manager.setArchiveDataLogs(isArchiveDataLogs()); 2755 manager.setSizeAccumulator(journalSize); 2756 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2757 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2758 manager.setPreallocationStrategy( 2759 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2760 if (getDirectoryArchive() != null) { 2761 IOHelper.mkdirs(getDirectoryArchive()); 2762 manager.setDirectoryArchive(getDirectoryArchive()); 2763 } 2764 return manager; 2765 } 2766 2767 private Metadata createMetadata() { 2768 Metadata md = new Metadata(); 2769 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2770 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2771 return md; 2772 } 2773 2774 protected abstract void configureMetadata(); 2775 2776 public int getJournalMaxWriteBatchSize() { 2777 return journalMaxWriteBatchSize; 2778 } 2779 2780 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2781 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2782 } 2783 2784 public File getDirectory() { 2785 return directory; 2786 } 2787 2788 public void setDirectory(File directory) { 2789 this.directory = directory; 2790 } 2791 2792 public boolean isDeleteAllMessages() { 2793 return deleteAllMessages; 2794 } 2795 2796 public void setDeleteAllMessages(boolean deleteAllMessages) { 2797 this.deleteAllMessages = deleteAllMessages; 2798 } 2799 2800 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2801 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2802 } 2803 2804 public int getIndexWriteBatchSize() { 2805 return setIndexWriteBatchSize; 2806 } 2807 2808 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2809 this.enableIndexWriteAsync = enableIndexWriteAsync; 2810 } 2811 2812 boolean isEnableIndexWriteAsync() { 2813 return enableIndexWriteAsync; 2814 } 2815 2816 public boolean isEnableJournalDiskSyncs() { 2817 return enableJournalDiskSyncs; 2818 } 2819 2820 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2821 this.enableJournalDiskSyncs = syncWrites; 2822 } 2823 2824 public long getCheckpointInterval() { 2825 return checkpointInterval; 2826 } 2827 2828 public void setCheckpointInterval(long checkpointInterval) { 2829 this.checkpointInterval = checkpointInterval; 2830 } 2831 2832 public long getCleanupInterval() { 2833 return cleanupInterval; 2834 } 2835 2836 public void setCleanupInterval(long cleanupInterval) { 2837 this.cleanupInterval = cleanupInterval; 2838 } 2839 2840 public void setJournalMaxFileLength(int journalMaxFileLength) { 2841 this.journalMaxFileLength = journalMaxFileLength; 2842 } 2843 2844 public int getJournalMaxFileLength() { 2845 return journalMaxFileLength; 2846 } 2847 2848 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 2849 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 2850 } 2851 2852 public int getMaxFailoverProducersToTrack() { 2853 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 2854 } 2855 2856 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 2857 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 2858 } 2859 2860 public int getFailoverProducersAuditDepth() { 2861 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 2862 } 2863 2864 public PageFile getPageFile() throws IOException { 2865 if (pageFile == null) { 2866 pageFile = createPageFile(); 2867 } 2868 return pageFile; 2869 } 2870 2871 public Journal getJournal() throws IOException { 2872 if (journal == null) { 2873 journal = createJournal(); 2874 } 2875 return journal; 2876 } 2877 2878 public boolean isFailIfDatabaseIsLocked() { 2879 return failIfDatabaseIsLocked; 2880 } 2881 2882 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 2883 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 2884 } 2885 2886 public boolean isIgnoreMissingJournalfiles() { 2887 return ignoreMissingJournalfiles; 2888 } 2889 2890 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 2891 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 2892 } 2893 2894 public int getIndexCacheSize() { 2895 return indexCacheSize; 2896 } 2897 2898 public void setIndexCacheSize(int indexCacheSize) { 2899 this.indexCacheSize = indexCacheSize; 2900 } 2901 2902 public boolean isCheckForCorruptJournalFiles() { 2903 return checkForCorruptJournalFiles; 2904 } 2905 2906 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 2907 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 2908 } 2909 2910 public boolean isChecksumJournalFiles() { 2911 return checksumJournalFiles; 2912 } 2913 2914 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 2915 this.checksumJournalFiles = checksumJournalFiles; 2916 } 2917 2918 @Override 2919 public void setBrokerService(BrokerService brokerService) { 2920 this.brokerService = brokerService; 2921 } 2922 2923 /** 2924 * @return the archiveDataLogs 2925 */ 2926 public boolean isArchiveDataLogs() { 2927 return this.archiveDataLogs; 2928 } 2929 2930 /** 2931 * @param archiveDataLogs the archiveDataLogs to set 2932 */ 2933 public void setArchiveDataLogs(boolean archiveDataLogs) { 2934 this.archiveDataLogs = archiveDataLogs; 2935 } 2936 2937 /** 2938 * @return the directoryArchive 2939 */ 2940 public File getDirectoryArchive() { 2941 return this.directoryArchive; 2942 } 2943 2944 /** 2945 * @param directoryArchive the directoryArchive to set 2946 */ 2947 public void setDirectoryArchive(File directoryArchive) { 2948 this.directoryArchive = directoryArchive; 2949 } 2950 2951 public boolean isArchiveCorruptedIndex() { 2952 return archiveCorruptedIndex; 2953 } 2954 2955 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 2956 this.archiveCorruptedIndex = archiveCorruptedIndex; 2957 } 2958 2959 public float getIndexLFUEvictionFactor() { 2960 return indexLFUEvictionFactor; 2961 } 2962 2963 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 2964 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 2965 } 2966 2967 public boolean isUseIndexLFRUEviction() { 2968 return useIndexLFRUEviction; 2969 } 2970 2971 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 2972 this.useIndexLFRUEviction = useIndexLFRUEviction; 2973 } 2974 2975 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 2976 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 2977 } 2978 2979 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 2980 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 2981 } 2982 2983 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 2984 this.enableIndexPageCaching = enableIndexPageCaching; 2985 } 2986 2987 public boolean isEnableIndexDiskSyncs() { 2988 return enableIndexDiskSyncs; 2989 } 2990 2991 public boolean isEnableIndexRecoveryFile() { 2992 return enableIndexRecoveryFile; 2993 } 2994 2995 public boolean isEnableIndexPageCaching() { 2996 return enableIndexPageCaching; 2997 } 2998 2999 // ///////////////////////////////////////////////////////////////// 3000 // Internal conversion methods. 3001 // ///////////////////////////////////////////////////////////////// 3002 3003 class MessageOrderCursor{ 3004 long defaultCursorPosition; 3005 long lowPriorityCursorPosition; 3006 long highPriorityCursorPosition; 3007 MessageOrderCursor(){ 3008 } 3009 3010 MessageOrderCursor(long position){ 3011 this.defaultCursorPosition=position; 3012 this.lowPriorityCursorPosition=position; 3013 this.highPriorityCursorPosition=position; 3014 } 3015 3016 MessageOrderCursor(MessageOrderCursor other){ 3017 this.defaultCursorPosition=other.defaultCursorPosition; 3018 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3019 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3020 } 3021 3022 MessageOrderCursor copy() { 3023 return new MessageOrderCursor(this); 3024 } 3025 3026 void reset() { 3027 this.defaultCursorPosition=0; 3028 this.highPriorityCursorPosition=0; 3029 this.lowPriorityCursorPosition=0; 3030 } 3031 3032 void increment() { 3033 if (defaultCursorPosition!=0) { 3034 defaultCursorPosition++; 3035 } 3036 if (highPriorityCursorPosition!=0) { 3037 highPriorityCursorPosition++; 3038 } 3039 if (lowPriorityCursorPosition!=0) { 3040 lowPriorityCursorPosition++; 3041 } 3042 } 3043 3044 @Override 3045 public String toString() { 3046 return "MessageOrderCursor:[def:" + defaultCursorPosition 3047 + ", low:" + lowPriorityCursorPosition 3048 + ", high:" + highPriorityCursorPosition + "]"; 3049 } 3050 3051 public void sync(MessageOrderCursor other) { 3052 this.defaultCursorPosition=other.defaultCursorPosition; 3053 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3054 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3055 } 3056 } 3057 3058 class MessageOrderIndex { 3059 static final byte HI = 9; 3060 static final byte LO = 0; 3061 static final byte DEF = 4; 3062 3063 long nextMessageId; 3064 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3065 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3066 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3067 final MessageOrderCursor cursor = new MessageOrderCursor(); 3068 Long lastDefaultKey; 3069 Long lastHighKey; 3070 Long lastLowKey; 3071 byte lastGetPriority; 3072 final List<Long> pendingAdditions = new LinkedList<Long>(); 3073 final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); 3074 3075 MessageKeys remove(Transaction tx, Long key) throws IOException { 3076 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3077 if (result == null && highPriorityIndex!=null) { 3078 result = highPriorityIndex.remove(tx, key); 3079 if (result ==null && lowPriorityIndex!=null) { 3080 result = lowPriorityIndex.remove(tx, key); 3081 } 3082 } 3083 return result; 3084 } 3085 3086 void load(Transaction tx) throws IOException { 3087 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3088 defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3089 defaultPriorityIndex.load(tx); 3090 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3091 lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3092 lowPriorityIndex.load(tx); 3093 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3094 highPriorityIndex.setValueMarshaller(messageKeysMarshaller); 3095 highPriorityIndex.load(tx); 3096 } 3097 3098 void allocate(Transaction tx) throws IOException { 3099 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3100 if (metadata.version >= 2) { 3101 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3102 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3103 } 3104 } 3105 3106 void configureLast(Transaction tx) throws IOException { 3107 // Figure out the next key using the last entry in the destination. 3108 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3109 3110 addLast(orderedSet, highPriorityIndex, tx); 3111 addLast(orderedSet, defaultPriorityIndex, tx); 3112 addLast(orderedSet, lowPriorityIndex, tx); 3113 3114 if (!orderedSet.isEmpty()) { 3115 nextMessageId = orderedSet.last() + 1; 3116 } 3117 } 3118 3119 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3120 if (index != null) { 3121 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3122 if (lastEntry != null) { 3123 orderedSet.add(lastEntry.getKey()); 3124 } 3125 } 3126 } 3127 3128 void clear(Transaction tx) throws IOException { 3129 this.remove(tx); 3130 this.resetCursorPosition(); 3131 this.allocate(tx); 3132 this.load(tx); 3133 this.configureLast(tx); 3134 } 3135 3136 void remove(Transaction tx) throws IOException { 3137 defaultPriorityIndex.clear(tx); 3138 defaultPriorityIndex.unload(tx); 3139 tx.free(defaultPriorityIndex.getPageId()); 3140 if (lowPriorityIndex != null) { 3141 lowPriorityIndex.clear(tx); 3142 lowPriorityIndex.unload(tx); 3143 3144 tx.free(lowPriorityIndex.getPageId()); 3145 } 3146 if (highPriorityIndex != null) { 3147 highPriorityIndex.clear(tx); 3148 highPriorityIndex.unload(tx); 3149 tx.free(highPriorityIndex.getPageId()); 3150 } 3151 } 3152 3153 void resetCursorPosition() { 3154 this.cursor.reset(); 3155 lastDefaultKey = null; 3156 lastHighKey = null; 3157 lastLowKey = null; 3158 } 3159 3160 void setBatch(Transaction tx, Long sequence) throws IOException { 3161 if (sequence != null) { 3162 Long nextPosition = new Long(sequence.longValue() + 1); 3163 lastDefaultKey = sequence; 3164 cursor.defaultCursorPosition = nextPosition.longValue(); 3165 lastHighKey = sequence; 3166 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3167 lastLowKey = sequence; 3168 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3169 } 3170 } 3171 3172 void setBatch(Transaction tx, LastAck last) throws IOException { 3173 setBatch(tx, last.lastAckedSequence); 3174 if (cursor.defaultCursorPosition == 0 3175 && cursor.highPriorityCursorPosition == 0 3176 && cursor.lowPriorityCursorPosition == 0) { 3177 long next = last.lastAckedSequence + 1; 3178 switch (last.priority) { 3179 case DEF: 3180 cursor.defaultCursorPosition = next; 3181 cursor.highPriorityCursorPosition = next; 3182 break; 3183 case HI: 3184 cursor.highPriorityCursorPosition = next; 3185 break; 3186 case LO: 3187 cursor.lowPriorityCursorPosition = next; 3188 cursor.defaultCursorPosition = next; 3189 cursor.highPriorityCursorPosition = next; 3190 break; 3191 } 3192 } 3193 } 3194 3195 void stoppedIterating() { 3196 if (lastDefaultKey!=null) { 3197 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3198 } 3199 if (lastHighKey!=null) { 3200 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3201 } 3202 if (lastLowKey!=null) { 3203 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3204 } 3205 lastDefaultKey = null; 3206 lastHighKey = null; 3207 lastLowKey = null; 3208 } 3209 3210 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3211 throws IOException { 3212 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3213 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3214 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3215 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3216 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3217 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3218 } 3219 } 3220 3221 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3222 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3223 3224 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3225 deletes.add(iterator.next()); 3226 } 3227 3228 long getNextMessageId() { 3229 return nextMessageId++; 3230 } 3231 3232 void revertNextMessageId() { 3233 nextMessageId--; 3234 } 3235 3236 MessageKeys get(Transaction tx, Long key) throws IOException { 3237 MessageKeys result = defaultPriorityIndex.get(tx, key); 3238 if (result == null) { 3239 result = highPriorityIndex.get(tx, key); 3240 if (result == null) { 3241 result = lowPriorityIndex.get(tx, key); 3242 lastGetPriority = LO; 3243 } else { 3244 lastGetPriority = HI; 3245 } 3246 } else { 3247 lastGetPriority = DEF; 3248 } 3249 return result; 3250 } 3251 3252 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3253 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3254 return defaultPriorityIndex.put(tx, key, value); 3255 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3256 return highPriorityIndex.put(tx, key, value); 3257 } else { 3258 return lowPriorityIndex.put(tx, key, value); 3259 } 3260 } 3261 3262 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3263 return new MessageOrderIterator(tx,cursor,this); 3264 } 3265 3266 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3267 return new MessageOrderIterator(tx,m,this); 3268 } 3269 3270 public byte lastGetPriority() { 3271 return lastGetPriority; 3272 } 3273 3274 public boolean alreadyDispatched(Long sequence) { 3275 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3276 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3277 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3278 } 3279 3280 public void trackPendingAdd(Long seq) { 3281 synchronized (pendingAdditions) { 3282 pendingAdditions.add(seq); 3283 } 3284 } 3285 3286 public void trackPendingAddComplete(Long seq) { 3287 synchronized (pendingAdditions) { 3288 pendingAdditions.remove(seq); 3289 } 3290 } 3291 3292 public Long minPendingAdd() { 3293 synchronized (pendingAdditions) { 3294 if (!pendingAdditions.isEmpty()) { 3295 return pendingAdditions.get(0); 3296 } else { 3297 return null; 3298 } 3299 } 3300 } 3301 3302 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3303 Iterator<Entry<Long, MessageKeys>>currentIterator; 3304 final Iterator<Entry<Long, MessageKeys>>highIterator; 3305 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3306 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3307 3308 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3309 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3310 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3311 if (highPriorityIndex != null) { 3312 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3313 } else { 3314 this.highIterator = null; 3315 } 3316 if (lowPriorityIndex != null) { 3317 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3318 } else { 3319 this.lowIterator = null; 3320 } 3321 } 3322 3323 @Override 3324 public boolean hasNext() { 3325 if (currentIterator == null) { 3326 if (highIterator != null) { 3327 if (highIterator.hasNext()) { 3328 currentIterator = highIterator; 3329 return currentIterator.hasNext(); 3330 } 3331 if (defaultIterator.hasNext()) { 3332 currentIterator = defaultIterator; 3333 return currentIterator.hasNext(); 3334 } 3335 if (lowIterator.hasNext()) { 3336 currentIterator = lowIterator; 3337 return currentIterator.hasNext(); 3338 } 3339 return false; 3340 } else { 3341 currentIterator = defaultIterator; 3342 return currentIterator.hasNext(); 3343 } 3344 } 3345 if (highIterator != null) { 3346 if (currentIterator.hasNext()) { 3347 return true; 3348 } 3349 if (currentIterator == highIterator) { 3350 if (defaultIterator.hasNext()) { 3351 currentIterator = defaultIterator; 3352 return currentIterator.hasNext(); 3353 } 3354 if (lowIterator.hasNext()) { 3355 currentIterator = lowIterator; 3356 return currentIterator.hasNext(); 3357 } 3358 return false; 3359 } 3360 3361 if (currentIterator == defaultIterator) { 3362 if (lowIterator.hasNext()) { 3363 currentIterator = lowIterator; 3364 return currentIterator.hasNext(); 3365 } 3366 return false; 3367 } 3368 } 3369 return currentIterator.hasNext(); 3370 } 3371 3372 @Override 3373 public Entry<Long, MessageKeys> next() { 3374 Entry<Long, MessageKeys> result = currentIterator.next(); 3375 if (result != null) { 3376 Long key = result.getKey(); 3377 if (highIterator != null) { 3378 if (currentIterator == defaultIterator) { 3379 lastDefaultKey = key; 3380 } else if (currentIterator == highIterator) { 3381 lastHighKey = key; 3382 } else { 3383 lastLowKey = key; 3384 } 3385 } else { 3386 lastDefaultKey = key; 3387 } 3388 } 3389 return result; 3390 } 3391 3392 @Override 3393 public void remove() { 3394 throw new UnsupportedOperationException(); 3395 } 3396 } 3397 } 3398 3399 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3400 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3401 3402 @Override 3403 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3404 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3405 ObjectOutputStream oout = new ObjectOutputStream(baos); 3406 oout.writeObject(object); 3407 oout.flush(); 3408 oout.close(); 3409 byte[] data = baos.toByteArray(); 3410 dataOut.writeInt(data.length); 3411 dataOut.write(data); 3412 } 3413 3414 @Override 3415 @SuppressWarnings("unchecked") 3416 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3417 int dataLen = dataIn.readInt(); 3418 byte[] data = new byte[dataLen]; 3419 dataIn.readFully(data); 3420 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3421 ObjectInputStream oin = new ObjectInputStream(bais); 3422 try { 3423 return (HashSet<String>) oin.readObject(); 3424 } catch (ClassNotFoundException cfe) { 3425 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3426 ioe.initCause(cfe); 3427 throw ioe; 3428 } 3429 } 3430 } 3431 3432 public File getIndexDirectory() { 3433 return indexDirectory; 3434 } 3435 3436 public void setIndexDirectory(File indexDirectory) { 3437 this.indexDirectory = indexDirectory; 3438 } 3439 3440 interface IndexAware { 3441 public void sequenceAssignedWithIndexLocked(long index); 3442 } 3443 3444 public String getPreallocationScope() { 3445 return preallocationScope; 3446 } 3447 3448 public void setPreallocationScope(String preallocationScope) { 3449 this.preallocationScope = preallocationScope; 3450 } 3451 3452 public String getPreallocationStrategy() { 3453 return preallocationStrategy; 3454 } 3455 3456 public void setPreallocationStrategy(String preallocationStrategy) { 3457 this.preallocationStrategy = preallocationStrategy; 3458 } 3459}