alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (DefaultJDBCAdapter.java)

This example ActiveMQ source code file (DefaultJDBCAdapter.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

activemqdestination, exception, io, ioexception, ioexception, jdbc, jdbcmessagerecoverylistener, messageid, preparedstatement, preparedstatement, resultset, sql, sqlexception, sqlexception, string, string, subscriptioninfo, util

The ActiveMQ DefaultJDBCAdapter.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.store.jdbc.adapter;

import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. 

* The databases/JDBC drivers that use this adapter are: * <ul> * <li> * </ul> * * @org.apache.xbean.XBean element="defaultJDBCAdapter" * * */ public class DefaultJDBCAdapter implements JDBCAdapter { private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); public static final int MAX_ROWS = 10000; protected Statements statements; protected boolean batchStatments = true; protected boolean prioritizedMessages; protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); // needs to be min twice the prefetch for a durable sub and large enough for selector range protected int maxRows = MAX_ROWS; protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { s.setBytes(index, data); } protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { return rs.getBytes(index); } public void doCreateTables(TransactionContext c) throws SQLException, IOException { Statement s = null; cleanupExclusiveLock.writeLock().lock(); try { // Check to see if the table already exists. If it does, then don't // log warnings during startup. // Need to run the scripts anyways since they may contain ALTER // statements that upgrade a previous version // of the table boolean alreadyExists = false; ResultSet rs = null; try { rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" }); alreadyExists = rs.next(); } catch (Throwable ignore) { } finally { close(rs); } s = c.getConnection().createStatement(); String[] createStatments = this.statements.getCreateSchemaStatements(); for (int i = 0; i < createStatments.length; i++) { // This will fail usually since the tables will be // created already. try { LOG.debug("Executing SQL: " + createStatments[i]); s.execute(createStatments[i]); } catch (SQLException e) { if (alreadyExists) { LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode()); } else { LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode()); JDBCPersistenceAdapter.log("Failure details: ", e); } } } c.getConnection().commit(); } finally { cleanupExclusiveLock.writeLock().unlock(); try { s.close(); } catch (Throwable e) { } } } public void doDropTables(TransactionContext c) throws SQLException, IOException { Statement s = null; cleanupExclusiveLock.writeLock().lock(); try { s = c.getConnection().createStatement(); String[] dropStatments = this.statements.getDropSchemaStatements(); for (int i = 0; i < dropStatments.length; i++) { // This will fail usually since the tables will be // created already. try { LOG.debug("Executing SQL: " + dropStatments[i]); s.execute(dropStatments[i]); } catch (SQLException e) { LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode()); JDBCPersistenceAdapter.log("Failure details: ", e); } } c.getConnection().commit(); } finally { cleanupExclusiveLock.writeLock().unlock(); try { s.close(); } catch (Throwable e) { } } } public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); rs = s.executeQuery(); long seq1 = 0; if (rs.next()) { seq1 = rs.getLong(1); } rs.close(); s.close(); s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); rs = s.executeQuery(); long seq2 = 0; if (rs.next()) { seq2 = rs.getLong(1); } long seq = Math.max(seq1, seq2); return seq; } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement( this.statements.getFindMessageByIdStatement()); s.setLong(1, storeSequenceId); rs = s.executeQuery(); if (!rs.next()) { return null; } return getBinaryData(rs, 1); } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); if (this.batchStatments) { c.setAddMessageStatement(s); } } s.setLong(1, sequence); s.setString(2, messageID.getProducerId().toString()); s.setLong(3, messageID.getProducerSequenceId()); s.setString(4, destination.getQualifiedName()); s.setLong(5, expiration); s.setLong(6, priority); setBinaryData(s, 7, data); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { throw new SQLException("Failed add a message"); } } finally { cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { if (s != null) { s.close(); } } } } public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException { PreparedStatement s = c.getAddMessageStatement(); cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); if (this.batchStatments) { c.setAddMessageStatement(s); } } s.setLong(1, messageID.getBrokerSequenceId()); s.setString(2, messageID.getProducerId().toString()); s.setLong(3, messageID.getProducerSequenceId()); s.setString(4, destination.getQualifiedName()); s.setLong(5, expirationTime); s.setString(6, messageRef); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { throw new SQLException("Failed add a message"); } } finally { cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { s.close(); } } } public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); s.setString(1, messageID.getProducerId().toString()); s.setLong(2, messageID.getProducerSequenceId()); s.setString(3, destination.getQualifiedName()); rs = s.executeQuery(); if (!rs.next()) { return new long[]{0,0}; } return new long[]{rs.getLong(1), rs.getLong(2)}; } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); s.setString(1, id.getProducerId().toString()); s.setLong(2, id.getProducerSequenceId()); rs = s.executeQuery(); if (!rs.next()) { return null; } return getBinaryData(rs, 1); } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); s.setLong(1, seq); rs = s.executeQuery(); if (!rs.next()) { return null; } return rs.getString(1); } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException { PreparedStatement s = c.getRemovedMessageStatement(); cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement()); if (this.batchStatments) { c.setRemovedMessageStatement(s); } } s.setLong(1, seq); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { throw new SQLException("Failed to remove message"); } } finally { cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments && s != null) { s.close(); } } } public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); s.setString(1, destination.getQualifiedName()); rs = s.executeQuery(); if (this.statements.isUseExternalMessageReferences()) { while (rs.next()) { if (!listener.recoverMessageReference(rs.getString(2))) { break; } } } else { while (rs.next()) { if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { break; } } } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); s.setMaxRows(limit); rs = s.executeQuery(); // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid LinkedList<MessageId> reverseOrderIds = new LinkedList(); while (rs.next()) { reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); } if (LOG.isDebugEnabled()) { LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); } for (MessageId id : reverseOrderIds) { listener.messageId(id); } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException { PreparedStatement s = c.getUpdateLastAckStatement(); cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement()); if (this.batchStatments) { c.setUpdateLastAckStatement(s); } } s.setLong(1, seq); s.setString(2, destination.getQualifiedName()); s.setString(3, clientId); s.setString(4, subscriptionName); s.setLong(5, prio); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName); } } finally { cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { close(s); } } } public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority) throws SQLException, IOException { PreparedStatement s = c.getUpdateLastAckStatement(); cleanupExclusiveLock.readLock().lock(); try { if (s == null) { s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement()); if (this.batchStatments) { c.setUpdateLastAckStatement(s); } } s.setLong(1, seq); s.setString(2, destination.getQualifiedName()); s.setString(3, clientId); s.setString(4, subscriptionName); if (this.batchStatments) { s.addBatch(); } else if (s.executeUpdate() != 1) { throw new IOException("Could not update last ack seq : " + seq + ", for sub: " + subscriptionName); } } finally { cleanupExclusiveLock.readLock().unlock(); if (!this.batchStatments) { close(s); } } } public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { // dumpTables(c, // destination.getQualifiedName(),clientId,subscriptionName); PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); rs = s.executeQuery(); if (this.statements.isUseExternalMessageReferences()) { while (rs.next()) { if (!listener.recoverMessageReference(rs.getString(2))) { break; } } } else { while (rs.next()) { if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { break; } } } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); s.setLong(4, seq); rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } } } else { while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } } } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); s.setMaxRows(maxRows); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); s.setLong(4, seq); s.setLong(5, priority); rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } } } else { while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } } } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; int result = 0; cleanupExclusiveLock.readLock().lock(); try { if (isPrioritizedMessages) { s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); } else { s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); } s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); rs = s.executeQuery(); if (rs.next()) { result = rs.getInt(1); } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } return result; } /** * @param c * @param info * @param retroactive * @throws SQLException * @throws IOException */ public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) throws SQLException, IOException { // dumpTables(c, destination.getQualifiedName(), clientId, // subscriptionName); PreparedStatement s = null; cleanupExclusiveLock.readLock().lock(); try { long lastMessageId = -1; if (!retroactive) { s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); ResultSet rs = null; try { rs = s.executeQuery(); if (rs.next()) { lastMessageId = rs.getLong(1); } } finally { close(rs); close(s); } } s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); int maxPriority = 1; if (isPrioritizedMessages) { maxPriority = 10; } for (int priority = 0; priority < maxPriority; priority++) { s.setString(1, info.getDestination().getQualifiedName()); s.setString(2, info.getClientId()); s.setString(3, info.getSubscriptionName()); s.setString(4, info.getSelector()); s.setLong(5, lastMessageId); s.setString(6, info.getSubscribedDestination().getQualifiedName()); s.setLong(7, priority); if (s.executeUpdate() != 1) { throw new IOException("Could not create durable subscription for: " + info.getClientId()); } } } finally { cleanupExclusiveLock.readLock().unlock(); close(s); } } public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); rs = s.executeQuery(); if (!rs.next()) { return null; } SubscriptionInfo subscription = new SubscriptionInfo(); subscription.setDestination(destination); subscription.setClientId(clientId); subscription.setSubscriptionName(subscriptionName); subscription.setSelector(rs.getString(1)); subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE)); return subscription; } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); s.setString(1, destination.getQualifiedName()); rs = s.executeQuery(); ArrayList<SubscriptionInfo> rc = new ArrayList(); while (rs.next()) { SubscriptionInfo subscription = new SubscriptionInfo(); subscription.setDestination(destination); subscription.setSelector(rs.getString(1)); subscription.setSubscriptionName(rs.getString(2)); subscription.setClientId(rs.getString(3)); subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), ActiveMQDestination.QUEUE_TYPE)); rc.add(subscription); } return rc.toArray(new SubscriptionInfo[rc.size()]); } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException { PreparedStatement s = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); s.setString(1, destinationName.getQualifiedName()); s.executeUpdate(); s.close(); s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); s.setString(1, destinationName.getQualifiedName()); s.executeUpdate(); } finally { cleanupExclusiveLock.readLock().unlock(); close(s); } } public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException { PreparedStatement s = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriptionName); s.executeUpdate(); } finally { cleanupExclusiveLock.readLock().unlock(); close(s); } } public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException { PreparedStatement s = null; cleanupExclusiveLock.writeLock().lock(); try { if (isPrioritizedMessages) { LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); } else { LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement()); s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement()); } s.setLong(1, System.currentTimeMillis()); int i = s.executeUpdate(); LOG.debug("Deleted " + i + " old message(s)."); } finally { cleanupExclusiveLock.writeLock().unlock(); close(s); } } public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; long result = -1; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriberName); rs = s.executeQuery(); if (rs.next()) { result = rs.getLong(1); } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } return result; } private static void close(PreparedStatement s) { try { s.close(); } catch (Throwable e) { } } private static void close(ResultSet rs) { try { rs.close(); } catch (Throwable e) { } } public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { HashSet<ActiveMQDestination> rc = new HashSet(); PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); rs = s.executeQuery(); while (rs.next()) { rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } return rc; } /** * @return true if batchStements */ public boolean isBatchStatments() { return this.batchStatments; } /** * @param batchStatments */ public void setBatchStatments(boolean batchStatments) { this.batchStatments = batchStatments; } public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { this.statements.setUseExternalMessageReferences(useExternalMessageReferences); } /** * @return the statements */ public Statements getStatements() { return this.statements; } public void setStatements(Statements statements) { this.statements = statements; } public int getMaxRows() { return maxRows; } public void setMaxRows(int maxRows) { this.maxRows = maxRows; } /** * @param c * @param destination * @param clientId * @param subscriberName * @return * @throws SQLException * @throws IOException */ public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement()); s.setString(1, destination.getQualifiedName()); s.setString(2, clientId); s.setString(3, subscriberName); rs = s.executeQuery(); if (!rs.next()) { return null; } return getBinaryData(rs, 1); } finally { close(rs); cleanupExclusiveLock.readLock().unlock(); close(s); } } public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; int result = 0; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); s.setString(1, destination.getQualifiedName()); rs = s.executeQuery(); if (rs.next()) { result = rs.getInt(1); } } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } return result; } public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { if (isPrioritizedMessages) { s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); } else { s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); } s.setMaxRows(Math.max(maxReturned * 2, maxRows)); s.setString(1, destination.getQualifiedName()); s.setLong(2, nextSeq); if (isPrioritizedMessages) { s.setLong(3, priority); s.setLong(4, priority); } rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { while (rs.next() && count < maxReturned) { if (listener.recoverMessageReference(rs.getString(1))) { count++; } else { LOG.debug("Stopped recover next messages"); break; } } } else { while (rs.next() && count < maxReturned) { if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { count++; } else { LOG.debug("Stopped recover next messages"); break; } } } } catch (Exception e) { e.printStackTrace(); } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } /* public void dumpTables(Connection c, String destinationName, String clientId, String subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"); s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); printQuery(s,System.out); } public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); } private void printQuery(Connection c, String query, PrintStream out) throws SQLException { printQuery(c.prepareStatement(query), out); } private void printQuery(PreparedStatement s, PrintStream out) throws SQLException { ResultSet set = null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for (int i = 1; i <= metaData.getColumnCount(); i++) { if (i == 1) out.print("||"); out.print(metaData.getColumnName(i) + "||"); } out.println(); while (set.next()) { for (int i = 1; i <= metaData.getColumnCount(); i++) { if (i == 1) out.print("|"); out.print(set.getString(i) + "|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) { } try { s.close(); } catch (Throwable ignore) { } } } */ public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException { PreparedStatement s = null; ResultSet rs = null; cleanupExclusiveLock.readLock().lock(); try { s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); s.setString(1, id.toString()); rs = s.executeQuery(); long seq = -1; if (rs.next()) { seq = rs.getLong(1); } return seq; } finally { cleanupExclusiveLock.readLock().unlock(); close(rs); close(s); } } }

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ DefaultJDBCAdapter.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.