001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Timestamp;
025import java.util.Date;
026import java.util.concurrent.TimeUnit;
027import org.apache.activemq.util.IOExceptionSupport;
028import org.apache.activemq.util.ServiceStopper;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032/**
033 * Represents an exclusive lease on a database to avoid multiple brokers running
034 * against the same logical database.
035 * 
036 * @org.apache.xbean.XBean element="lease-database-locker"
037 * 
038 */
039public class LeaseDatabaseLocker extends AbstractJDBCLocker {
040    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
041
042    protected int maxAllowableDiffFromDBTime = 0;
043    protected long diffFromCurrentTime = Long.MAX_VALUE;
044    protected String leaseHolderId;
045    protected boolean handleStartException;
046
047    public void doStart() throws Exception {
048
049        if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) {
050            LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod()
051                    + ", which renews the lease, is greater than lockAcquireSleepInterval: " + lockAcquireSleepInterval
052                    + ", the lease duration. These values will allow the lease to expire.");
053        }
054
055        LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
056        String sql = getStatements().getLeaseObtainStatement();
057        LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
058
059        long now = 0l;
060        while (!isStopping()) {
061            Connection connection = null;
062            PreparedStatement statement = null;
063            try {
064                connection = getConnection();
065                initTimeDiff(connection);
066
067                statement = connection.prepareStatement(sql);
068                setQueryTimeout(statement);
069
070                now = System.currentTimeMillis() + diffFromCurrentTime;
071                statement.setString(1, getLeaseHolderId());
072                statement.setLong(2, now + lockAcquireSleepInterval);
073                statement.setLong(3, now);
074
075                int result = statement.executeUpdate();
076                if (result == 1) {
077                    // we got the lease, verify we still have it
078                    if (keepAlive()) {
079                        break;
080                    }
081                }
082
083                reportLeasOwnerShipAndDuration(connection);
084
085            } catch (Exception e) {
086                LOG.warn(getLeaseHolderId() + " lease acquire failure: "+ e, e);
087                if (isStopping()) {
088                    throw new Exception(
089                            "Cannot start broker as being asked to shut down. "
090                                    + "Interrupted attempt to acquire lock: "
091                                    + e, e);
092                }
093                if (handleStartException) {
094                    lockable.getBrokerService().handleIOException(IOExceptionSupport.create(e));
095                }
096            } finally {
097                close(statement);
098                close(connection);
099            }
100
101            LOG.debug(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
102            TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
103        }
104        if (isStopping()) {
105            throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
106        }
107
108        LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now + lockAcquireSleepInterval) + " on dataSource: " + dataSource);
109    }
110
111    private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
112        PreparedStatement statement = null;
113        try {
114            statement = connection.prepareStatement(getStatements().getLeaseOwnerStatement());
115            ResultSet resultSet = statement.executeQuery();
116            while (resultSet.next()) {
117                LOG.debug(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
118            }
119        } finally {
120            close(statement);
121        }
122    }
123
124    protected long initTimeDiff(Connection connection) throws SQLException {
125        if (Long.MAX_VALUE == diffFromCurrentTime) {
126            if (maxAllowableDiffFromDBTime > 0) {
127                diffFromCurrentTime = determineTimeDifference(connection);
128            } else {
129                diffFromCurrentTime = 0l;
130            }
131        }
132        return diffFromCurrentTime;
133    }
134
135    protected long determineTimeDifference(Connection connection) throws SQLException {
136        PreparedStatement statement = connection.prepareStatement(getStatements().getCurrentDateTime());
137        ResultSet resultSet = statement.executeQuery();
138        long result = 0l;
139        if (resultSet.next()) {
140            Timestamp timestamp = resultSet.getTimestamp(1);
141            long diff = System.currentTimeMillis() - timestamp.getTime();
142            if (Math.abs(diff) > maxAllowableDiffFromDBTime) {
143                // off by more than maxAllowableDiffFromDBTime so lets adjust
144                result = (-diff);
145            }
146            LOG.info(getLeaseHolderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
147        }
148        return result;
149    }
150
151    public void doStop(ServiceStopper stopper) throws Exception {
152        if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested()) {
153            // keep our lease for restart
154            return;
155        }
156        releaseLease();
157    }
158
159    private void releaseLease() {
160        Connection connection = null;
161        PreparedStatement statement = null;
162        try {
163            connection = getConnection();
164            statement = connection.prepareStatement(getStatements().getLeaseUpdateStatement());
165            statement.setString(1, null);
166            statement.setLong(2, 0l);
167            statement.setString(3, getLeaseHolderId());
168            if (statement.executeUpdate() == 1) {
169                LOG.info(getLeaseHolderId() + ", released lease");
170            }
171        } catch (Exception e) {
172            LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
173        } finally {
174            close(statement);
175            close(connection);
176        }
177    }
178
179    @Override
180    public boolean keepAlive() throws IOException {
181        boolean result = false;
182        final String sql = getStatements().getLeaseUpdateStatement();
183        LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
184
185        Connection connection = null;
186        PreparedStatement statement = null;
187        try {
188            connection = getConnection();
189
190            initTimeDiff(connection);
191            statement = connection.prepareStatement(sql);
192            setQueryTimeout(statement);
193
194            final long now = System.currentTimeMillis() + diffFromCurrentTime;
195            statement.setString(1, getLeaseHolderId());
196            statement.setLong(2, now + lockAcquireSleepInterval);
197            statement.setString(3, getLeaseHolderId());
198
199            result = (statement.executeUpdate() == 1);
200
201            if (!result) {
202                reportLeasOwnerShipAndDuration(connection);
203            }
204        } catch (Exception e) {
205            LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
206            IOException ioe = IOExceptionSupport.create(e);
207            lockable.getBrokerService().handleIOException(ioe);
208            throw ioe;
209        } finally {
210            close(statement);
211            close(connection);
212        }
213        return result;
214    }
215
216    public String getLeaseHolderId() {
217        if (leaseHolderId == null) {
218            if (lockable.getBrokerService() != null) {
219                leaseHolderId = lockable.getBrokerService().getBrokerName();
220            }
221        }
222        return leaseHolderId;
223    }
224
225    public void setLeaseHolderId(String leaseHolderId) {
226        this.leaseHolderId = leaseHolderId;
227    }
228
229    public int getMaxAllowableDiffFromDBTime() {
230        return maxAllowableDiffFromDBTime;
231    }
232
233    public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
234        this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
235    }
236
237    public boolean isHandleStartException() {
238        return handleStartException;
239    }
240
241    public void setHandleStartException(boolean handleStartException) {
242        this.handleStartException = handleStartException;
243    }
244
245    @Override
246    public String toString() {
247        return "LeaseDatabaseLocker owner:" + leaseHolderId + ",duration:" + lockAcquireSleepInterval + ",renew:" + lockAcquireSleepInterval;
248    }
249}