|
ActiveMQ example source code file (TransactionContext.java)
The ActiveMQ TransactionContext.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.store.jdbc; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import org.apache.activemq.util.IOExceptionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Helps keep track of the current transaction/JDBC connection. * * */ public class TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); private final DataSource dataSource; private final JDBCPersistenceAdapter persistenceAdapter; private Connection connection; private boolean inTx; private PreparedStatement addMessageStatement; private PreparedStatement removedMessageStatement; private PreparedStatement updateLastAckStatement; // a cheap dirty level that we can live with private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { this.persistenceAdapter = persistenceAdapter; this.dataSource = persistenceAdapter.getDataSource(); } public Connection getConnection() throws IOException { if (connection == null) { try { connection = dataSource.getConnection(); boolean autoCommit = !inTx; if (connection.getAutoCommit() != autoCommit) { connection.setAutoCommit(autoCommit); } } catch (SQLException e) { JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); IOException ioe = IOExceptionSupport.create(e); persistenceAdapter.getBrokerService().handleIOException(ioe); throw ioe; } try { connection.setTransactionIsolation(transactionIsolation); } catch (Throwable e) { } } return connection; } public void executeBatch() throws SQLException { try { executeBatch(addMessageStatement, "Failed add a message"); } finally { addMessageStatement = null; try { executeBatch(removedMessageStatement, "Failed to remove a message"); } finally { removedMessageStatement = null; try { executeBatch(updateLastAckStatement, "Failed to ack a message"); } finally { updateLastAckStatement = null; } } } } private void executeBatch(PreparedStatement p, String message) throws SQLException { if (p == null) { return; } try { int[] rc = p.executeBatch(); for (int i = 0; i < rc.length; i++) { int code = rc[i]; if (code < 0 && code != Statement.SUCCESS_NO_INFO) { throw new SQLException(message + ". Response code: " + code); } } } finally { try { p.close(); } catch (Throwable e) { } } } public void close() throws IOException { if (!inTx) { try { /** * we are not in a transaction so should not be committing ?? * This was previously commented out - but had adverse affects * on testing - so it's back! * */ try { executeBatch(); } finally { if (connection != null && !connection.getAutoCommit()) { connection.commit(); } } } catch (SQLException e) { JDBCPersistenceAdapter.log("Error while closing connection: ", e); throw IOExceptionSupport.create(e); } finally { try { if (connection != null) { connection.close(); } } catch (Throwable e) { LOG.warn("Close failed: " + e.getMessage(), e); } finally { connection = null; } } } } public void begin() throws IOException { if (inTx) { throw new IOException("Already started."); } inTx = true; connection = getConnection(); } public void commit() throws IOException { if (!inTx) { throw new IOException("Not started."); } try { executeBatch(); if (!connection.getAutoCommit()) { connection.commit(); } } catch (SQLException e) { JDBCPersistenceAdapter.log("Commit failed: ", e); this.rollback(); throw IOExceptionSupport.create(e); } finally { inTx = false; close(); } } public void rollback() throws IOException { if (!inTx) { throw new IOException("Not started."); } try { if (addMessageStatement != null) { addMessageStatement.close(); addMessageStatement = null; } if (removedMessageStatement != null) { removedMessageStatement.close(); removedMessageStatement = null; } if (updateLastAckStatement != null) { updateLastAckStatement.close(); updateLastAckStatement = null; } connection.rollback(); } catch (SQLException e) { JDBCPersistenceAdapter.log("Rollback failed: ", e); throw IOExceptionSupport.create(e); } finally { inTx = false; close(); } } public PreparedStatement getAddMessageStatement() { return addMessageStatement; } public void setAddMessageStatement(PreparedStatement addMessageStatement) { this.addMessageStatement = addMessageStatement; } public PreparedStatement getUpdateLastAckStatement() { return updateLastAckStatement; } public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { this.updateLastAckStatement = ackMessageStatement; } public PreparedStatement getRemovedMessageStatement() { return removedMessageStatement; } public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { this.removedMessageStatement = removedMessageStatement; } public void setTransactionIsolation(int transactionIsolation) { this.transactionIsolation = transactionIsolation; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ TransactionContext.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.