001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.io.IOException;
020import java.util.Set;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.atomic.AtomicReference;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerService;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.DestinationFilter;
029import org.apache.activemq.broker.region.Topic;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQQueue;
032import org.apache.activemq.command.ConnectionId;
033import org.apache.activemq.command.LocalTransactionId;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.util.LRUCache;
036
037/**
038 * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
039 */
040public class VirtualTopicInterceptor extends DestinationFilter {
041
042    private final String prefix;
043    private final String postfix;
044    private final boolean local;
045    private final boolean concurrentSend;
046    private final boolean transactedSend;
047
048    private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
049
050    public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
051        super(next);
052        this.prefix = virtualTopic.getPrefix();
053        this.postfix = virtualTopic.getPostfix();
054        this.local = virtualTopic.isLocal();
055        this.concurrentSend = virtualTopic.isConcurrentSend();
056        this.transactedSend = virtualTopic.isTransactedSend();
057    }
058
059    public Topic getTopic() {
060        return (Topic) this.next;
061    }
062
063    @Override
064    public void send(ProducerBrokerExchange context, Message message) throws Exception {
065        if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
066            ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
067            send(context, message, queueConsumers);
068        }
069        super.send(context, message);
070    }
071
072    @Override
073    protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
074        final Broker broker = context.getConnectionContext().getBroker();
075        final Set<Destination> destinations = broker.getDestinations(destination);
076        final int numDestinations = destinations.size();
077
078        final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
079                beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
080        try {
081            if (concurrentSend && numDestinations > 1) {
082
083                final CountDownLatch concurrent = new CountDownLatch(destinations.size());
084                final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
085                final BrokerService brokerService = broker.getBrokerService();
086
087                for (final Destination dest : destinations) {
088                    if (shouldDispatch(broker, message, dest)) {
089                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
090                            @Override
091                            public void run() {
092                                try {
093                                    if (exceptionAtomicReference.get() == null) {
094                                        dest.send(context, message.copy());
095                                    }
096                                } catch (Exception e) {
097                                    exceptionAtomicReference.set(e);
098                                } finally {
099                                    concurrent.countDown();
100                                }
101                            }
102                        });
103                    } else {
104                        concurrent.countDown();
105                    }
106                }
107                concurrent.await();
108                if (exceptionAtomicReference.get() != null) {
109                    throw exceptionAtomicReference.get();
110                }
111
112            } else {
113                for (final Destination dest : destinations) {
114                    if (shouldDispatch(broker, message, dest)) {
115                        dest.send(context, message.copy());
116                    }
117                }
118            }
119        } finally {
120            commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
121        }
122    }
123
124    private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
125        LocalTransactionId result = null;
126        if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
127            result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
128            connectionContext.getBroker().beginTransaction(connectionContext, result);
129            connectionContext.setTransaction(connectionContext.getTransactions().get(result));
130            message.setTransactionId(result);
131        }
132        return result;
133    }
134
135    private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
136        if (tx != null) {
137            connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
138            connectionContext.getTransactions().remove(tx);
139            connectionContext.setTransaction(null);
140            message.setTransactionId(null);
141        }
142    }
143
144    protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
145        //if can't find .* in the prefix, default back to old logic and return true
146        return prefix.contains(".*") ? dest.getName().startsWith(prefix.substring(0, prefix.indexOf(".*"))) : true;
147    }
148
149    protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
150        ActiveMQQueue queue;
151        synchronized (cache) {
152            queue = cache.get(original);
153            if (queue == null) {
154                queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
155                cache.put(original, queue);
156            }
157        }
158        return queue;
159    }
160}