001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport; 018 019import java.io.IOException; 020import java.net.URI; 021import java.security.cert.X509Certificate; 022 023import org.apache.activemq.wireformat.WireFormat; 024 025/** 026 * 027 */ 028public class TransportFilter implements TransportListener, Transport { 029 protected final Transport next; 030 protected TransportListener transportListener; 031 032 public TransportFilter(Transport next) { 033 this.next = next; 034 } 035 036 @Override 037 public TransportListener getTransportListener() { 038 return transportListener; 039 } 040 041 @Override 042 public void setTransportListener(TransportListener channelListener) { 043 this.transportListener = channelListener; 044 if (channelListener == null) { 045 next.setTransportListener(null); 046 } else { 047 next.setTransportListener(this); 048 } 049 } 050 051 /** 052 * @see org.apache.activemq.Service#start() 053 * @throws IOException 054 * if the next channel has not been set. 055 */ 056 @Override 057 public void start() throws Exception { 058 if (next == null) { 059 throw new IOException("The next channel has not been set."); 060 } 061 if (transportListener == null) { 062 throw new IOException("The command listener has not been set."); 063 } 064 next.start(); 065 } 066 067 /** 068 * @see org.apache.activemq.Service#stop() 069 */ 070 @Override 071 public void stop() throws Exception { 072 next.stop(); 073 } 074 075 @Override 076 public void onCommand(Object command) { 077 transportListener.onCommand(command); 078 } 079 080 /** 081 * @return Returns the next. 082 */ 083 public Transport getNext() { 084 return next; 085 } 086 087 @Override 088 public String toString() { 089 return next.toString(); 090 } 091 092 @Override 093 public void oneway(Object command) throws IOException { 094 next.oneway(command); 095 } 096 097 @Override 098 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 099 return next.asyncRequest(command, null); 100 } 101 102 @Override 103 public Object request(Object command) throws IOException { 104 return next.request(command); 105 } 106 107 @Override 108 public Object request(Object command, int timeout) throws IOException { 109 return next.request(command, timeout); 110 } 111 112 @Override 113 public void onException(IOException error) { 114 transportListener.onException(error); 115 } 116 117 @Override 118 public void transportInterupted() { 119 transportListener.transportInterupted(); 120 } 121 122 @Override 123 public void transportResumed() { 124 transportListener.transportResumed(); 125 } 126 127 @Override 128 public <T> T narrow(Class<T> target) { 129 if (target.isAssignableFrom(getClass())) { 130 return target.cast(this); 131 } 132 return next.narrow(target); 133 } 134 135 @Override 136 public String getRemoteAddress() { 137 return next.getRemoteAddress(); 138 } 139 140 /** 141 * @return 142 * @see org.apache.activemq.transport.Transport#isFaultTolerant() 143 */ 144 @Override 145 public boolean isFaultTolerant() { 146 return next.isFaultTolerant(); 147 } 148 149 @Override 150 public boolean isDisposed() { 151 return next.isDisposed(); 152 } 153 154 @Override 155 public boolean isConnected() { 156 return next.isConnected(); 157 } 158 159 @Override 160 public void reconnect(URI uri) throws IOException { 161 next.reconnect(uri); 162 } 163 164 @Override 165 public int getReceiveCounter() { 166 return next.getReceiveCounter(); 167 } 168 169 @Override 170 public boolean isReconnectSupported() { 171 return next.isReconnectSupported(); 172 } 173 174 @Override 175 public boolean isUpdateURIsSupported() { 176 return next.isUpdateURIsSupported(); 177 } 178 179 @Override 180 public void updateURIs(boolean rebalance,URI[] uris) throws IOException { 181 next.updateURIs(rebalance,uris); 182 } 183 184 @Override 185 public X509Certificate[] getPeerCertificates() { 186 return next.getPeerCertificates(); 187 } 188 189 @Override 190 public void setPeerCertificates(X509Certificate[] certificates) { 191 next.setPeerCertificates(certificates); 192 } 193 194 @Override 195 public WireFormat getWireFormat() { 196 return next.getWireFormat(); 197 } 198}