001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Map.Entry; 025 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.Message; 029import org.apache.activemq.command.MessageAck; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.command.SubscriptionInfo; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.apache.activemq.store.MessageStoreStatistics; 034import org.apache.activemq.store.TopicMessageStore; 035import org.apache.activemq.util.LRUCache; 036import org.apache.activemq.util.SubscriptionKey; 037 038/** 039 * 040 */ 041public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { 042 043 private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; 044 private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; 045 private final Map<MessageId, Message> originalMessageTable; 046 047 public MemoryTopicMessageStore(ActiveMQDestination destination) { 048 this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap()); 049 050 //Set the messageStoreStatistics after the super class is initialized so that the stats can be 051 //properly updated on cache eviction 052 MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable; 053 cache.setMessageStoreStatistics(messageStoreStatistics); 054 } 055 056 public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { 057 super(destination, messageTable); 058 this.subscriberDatabase = subscriberDatabase; 059 this.topicSubMap = makeSubMap(); 060 //this is only necessary so that messageStoreStatistics can be set if necessary 061 //We need the original reference since messageTable is wrapped in a synchronized map in the parent class 062 this.originalMessageTable = messageTable; 063 } 064 065 protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { 066 return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); 067 } 068 069 protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { 070 return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); 071 } 072 073 @Override 074 public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { 075 super.addMessage(context, message); 076 for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) { 077 MemoryTopicSub sub = i.next(); 078 sub.addMessage(message.getMessageId(), message); 079 } 080 } 081 082 @Override 083 public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 084 MessageId messageId, MessageAck ack) throws IOException { 085 SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 086 MemoryTopicSub sub = topicSubMap.get(key); 087 if (sub != null) { 088 sub.removeMessage(messageId); 089 } 090 } 091 092 @Override 093 public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { 094 return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); 095 } 096 097 @Override 098 public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException { 099 SubscriptionKey key = new SubscriptionKey(info); 100 MemoryTopicSub sub = new MemoryTopicSub(); 101 topicSubMap.put(key, sub); 102 if (retroactive) { 103 for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) { 104 Map.Entry entry = (Entry)i.next(); 105 sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue()); 106 } 107 } 108 subscriberDatabase.put(key, info); 109 } 110 111 @Override 112 public synchronized void deleteSubscription(String clientId, String subscriptionName) { 113 org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); 114 subscriberDatabase.remove(key); 115 topicSubMap.remove(key); 116 } 117 118 @Override 119 public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { 120 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 121 if (sub != null) { 122 sub.recoverSubscription(listener); 123 } 124 } 125 126 @Override 127 public synchronized void delete() { 128 super.delete(); 129 subscriberDatabase.clear(); 130 topicSubMap.clear(); 131 } 132 133 @Override 134 public SubscriptionInfo[] getAllSubscriptions() throws IOException { 135 return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); 136 } 137 138 @Override 139 public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { 140 int result = 0; 141 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 142 if (sub != null) { 143 result = sub.size(); 144 } 145 return result; 146 } 147 148 @Override 149 public synchronized long getMessageSize(String clientId, String subscriberName) throws IOException { 150 long result = 0; 151 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); 152 if (sub != null) { 153 result = sub.messageSize(); 154 } 155 return result; 156 } 157 158 @Override 159 public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { 160 MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 161 if (sub != null) { 162 sub.recoverNextMessages(maxReturned, listener); 163 } 164 } 165 166 @Override 167 public void resetBatching(String clientId, String subscriptionName) { 168 MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); 169 if (sub != null) { 170 sub.resetBatching(); 171 } 172 } 173 174 /** 175 * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions 176 * when computing the message store statistics. 177 * 178 */ 179 private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> { 180 private static final long serialVersionUID = -342098639681884413L; 181 private MessageStoreStatistics messageStoreStatistics; 182 183 public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, 184 float loadFactor, boolean accessOrder) { 185 super(initialCapacity, maximumCacheSize, loadFactor, accessOrder); 186 } 187 188 public void setMessageStoreStatistics( 189 MessageStoreStatistics messageStoreStatistics) { 190 this.messageStoreStatistics = messageStoreStatistics; 191 } 192 193 @Override 194 protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) { 195 decMessageStoreStatistics(messageStoreStatistics, eldest.getValue()); 196 } 197 } 198}