001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.Set;
021
022import javax.jms.InvalidSelectorException;
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQQueue;
030import org.apache.activemq.command.ActiveMQTopic;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.filter.DestinationFilter;
033import org.apache.activemq.util.IOExceptionSupport;
034
035/**
036 *
037 */
038public class SubscriptionView implements SubscriptionViewMBean {
039
040    protected final Subscription subscription;
041    protected final String clientId;
042    protected final String userName;
043
044    /**
045     * Constructor
046     *
047     * @param subs
048     */
049    public SubscriptionView(String clientId, String userName, Subscription subs) {
050        this.clientId = clientId;
051        this.subscription = subs;
052        this.userName = userName;
053    }
054
055    /**
056     * @return the clientId
057     */
058    @Override
059    public String getClientId() {
060        return clientId;
061    }
062
063    /**
064     * @returns the ObjectName of the Connection that created this subscription
065     */
066    @Override
067    public ObjectName getConnection() {
068        ObjectName result = null;
069
070        if (clientId != null && subscription != null) {
071            ConnectionContext ctx = subscription.getContext();
072            if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
073                BrokerService service = ctx.getBroker().getBrokerService();
074                ManagementContext managementCtx = service.getManagementContext();
075                if (managementCtx != null) {
076
077                    try {
078                        ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
079                        Set<ObjectName> names = managementCtx.queryNames(query, null);
080                        if (names.size() == 1) {
081                            result = names.iterator().next();
082                        }
083                    } catch (Exception e) {
084                    }
085                }
086            }
087        }
088        return result;
089    }
090
091
092
093    private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
094        try {
095            return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
096        } catch (Throwable e) {
097            throw IOExceptionSupport.create(e);
098        }
099    }
100
101    /**
102     * @return the id of the Connection the Subscription is on
103     */
104    @Override
105    public String getConnectionId() {
106        ConsumerInfo info = getConsumerInfo();
107        if (info != null) {
108            return info.getConsumerId().getConnectionId();
109        }
110        return "NOTSET";
111    }
112
113    /**
114     * @return the id of the Session the subscription is on
115     */
116    @Override
117    public long getSessionId() {
118        ConsumerInfo info = getConsumerInfo();
119        if (info != null) {
120            return info.getConsumerId().getSessionId();
121        }
122        return 0;
123    }
124
125    /**
126     * @return the id of the Subscription
127     */
128    @Deprecated
129    @Override
130    public long getSubcriptionId() {
131        return getSubscriptionId();
132    }
133
134    /**
135     * @return the id of the Subscription
136     */
137    @Override
138    public long getSubscriptionId() {
139        ConsumerInfo info = getConsumerInfo();
140        if (info != null) {
141            return info.getConsumerId().getValue();
142        }
143        return 0;
144    }
145
146    /**
147     * @return the destination name
148     */
149    @Override
150    public String getDestinationName() {
151        ConsumerInfo info = getConsumerInfo();
152        if (info != null) {
153            ActiveMQDestination dest = info.getDestination();
154            return dest.getPhysicalName();
155        }
156        return "NOTSET";
157    }
158
159    @Override
160    public String getSelector() {
161        if (subscription != null) {
162            return subscription.getSelector();
163        }
164        return null;
165    }
166
167    @Override
168    public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
169        if (subscription != null) {
170            subscription.setSelector(selector);
171        } else {
172            throw new UnsupportedOperationException("No subscription object");
173        }
174    }
175
176    /**
177     * @return true if the destination is a Queue
178     */
179    @Override
180    public boolean isDestinationQueue() {
181        ConsumerInfo info = getConsumerInfo();
182        if (info != null) {
183            ActiveMQDestination dest = info.getDestination();
184            return dest.isQueue();
185        }
186        return false;
187    }
188
189    /**
190     * @return true of the destination is a Topic
191     */
192    @Override
193    public boolean isDestinationTopic() {
194        ConsumerInfo info = getConsumerInfo();
195        if (info != null) {
196            ActiveMQDestination dest = info.getDestination();
197            return dest.isTopic();
198        }
199        return false;
200    }
201
202    /**
203     * @return true if the destination is temporary
204     */
205    @Override
206    public boolean isDestinationTemporary() {
207        ConsumerInfo info = getConsumerInfo();
208        if (info != null) {
209            ActiveMQDestination dest = info.getDestination();
210            return dest.isTemporary();
211        }
212        return false;
213    }
214
215    /**
216     * @return true if the subscriber is active
217     */
218    @Override
219    public boolean isActive() {
220        return true;
221    }
222
223    @Override
224    public boolean isNetwork() {
225        ConsumerInfo info = getConsumerInfo();
226        if (info != null) {
227            return info.isNetworkSubscription();
228        }
229        return false;
230    }
231
232    /**
233     * The subscription should release as may references as it can to help the
234     * garbage collector reclaim memory.
235     */
236    public void gc() {
237        if (subscription != null) {
238            subscription.gc();
239        }
240    }
241
242    /**
243     * @return whether or not the subscriber is retroactive or not
244     */
245    @Override
246    public boolean isRetroactive() {
247        ConsumerInfo info = getConsumerInfo();
248        return info != null ? info.isRetroactive() : false;
249    }
250
251    /**
252     * @return whether or not the subscriber is an exclusive consumer
253     */
254    @Override
255    public boolean isExclusive() {
256        ConsumerInfo info = getConsumerInfo();
257        return info != null ? info.isExclusive() : false;
258    }
259
260    /**
261     * @return whether or not the subscriber is durable (persistent)
262     */
263    @Override
264    public boolean isDurable() {
265        ConsumerInfo info = getConsumerInfo();
266        return info != null ? info.isDurable() : false;
267    }
268
269    /**
270     * @return whether or not the subscriber ignores local messages
271     */
272    @Override
273    public boolean isNoLocal() {
274        ConsumerInfo info = getConsumerInfo();
275        return info != null ? info.isNoLocal() : false;
276    }
277
278    /**
279     * @return the maximum number of pending messages allowed in addition to the
280     *         prefetch size. If enabled to a non-zero value then this will
281     *         perform eviction of messages for slow consumers on non-durable
282     *         topics.
283     */
284    @Override
285    public int getMaximumPendingMessageLimit() {
286        ConsumerInfo info = getConsumerInfo();
287        return info != null ? info.getMaximumPendingMessageLimit() : 0;
288    }
289
290    /**
291     * @return the consumer priority
292     */
293    @Override
294    public byte getPriority() {
295        ConsumerInfo info = getConsumerInfo();
296        return info != null ? info.getPriority() : 0;
297    }
298
299    /**
300     * @return the name of the consumer which is only used for durable
301     *         consumers.
302     */
303    @Deprecated
304    @Override
305    public String getSubcriptionName() {
306        return getSubscriptionName();
307    }
308
309    /**
310     * @return the name of the consumer which is only used for durable
311     *         consumers.
312     */
313    @Override
314    public String getSubscriptionName() {
315        ConsumerInfo info = getConsumerInfo();
316        return info != null ? info.getSubscriptionName() : null;
317    }
318
319    /**
320     * @return number of messages pending delivery
321     */
322    @Override
323    public int getPendingQueueSize() {
324        return subscription != null ? subscription.getPendingQueueSize() : 0;
325    }
326
327    /**
328     * @return number of messages dispatched
329     */
330    @Override
331    public int getDispatchedQueueSize() {
332        return subscription != null ? subscription.getDispatchedQueueSize() : 0;
333    }
334
335    @Override
336    public int getMessageCountAwaitingAcknowledge() {
337        return getDispatchedQueueSize();
338    }
339
340    /**
341     * @return number of messages that matched the subscription
342     */
343    @Override
344    public long getDispatchedCounter() {
345        return subscription != null ? subscription.getDispatchedCounter() : 0;
346    }
347
348    /**
349     * @return number of messages that matched the subscription
350     */
351    @Override
352    public long getEnqueueCounter() {
353        return subscription != null ? subscription.getEnqueueCounter() : 0;
354    }
355
356    /**
357     * @return number of messages queued by the client
358     */
359    @Override
360    public long getDequeueCounter() {
361        return subscription != null ? subscription.getDequeueCounter() : 0;
362    }
363
364    protected ConsumerInfo getConsumerInfo() {
365        return subscription != null ? subscription.getConsumerInfo() : null;
366    }
367
368    /**
369     * @return pretty print
370     */
371    @Override
372    public String toString() {
373        return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
374    }
375
376    /**
377     */
378    @Override
379    public int getPrefetchSize() {
380        return subscription != null ? subscription.getPrefetchSize() : 0;
381    }
382
383    @Override
384    public boolean isMatchingQueue(String queueName) {
385        if (isDestinationQueue()) {
386            return matchesDestination(new ActiveMQQueue(queueName));
387        }
388        return false;
389    }
390
391    @Override
392    public boolean isMatchingTopic(String topicName) {
393        if (isDestinationTopic()) {
394            return matchesDestination(new ActiveMQTopic(topicName));
395        }
396        return false;
397    }
398
399    /**
400     * Return true if this subscription matches the given destination
401     *
402     * @param destination the destination to compare against
403     * @return true if this subscription matches the given destination
404     */
405    public boolean matchesDestination(ActiveMQDestination destination) {
406        ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
407        DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
408        return filter.matches(destination);
409    }
410
411    @Override
412    public boolean isSlowConsumer() {
413        return subscription.isSlowConsumer();
414    }
415
416    @Override
417    public String getUserName() {
418        return userName;
419    }
420
421    @Override
422    public void resetStatistics() {
423        if (subscription != null && subscription.getSubscriptionStatistics() != null){
424            subscription.getSubscriptionStatistics().reset();
425        }
426    }
427
428    @Override
429    public long getConsumedCount() {
430        return subscription != null ? subscription.getConsumedCount() : 0;
431    }
432}