001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.ServerSocketChannel; 031import java.nio.channels.SocketChannel; 032import java.util.HashMap; 033import java.util.concurrent.BlockingQueue; 034import java.util.concurrent.LinkedBlockingQueue; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import javax.net.ServerSocketFactory; 039import javax.net.ssl.SSLServerSocket; 040 041import org.apache.activemq.Service; 042import org.apache.activemq.ThreadPriorities; 043import org.apache.activemq.TransportLoggerSupport; 044import org.apache.activemq.command.BrokerInfo; 045import org.apache.activemq.openwire.OpenWireFormatFactory; 046import org.apache.activemq.transport.Transport; 047import org.apache.activemq.transport.TransportFactory; 048import org.apache.activemq.transport.TransportServer; 049import org.apache.activemq.transport.TransportServerThreadSupport; 050import org.apache.activemq.transport.nio.SelectorManager; 051import org.apache.activemq.transport.nio.SelectorSelection; 052import org.apache.activemq.util.IOExceptionSupport; 053import org.apache.activemq.util.InetAddressUtil; 054import org.apache.activemq.util.IntrospectionSupport; 055import org.apache.activemq.util.ServiceListener; 056import org.apache.activemq.util.ServiceStopper; 057import org.apache.activemq.util.ServiceSupport; 058import org.apache.activemq.wireformat.WireFormat; 059import org.apache.activemq.wireformat.WireFormatFactory; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * A TCP based implementation of {@link TransportServer} 065 */ 066public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 067 068 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 069 protected ServerSocket serverSocket; 070 protected SelectorSelection selector; 071 protected int backlog = 5000; 072 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 073 protected final TcpTransportFactory transportFactory; 074 protected long maxInactivityDuration = 30000; 075 protected long maxInactivityDurationInitalDelay = 10000; 076 protected int minmumWireFormatVersion; 077 protected boolean useQueueForAccept = true; 078 protected boolean allowLinkStealing; 079 080 /** 081 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 082 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 083 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 084 * TransportConnector URIs. 085 */ 086 protected boolean trace = false; 087 088 protected int soTimeout = 0; 089 protected int socketBufferSize = 64 * 1024; 090 protected int connectionTimeout = 30000; 091 092 /** 093 * Name of the LogWriter implementation to use. Names are mapped to classes in the 094 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 095 * set in Connection or TransportConnector URIs. 096 */ 097 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 098 099 /** 100 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 101 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 102 */ 103 protected boolean dynamicManagement = false; 104 105 /** 106 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 107 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 108 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 109 * TransportConnector URIs. 110 */ 111 protected boolean startLogging = true; 112 protected final ServerSocketFactory serverSocketFactory; 113 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 114 protected Thread socketHandlerThread; 115 116 /** 117 * The maximum number of sockets allowed for this server 118 */ 119 protected int maximumConnections = Integer.MAX_VALUE; 120 protected AtomicInteger currentTransportCount = new AtomicInteger(); 121 122 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 123 URISyntaxException { 124 super(location); 125 this.transportFactory = transportFactory; 126 this.serverSocketFactory = serverSocketFactory; 127 } 128 129 public void bind() throws IOException { 130 URI bind = getBindLocation(); 131 132 String host = bind.getHost(); 133 host = (host == null || host.length() == 0) ? "localhost" : host; 134 InetAddress addr = InetAddress.getByName(host); 135 136 try { 137 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 138 configureServerSocket(this.serverSocket); 139 } catch (IOException e) { 140 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 141 } 142 try { 143 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 144 bind.getQuery(), bind.getFragment())); 145 } catch (URISyntaxException e) { 146 147 // it could be that the host name contains invalid characters such 148 // as _ on unix platforms so lets try use the IP address instead 149 try { 150 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 151 bind.getQuery(), bind.getFragment())); 152 } catch (URISyntaxException e2) { 153 throw IOExceptionSupport.create(e2); 154 } 155 } 156 } 157 158 private void configureServerSocket(ServerSocket socket) throws SocketException { 159 socket.setSoTimeout(2000); 160 if (transportOptions != null) { 161 162 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 163 // to SSLServerSocket to configure it has a side effect on the socket rendering it 164 // useless as all suites are enabled many of which are considered as insecure. We 165 // instead trap that option here and throw an exception. We should really consider 166 // all invalid options as breaking and not start the transport but the current design 167 // doesn't really allow for this. 168 // 169 // see: https://issues.apache.org/jira/browse/AMQ-4582 170 // 171 if (socket instanceof SSLServerSocket) { 172 if (transportOptions.containsKey("enabledCipherSuites")) { 173 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 174 175 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 176 throw new SocketException(String.format( 177 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 178 } 179 } 180 } 181 182 IntrospectionSupport.setProperties(socket, transportOptions); 183 } 184 } 185 186 /** 187 * @return Returns the wireFormatFactory. 188 */ 189 public WireFormatFactory getWireFormatFactory() { 190 return wireFormatFactory; 191 } 192 193 /** 194 * @param wireFormatFactory 195 * The wireFormatFactory to set. 196 */ 197 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 198 this.wireFormatFactory = wireFormatFactory; 199 } 200 201 /** 202 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 203 * broker. 204 * 205 * @param brokerInfo 206 */ 207 @Override 208 public void setBrokerInfo(BrokerInfo brokerInfo) { 209 } 210 211 public long getMaxInactivityDuration() { 212 return maxInactivityDuration; 213 } 214 215 public void setMaxInactivityDuration(long maxInactivityDuration) { 216 this.maxInactivityDuration = maxInactivityDuration; 217 } 218 219 public long getMaxInactivityDurationInitalDelay() { 220 return this.maxInactivityDurationInitalDelay; 221 } 222 223 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 224 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 225 } 226 227 public int getMinmumWireFormatVersion() { 228 return minmumWireFormatVersion; 229 } 230 231 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 232 this.minmumWireFormatVersion = minmumWireFormatVersion; 233 } 234 235 public boolean isTrace() { 236 return trace; 237 } 238 239 public void setTrace(boolean trace) { 240 this.trace = trace; 241 } 242 243 public String getLogWriterName() { 244 return logWriterName; 245 } 246 247 public void setLogWriterName(String logFormat) { 248 this.logWriterName = logFormat; 249 } 250 251 public boolean isDynamicManagement() { 252 return dynamicManagement; 253 } 254 255 public void setDynamicManagement(boolean useJmx) { 256 this.dynamicManagement = useJmx; 257 } 258 259 public boolean isStartLogging() { 260 return startLogging; 261 } 262 263 public void setStartLogging(boolean startLogging) { 264 this.startLogging = startLogging; 265 } 266 267 /** 268 * @return the backlog 269 */ 270 public int getBacklog() { 271 return backlog; 272 } 273 274 /** 275 * @param backlog 276 * the backlog to set 277 */ 278 public void setBacklog(int backlog) { 279 this.backlog = backlog; 280 } 281 282 /** 283 * @return the useQueueForAccept 284 */ 285 public boolean isUseQueueForAccept() { 286 return useQueueForAccept; 287 } 288 289 /** 290 * @param useQueueForAccept 291 * the useQueueForAccept to set 292 */ 293 public void setUseQueueForAccept(boolean useQueueForAccept) { 294 this.useQueueForAccept = useQueueForAccept; 295 } 296 297 /** 298 * pull Sockets from the ServerSocket 299 */ 300 @Override 301 public void run() { 302 final ServerSocketChannel chan = serverSocket.getChannel(); 303 if (chan != null) { 304 try { 305 chan.configureBlocking(false); 306 selector = SelectorManager.getInstance().register(chan, new SelectorManager.Listener() { 307 @Override 308 public void onSelect(SelectorSelection sel) { 309 try { 310 SocketChannel sc = chan.accept(); 311 if (sc != null) { 312 if (isStopped() || getAcceptListener() == null) { 313 sc.close(); 314 } else { 315 if (useQueueForAccept) { 316 socketQueue.put(sc.socket()); 317 } else { 318 handleSocket(sc.socket()); 319 } 320 } 321 } 322 } catch (Exception e) { 323 onError(sel, e); 324 } 325 } 326 @Override 327 public void onError(SelectorSelection sel, Throwable error) { 328 Exception e = null; 329 if (error instanceof Exception) { 330 e = (Exception)error; 331 } else { 332 e = new Exception(error); 333 } 334 if (!isStopping()) { 335 onAcceptError(e); 336 } else if (!isStopped()) { 337 LOG.warn("run()", e); 338 onAcceptError(e); 339 } 340 } 341 }); 342 selector.setInterestOps(SelectionKey.OP_ACCEPT); 343 selector.enable(); 344 } catch (IOException ex) { 345 selector = null; 346 } 347 } else { 348 while (!isStopped()) { 349 Socket socket = null; 350 try { 351 socket = serverSocket.accept(); 352 if (socket != null) { 353 if (isStopped() || getAcceptListener() == null) { 354 socket.close(); 355 } else { 356 if (useQueueForAccept) { 357 socketQueue.put(socket); 358 } else { 359 handleSocket(socket); 360 } 361 } 362 } 363 } catch (SocketTimeoutException ste) { 364 // expect this to happen 365 } catch (Exception e) { 366 if (!isStopping()) { 367 onAcceptError(e); 368 } else if (!isStopped()) { 369 LOG.warn("run()", e); 370 onAcceptError(e); 371 } 372 } 373 } 374 } 375 } 376 377 /** 378 * Allow derived classes to override the Transport implementation that this transport server creates. 379 * 380 * @param socket 381 * @param format 382 * 383 * @return a new Transport instance. 384 * 385 * @throws IOException 386 */ 387 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 388 return new TcpTransport(format, socket); 389 } 390 391 /** 392 * @return pretty print of this 393 */ 394 @Override 395 public String toString() { 396 return "" + getBindLocation(); 397 } 398 399 /** 400 * @param socket 401 * @param bindAddress 402 * @return real hostName 403 * @throws UnknownHostException 404 */ 405 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 406 String result = null; 407 if (socket.isBound()) { 408 if (socket.getInetAddress().isAnyLocalAddress()) { 409 // make it more human readable and useful, an alternative to 0.0.0.0 410 result = InetAddressUtil.getLocalHostName(); 411 } else { 412 result = socket.getInetAddress().getCanonicalHostName(); 413 } 414 } else { 415 result = bindAddress.getCanonicalHostName(); 416 } 417 return result; 418 } 419 420 @Override 421 protected void doStart() throws Exception { 422 if (useQueueForAccept) { 423 Runnable run = new Runnable() { 424 @Override 425 public void run() { 426 try { 427 while (!isStopped() && !isStopping()) { 428 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 429 if (sock != null) { 430 try { 431 handleSocket(sock); 432 } catch (Throwable thrown) { 433 if (!isStopping()) { 434 onAcceptError(new Exception(thrown)); 435 } else if (!isStopped()) { 436 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 437 onAcceptError(new Exception(thrown)); 438 } 439 } 440 } 441 } 442 443 } catch (InterruptedException e) { 444 if (!isStopped() || !isStopping()) { 445 LOG.info("socketQueue interrupted - stopping"); 446 onAcceptError(e); 447 } 448 } 449 } 450 }; 451 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 452 socketHandlerThread.setDaemon(true); 453 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 454 socketHandlerThread.start(); 455 } 456 super.doStart(); 457 } 458 459 @Override 460 protected void doStop(ServiceStopper stopper) throws Exception { 461 if (selector != null) { 462 selector.disable(); 463 selector.close(); 464 selector = null; 465 } 466 if (serverSocket != null) { 467 serverSocket.close(); 468 serverSocket = null; 469 } 470 if (socketHandlerThread != null) { 471 socketHandlerThread.interrupt(); 472 socketHandlerThread = null; 473 } 474 super.doStop(stopper); 475 } 476 477 @Override 478 public InetSocketAddress getSocketAddress() { 479 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 480 } 481 482 protected void handleSocket(Socket socket) { 483 doHandleSocket(socket); 484 } 485 486 final protected void doHandleSocket(Socket socket) { 487 boolean closeSocket = true; 488 boolean countIncremented = false; 489 try { 490 int currentCount; 491 do { 492 currentCount = currentTransportCount.get(); 493 if (currentCount >= this.maximumConnections) { 494 throw new ExceededMaximumConnectionsException( 495 "Exceeded the maximum number of allowed client connections. See the '" + 496 "maximumConnections' property on the TCP transport configuration URI " + 497 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 498 } 499 500 //Increment this value before configuring the transport 501 //This is necessary because some of the transport servers must read from the 502 //socket during configureTransport() so we want to make sure this value is 503 //accurate as the transport server could pause here waiting for data to be sent from a client 504 } while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1)); 505 countIncremented = true; 506 507 HashMap<String, Object> options = new HashMap<String, Object>(); 508 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 509 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 510 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 511 options.put("trace", Boolean.valueOf(trace)); 512 options.put("soTimeout", Integer.valueOf(soTimeout)); 513 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 514 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 515 options.put("logWriterName", logWriterName); 516 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 517 options.put("startLogging", Boolean.valueOf(startLogging)); 518 options.putAll(transportOptions); 519 520 TransportInfo transportInfo = configureTransport(this, socket); 521 closeSocket = false; 522 523 if (transportInfo.transport instanceof ServiceSupport) { 524 ((ServiceSupport) transportInfo.transport).addServiceListener(this); 525 } 526 527 Transport configuredTransport = transportInfo.transportFactory.serverConfigure( 528 transportInfo.transport, transportInfo.format, options); 529 530 getAcceptListener().onAccept(configuredTransport); 531 532 } catch (SocketTimeoutException ste) { 533 // expect this to happen 534 } catch (Exception e) { 535 if (closeSocket) { 536 try { 537 //if closing the socket, only decrement the count it was actually incremented 538 //where it was incremented 539 if (countIncremented) { 540 currentTransportCount.decrementAndGet(); 541 } 542 socket.close(); 543 } catch (Exception ignore) { 544 } 545 } 546 547 if (!isStopping()) { 548 onAcceptError(e); 549 } else if (!isStopped()) { 550 LOG.warn("run()", e); 551 onAcceptError(e); 552 } 553 } 554 } 555 556 protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { 557 WireFormat format = wireFormatFactory.createWireFormat(); 558 Transport transport = createTransport(socket, format); 559 return new TransportInfo(format, transport, transportFactory); 560 } 561 562 protected class TransportInfo { 563 final WireFormat format; 564 final Transport transport; 565 final TransportFactory transportFactory; 566 567 public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) { 568 this.format = format; 569 this.transport = transport; 570 this.transportFactory = transportFactory; 571 } 572 } 573 574 public int getSoTimeout() { 575 return soTimeout; 576 } 577 578 public void setSoTimeout(int soTimeout) { 579 this.soTimeout = soTimeout; 580 } 581 582 public int getSocketBufferSize() { 583 return socketBufferSize; 584 } 585 586 public void setSocketBufferSize(int socketBufferSize) { 587 this.socketBufferSize = socketBufferSize; 588 } 589 590 public int getConnectionTimeout() { 591 return connectionTimeout; 592 } 593 594 public void setConnectionTimeout(int connectionTimeout) { 595 this.connectionTimeout = connectionTimeout; 596 } 597 598 /** 599 * @return the maximumConnections 600 */ 601 public int getMaximumConnections() { 602 return maximumConnections; 603 } 604 605 /** 606 * @param maximumConnections 607 * the maximumConnections to set 608 */ 609 public void setMaximumConnections(int maximumConnections) { 610 this.maximumConnections = maximumConnections; 611 } 612 613 public AtomicInteger getCurrentTransportCount() { 614 return currentTransportCount; 615 } 616 617 @Override 618 public void started(Service service) { 619 } 620 621 @Override 622 public void stopped(Service service) { 623 this.currentTransportCount.decrementAndGet(); 624 } 625 626 @Override 627 public boolean isSslServer() { 628 return false; 629 } 630 631 @Override 632 public boolean isAllowLinkStealing() { 633 return allowLinkStealing; 634 } 635 636 @Override 637 public void setAllowLinkStealing(boolean allowLinkStealing) { 638 this.allowLinkStealing = allowLinkStealing; 639 } 640}