alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (Statements.java)

This example ActiveMQ source code file (Statements.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

and, and, client_id, container, from, from, id, priority, select, string, string, sub_name, where, where

The ActiveMQ Statements.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.store.jdbc;

/**
 * 
 * 
 * @org.apache.xbean.XBean element="statements"
 * 
 */
public class Statements {

    protected String messageTableName = "ACTIVEMQ_MSGS";
    protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
    protected String lockTableName = "ACTIVEMQ_LOCK";
    protected String binaryDataType = "BLOB";
    protected String containerNameDataType = "VARCHAR(250)";
    protected String msgIdDataType = "VARCHAR(250)";
    protected String sequenceDataType = "BIGINT";
    protected String longDataType = "BIGINT";
    protected String stringIdDataType = "VARCHAR(250)";
    protected boolean useExternalMessageReferences;

    private String tablePrefix = "";
    private String addMessageStatement;
    private String updateMessageStatement;
    private String removeMessageStatement;
    private String findMessageSequenceIdStatement;
    private String findMessageStatement;
    private String findMessageByIdStatement;
    private String findAllMessagesStatement;
    private String findLastSequenceIdInMsgsStatement;
    private String findLastSequenceIdInAcksStatement;
    private String createDurableSubStatement;
    private String findDurableSubStatement;
    private String findAllDurableSubsStatement;
    private String updateLastPriorityAckRowOfDurableSubStatement;
    private String deleteSubscriptionStatement;
    private String findAllDurableSubMessagesStatement;
    private String findDurableSubMessagesStatement;
    private String findDurableSubMessagesByPriorityStatement;
    private String findAllDestinationsStatement;
    private String removeAllMessagesStatement;
    private String removeAllSubscriptionsStatement;
    private String deleteOldMessagesStatement;
    private String[] createSchemaStatements;
    private String[] dropSchemaStatements;
    private String lockCreateStatement;
    private String lockUpdateStatement;
    private String nextDurableSubscriberMessageStatement;
    private String durableSubscriberMessageCountStatement;
    private String lastAckedDurableSubscriberMessageStatement;
    private String destinationMessageCountStatement;
    private String findNextMessagesStatement;
    private String findNextMessagesByPriorityStatement;
    private boolean useLockCreateWhereClause;
    private String findAllMessageIdsStatement;
    private String lastProducerSequenceIdStatement;
    private String selectDurablePriorityAckStatement;

    private String insertDurablePriorityAckStatement;
    private String updateDurableLastAckStatement;
    private String deleteOldMessagesStatementWithPriority;
    private String durableSubscriberMessageCountStatementWithPriority;
    private String dropAckPKAlterStatementEnd;

    public String[] getCreateSchemaStatements() {
        if (createSchemaStatements == null) {
            createSchemaStatements = new String[] {
                "CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType + " NOT NULL"
                    + ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType + ", MSGID_SEQ "
                    + sequenceDataType + ", EXPIRATION " + longDataType + ", MSG "
                    + (useExternalMessageReferences ? stringIdDataType : binaryDataType)
                    + ", PRIMARY KEY ( ID ) )",
                "CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName() + " (MSGID_PROD,MSGID_SEQ)",
                "CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName() + " (CONTAINER)",
                "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() + " (EXPIRATION)",
                "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL"
                    + ", SUB_DEST " + stringIdDataType 
                    + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
                    + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
                    + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
                "CREATE TABLE " + getFullLockTableName() 
                    + "( ID " + longDataType + " NOT NULL, TIME " + longDataType 
                    + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
                "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 
                "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
                "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
                "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType  + " DEFAULT 5 NOT NULL",
                "ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(),
                "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
            };
        }
        return createSchemaStatements;
    }

    public String getDropAckPKAlterStatementEnd() {
        if (dropAckPKAlterStatementEnd == null) {
            dropAckPKAlterStatementEnd = "DROP PRIMARY KEY";
        }
        return dropAckPKAlterStatementEnd;
    }

    public void setDropAckPKAlterStatementEnd(String dropAckPKAlterStatementEnd) {
        this.dropAckPKAlterStatementEnd = dropAckPKAlterStatementEnd;
    }

    public String[] getDropSchemaStatements() {
        if (dropSchemaStatements == null) {
            dropSchemaStatements = new String[] {"DROP TABLE " + getFullAckTableName() + "",
                                                 "DROP TABLE " + getFullMessageTableName() + "",
                                                 "DROP TABLE " + getFullLockTableName() + ""};
        }
        return dropSchemaStatements;
    }

    public String getAddMessageStatement() {
        if (addMessageStatement == null) {
            addMessageStatement = "INSERT INTO "
                                  + getFullMessageTableName()
                                  + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
        }
        return addMessageStatement;
    }

    public String getUpdateMessageStatement() {
        if (updateMessageStatement == null) {
            updateMessageStatement = "UPDATE " + getFullMessageTableName() + " SET MSG=? WHERE ID=?";
        }
        return updateMessageStatement;
    }

    public String getRemoveMessageStatement() {
        if (removeMessageStatement == null) {
            removeMessageStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE ID=?";
        }
        return removeMessageStatement;
    }

    public String getFindMessageSequenceIdStatement() {
        if (findMessageSequenceIdStatement == null) {
            findMessageSequenceIdStatement = "SELECT ID, PRIORITY FROM " + getFullMessageTableName()
                                             + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
        }
        return findMessageSequenceIdStatement;
    }

    public String getFindMessageStatement() {
        if (findMessageStatement == null) {
            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
        }
        return findMessageStatement;
    }

    public String getFindMessageByIdStatement() {
        if (findMessageByIdStatement == null) {
        	findMessageByIdStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE ID=?";
        }
        return findMessageByIdStatement;
    }
    
    public String getFindAllMessagesStatement() {
        if (findAllMessagesStatement == null) {
            findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                       + " WHERE CONTAINER=? ORDER BY ID";
        }
        return findAllMessagesStatement;
    }
    
    public String getFindAllMessageIdsStatement() {
        //  this needs to be limited maybe need to use getFindLastSequenceIdInMsgsStatement
        // and work back for X
        if (findAllMessageIdsStatement == null) {
            findAllMessageIdsStatement = "SELECT ID, MSGID_PROD, MSGID_SEQ FROM " + getFullMessageTableName()
                                       + " ORDER BY ID DESC";
        }
        return findAllMessageIdsStatement;
    }

    public String getFindLastSequenceIdInMsgsStatement() {
        if (findLastSequenceIdInMsgsStatement == null) {
            findLastSequenceIdInMsgsStatement = "SELECT MAX(ID) FROM " + getFullMessageTableName();
        }
        return findLastSequenceIdInMsgsStatement;
    }

    public String getLastProducerSequenceIdStatement() {
        if (lastProducerSequenceIdStatement == null) {
            lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName()
                                            + " WHERE MSGID_PROD=?";
        }
        return lastProducerSequenceIdStatement;
    }


    public String getFindLastSequenceIdInAcksStatement() {
        if (findLastSequenceIdInAcksStatement == null) {
            findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName();
        }
        return findLastSequenceIdInAcksStatement;
    }

    public String getCreateDurableSubStatement() {
        if (createDurableSubStatement == null) {
            createDurableSubStatement = "INSERT INTO "
                                        + getFullAckTableName()
                                        + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST, PRIORITY) "
                                        + "VALUES (?, ?, ?, ?, ?, ?, ?)";
        }
        return createDurableSubStatement;
    }

    public String getFindDurableSubStatement() {
        if (findDurableSubStatement == null) {
            findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
                                      + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
        }
        return findDurableSubStatement;
    }

    public String getFindAllDurableSubsStatement() {
        if (findAllDurableSubsStatement == null) {
            findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM "
                                          + getFullAckTableName() + " WHERE CONTAINER=? AND PRIORITY=0";
        }
        return findAllDurableSubsStatement;
    }

    public String getUpdateLastPriorityAckRowOfDurableSubStatement() {
        if (updateLastPriorityAckRowOfDurableSubStatement == null) {
            updateLastPriorityAckRowOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET LAST_ACKED_ID=?"
                                                 + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
        }
        return updateLastPriorityAckRowOfDurableSubStatement;
    }

    public String getDeleteSubscriptionStatement() {
        if (deleteSubscriptionStatement == null) {
            deleteSubscriptionStatement = "DELETE FROM " + getFullAckTableName()
                                          + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
        }
        return deleteSubscriptionStatement;
    }

    public String getFindAllDurableSubMessagesStatement() {
        if (findAllDurableSubMessagesStatement == null) {
            findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
                                                 + " M, " + getFullAckTableName() + " D "
                                                 + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                                 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
                                                 + " ORDER BY M.PRIORITY DESC, M.ID";
        }
        return findAllDurableSubMessagesStatement;
    }

    public String getFindDurableSubMessagesStatement() {
        if (findDurableSubMessagesStatement == null) {
            findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
                                              + getFullAckTableName() + " D "
                                              + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                              + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
                                              + " AND M.ID > ?"
                                              + " ORDER BY M.ID";
        }
        return findDurableSubMessagesStatement;
    }
    
    public String getFindDurableSubMessagesByPriorityStatement() {
        if (findDurableSubMessagesByPriorityStatement == null) {
            findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
                                              + " " + getFullAckTableName() + " D"
                                              + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                              + " AND M.CONTAINER=D.CONTAINER"
                                              + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
                                              + " AND M.ID > ? AND M.PRIORITY = ?"
                                              + " ORDER BY M.ID";
        }
        return findDurableSubMessagesByPriorityStatement;
    }    

    public String findAllDurableSubMessagesStatement() {
        if (findAllDurableSubMessagesStatement == null) {
            findAllDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
                                                 + " M, " + getFullAckTableName() + " D "
                                                 + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                                 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
                                                 + " ORDER BY M.ID";
        }
        return findAllDurableSubMessagesStatement;
    }

    public String getNextDurableSubscriberMessageStatement() {
        if (nextDurableSubscriberMessageStatement == null) {
            nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "
                                                    + getFullMessageTableName()
                                                    + " M, "
                                                    + getFullAckTableName()
                                                    + " D "
                                                    + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > ?"
                                                    + " ORDER BY M.ID ";
        }
        return nextDurableSubscriberMessageStatement;
    }

    /**
     * @return the durableSubscriberMessageCountStatement
     */

    public String getDurableSubscriberMessageCountStatement() {
        if (durableSubscriberMessageCountStatement == null) {
            durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
                                                     + getFullMessageTableName()
                                                     + " M, "
                                                     + getFullAckTableName()
                                                     + " D "
                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                                     + " AND M.CONTAINER=D.CONTAINER "
                                                     + "     AND M.ID >"
                                                     + "          ( SELECT LAST_ACKED_ID FROM " + getFullAckTableName()
                                                     + "           WHERE CONTAINER=D.CONTAINER AND CLIENT_ID=D.CLIENT_ID"
                                                     + "           AND SUB_NAME=D.SUB_NAME )";

        }
        return durableSubscriberMessageCountStatement;
    }

    public String getDurableSubscriberMessageCountStatementWithPriority() {
        if (durableSubscriberMessageCountStatementWithPriority == null) {
            durableSubscriberMessageCountStatementWithPriority = "SELECT COUNT(*) FROM "
                                                     + getFullMessageTableName()
                                                     + " M, "
                                                     + getFullAckTableName()
                                                     + " D "
                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
                                                     + " AND M.CONTAINER=D.CONTAINER "
                                                     + " AND M.PRIORITY=D.PRIORITY "
                                                     + " AND M.ID > D.LAST_ACKED_ID";
        }

        return durableSubscriberMessageCountStatementWithPriority;
    }

    public String getFindAllDestinationsStatement() {
        if (findAllDestinationsStatement == null) {
            findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullAckTableName();
        }
        return findAllDestinationsStatement;
    }

    public String getRemoveAllMessagesStatement() {
        if (removeAllMessagesStatement == null) {
            removeAllMessagesStatement = "DELETE FROM " + getFullMessageTableName() + " WHERE CONTAINER=?";
        }
        return removeAllMessagesStatement;
    }

    public String getRemoveAllSubscriptionsStatement() {
        if (removeAllSubscriptionsStatement == null) {
            removeAllSubscriptionsStatement = "DELETE FROM " + getFullAckTableName() + " WHERE CONTAINER=?";
        }
        return removeAllSubscriptionsStatement;
    }

    public String getDeleteOldMessagesStatementWithPriority() {
        if (deleteOldMessagesStatementWithPriority == null) {
            deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName()
                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ Statements.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.