alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (HashIndex.java)

This example ActiveMQ source code file (HashIndex.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

default_minimum_bin_capacity, hashbin, hashbin, hashindex, io, ioexception, ioexception, key, key, marshaller, metadata, open_state, page, util, value, value

The ActiveMQ HashIndex.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kahadb.index;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller;

/**
 * BTree implementation
 * 
 * 
 */
public class HashIndex<Key,Value> implements Index {

    public static final int CLOSED_STATE = 1;
    public static final int OPEN_STATE = 2;


    private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);

    public static final int DEFAULT_BIN_CAPACITY;
    public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
    public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
    public static final int DEFAULT_LOAD_FACTOR;

    static {
        DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
        DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
        DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
        DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
    }

    private AtomicBoolean loaded = new AtomicBoolean();


    private int increaseThreshold;
    private int decreaseThreshold;

    // Where the bin page array starts at.
    private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
    private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;



    // Once binsActive/binCapacity reaches the loadFactor, then we need to
    // increase the capacity
    private int loadFactor = DEFAULT_LOAD_FACTOR;

    private PageFile pageFile;
    // This page holds the index metadata.
    private long pageId;

    static class Metadata {
        
        private Page<Metadata> page;
        
        // When the index is initializing or resizing.. state changes so that
        // on failure it can be properly recovered.
        private int state;
        private long binPageId;
        private int binCapacity = DEFAULT_BIN_CAPACITY;
        private int binsActive;
        private int size;

        
        public void read(DataInput is) throws IOException {
            state = is.readInt();
            binPageId = is.readLong();
            binCapacity = is.readInt();
            size = is.readInt();
            binsActive = is.readInt();
        }
        public void write(DataOutput os) throws IOException {
            os.writeInt(state);
            os.writeLong(binPageId);
            os.writeInt(binCapacity);
            os.writeInt(size);
            os.writeInt(binsActive);
        }
        
        static class Marshaller extends VariableMarshaller<Metadata> {
            public Metadata readPayload(DataInput dataIn) throws IOException {
                Metadata rc = new Metadata();
                rc.read(dataIn);
                return rc;
            }

            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
                object.write(dataOut);
            }
        }
    }
    
    private Metadata metadata = new Metadata();
    
    private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
    private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller(this);
    private Marshaller<Key> keyMarshaller;
    private Marshaller<Value> valueMarshaller;

    
    /**
     * Constructor
     * 
     * @param directory
     * @param name
     * @param indexManager
     * @param numberOfBins
     * @throws IOException
     */
    public HashIndex(PageFile pageFile, long pageId) throws IOException {
        this.pageFile = pageFile;
        this.pageId = pageId;
    }

    public synchronized void load(Transaction tx) throws IOException {
        if (loaded.compareAndSet(false, true)) {
            final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
            // Is this a brand new index?
            if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
                // We need to create the pages for the bins
                Page binPage = tx.allocate(metadata.binCapacity);
                metadata.binPageId = binPage.getPageId();
                metadata.page = metadataPage;
                metadataPage.set(metadata);
                clear(tx);

                // If failure happens now we can continue initializing the
                // the hash bins...
            } else {

                metadata = metadataPage.get();
                metadata.page = metadataPage;
                
                // If we did not have a clean shutdown...
                if (metadata.state == OPEN_STATE ) {
                    // Figure out the size and the # of bins that are
                    // active. Yeah This loads the first page of every bin. :(
                    // We might want to put this in the metadata page, but
                    // then that page would be getting updated on every write.
                    metadata.size = 0;
                    for (int i = 0; i < metadata.binCapacity; i++) {
                        int t = sizeOfBin(tx, i);
                        if (t > 0) {
                            metadata.binsActive++;
                        }
                        metadata.size += t;
                    }
                }
            }

            calcThresholds();

            metadata.state = OPEN_STATE;
            tx.store(metadataPage, metadataMarshaller, true);
            
            LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
        }
    }

    public synchronized void unload(Transaction tx) throws IOException {
        if (loaded.compareAndSet(true, false)) {
            metadata.state = CLOSED_STATE;
            tx.store(metadata.page, metadataMarshaller, true);
        }
    }

    private int sizeOfBin(Transaction tx, int index) throws IOException {
        return getBin(tx, index).size();
    }

    public synchronized Value get(Transaction tx, Key key) throws IOException {
        assertLoaded();
        return getBin(tx, key).get(key);
    }
    
    public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
        assertLoaded();
        return getBin(tx, key).containsKey(key);
    }

    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
        assertLoaded();
        HashBin<Key,Value> bin = getBin(tx, key);

        int originalSize = bin.size();
        Value result = bin.put(key,value);
        store(tx, bin);

        int newSize = bin.size();

        if (newSize != originalSize) {
            metadata.size++;
            if (newSize == 1) {
                metadata.binsActive++;
            }
        }

        if (metadata.binsActive >= this.increaseThreshold) {
            newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
            if(metadata.binCapacity!=newSize) {
                resize(tx, newSize);
            }
        }
        return result;
    }
    
    synchronized public Value remove(Transaction tx, Key key) throws IOException {
        assertLoaded();

        HashBin<Key,Value> bin = getBin(tx, key);
        int originalSize = bin.size();
        Value result = bin.remove(key);
        int newSize = bin.size();
        
        if (newSize != originalSize) {
            store(tx, bin);

            metadata.size--;
            if (newSize == 0) {
                metadata.binsActive--;
            }
        }

        if (metadata.binsActive <= this.decreaseThreshold) {
            newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
            if(metadata.binCapacity!=newSize) {
                resize(tx, newSize);
            }
        }
        return result;
    }
    

    public synchronized void clear(Transaction tx) throws IOException {
        assertLoaded();
        for (int i = 0; i < metadata.binCapacity; i++) {
            long pageId = metadata.binPageId + i;
            clearBinAtPage(tx, pageId);
        }
        metadata.size = 0;
        metadata.binsActive = 0;
    }
    
    public Iterator<Entry iterator(Transaction tx) throws IOException, UnsupportedOperationException {
        throw new UnsupportedOperationException();
    }


    /**
     * @param tx
     * @param pageId
     * @throws IOException
     */
    private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
        Page<HashBin page = tx.load(pageId, null);
        HashBin<Key, Value> bin = new HashBin();
        bin.setPage(page);
        page.set(bin);
        store(tx, bin);
    }

    public String toString() {
        String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
        return str;
    }

    // /////////////////////////////////////////////////////////////////
    // Implementation Methods
    // /////////////////////////////////////////////////////////////////

    private void assertLoaded() throws IllegalStateException {
        if( !loaded.get() ) {
            throw new IllegalStateException("The HashIndex is not loaded");
        }
    }

    public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
        tx.store(bin.getPage(), hashBinMarshaller, true);
    }

    // While resizing, the following contains the new resize data.
    
    private void resize(Transaction tx, final int newSize) throws IOException {
        LOG.debug("Resizing to: "+newSize);
        
        int resizeCapacity = newSize;
        long resizePageId = tx.allocate(resizeCapacity).getPageId();

        // In Phase 1 we copy the data to the new bins..
        // Initialize the bins..
        for (int i = 0; i < resizeCapacity; i++) {
            long pageId = resizePageId + i;
            clearBinAtPage(tx, pageId);
        }

        metadata.binsActive = 0;
        // Copy the data from the old bins to the new bins.
        for (int i = 0; i < metadata.binCapacity; i++) {
            
            HashBin<Key,Value> bin = getBin(tx, i);
            for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
                HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
                resizeBin.put(entry.getKey(), entry.getValue());
                store(tx, resizeBin);
                if( resizeBin.size() == 1) {
                    metadata.binsActive++;
                }
            }
        }
        
        // In phase 2 we free the old bins and switch the the new bins.
        tx.free(metadata.binPageId, metadata.binCapacity);
        
        metadata.binCapacity = resizeCapacity;
        metadata.binPageId = resizePageId;
        metadata.state = OPEN_STATE;
        tx.store(metadata.page, metadataMarshaller, true);
        calcThresholds();

        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);        
        resizeCapacity=0;
        resizePageId=0;
    }

    private void calcThresholds() {
        increaseThreshold = (metadata.binCapacity * loadFactor)/100;
        decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
    }
    
    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
        return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
    }

    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
        return getBin(tx, i, metadata.binPageId);
    }
    
    private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
        int i = indexFor(key, capacity);
        return getBin(tx, i, basePage);
    }

    private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
        Page<HashBin page = tx.load(basePage + i, hashBinMarshaller);
        HashBin<Key, Value> rc = page.get();
        rc.setPage(page);
        return rc;
    }

    int indexFor(Key x, int length) {
        return Math.abs(x.hashCode()%length);
    }

    // /////////////////////////////////////////////////////////////////
    // Property Accessors
    // /////////////////////////////////////////////////////////////////

    public Marshaller<Key> getKeyMarshaller() {
        return keyMarshaller;
    }

    /**
     * Set the marshaller for key objects
     * 
     * @param marshaller
     */
    public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
        this.keyMarshaller = marshaller;
    }

    public Marshaller<Value> getValueMarshaller() {
        return valueMarshaller;
    }
    /**
     * Set the marshaller for value objects
     * 
     * @param marshaller
     */
    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
        this.valueMarshaller = valueMarshaller;
    }
    
    /**
     * @return number of bins in the index
     */
    public int getBinCapacity() {
        return metadata.binCapacity;
    }

    /**
     * @param binCapacity
     */
    public void setBinCapacity(int binCapacity) {
        if (loaded.get() && binCapacity != metadata.binCapacity) {
            throw new RuntimeException("Pages already loaded - can't reset bin capacity");
        }
        metadata.binCapacity = binCapacity;
    }

    public boolean isTransient() {
        return false;
    }

    /**
     * @return the loadFactor
     */
    public int getLoadFactor() {
        return loadFactor;
    }

    /**
     * @param loadFactor the loadFactor to set
     */
    public void setLoadFactor(int loadFactor) {
        this.loadFactor = loadFactor;
    }

    /**
     * @return the maximumCapacity
     */
    public int setMaximumBinCapacity() {
        return maximumBinCapacity;
    }

    /**
     * @param maximumCapacity the maximumCapacity to set
     */
    public void setMaximumBinCapacity(int maximumCapacity) {
        this.maximumBinCapacity = maximumCapacity;
    }

    public synchronized int size(Transaction tx) {
        return metadata.size;
    }

    public synchronized int getActiveBins() {
        return metadata.binsActive;
    }

    public long getBinPageId() {
        return metadata.binPageId;
    }

    public PageFile getPageFile() {
        return pageFile;
    }

    public int getBinsActive() {
        return metadata.binsActive;
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ HashIndex.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.