|
ActiveMQ example source code file (Statements.java)
The ActiveMQ Statements.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.store.jdbc; /** * * * @org.apache.xbean.XBean element="statements" * */ public class Statements { protected String messageTableName = "ACTIVEMQ_MSGS"; protected String durableSubAcksTableName = "ACTIVEMQ_ACKS"; protected String lockTableName = "ACTIVEMQ_LOCK"; protected String binaryDataType = "BLOB"; protected String containerNameDataType = "VARCHAR(250)"; protected String msgIdDataType = "VARCHAR(250)"; protected String sequenceDataType = "BIGINT"; protected String longDataType = "BIGINT"; protected String stringIdDataType = "VARCHAR(250)"; protected boolean useExternalMessageReferences; private String tablePrefix = ""; private String addMessageStatement; private String updateMessageStatement; private String removeMessageStatement; private String findMessageSequenceIdStatement; private String findMessageStatement; private String findMessageByIdStatement; private String findAllMessagesStatement; private String findLastSequenceIdInMsgsStatement; private String findLastSequenceIdInAcksStatement; private String createDurableSubStatement; private String findDurableSubStatement; private String findAllDurableSubsStatement; private String updateLastPriorityAckRowOfDurableSubStatement; private String deleteSubscriptionStatement; private String findAllDurableSubMessagesStatement; private String findDurableSubMessagesStatement; private String findDurableSubMessagesByPriorityStatement; private String findAllDestinationsStatement; private String removeAllMessagesStatement; private String removeAllSubscriptionsStatement; private String deleteOldMessagesStatement; private String[] createSchemaStatements; private String[] dropSchemaStatements; private String lockCreateStatement; private String lockUpdateStatement; private String nextDurableSubscriberMessageStatement; private String durableSubscriberMessageCountStatement; private String lastAckedDurableSubscriberMessageStatement; private String destinationMessageCountStatement; private String findNextMessagesStatement; private String findNextMessagesByPriorityStatement; private boolean useLockCreateWhereClause; private String findAllMessageIdsStatement; private String lastProducerSequenceIdStatement; private String selectDurablePriorityAckStatement; private String insertDurablePriorityAckStatement; private String updateDurableLastAckStatement; private String deleteOldMessagesStatementWithPriority; private String durableSubscriberMessageCountStatementWithPriority; private String dropAckPKAlterStatementEnd; public String[] getCreateSchemaStatements() { if (createSchemaStatements == null) { createSchemaStatements = new String[] { "CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL" + ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ " + sequenceDataType + ", EXPIRATION " + longDataType + ", MSG " + (useExternalMessageReferences ? stringIdDataType : binaryDataType) + ", PRIMARY KEY ( ID ) )", "CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName() + " (MSGID_PROD,MSGID_SEQ)", "CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName() + " (CONTAINER)", "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() + " (EXPIRATION)", "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL" + ", SUB_DEST " + stringIdDataType + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", "CREATE TABLE " + getFullLockTableName() + "( ID " + longDataType + " NOT NULL, TIME " + longDataType + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )", "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType, "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)", "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType + " DEFAULT 5 NOT NULL", "ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(), "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)", }; } return createSchemaStatements; } public String getDropAckPKAlterStatementEnd() { if (dropAckPKAlterStatementEnd == null) { dropAckPKAlterStatementEnd = "DROP PRIMARY KEY"; } return dropAckPKAlterStatementEnd; } public void setDropAckPKAlterStatementEnd(String dropAckPKAlterStatementEnd) { this.dropAckPKAlterStatementEnd = dropAckPKAlterStatementEnd; } public String[] getDropSchemaStatements() { if (dropSchemaStatements == null) { dropSchemaStatements = new String[] {"DROP TABLE " + getFullAckTableName() + "", "DROP TABLE " + getFullMessageTableName() + "", "DROP TABLE " + getFullLockTableName() + ""}; } return dropSchemaStatements; } public String getAddMessageStatement() { if (addMessageStatement == null) { addMessageStatement = "INSERT INTO " + getFullMessageTableName() + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)"; } return addMessageStatement; } public String getUpdateMessageStatement() { if (updateMessageStatement == null) { updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?"; } return updateMessageStatement; } public String getRemoveMessageStatement() { if (removeMessageStatement == null) { removeMessageStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE ID=?"; } return removeMessageStatement; } public String getFindMessageSequenceIdStatement() { if (findMessageSequenceIdStatement == null) { findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?"; } return findMessageSequenceIdStatement; } public String getFindMessageStatement() { if (findMessageStatement == null) { findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=?"; } return findMessageStatement; } public String getFindMessageByIdStatement() { if (findMessageByIdStatement == null) { findMessageByIdStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?"; } return findMessageByIdStatement; } public String getFindAllMessagesStatement() { if (findAllMessagesStatement == null) { findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName() + " WHERE CONTAINER=? ORDER BY ID"; } return findAllMessagesStatement; } public String getFindAllMessageIdsStatement() { // this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement // and work back for X if (findAllMessageIdsStatement == null) { findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName() + " ORDER BY ID DESC"; } return findAllMessageIdsStatement; } public String getFindLastSequenceIdInMsgsStatement() { if (findLastSequenceIdInMsgsStatement == null) { findLastSequenceIdInMsgsStatement = "SELECT MAX(ID) FROM " + getFullMessageTableName(); } return findLastSequenceIdInMsgsStatement; } public String getLastProducerSequenceIdStatement() { if (lastProducerSequenceIdStatement == null) { lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=?"; } return lastProducerSequenceIdStatement; } public String getFindLastSequenceIdInAcksStatement() { if (findLastSequenceIdInAcksStatement == null) { findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName(); } return findLastSequenceIdInAcksStatement; } public String getCreateDurableSubStatement() { if (createDurableSubStatement == null) { createDurableSubStatement = "INSERT INTO " + getFullAckTableName() + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) " + "VALUES (?, ?, ?, ?, ?, ?, ?)"; } return createDurableSubStatement; } public String getFindDurableSubStatement() { if (findDurableSubStatement == null) { findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return findDurableSubStatement; } public String getFindAllDurableSubsStatement() { if (findAllDurableSubsStatement == null) { findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName() + " WHERE CONTAINER=? AND PRIORITY=0"; } return findAllDurableSubsStatement; } public String getUpdateLastPriorityAckRowOfDurableSubStatement() { if (updateLastPriorityAckRowOfDurableSubStatement == null) { updateLastPriorityAckRowOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?" + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?"; } return updateLastPriorityAckRowOfDurableSubStatement; } public String getDeleteSubscriptionStatement() { if (deleteSubscriptionStatement == null) { deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName() + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return deleteSubscriptionStatement; } public String getFindAllDurableSubMessagesStatement() { if (findAllDurableSubMessagesStatement == null) { findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.PRIORITY DESC, M.ID"; } return findAllDurableSubMessagesStatement; } public String getFindDurableSubMessagesStatement() { if (findDurableSubMessagesStatement == null) { findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " AND M.ID > ?" + " ORDER BY M.ID"; } return findDurableSubMessagesStatement; } public String getFindDurableSubMessagesByPriorityStatement() { if (findDurableSubMessagesByPriorityStatement == null) { findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M," + " " + getFullAckTableName() + " D" + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER" + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID" + " AND M.ID > ? AND M.PRIORITY = ?" + " ORDER BY M.ID"; } return findDurableSubMessagesByPriorityStatement; } public String findAllDurableSubMessagesStatement() { if (findAllDurableSubMessagesStatement == null) { findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" + " ORDER BY M.ID"; } return findAllDurableSubMessagesStatement; } public String getNextDurableSubscriberMessageStatement() { if (nextDurableSubscriberMessageStatement == null) { nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?" + " ORDER BY M.ID "; } return nextDurableSubscriberMessageStatement; } /** * @return the durableSubscriberMessageCountStatement */ public String getDurableSubscriberMessageCountStatement() { if (durableSubscriberMessageCountStatement == null) { durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER " + " AND M.ID >" + " ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName() + " WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID" + " AND SUB_NAME=D.SUB_NAME )"; } return durableSubscriberMessageCountStatement; } public String getDurableSubscriberMessageCountStatementWithPriority() { if (durableSubscriberMessageCountStatementWithPriority == null) { durableSubscriberMessageCountStatementWithPriority = "SELECT COUNT(*) FROM " + getFullMessageTableName() + " M, " + getFullAckTableName() + " D " + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" + " AND M.CONTAINER=D.CONTAINER " + " AND M.PRIORITY=D.PRIORITY " + " AND M.ID > D.LAST_ACKED_ID"; } return durableSubscriberMessageCountStatementWithPriority; } public String getFindAllDestinationsStatement() { if (findAllDestinationsStatement == null) { findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullAckTableName(); } return findAllDestinationsStatement; } public String getRemoveAllMessagesStatement() { if (removeAllMessagesStatement == null) { removeAllMessagesStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE CONTAINER=?"; } return removeAllMessagesStatement; } public String getRemoveAllSubscriptionsStatement() { if (removeAllSubscriptionsStatement == null) { removeAllSubscriptionsStatement = "DELETE FROM " + getFullAckTableName() + " WHERE CONTAINER=?"; } return removeAllSubscriptionsStatement; } public String getDeleteOldMessagesStatementWithPriority() { if (deleteOldMessagesStatementWithPriority == null) { deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName() + " WHERE ( EXPIRATION<>0 AND EXPIRATION |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.