alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (MapContainerImpl.java)

This example ActiveMQ source code file (MapContainerImpl.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

failed, failed, hashindex, indexitem, indexitem, io, ioexception, ioexception, mapcontainerimpl, object, object, runtimeexception, runtimestoreexception, storeentry, storelocation, util

The ActiveMQ MapContainerImpl.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.kaha.impl.container;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.IndexMBean;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.Index;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.kaha.impl.index.VMIndex;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implementation of a MapContainer
 * 
 * 
 */
public final class MapContainerImpl extends BaseContainerImpl implements MapContainer {

    private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class);
    protected Index index;
    protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
    protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
    protected File directory;
    private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
    private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
    private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
    private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
    private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;

    public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
                            DataManager dataManager, boolean persistentIndex) {
        super(id, root, indexManager, dataManager, persistentIndex);
        this.directory = directory;
    }

    public synchronized void init() {
        super.init();
        if (index == null) {
            if (persistentIndex) {
                String name = containerId.getDataContainerName() + "_" + containerId.getKey();
                try {
                    HashIndex hashIndex = new HashIndex(directory, name, indexManager);
                    hashIndex.setNumberOfBins(getIndexBinSize());
                    hashIndex.setKeySize(getIndexKeySize());
                    hashIndex.setPageSize(getIndexPageSize());
                    hashIndex.setMaximumCapacity(getIndexMaxBinSize());
                    hashIndex.setLoadFactor(getIndexLoadFactor());
                    this.index = hashIndex;
                } catch (IOException e) {
                    LOG.error("Failed to create HashIndex", e);
                    throw new RuntimeException(e);
                }
            } else {
                this.index = new VMIndex(indexManager);
            }
        }
        index.setKeyMarshaller(keyMarshaller);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#load()
     */
    public synchronized void load() {
        checkClosed();
        if (!loaded) {
            if (!loaded) {
                loaded = true;
                try {
                    init();
                    index.load();
                    long nextItem = root.getNextItem();
                    while (nextItem != Item.POSITION_NOT_SET) {
                        IndexItem item = indexManager.getIndex(nextItem);
                        StoreLocation data = item.getKeyDataItem();
                        Object key = dataManager.readItem(keyMarshaller, data);
                        if (index.isTransient()) {
                            index.store(key, item);
                        }
                        indexList.add(item);
                        nextItem = item.getNextItem();
                    }
                } catch (IOException e) {
                    LOG.error("Failed to load container " + getId(), e);
                    throw new RuntimeStoreException(e);
                }
            }
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#unload()
     */
    public synchronized void unload() {
        checkClosed();
        if (loaded) {
            loaded = false;
            try {
                index.unload();
            } catch (IOException e) {
                LOG.warn("Failed to unload the index", e);
            }
            indexList.clear();
        }
    }

    public synchronized void delete() {
        unload();
        try {
            index.delete();
        } catch (IOException e) {
            LOG.warn("Failed to unload the index", e);
        }
    }


    public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
        checkClosed();
        this.keyMarshaller = keyMarshaller;
        if (index != null) {
            index.setKeyMarshaller(keyMarshaller);
        }
    }

    public synchronized void setValueMarshaller(Marshaller valueMarshaller) {
        checkClosed();
        this.valueMarshaller = valueMarshaller;
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#size()
     */
    public synchronized int size() {
        load();
        return indexList.size();
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#isEmpty()
     */
    public synchronized boolean isEmpty() {
        load();
        return indexList.isEmpty();
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
     */
    public synchronized boolean containsKey(Object key) {
        load();
        try {
            return index.containsKey(key);
        } catch (IOException e) {
            LOG.error("Failed trying to find key: " + key, e);
            throw new RuntimeException(e);
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
     */
    public synchronized Object get(Object key) {
        load();
        Object result = null;
        StoreEntry item = null;
        try {
            item = index.get(key);
        } catch (IOException e) {
            LOG.error("Failed trying to get key: " + key, e);
            throw new RuntimeException(e);
        }
        if (item != null) {
            result = getValue(item);
        }
        return result;
    }

    /**
     * Get the StoreEntry associated with the key
     * 
     * @param key
     * @return the StoreEntry
     */
    public synchronized StoreEntry getEntry(Object key) {
        load();
        StoreEntry item = null;
        try {
            item = index.get(key);
        } catch (IOException e) {
            LOG.error("Failed trying to get key: " + key, e);
            throw new RuntimeException(e);
        }
        return item;
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
     */
    public synchronized boolean containsValue(Object o) {
        load();
        boolean result = false;
        if (o != null) {
            IndexItem item = indexList.getFirst();
            while (item != null) {
                Object value = getValue(item);
                if (value != null && value.equals(o)) {
                    result = true;
                    break;
                }
                item = indexList.getNextEntry(item);
            }
        }
        return result;
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
     */
    public synchronized void putAll(Map t) {
        load();
        if (t != null) {
            for (Iterator i = t.entrySet().iterator(); i.hasNext();) {
                Map.Entry entry = (Map.Entry)i.next();
                put(entry.getKey(), entry.getValue());
            }
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#keySet()
     */
    public synchronized Set keySet() {
        load();
        return new ContainerKeySet(this);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#values()
     */
    public synchronized Collection values() {
        load();
        return new ContainerValueCollection(this);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#entrySet()
     */
    public synchronized Set entrySet() {
        load();
        return new ContainerEntrySet(this);
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
     *      java.lang.Object)
     */
    public synchronized Object put(Object key, Object value) {
        load();
        Object result = remove(key);
        IndexItem item = write(key, value);
        try {
            index.store(key, item);
        } catch (IOException e) {
            LOG.error("Failed trying to insert key: " + key, e);
            throw new RuntimeException(e);
        }
        indexList.add(item);
        return result;
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
     */
    public synchronized Object remove(Object key) {
        load();
        try {
            Object result = null;
            IndexItem item = (IndexItem)index.remove(key);
            if (item != null) {
                // refresh the index
                item = (IndexItem)indexList.refreshEntry(item);
                result = getValue(item);
                IndexItem prev = indexList.getPrevEntry(item);
                IndexItem next = indexList.getNextEntry(item);
                indexList.remove(item);
                delete(item, prev, next);
            }
            return result;
        } catch (IOException e) {
            LOG.error("Failed trying to remove key: " + key, e);
            throw new RuntimeException(e);
        }
    }

    public synchronized boolean removeValue(Object o) {
        load();
        boolean result = false;
        if (o != null) {
            IndexItem item = indexList.getFirst();
            while (item != null) {
                Object value = getValue(item);
                if (value != null && value.equals(o)) {
                    result = true;
                    // find the key
                    Object key = getKey(item);
                    if (key != null) {
                        remove(key);
                    }
                    break;
                }
                item = indexList.getNextEntry(item);
            }
        }
        return result;
    }

    protected synchronized void remove(IndexItem item) {
        Object key = getKey(item);
        if (key != null) {
            remove(key);
        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.activemq.kaha.MapContainer#clear()
     */
    public synchronized void clear() {
        checkClosed();
        loaded = true;
        init();
        if (index != null) {
            try {
                index.clear();
            } catch (IOException e) {
                LOG.error("Failed trying clear index", e);
                throw new RuntimeException(e);
            }
        }
        super.clear();
        doClear();
    }

    /**
     * Add an entry to the Store Map
     * 
     * @param key
     * @param value
     * @return the StoreEntry associated with the entry
     */
    public synchronized StoreEntry place(Object key, Object value) {
        load();
        try {
            remove(key);
            IndexItem item = write(key, value);
            index.store(key, item);
            indexList.add(item);
            return item;
        } catch (IOException e) {
            LOG.error("Failed trying to place key: " + key, e);
            throw new RuntimeException(e);
        }
    }

    /**
     * Remove an Entry from ther Map
     * 
     * @param entry
     * @throws IOException
     */
    public synchronized void remove(StoreEntry entry) {
        load();
        IndexItem item = (IndexItem)entry;
        if (item != null) {
            Object key = getKey(item);
            try {
                index.remove(key);
            } catch (IOException e) {
                LOG.error("Failed trying to remove entry: " + entry, e);
                throw new RuntimeException(e);
            }
            IndexItem prev = indexList.getPrevEntry(item);
            IndexItem next = indexList.getNextEntry(item);
            indexList.remove(item);
            delete(item, prev, next);
        }
    }

    public synchronized StoreEntry getFirst() {
        load();
        return indexList.getFirst();
    }

    public synchronized StoreEntry getLast() {
        load();
        return indexList.getLast();
    }

    public synchronized StoreEntry getNext(StoreEntry entry) {
        load();
        IndexItem item = (IndexItem)entry;
        return indexList.getNextEntry(item);
    }

    public synchronized StoreEntry getPrevious(StoreEntry entry) {
        load();
        IndexItem item = (IndexItem)entry;
        return indexList.getPrevEntry(item);
    }

    public synchronized StoreEntry refresh(StoreEntry entry) {
        load();
        return indexList.getEntry(entry);
    }

    /**
     * Get the value from it's location
     * 
     * @param item
     * @return the value associated with the store entry
     */
    public synchronized Object getValue(StoreEntry item) {
        load();
        Object result = null;
        if (item != null) {
            try {
                // ensure this value is up to date
                // item=indexList.getEntry(item);
                StoreLocation data = item.getValueDataItem();
                result = dataManager.readItem(valueMarshaller, data);
            } catch (IOException e) {
                LOG.error("Failed to get value for " + item, e);
                throw new RuntimeStoreException(e);
            }
        }
        return result;
    }

    /**
     * Get the Key object from it's location
     * 
     * @param item
     * @return the Key Object associated with the StoreEntry
     */
    public synchronized Object getKey(StoreEntry item) {
        load();
        Object result = null;
        if (item != null) {
            try {
                StoreLocation data = item.getKeyDataItem();
                result = dataManager.readItem(keyMarshaller, data);
            } catch (IOException e) {
                LOG.error("Failed to get key for " + item, e);
                throw new RuntimeStoreException(e);
            }
        }
        return result;
    }

    protected IndexLinkedList getItemList() {
        return indexList;
    }

    protected synchronized IndexItem write(Object key, Object value) {
        IndexItem index = null;
        try {
            index = indexManager.createNewIndex();
            StoreLocation data = dataManager.storeDataItem(keyMarshaller, key);
            index.setKeyData(data);

            if (value != null) {
                data = dataManager.storeDataItem(valueMarshaller, value);
                index.setValueData(data);
            }
            IndexItem prev = indexList.getLast();
            prev = prev != null ? prev : indexList.getRoot();
            IndexItem next = indexList.getNextEntry(prev);
            prev.setNextItem(index.getOffset());
            index.setPreviousItem(prev.getOffset());
            updateIndexes(prev);
            if (next != null) {
                next.setPreviousItem(index.getOffset());
                index.setNextItem(next.getOffset());
                updateIndexes(next);
            }
            storeIndex(index);
        } catch (IOException e) {
            LOG.error("Failed to write " + key + " , " + value, e);
            throw new RuntimeStoreException(e);
        }
        return index;
    }

    public int getIndexBinSize() {
        return indexBinSize;
    }

    public void setIndexBinSize(int indexBinSize) {
        this.indexBinSize = indexBinSize;
    }

    public int getIndexKeySize() {
        return indexKeySize;
    }

    public void setIndexKeySize(int indexKeySize) {
        this.indexKeySize = indexKeySize;
    }

    public int getIndexPageSize() {
        return indexPageSize;
    }

    public void setIndexPageSize(int indexPageSize) {
        this.indexPageSize = indexPageSize;
    }
    
    public int getIndexLoadFactor() {
        return indexLoadFactor;
    }

    public void setIndexLoadFactor(int loadFactor) {
        this.indexLoadFactor = loadFactor;
    }

  
    public IndexMBean getIndexMBean() {
      return (IndexMBean) index;
    }
    public int getIndexMaxBinSize() {
        return indexMaxBinSize;
    }

    public void setIndexMaxBinSize(int maxBinSize) {
        this.indexMaxBinSize = maxBinSize;
    }
   

   
    public String toString() {
        load();
        StringBuffer buf = new StringBuffer();
        buf.append("{");
        Iterator i = entrySet().iterator();
        boolean hasNext = i.hasNext();
        while (hasNext) {
            Map.Entry e = (Entry) i.next();
            Object key = e.getKey();
            Object value = e.getValue();
            buf.append(key);
            buf.append("=");

            buf.append(value);
            hasNext = i.hasNext();
            if (hasNext)
                buf.append(", ");
        }
        buf.append("}");
        return buf.toString();
    }    
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ MapContainerImpl.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.