001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020
021import org.apache.activemq.broker.region.RegionBroker;
022import org.apache.activemq.broker.region.Subscription;
023import org.apache.activemq.broker.region.TopicRegion;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ConsumerId;
026import org.apache.activemq.command.ConsumerInfo;
027import org.apache.activemq.filter.DestinationFilter;
028import org.apache.activemq.transport.Transport;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Consolidates subscriptions
034 */
035public class DurableConduitBridge extends ConduitBridge {
036    private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
037
038    @Override
039    public String toString() {
040        return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
041    }
042    /**
043     * Constructor
044     *
045     * @param configuration
046     *
047     * @param localBroker
048     * @param remoteBroker
049     */
050    public DurableConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
051                                Transport remoteBroker) {
052        super(configuration, localBroker, remoteBroker);
053    }
054
055    /**
056     * Subscriptions for these destinations are always created
057     *
058     */
059    @Override
060    protected void setupStaticDestinations() {
061        super.setupStaticDestinations();
062        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
063        if (dests != null) {
064            for (ActiveMQDestination dest : dests) {
065                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
066                    try {
067                        //Filtering by non-empty subscriptions, see AMQ-5875
068                        if (dest.isTopic()) {
069                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
070                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
071
072                            String candidateSubName = getSubscriberName(dest);
073                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
074                                String subName = subscription.getConsumerInfo().getSubscriptionName();
075                                if (subName != null && subName.equals(candidateSubName)) {
076                                    DemandSubscription sub = createDemandSubscription(dest);
077                                    sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
078                                    sub.setStaticallyIncluded(true);
079                                    addSubscription(sub);
080                                    break;
081                                }
082                            }
083                        }
084                    } catch (IOException e) {
085                        LOG.error("Failed to add static destination {}", dest, e);
086                    }
087                    LOG.trace("Forwarding messages for durable destination: {}", dest);
088                }
089            }
090        }
091    }
092
093    @Override
094    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
095        if (addToAlreadyInterestedConsumers(info)) {
096            return null; // don't want this subscription added
097        }
098        //add our original id to ourselves
099        info.addNetworkConsumerId(info.getConsumerId());
100
101        if (info.isDurable()) {
102            // set the subscriber name to something reproducible
103            info.setSubscriptionName(getSubscriberName(info.getDestination()));
104            // and override the consumerId with something unique so that it won't
105            // be removed if the durable subscriber (at the other end) goes away
106            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
107                               consumerIdGenerator.getNextSequenceId()));
108        }
109        info.setSelector(null);
110        return doCreateDemandSubscription(info);
111    }
112
113    protected String getSubscriberName(ActiveMQDestination dest) {
114        String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
115        return subscriberName;
116    }
117
118    protected boolean doesConsumerExist(ActiveMQDestination dest) {
119        DestinationFilter filter = DestinationFilter.parseFilter(dest);
120        for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
121            if (filter.matches(ds.getLocalInfo().getDestination())) {
122                return true;
123            }
124        }
125        return false;
126    }
127}