alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (KahaStore.java)

This example ActiveMQ source code file (KahaStore.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

containerid, containerid, datamanager, datamanager, indexitem, indexmanager, io, ioexception, ioexception, iterator, listcontainerimpl, mapcontainerimpl, store, string, string, threading, threads, util

The ActiveMQ KahaStore.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.kaha.impl;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Store Implementation
 * 
 * 
 */
public class KahaStore implements Store {

    private static final String PROPERTY_PREFIX = "org.apache.activemq.kaha.Store";
    private static final boolean BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
                                                                                     + ".FileLockBroken",
                                                                                     "false"));
    private static final boolean DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
                                                                                    + ".DisableLocking",
                                                                                    "false"));
    //according to the String javadoc, all constant strings are interned so this will be the same object throughout the vm
    //and we can use it as a monitor for the lockset.
    private final static String LOCKSET_MONITOR = PROPERTY_PREFIX + ".Lock.Monitor";
    private static final Logger LOG = LoggerFactory.getLogger(KahaStore.class);

    private final File directory;
    private final String mode;
    private IndexRootContainer mapsContainer;
    private IndexRootContainer listsContainer;
    private final Map<ContainerId, ListContainerImpl> lists = new ConcurrentHashMap();
    private final Map<ContainerId, MapContainerImpl> maps = new ConcurrentHashMap();
    private final Map<String, DataManager> dataManagers = new ConcurrentHashMap();
    private final Map<String, IndexManager> indexManagers = new ConcurrentHashMap();
    private boolean closed;
    private boolean initialized;
    private boolean logIndexChanges;
    private boolean useAsyncDataManager;
    private long maxDataFileLength = 1024 * 1024 * 32;
    private FileLock lock;
    private boolean persistentIndex = true;
    private RandomAccessFile lockFile;
    private final AtomicLong storeSize;
    private String defaultContainerName = DEFAULT_CONTAINER_NAME;

    
    public KahaStore(String name, String mode) throws IOException {
    	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, new AtomicLong());
    }

    public KahaStore(File directory, String mode) throws IOException {
    	this(directory, mode, new AtomicLong());
    }

    public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
    	this(new File(IOHelper.toFileSystemDirectorySafeName(name)), mode, storeSize);
    }
    
    public KahaStore(File directory, String mode, AtomicLong storeSize) throws IOException {
        this.mode = mode;
        this.storeSize = storeSize;
        this.directory = directory;
        IOHelper.mkdirs(this.directory);
    }

    public synchronized void close() throws IOException {
        if (!closed) {
            closed = true;
            if (initialized) {
                unlock();
                for (ListContainerImpl container : lists.values()) {
                    container.close();
                }
                lists.clear();
                for (MapContainerImpl container : maps.values()) {
                    container.close();
                }
                maps.clear();
                for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
                    IndexManager im = iter.next();
                    im.close();
                    iter.remove();
                }
                for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
                    DataManager dm = iter.next();
                    dm.close();
                    iter.remove();
                }
            }
            if (lockFile!=null) {
                lockFile.close();
                lockFile=null;
            }
        }
    }

    public synchronized void force() throws IOException {
        if (initialized) {
            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
                IndexManager im = iter.next();
                im.force();
            }
            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
                DataManager dm = iter.next();
                dm.force();
            }
        }
    }

    public synchronized void clear() throws IOException {
        initialize();
        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            MapContainer container = getMapContainer(id.getKey(), id.getDataContainerName());
            container.clear();
        }
        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            ListContainer container = getListContainer(id.getKey(), id.getDataContainerName());
            container.clear();
        }

    }

    public synchronized boolean delete() throws IOException {
        boolean result = true;
        if (initialized) {
            clear();
            for (Iterator<IndexManager> iter = indexManagers.values().iterator(); iter.hasNext();) {
                IndexManager im = iter.next();
                result &= im.delete();
                iter.remove();
            }
            for (Iterator<DataManager> iter = dataManagers.values().iterator(); iter.hasNext();) {
                DataManager dm = iter.next();
                result &= dm.delete();
                iter.remove();
            }
        }
        if (directory != null && directory.isDirectory()) {
            result =IOHelper.deleteChildren(directory);
            String str = result ? "successfully deleted" : "failed to delete";
            LOG.info("Kaha Store " + str + " data directory " + directory);
        }
        return result;
    }

    public synchronized boolean isInitialized() {
        return initialized;
    }

    public boolean doesMapContainerExist(Object id) throws IOException {
        return doesMapContainerExist(id, defaultContainerName);
    }

    public synchronized boolean doesMapContainerExist(Object id, String containerName) throws IOException {
        initialize();
        ContainerId containerId = new ContainerId(id, containerName);
        return maps.containsKey(containerId) || mapsContainer.doesRootExist(containerId);
    }

    public MapContainer getMapContainer(Object id) throws IOException {
        return getMapContainer(id, defaultContainerName);
    }

    public MapContainer getMapContainer(Object id, String containerName) throws IOException {
        return getMapContainer(id, containerName, persistentIndex);
    }

    public synchronized MapContainer getMapContainer(Object id, String containerName, boolean persistentIndex)
        throws IOException {
        initialize();
        ContainerId containerId = new ContainerId(id, containerName);
        MapContainerImpl result = maps.get(containerId);
        if (result == null) {
            DataManager dm = getDataManager(containerName);
            IndexManager im = getIndexManager(dm, containerName);

            IndexItem root = mapsContainer.getRoot(im, containerId);
            if (root == null) {
                root = mapsContainer.addRoot(im, containerId);
            }
            result = new MapContainerImpl(directory, containerId, root, im, dm, persistentIndex);
            maps.put(containerId, result);
        }
        return result;
    }

    public void deleteMapContainer(Object id) throws IOException {
        deleteMapContainer(id, defaultContainerName);
    }

    public void deleteMapContainer(Object id, String containerName) throws IOException {
        ContainerId containerId = new ContainerId(id, containerName);
        deleteMapContainer(containerId);
    }

    public synchronized void deleteMapContainer(ContainerId containerId) throws IOException {
        initialize();
        MapContainerImpl container = maps.remove(containerId);
        if (container != null) {
            container.clear();
            mapsContainer.removeRoot(container.getIndexManager(), containerId);
            container.close();
        }
    }

    public synchronized Set<ContainerId> getMapContainerIds() throws IOException {
        initialize();
        Set<ContainerId> set = new HashSet();
        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            set.add(id);
        }
        return set;
    }

    public boolean doesListContainerExist(Object id) throws IOException {
        return doesListContainerExist(id, defaultContainerName);
    }

    public synchronized boolean doesListContainerExist(Object id, String containerName) throws IOException {
        initialize();
        ContainerId containerId = new ContainerId(id, containerName);
        return lists.containsKey(containerId) || listsContainer.doesRootExist(containerId);
    }

    public ListContainer getListContainer(Object id) throws IOException {
        return getListContainer(id, defaultContainerName);
    }

    public ListContainer getListContainer(Object id, String containerName) throws IOException {
        return getListContainer(id, containerName, persistentIndex);
    }

    public synchronized ListContainer getListContainer(Object id, String containerName,
                                                       boolean persistentIndex) throws IOException {
        initialize();
        ContainerId containerId = new ContainerId(id, containerName);
        ListContainerImpl result = lists.get(containerId);
        if (result == null) {
            DataManager dm = getDataManager(containerName);
            IndexManager im = getIndexManager(dm, containerName);

            IndexItem root = listsContainer.getRoot(im, containerId);
            if (root == null) {
                root = listsContainer.addRoot(im, containerId);
            }
            result = new ListContainerImpl(containerId, root, im, dm, persistentIndex);
            lists.put(containerId, result);
        }
        return result;
    }

    public void deleteListContainer(Object id) throws IOException {
        deleteListContainer(id, defaultContainerName);
    }

    public synchronized void deleteListContainer(Object id, String containerName) throws IOException {
        ContainerId containerId = new ContainerId(id, containerName);
        deleteListContainer(containerId);
    }

    public synchronized void deleteListContainer(ContainerId containerId) throws IOException {
        initialize();
        ListContainerImpl container = lists.remove(containerId);
        if (container != null) {
            listsContainer.removeRoot(container.getIndexManager(), containerId);
            container.clear();
            container.close();
        }
    }

    public synchronized Set<ContainerId> getListContainerIds() throws IOException {
        initialize();
        Set<ContainerId> set = new HashSet();
        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            set.add(id);
        }
        return set;
    }

    /**
     * @return the listsContainer
     */
    public IndexRootContainer getListsContainer() {
        return this.listsContainer;
    }

    /**
     * @return the mapsContainer
     */
    public IndexRootContainer getMapsContainer() {
        return this.mapsContainer;
    }

    public synchronized DataManager getDataManager(String name) throws IOException {
        DataManager dm = dataManagers.get(name);
        if (dm == null) {
            if (isUseAsyncDataManager()) {
                AsyncDataManager t = new AsyncDataManager(storeSize);
                t.setDirectory(directory);
                t.setFilePrefix("async-data-" + name + "-");
                t.setMaxFileLength((int)maxDataFileLength);
                t.start();
                dm = new DataManagerFacade(t, name);
            } else {
                DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
                t.setMaxFileLength(maxDataFileLength);
                dm = t;
            }
            if (logIndexChanges) {
                recover(dm);
            }
            dataManagers.put(name, dm);
        }
        return dm;
    }

    public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
        IndexManager im = indexManagers.get(name);
        if (im == null) {
            im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
            indexManagers.put(name, im);
        }
        return im;
    }

    private void recover(final DataManager dm) throws IOException {
        dm.recoverRedoItems(new RedoListener() {
            public void onRedoItem(StoreLocation item, Object o) throws Exception {
                RedoStoreIndexItem redo = (RedoStoreIndexItem)o;
                // IndexManager im = getIndexManager(dm, redo.getIndexName());
                IndexManager im = getIndexManager(dm, dm.getName());
                im.redo(redo);
            }
        });
    }

    public synchronized boolean isLogIndexChanges() {
        return logIndexChanges;
    }

    public synchronized void setLogIndexChanges(boolean logIndexChanges) {
        this.logIndexChanges = logIndexChanges;
    }

    /**
     * @return the maxDataFileLength
     */
    public synchronized long getMaxDataFileLength() {
        return maxDataFileLength;
    }

    /**
     * @param maxDataFileLength the maxDataFileLength to set
     */
    public synchronized void setMaxDataFileLength(long maxDataFileLength) {
        this.maxDataFileLength = maxDataFileLength;
    }

    /**
     * @return the default index type
     */
    public synchronized String getIndexTypeAsString() {
        return persistentIndex ? "PERSISTENT" : "VM";
    }

    /**
     * Set the default index type
     * 
     * @param type "PERSISTENT" or "VM"
     */
    public synchronized void setIndexTypeAsString(String type) {
        if (type.equalsIgnoreCase("VM")) {
            persistentIndex = false;
        } else {
            persistentIndex = true;
        }
    }
    
    public boolean isPersistentIndex() {
		return persistentIndex;
	}

	public void setPersistentIndex(boolean persistentIndex) {
		this.persistentIndex = persistentIndex;
	}
	

    public synchronized boolean isUseAsyncDataManager() {
        return useAsyncDataManager;
    }

    public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
        this.useAsyncDataManager = useAsyncWriter;
    }

    /**
     * @return size of store
     * @see org.apache.activemq.kaha.Store#size()
     */
    public long size(){
        return storeSize.get();
    }

    public String getDefaultContainerName() {
        return defaultContainerName;
    }

    public void setDefaultContainerName(String defaultContainerName) {
        this.defaultContainerName = defaultContainerName;
    }

    public synchronized void initialize() throws IOException {
        if (closed) {
            throw new IOException("Store has been closed.");
        }
        if (!initialized) {       
            LOG.info("Kaha Store using data directory " + directory);
            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
            lock();
            DataManager defaultDM = getDataManager(defaultContainerName);
            IndexManager rootIndexManager = getIndexManager(defaultDM, defaultContainerName);
            IndexItem mapRoot = new IndexItem();
            IndexItem listRoot = new IndexItem();
            if (rootIndexManager.isEmpty()) {
                mapRoot.setOffset(0);
                rootIndexManager.storeIndex(mapRoot);
                listRoot.setOffset(IndexItem.INDEX_SIZE);
                rootIndexManager.storeIndex(listRoot);
                rootIndexManager.setLength(IndexItem.INDEX_SIZE * 2);
            } else {
                mapRoot = rootIndexManager.getIndex(0);
                listRoot = rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
            }
            initialized = true;
            mapsContainer = new IndexRootContainer(mapRoot, rootIndexManager, defaultDM);
            listsContainer = new IndexRootContainer(listRoot, rootIndexManager, defaultDM);
            /**
             * Add interest in data files - then consolidate them
             */
            generateInterestInMapDataFiles();
            generateInterestInListDataFiles();
            for (Iterator<DataManager> i = dataManagers.values().iterator(); i.hasNext();) {
                DataManager dm = i.next();
                dm.consolidateDataFiles();
            }
        }
    }

    private void lock() throws IOException {
        synchronized (LOCKSET_MONITOR) {
            if (!DISABLE_LOCKING && directory != null && lock == null) {
                String key = getPropertyKey();
                String property = System.getProperty(key);
                if (null == property) {
                    if (!BROKEN_FILE_LOCK) {
                        lock = lockFile.getChannel().tryLock(0, lockFile.getChannel().size(), false);
                        if (lock == null) {
                            throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + "  is already opened by another application");
                        } else
                            System.setProperty(key, new Date().toString());
                    }
                } else { //already locked
                    throw new StoreLockedExcpetion("Kaha Store " + directory.getName() + " is already opened by this application.");
                }
            }
        }
    }

    private void unlock() throws IOException {
        synchronized (LOCKSET_MONITOR) {
            if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
                System.getProperties().remove(getPropertyKey());
                if (lock.isValid()) {
                    lock.release();
                }
                lock = null;
            }
        }
    }


    private String getPropertyKey() throws IOException {
        return getClass().getName() + ".lock." + directory.getCanonicalPath();
    }

    /**
     * scans the directory and builds up the IndexManager and DataManager
     * 
     * @throws IOException if there is a problem accessing an index or data file
     */
    private void generateInterestInListDataFiles() throws IOException {
        for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            DataManager dm = getDataManager(id.getDataContainerName());
            IndexManager im = getIndexManager(dm, id.getDataContainerName());
            IndexItem theRoot = listsContainer.getRoot(im, id);
            long nextItem = theRoot.getNextItem();
            while (nextItem != Item.POSITION_NOT_SET) {
                IndexItem item = im.getIndex(nextItem);
                item.setOffset(nextItem);
                dm.addInterestInFile(item.getKeyFile());
                dm.addInterestInFile(item.getValueFile());
                nextItem = item.getNextItem();
            }
        }
    }

    /**
     * scans the directory and builds up the IndexManager and DataManager
     * 
     * @throws IOException if there is a problem accessing an index or data file
     */
    private void generateInterestInMapDataFiles() throws IOException {
        for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
            ContainerId id = (ContainerId)i.next();
            DataManager dm = getDataManager(id.getDataContainerName());
            IndexManager im = getIndexManager(dm, id.getDataContainerName());
            IndexItem theRoot = mapsContainer.getRoot(im, id);
            long nextItem = theRoot.getNextItem();
            while (nextItem != Item.POSITION_NOT_SET) {
                IndexItem item = im.getIndex(nextItem);
                item.setOffset(nextItem);
                dm.addInterestInFile(item.getKeyFile());
                dm.addInterestInFile(item.getValueFile());
                nextItem = item.getNextItem();
            }

        }
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ KahaStore.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.