001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.proxy; 018 019import org.apache.activemq.Service; 020import org.apache.activemq.transport.CompositeTransport; 021import org.apache.activemq.transport.Transport; 022import org.apache.activemq.transport.TransportAcceptListener; 023import org.apache.activemq.transport.TransportFactory; 024import org.apache.activemq.transport.TransportFilter; 025import org.apache.activemq.transport.TransportServer; 026import org.apache.activemq.util.ServiceStopper; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import java.io.IOException; 031import java.net.URI; 032import java.net.URISyntaxException; 033import java.util.Iterator; 034import java.util.concurrent.CopyOnWriteArrayList; 035 036/** 037 * @org.apache.xbean.XBean 038 */ 039public class ProxyConnector implements Service { 040 041 private static final Logger LOG = LoggerFactory.getLogger(ProxyConnector.class); 042 private TransportServer server; 043 private URI bind; 044 private URI remote; 045 private URI localUri; 046 private String name; 047 048 /** 049 * Should we proxy commands to the local broker using VM transport as well? 050 */ 051 private boolean proxyToLocalBroker = true; 052 053 private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>(); 054 055 @Override 056 public void start() throws Exception { 057 058 this.getServer().setAcceptListener(new TransportAcceptListener() { 059 @Override 060 public void onAccept(Transport localTransport) { 061 ProxyConnection connection = null; 062 try { 063 Transport remoteTransport = createRemoteTransport(localTransport); 064 connection = new ProxyConnection(localTransport, remoteTransport); 065 connection.start(); 066 connections.add(connection); 067 } catch (Exception e) { 068 onAcceptError(e); 069 try { 070 if (connection != null) { 071 connection.stop(); 072 } 073 } catch (Exception eoc) { 074 LOG.error("Could not close broken connection: ", eoc); 075 } 076 } 077 } 078 079 @Override 080 public void onAcceptError(Exception error) { 081 LOG.error("Could not accept connection: ", error); 082 } 083 }); 084 getServer().start(); 085 LOG.info("Proxy Connector {} started", getName()); 086 } 087 088 @Override 089 public void stop() throws Exception { 090 ServiceStopper ss = new ServiceStopper(); 091 if (this.server != null) { 092 ss.stop(this.server); 093 } 094 095 for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) { 096 LOG.info("Connector stopped: Stopping proxy."); 097 ss.stop(iter.next()); 098 } 099 connections.clear(); 100 ss.throwFirstException(); 101 LOG.info("Proxy Connector {} stopped", getName()); 102 } 103 104 // Properties 105 // ------------------------------------------------------------------------- 106 107 public URI getLocalUri() { 108 return localUri; 109 } 110 111 public void setLocalUri(URI localURI) { 112 this.localUri = localURI; 113 } 114 115 public URI getBind() { 116 return bind; 117 } 118 119 public void setBind(URI bind) { 120 this.bind = bind; 121 } 122 123 public URI getRemote() { 124 return remote; 125 } 126 127 public void setRemote(URI remote) { 128 this.remote = remote; 129 } 130 131 public TransportServer getServer() throws IOException, URISyntaxException { 132 if (server == null) { 133 server = createServer(); 134 } 135 return server; 136 } 137 138 public void setServer(TransportServer server) { 139 this.server = server; 140 } 141 142 protected TransportServer createServer() throws IOException, URISyntaxException { 143 if (bind == null) { 144 throw new IllegalArgumentException("You must specify either a server or the bind property"); 145 } 146 return TransportFactory.bind(bind); 147 } 148 149 private Transport createRemoteTransport(final Transport local) throws Exception { 150 Transport transport = TransportFactory.compositeConnect(remote); 151 CompositeTransport ct = transport.narrow(CompositeTransport.class); 152 if (ct != null && localUri != null && proxyToLocalBroker) { 153 ct.add(false, new URI[] { localUri }); 154 } 155 156 // Add a transport filter so that we can track the transport life cycle 157 transport = new TransportFilter(transport) { 158 @Override 159 public void stop() throws Exception { 160 LOG.info("Stopping proxy."); 161 super.stop(); 162 ProxyConnection dummy = new ProxyConnection(local, this); 163 LOG.debug("Removing proxyConnection {}", dummy.toString()); 164 connections.remove(dummy); 165 } 166 }; 167 return transport; 168 } 169 170 public String getName() { 171 if (name == null) { 172 if (server != null) { 173 name = server.getConnectURI().toString(); 174 } else { 175 name = "proxy"; 176 } 177 } 178 return name; 179 } 180 181 public void setName(String name) { 182 this.name = name; 183 } 184 185 public boolean isProxyToLocalBroker() { 186 return proxyToLocalBroker; 187 } 188 189 public void setProxyToLocalBroker(boolean proxyToLocalBroker) { 190 this.proxyToLocalBroker = proxyToLocalBroker; 191 } 192 193 protected Integer getConnectionCount() { 194 return connections.size(); 195 } 196}