001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.ResultSet; 023import java.sql.SQLException; 024import java.sql.Timestamp; 025import java.util.Date; 026import java.util.concurrent.TimeUnit; 027import org.apache.activemq.util.IOExceptionSupport; 028import org.apache.activemq.util.ServiceStopper; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032/** 033 * Represents an exclusive lease on a database to avoid multiple brokers running 034 * against the same logical database. 035 * 036 * @org.apache.xbean.XBean element="lease-database-locker" 037 * 038 */ 039public class LeaseDatabaseLocker extends AbstractJDBCLocker { 040 private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class); 041 042 protected int maxAllowableDiffFromDBTime = 0; 043 protected long diffFromCurrentTime = Long.MAX_VALUE; 044 protected String leaseHolderId; 045 protected boolean handleStartException; 046 047 public void doStart() throws Exception { 048 049 if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) { 050 LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod() 051 + ", which renews the lease, is greater than lockAcquireSleepInterval: " + lockAcquireSleepInterval 052 + ", the lease duration. These values will allow the lease to expire."); 053 } 054 055 LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master"); 056 String sql = getStatements().getLeaseObtainStatement(); 057 LOG.debug(getLeaseHolderId() + " locking Query is "+sql); 058 059 long now = 0l; 060 while (!isStopping()) { 061 Connection connection = null; 062 PreparedStatement statement = null; 063 try { 064 connection = getConnection(); 065 initTimeDiff(connection); 066 067 statement = connection.prepareStatement(sql); 068 setQueryTimeout(statement); 069 070 now = System.currentTimeMillis() + diffFromCurrentTime; 071 statement.setString(1, getLeaseHolderId()); 072 statement.setLong(2, now + lockAcquireSleepInterval); 073 statement.setLong(3, now); 074 075 int result = statement.executeUpdate(); 076 if (result == 1) { 077 // we got the lease, verify we still have it 078 if (keepAlive()) { 079 break; 080 } 081 } 082 083 reportLeasOwnerShipAndDuration(connection); 084 085 } catch (Exception e) { 086 LOG.warn(getLeaseHolderId() + " lease acquire failure: "+ e, e); 087 if (isStopping()) { 088 throw new Exception( 089 "Cannot start broker as being asked to shut down. " 090 + "Interrupted attempt to acquire lock: " 091 + e, e); 092 } 093 if (handleStartException) { 094 lockable.getBrokerService().handleIOException(IOExceptionSupport.create(e)); 095 } 096 } finally { 097 close(statement); 098 close(connection); 099 } 100 101 LOG.debug(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again..."); 102 TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval); 103 } 104 if (isStopping()) { 105 throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop"); 106 } 107 108 LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now + lockAcquireSleepInterval) + " on dataSource: " + dataSource); 109 } 110 111 private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException { 112 PreparedStatement statement = null; 113 try { 114 statement = connection.prepareStatement(getStatements().getLeaseOwnerStatement()); 115 ResultSet resultSet = statement.executeQuery(); 116 while (resultSet.next()) { 117 LOG.debug(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2))); 118 } 119 } finally { 120 close(statement); 121 } 122 } 123 124 protected long initTimeDiff(Connection connection) throws SQLException { 125 if (Long.MAX_VALUE == diffFromCurrentTime) { 126 if (maxAllowableDiffFromDBTime > 0) { 127 diffFromCurrentTime = determineTimeDifference(connection); 128 } else { 129 diffFromCurrentTime = 0l; 130 } 131 } 132 return diffFromCurrentTime; 133 } 134 135 protected long determineTimeDifference(Connection connection) throws SQLException { 136 PreparedStatement statement = connection.prepareStatement(getStatements().getCurrentDateTime()); 137 ResultSet resultSet = statement.executeQuery(); 138 long result = 0l; 139 if (resultSet.next()) { 140 Timestamp timestamp = resultSet.getTimestamp(1); 141 long diff = System.currentTimeMillis() - timestamp.getTime(); 142 if (Math.abs(diff) > maxAllowableDiffFromDBTime) { 143 // off by more than maxAllowableDiffFromDBTime so lets adjust 144 result = (-diff); 145 } 146 LOG.info(getLeaseHolderId() + " diff adjust from db: " + result + ", db time: " + timestamp); 147 } 148 return result; 149 } 150 151 public void doStop(ServiceStopper stopper) throws Exception { 152 if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested()) { 153 // keep our lease for restart 154 return; 155 } 156 releaseLease(); 157 } 158 159 private void releaseLease() { 160 Connection connection = null; 161 PreparedStatement statement = null; 162 try { 163 connection = getConnection(); 164 statement = connection.prepareStatement(getStatements().getLeaseUpdateStatement()); 165 statement.setString(1, null); 166 statement.setLong(2, 0l); 167 statement.setString(3, getLeaseHolderId()); 168 if (statement.executeUpdate() == 1) { 169 LOG.info(getLeaseHolderId() + ", released lease"); 170 } 171 } catch (Exception e) { 172 LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e); 173 } finally { 174 close(statement); 175 close(connection); 176 } 177 } 178 179 @Override 180 public boolean keepAlive() throws IOException { 181 boolean result = false; 182 final String sql = getStatements().getLeaseUpdateStatement(); 183 LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql); 184 185 Connection connection = null; 186 PreparedStatement statement = null; 187 try { 188 connection = getConnection(); 189 190 initTimeDiff(connection); 191 statement = connection.prepareStatement(sql); 192 setQueryTimeout(statement); 193 194 final long now = System.currentTimeMillis() + diffFromCurrentTime; 195 statement.setString(1, getLeaseHolderId()); 196 statement.setLong(2, now + lockAcquireSleepInterval); 197 statement.setString(3, getLeaseHolderId()); 198 199 result = (statement.executeUpdate() == 1); 200 201 if (!result) { 202 reportLeasOwnerShipAndDuration(connection); 203 } 204 } catch (Exception e) { 205 LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e); 206 IOException ioe = IOExceptionSupport.create(e); 207 lockable.getBrokerService().handleIOException(ioe); 208 throw ioe; 209 } finally { 210 close(statement); 211 close(connection); 212 } 213 return result; 214 } 215 216 public String getLeaseHolderId() { 217 if (leaseHolderId == null) { 218 if (lockable.getBrokerService() != null) { 219 leaseHolderId = lockable.getBrokerService().getBrokerName(); 220 } 221 } 222 return leaseHolderId; 223 } 224 225 public void setLeaseHolderId(String leaseHolderId) { 226 this.leaseHolderId = leaseHolderId; 227 } 228 229 public int getMaxAllowableDiffFromDBTime() { 230 return maxAllowableDiffFromDBTime; 231 } 232 233 public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) { 234 this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime; 235 } 236 237 public boolean isHandleStartException() { 238 return handleStartException; 239 } 240 241 public void setHandleStartException(boolean handleStartException) { 242 this.handleStartException = handleStartException; 243 } 244 245 @Override 246 public String toString() { 247 return "LeaseDatabaseLocker owner:" + leaseHolderId + ",duration:" + lockAcquireSleepInterval + ",renew:" + lockAcquireSleepInterval; 248 } 249}