001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.fanout; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.security.cert.X509Certificate; 023import java.util.ArrayList; 024import java.util.Iterator; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.activemq.command.Command; 030import org.apache.activemq.command.ConsumerInfo; 031import org.apache.activemq.command.Message; 032import org.apache.activemq.command.RemoveInfo; 033import org.apache.activemq.command.Response; 034import org.apache.activemq.state.ConnectionStateTracker; 035import org.apache.activemq.thread.Task; 036import org.apache.activemq.thread.TaskRunner; 037import org.apache.activemq.thread.TaskRunnerFactory; 038import org.apache.activemq.transport.CompositeTransport; 039import org.apache.activemq.transport.DefaultTransportListener; 040import org.apache.activemq.transport.FutureResponse; 041import org.apache.activemq.transport.ResponseCallback; 042import org.apache.activemq.transport.Transport; 043import org.apache.activemq.transport.TransportFactory; 044import org.apache.activemq.transport.TransportListener; 045import org.apache.activemq.util.IOExceptionSupport; 046import org.apache.activemq.util.ServiceStopper; 047import org.apache.activemq.util.ServiceSupport; 048import org.apache.activemq.wireformat.WireFormat; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * A Transport that fans out a connection to multiple brokers. 054 */ 055public class FanoutTransport implements CompositeTransport { 056 057 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class); 058 059 private TransportListener transportListener; 060 private boolean disposed; 061 private boolean connected; 062 063 private final Object reconnectMutex = new Object(); 064 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 065 private final ConcurrentMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); 066 067 private final TaskRunnerFactory reconnectTaskFactory; 068 private final TaskRunner reconnectTask; 069 private boolean started; 070 071 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>(); 072 private int connectedCount; 073 074 private int minAckCount = 2; 075 076 private long initialReconnectDelay = 10; 077 private long maxReconnectDelay = 1000 * 30; 078 private long backOffMultiplier = 2; 079 private final boolean useExponentialBackOff = true; 080 private int maxReconnectAttempts; 081 private Exception connectionFailure; 082 private FanoutTransportHandler primary; 083 private boolean fanOutQueues = false; 084 085 static class RequestCounter { 086 087 final Command command; 088 final AtomicInteger ackCount; 089 090 RequestCounter(Command command, int count) { 091 this.command = command; 092 this.ackCount = new AtomicInteger(count); 093 } 094 095 @Override 096 public String toString() { 097 return command.getCommandId() + "=" + ackCount.get(); 098 } 099 } 100 101 class FanoutTransportHandler extends DefaultTransportListener { 102 103 private final URI uri; 104 private Transport transport; 105 106 private int connectFailures; 107 private long reconnectDelay = initialReconnectDelay; 108 private long reconnectDate; 109 110 public FanoutTransportHandler(URI uri) { 111 this.uri = uri; 112 } 113 114 @Override 115 public void onCommand(Object o) { 116 Command command = (Command) o; 117 if (command.isResponse()) { 118 Integer id = new Integer(((Response) command).getCorrelationId()); 119 RequestCounter rc = requestMap.get(id); 120 if (rc != null) { 121 if (rc.ackCount.decrementAndGet() <= 0) { 122 requestMap.remove(id); 123 transportListenerOnCommand(command); 124 } 125 } else { 126 transportListenerOnCommand(command); 127 } 128 } else { 129 transportListenerOnCommand(command); 130 } 131 } 132 133 @Override 134 public void onException(IOException error) { 135 try { 136 synchronized (reconnectMutex) { 137 if (transport == null || !transport.isConnected()) { 138 return; 139 } 140 141 LOG.debug("Transport failed, starting up reconnect task", error); 142 143 ServiceSupport.dispose(transport); 144 transport = null; 145 connectedCount--; 146 if (primary == this) { 147 primary = null; 148 } 149 reconnectTask.wakeup(); 150 } 151 } catch (InterruptedException e) { 152 Thread.currentThread().interrupt(); 153 if (transportListener != null) { 154 transportListener.onException(new InterruptedIOException()); 155 } 156 } 157 } 158 } 159 160 public FanoutTransport() { 161 // Setup a task that is used to reconnect the a connection async. 162 reconnectTaskFactory = new TaskRunnerFactory(); 163 reconnectTaskFactory.init(); 164 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 165 @Override 166 public boolean iterate() { 167 return doConnect(); 168 } 169 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this)); 170 } 171 172 /** 173 * @return 174 */ 175 private boolean doConnect() { 176 long closestReconnectDate = 0; 177 synchronized (reconnectMutex) { 178 179 if (disposed || connectionFailure != null) { 180 reconnectMutex.notifyAll(); 181 } 182 183 if (transports.size() == connectedCount || disposed || connectionFailure != null) { 184 return false; 185 } else { 186 187 if (transports.isEmpty()) { 188 // connectionFailure = new IOException("No uris available to 189 // connect to."); 190 } else { 191 192 // Try to connect them up. 193 Iterator<FanoutTransportHandler> iter = transports.iterator(); 194 while (iter.hasNext() && !disposed) { 195 196 long now = System.currentTimeMillis(); 197 198 FanoutTransportHandler fanoutHandler = iter.next(); 199 if (fanoutHandler.transport != null) { 200 continue; 201 } 202 203 // Are we waiting a little to try to reconnect this one? 204 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) { 205 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 206 closestReconnectDate = fanoutHandler.reconnectDate; 207 } 208 continue; 209 } 210 211 URI uri = fanoutHandler.uri; 212 try { 213 LOG.debug("Stopped: " + this); 214 LOG.debug("Attempting connect to: " + uri); 215 Transport t = TransportFactory.compositeConnect(uri); 216 fanoutHandler.transport = t; 217 t.setTransportListener(fanoutHandler); 218 if (started) { 219 restoreTransport(fanoutHandler); 220 } 221 LOG.debug("Connection established"); 222 fanoutHandler.reconnectDelay = initialReconnectDelay; 223 fanoutHandler.connectFailures = 0; 224 if (primary == null) { 225 primary = fanoutHandler; 226 } 227 connectedCount++; 228 } catch (Exception e) { 229 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 230 231 if (fanoutHandler.transport != null) { 232 ServiceSupport.dispose(fanoutHandler.transport); 233 fanoutHandler.transport = null; 234 } 235 236 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 237 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 238 connectionFailure = e; 239 reconnectMutex.notifyAll(); 240 return false; 241 } else { 242 243 if (useExponentialBackOff) { 244 // Exponential increment of reconnect delay. 245 fanoutHandler.reconnectDelay *= backOffMultiplier; 246 if (fanoutHandler.reconnectDelay > maxReconnectDelay) { 247 fanoutHandler.reconnectDelay = maxReconnectDelay; 248 } 249 } 250 251 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 252 253 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 254 closestReconnectDate = fanoutHandler.reconnectDate; 255 } 256 } 257 } 258 } 259 260 if (transports.size() == connectedCount || disposed) { 261 reconnectMutex.notifyAll(); 262 return false; 263 } 264 } 265 } 266 } 267 268 try { 269 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 270 if (reconnectDelay > 0) { 271 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 272 Thread.sleep(reconnectDelay); 273 } 274 } catch (InterruptedException e1) { 275 Thread.currentThread().interrupt(); 276 } 277 return true; 278 } 279 280 @Override 281 public void start() throws Exception { 282 synchronized (reconnectMutex) { 283 LOG.debug("Started."); 284 if (started) { 285 return; 286 } 287 started = true; 288 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 289 FanoutTransportHandler th = iter.next(); 290 if (th.transport != null) { 291 restoreTransport(th); 292 } 293 } 294 connected = true; 295 } 296 } 297 298 @Override 299 public void stop() throws Exception { 300 try { 301 synchronized (reconnectMutex) { 302 ServiceStopper ss = new ServiceStopper(); 303 304 if (!started) { 305 return; 306 } 307 started = false; 308 disposed = true; 309 connected = false; 310 311 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 312 FanoutTransportHandler th = iter.next(); 313 if (th.transport != null) { 314 ss.stop(th.transport); 315 } 316 } 317 318 LOG.debug("Stopped: " + this); 319 ss.throwFirstException(); 320 } 321 } finally { 322 reconnectTask.shutdown(); 323 reconnectTaskFactory.shutdownNow(); 324 } 325 } 326 327 public int getMinAckCount() { 328 return minAckCount; 329 } 330 331 public void setMinAckCount(int minAckCount) { 332 this.minAckCount = minAckCount; 333 } 334 335 public long getInitialReconnectDelay() { 336 return initialReconnectDelay; 337 } 338 339 public void setInitialReconnectDelay(long initialReconnectDelay) { 340 this.initialReconnectDelay = initialReconnectDelay; 341 } 342 343 public long getMaxReconnectDelay() { 344 return maxReconnectDelay; 345 } 346 347 public void setMaxReconnectDelay(long maxReconnectDelay) { 348 this.maxReconnectDelay = maxReconnectDelay; 349 } 350 351 public long getReconnectDelayExponent() { 352 return backOffMultiplier; 353 } 354 355 public void setReconnectDelayExponent(long reconnectDelayExponent) { 356 this.backOffMultiplier = reconnectDelayExponent; 357 } 358 359 public int getMaxReconnectAttempts() { 360 return maxReconnectAttempts; 361 } 362 363 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 364 this.maxReconnectAttempts = maxReconnectAttempts; 365 } 366 367 @Override 368 public void oneway(Object o) throws IOException { 369 final Command command = (Command) o; 370 try { 371 synchronized (reconnectMutex) { 372 373 // Wait for transport to be connected. 374 while (connectedCount < minAckCount && !disposed && connectionFailure == null) { 375 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); 376 reconnectMutex.wait(1000); 377 } 378 379 // Still not fully connected. 380 if (connectedCount < minAckCount) { 381 382 Exception error; 383 384 // Throw the right kind of error.. 385 if (disposed) { 386 error = new IOException("Transport disposed."); 387 } else if (connectionFailure != null) { 388 error = connectionFailure; 389 } else { 390 error = new IOException("Unexpected failure."); 391 } 392 393 if (error instanceof IOException) { 394 throw (IOException) error; 395 } 396 throw IOExceptionSupport.create(error); 397 } 398 399 // If it was a request and it was not being tracked by 400 // the state tracker, 401 // then hold it in the requestMap so that we can replay 402 // it later. 403 boolean fanout = isFanoutCommand(command); 404 if (stateTracker.track(command) == null && command.isResponseRequired()) { 405 int size = fanout ? minAckCount : 1; 406 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); 407 } 408 409 // Send the message. 410 if (fanout) { 411 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 412 FanoutTransportHandler th = iter.next(); 413 if (th.transport != null) { 414 try { 415 th.transport.oneway(command); 416 } catch (IOException e) { 417 LOG.debug("Send attempt: failed."); 418 th.onException(e); 419 } 420 } 421 } 422 } else { 423 try { 424 primary.transport.oneway(command); 425 } catch (IOException e) { 426 LOG.debug("Send attempt: failed."); 427 primary.onException(e); 428 } 429 } 430 } 431 } catch (InterruptedException e) { 432 // Some one may be trying to stop our thread. 433 Thread.currentThread().interrupt(); 434 throw new InterruptedIOException(); 435 } 436 } 437 438 /** 439 * @param command 440 * @return 441 */ 442 private boolean isFanoutCommand(Command command) { 443 if (command.isMessage()) { 444 if (fanOutQueues) { 445 return true; 446 } 447 return ((Message) command).getDestination().isTopic(); 448 } 449 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { 450 return false; 451 } 452 return true; 453 } 454 455 @Override 456 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 457 throw new AssertionError("Unsupported Method"); 458 } 459 460 @Override 461 public Object request(Object command) throws IOException { 462 throw new AssertionError("Unsupported Method"); 463 } 464 465 @Override 466 public Object request(Object command, int timeout) throws IOException { 467 throw new AssertionError("Unsupported Method"); 468 } 469 470 public void reconnect() { 471 LOG.debug("Waking up reconnect task"); 472 try { 473 reconnectTask.wakeup(); 474 } catch (InterruptedException e) { 475 Thread.currentThread().interrupt(); 476 } 477 } 478 479 @Override 480 public TransportListener getTransportListener() { 481 return transportListener; 482 } 483 484 @Override 485 public void setTransportListener(TransportListener commandListener) { 486 this.transportListener = commandListener; 487 } 488 489 @Override 490 public <T> T narrow(Class<T> target) { 491 if (target.isAssignableFrom(getClass())) { 492 return target.cast(this); 493 } 494 495 synchronized (reconnectMutex) { 496 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 497 FanoutTransportHandler th = iter.next(); 498 if (th.transport != null) { 499 T rc = th.transport.narrow(target); 500 if (rc != null) { 501 return rc; 502 } 503 } 504 } 505 } 506 507 return null; 508 } 509 510 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { 511 th.transport.start(); 512 stateTracker.setRestoreConsumers(th.transport == primary); 513 stateTracker.restore(th.transport); 514 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) { 515 RequestCounter rc = iter2.next(); 516 th.transport.oneway(rc.command); 517 } 518 } 519 520 @Override 521 public void add(boolean reblance, URI uris[]) { 522 synchronized (reconnectMutex) { 523 for (int i = 0; i < uris.length; i++) { 524 URI uri = uris[i]; 525 526 boolean match = false; 527 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 528 FanoutTransportHandler th = iter.next(); 529 if (th.uri.equals(uri)) { 530 match = true; 531 break; 532 } 533 } 534 535 if (!match) { 536 FanoutTransportHandler th = new FanoutTransportHandler(uri); 537 transports.add(th); 538 reconnect(); 539 } 540 } 541 } 542 } 543 544 @Override 545 public void remove(boolean rebalance, URI uris[]) { 546 synchronized (reconnectMutex) { 547 for (int i = 0; i < uris.length; i++) { 548 URI uri = uris[i]; 549 550 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 551 FanoutTransportHandler th = iter.next(); 552 if (th.uri.equals(uri)) { 553 if (th.transport != null) { 554 ServiceSupport.dispose(th.transport); 555 connectedCount--; 556 } 557 iter.remove(); 558 break; 559 } 560 } 561 } 562 } 563 } 564 565 @Override 566 public void reconnect(URI uri) throws IOException { 567 add(true, new URI[] { uri }); 568 } 569 570 @Override 571 public boolean isReconnectSupported() { 572 return true; 573 } 574 575 @Override 576 public boolean isUpdateURIsSupported() { 577 return true; 578 } 579 580 @Override 581 public void updateURIs(boolean reblance, URI[] uris) throws IOException { 582 add(reblance, uris); 583 } 584 585 @Override 586 public String getRemoteAddress() { 587 if (primary != null) { 588 if (primary.transport != null) { 589 return primary.transport.getRemoteAddress(); 590 } 591 } 592 return null; 593 } 594 595 protected void transportListenerOnCommand(Command command) { 596 if (transportListener != null) { 597 transportListener.onCommand(command); 598 } 599 } 600 601 @Override 602 public boolean isFaultTolerant() { 603 return true; 604 } 605 606 public boolean isFanOutQueues() { 607 return fanOutQueues; 608 } 609 610 public void setFanOutQueues(boolean fanOutQueues) { 611 this.fanOutQueues = fanOutQueues; 612 } 613 614 @Override 615 public boolean isDisposed() { 616 return disposed; 617 } 618 619 @Override 620 public boolean isConnected() { 621 return connected; 622 } 623 624 @Override 625 public int getReceiveCounter() { 626 int rc = 0; 627 synchronized (reconnectMutex) { 628 for (FanoutTransportHandler th : transports) { 629 if (th.transport != null) { 630 rc += th.transport.getReceiveCounter(); 631 } 632 } 633 } 634 return rc; 635 } 636 637 @Override 638 public X509Certificate[] getPeerCertificates() { 639 return null; 640 } 641 642 @Override 643 public void setPeerCertificates(X509Certificate[] certificates) { 644 645 } 646 647 @Override 648 public WireFormat getWireFormat() { 649 return null; 650 } 651}