001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.peer; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.activemq.broker.BrokerFactoryHandler; 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.TransportConnector; 030import org.apache.activemq.transport.Transport; 031import org.apache.activemq.transport.TransportFactory; 032import org.apache.activemq.transport.TransportServer; 033import org.apache.activemq.transport.vm.VMTransportFactory; 034import org.apache.activemq.util.IOExceptionSupport; 035import org.apache.activemq.util.IdGenerator; 036import org.apache.activemq.util.IntrospectionSupport; 037import org.apache.activemq.util.URISupport; 038 039public class PeerTransportFactory extends TransportFactory { 040 041 public static final ConcurrentMap BROKERS = new ConcurrentHashMap(); 042 public static final ConcurrentMap CONNECTORS = new ConcurrentHashMap(); 043 public static final ConcurrentMap SERVERS = new ConcurrentHashMap(); 044 private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-"); 045 046 @Override 047 public Transport doConnect(URI location) throws Exception { 048 VMTransportFactory vmTransportFactory = createTransportFactory(location); 049 return vmTransportFactory.doConnect(location); 050 } 051 052 @Override 053 public Transport doCompositeConnect(URI location) throws Exception { 054 VMTransportFactory vmTransportFactory = createTransportFactory(location); 055 return vmTransportFactory.doCompositeConnect(location); 056 } 057 058 /** 059 * @param location 060 * @return the converted URI 061 * @throws URISyntaxException 062 */ 063 private VMTransportFactory createTransportFactory(URI location) throws IOException { 064 try { 065 String group = location.getHost(); 066 String broker = URISupport.stripPrefix(location.getPath(), "/"); 067 068 if (group == null) { 069 group = "default"; 070 } 071 if (broker == null || broker.length() == 0) { 072 broker = ID_GENERATOR.generateSanitizedId(); 073 } 074 075 final Map<String, String> brokerOptions = new HashMap<String, String>(URISupport.parseParameters(location)); 076 if (!brokerOptions.containsKey("persistent")) { 077 brokerOptions.put("persistent", "false"); 078 } 079 080 final URI finalLocation = new URI("vm://" + broker); 081 final String finalBroker = broker; 082 final String finalGroup = group; 083 VMTransportFactory rc = new VMTransportFactory() { 084 @Override 085 public Transport doConnect(URI ignore) throws Exception { 086 return super.doConnect(finalLocation); 087 }; 088 089 @Override 090 public Transport doCompositeConnect(URI ignore) throws Exception { 091 return super.doCompositeConnect(finalLocation); 092 }; 093 }; 094 rc.setBrokerFactoryHandler(new BrokerFactoryHandler() { 095 @Override 096 public BrokerService createBroker(URI brokerURI) throws Exception { 097 BrokerService service = new BrokerService(); 098 IntrospectionSupport.setProperties(service, brokerOptions); 099 service.setBrokerName(finalBroker); 100 TransportConnector c = service.addConnector("tcp://0.0.0.0:0"); 101 c.setDiscoveryUri(new URI("multicast://default?group=" + finalGroup)); 102 service.addNetworkConnector("multicast://default?group=" + finalGroup); 103 return service; 104 } 105 }); 106 return rc; 107 108 } catch (URISyntaxException e) { 109 throw IOExceptionSupport.create(e); 110 } 111 } 112 113 @Override 114 public TransportServer doBind(URI location) throws IOException { 115 throw new IOException("This protocol does not support being bound."); 116 } 117 118}