home | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (HashIndex.java)

This example ActiveMQ source code file (HashIndex.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

file, hashbin, hashbin, hashentry, hashentry, hashindex, hashpage, hashpage, io, ioexception, ioexception, pages, runtimeexception, storeentry, string, util

The ActiveMQ HashIndex.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.kaha.impl.index.hash;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.Index;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * BTree implementation
 * 
 * 
 */
public class HashIndex implements Index, HashIndexMBean {
    public static final int DEFAULT_PAGE_SIZE;
    public static final int DEFAULT_KEY_SIZE;
    public static final int DEFAULT_BIN_SIZE;
    public static final int MAXIMUM_CAPACITY;
    public static final int DEFAULT_LOAD_FACTOR;
    private static final int LOW_WATER_MARK=1024*16;
    private static final String NAME_PREFIX = "hash-index-";
    private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
    private final String name;
    private File directory;
    private File file;
    private RandomAccessFile indexFile;
    private IndexManager indexManager;
    private int pageSize = DEFAULT_PAGE_SIZE;
    private int keySize = DEFAULT_KEY_SIZE;
    private int numberOfBins = DEFAULT_BIN_SIZE;
    private int keysPerPage = this.pageSize /this.keySize;
    private DataByteArrayInputStream dataIn;
    private DataByteArrayOutputStream dataOut;
    private byte[] readBuffer;
    private HashBin[] bins;
    private Marshaller keyMarshaller;
    private long length;
    private LinkedList<HashPage> freeList = new LinkedList();
    private AtomicBoolean loaded = new AtomicBoolean();
    private LRUCache<Long, HashPage> pageCache;
    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
    private int pageCacheSize = 10;
    private int size;
    private int highestSize=0;
    private int activeBins;
    private int threshold;
    private int maximumCapacity=MAXIMUM_CAPACITY;
    private int loadFactor=DEFAULT_LOAD_FACTOR;
    
    
    /**
     * Constructor
     * 
     * @param directory
     * @param name
     * @param indexManager
     * @throws IOException
     */
    public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
        this.directory = directory;
        this.name = name;
        this.indexManager = indexManager;
        openIndexFile();
        pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
    }

    /**
     * Set the marshaller for key objects
     * 
     * @param marshaller
     */
    public synchronized void setKeyMarshaller(Marshaller marshaller) {
        this.keyMarshaller = marshaller;
    }

    /**
     * @return the keySize
     */
    public synchronized int getKeySize() {
        return this.keySize;
    }

    /**
     * @param keySize the keySize to set
     */
    public synchronized void setKeySize(int keySize) {
        this.keySize = keySize;
        if (loaded.get()) {
            throw new RuntimeException("Pages already loaded - can't reset key size");
        }
    }

    /**
     * @return the pageSize
     */
    public synchronized int getPageSize() {
        return this.pageSize;
    }

    /**
     * @param pageSize the pageSize to set
     */
    public synchronized void setPageSize(int pageSize) {
        if (loaded.get() && pageSize != this.pageSize) {
            throw new RuntimeException("Pages already loaded - can't reset page size");
        }
        this.pageSize = pageSize;
    }
    
    /**
     * @return number of bins
     */
    public int getNumberOfBins() {
        return this.numberOfBins;
    }

    /**
     * @param numberOfBins
     */
    public void setNumberOfBins(int numberOfBins) {
        if (loaded.get() && numberOfBins != this.numberOfBins) {
            throw new RuntimeException("Pages already loaded - can't reset bin size");
        }
        this.numberOfBins = numberOfBins;
    }

    /**
     * @return the enablePageCaching
     */
    public synchronized boolean isEnablePageCaching() {
        return this.enablePageCaching;
    }

    /**
     * @param enablePageCaching the enablePageCaching to set
     */
    public synchronized void setEnablePageCaching(boolean enablePageCaching) {
        this.enablePageCaching = enablePageCaching;
    }

    /**
     * @return the pageCacheSize
     */
    public synchronized int getPageCacheSize() {
        return this.pageCacheSize;
    }

    /**
     * @param pageCacheSize the pageCacheSize to set
     */
    public synchronized void setPageCacheSize(int pageCacheSize) {
        this.pageCacheSize = pageCacheSize;
        pageCache.setMaxCacheSize(pageCacheSize);
    }

    public synchronized boolean isTransient() {
        return false;
    }
    
    /**
     * @return the threshold
     */
    public int getThreshold() {
        return threshold;
    }

    /**
     * @param threshold the threshold to set
     */
    public void setThreshold(int threshold) {
        this.threshold = threshold;
    }

    /**
     * @return the loadFactor
     */
    public int getLoadFactor() {
        return loadFactor;
    }

    /**
     * @param loadFactor the loadFactor to set
     */
    public void setLoadFactor(int loadFactor) {
        this.loadFactor = loadFactor;
    }
    
    /**
     * @return the maximumCapacity
     */
    public int getMaximumCapacity() {
        return maximumCapacity;
    }

    /**
     * @param maximumCapacity the maximumCapacity to set
     */
    public void setMaximumCapacity(int maximumCapacity) {
        this.maximumCapacity = maximumCapacity;
    }
    
    public synchronized int getSize() {
        return size;
    }
    
    public synchronized int getActiveBins(){
        return activeBins;
    }

    public synchronized void load() {
        if (loaded.compareAndSet(false, true)) {
            int capacity = 1;
            while (capacity < numberOfBins) {
                capacity <<= 1;
            }
            this.bins = new HashBin[capacity];
            this.numberOfBins=capacity;
            threshold = calculateThreashold();
            keysPerPage = pageSize / keySize;
            dataIn = new DataByteArrayInputStream();
            dataOut = new DataByteArrayOutputStream(pageSize);
            readBuffer = new byte[pageSize];
            try {
                openIndexFile();
                if (indexFile.length() > 0) {
                    doCompress();
                }
            } catch (IOException e) {
                LOG.error("Failed to load index ", e);
                throw new RuntimeException(e);
            }
        }
    }    

    public synchronized void unload() throws IOException {
        if (loaded.compareAndSet(true, false)) {
            if (indexFile != null) {
                indexFile.close();
                indexFile = null;
                freeList.clear();
                pageCache.clear();
                bins = new HashBin[bins.length];
            }
        }
    }

    public synchronized void store(Object key, StoreEntry value) throws IOException {
        load();
        HashEntry entry = new HashEntry();
        entry.setKey((Comparable)key);
        entry.setIndexOffset(value.getOffset());
        if (!getBin(key).put(entry)) {
            this.size++;
        }
        if (this.size >= this.threshold) {
            resize(2*bins.length);
        }
        if(this.size > this.highestSize) {
            this.highestSize=this.size;
        }
    }

    public synchronized StoreEntry get(Object key) throws IOException {
        load();
        HashEntry entry = new HashEntry();
        entry.setKey((Comparable)key);
        HashEntry result = getBin(key).find(entry);
        return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
    }

    public synchronized StoreEntry remove(Object key) throws IOException {
        load();
        StoreEntry result = null;
        HashEntry entry = new HashEntry();
        entry.setKey((Comparable)key);
        HashEntry he = getBin(key).remove(entry);
        if (he != null) {
            this.size--;
            result = this.indexManager.getIndex(he.getIndexOffset());
        }
        if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size *2)) {
            int newSize = this.size/this.keysPerPage;
            newSize = Math.max(128, newSize);
            this.highestSize=0;
            resize(newSize);
            
        }
        return result;
    }

    public synchronized boolean containsKey(Object key) throws IOException {
        return get(key) != null;
    }

    public synchronized void clear() throws IOException {
        unload();
        delete();
        openIndexFile();
        load();
    }

    public synchronized void delete() throws IOException {
        unload();
        if (file.exists()) {
            file.delete();
        }
        length = 0;
    }

    HashPage lookupPage(long pageId) throws IOException {
        HashPage result = null;
        if (pageId >= 0) {
            result = getFromCache(pageId);
            if (result == null) {
                result = getFullPage(pageId);
                if (result != null) {
                    if (result.isActive()) {
                        addToCache(result);
                    } else {
                        throw new IllegalStateException("Trying to access an inactive page: " + pageId);
                    }
                }
            }
        }
        return result;
    }

    HashPage createPage(int binId) throws IOException {
        HashPage result = getNextFreePage();
        if (result == null) {  
            // allocate one
            result = new HashPage(keysPerPage);
            result.setId(length);
            result.setBinId(binId);
            writePageHeader(result);
            length += pageSize;
            indexFile.seek(length);
            indexFile.write(HashEntry.NOT_SET);
        }
        addToCache(result);
        return result;
    }

    void releasePage(HashPage page) throws IOException {
        removeFromCache(page);
        page.reset();
        page.setActive(false);
        writePageHeader(page);
        freeList.add(page);
    }

    private HashPage getNextFreePage() throws IOException {
        HashPage result = null;
        if(!freeList.isEmpty()) {
            result = freeList.removeFirst();
            result.setActive(true);
            result.reset();
            writePageHeader(result);
        }
        return result;
    }

    void writeFullPage(HashPage page) throws IOException {
        dataOut.reset();
        page.write(keyMarshaller, dataOut);
        if (dataOut.size() > pageSize) {
            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
        }
        indexFile.seek(page.getId());
        indexFile.write(dataOut.getData(), 0, dataOut.size());
    }

    void writePageHeader(HashPage page) throws IOException {
        dataOut.reset();
        page.writeHeader(dataOut);
        indexFile.seek(page.getId());
        indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
    }

    HashPage getFullPage(long id) throws IOException {
        indexFile.seek(id);
        indexFile.readFully(readBuffer, 0, pageSize);
        dataIn.restart(readBuffer);
        HashPage page = new HashPage(keysPerPage);
        page.setId(id);
        page.read(keyMarshaller, dataIn);
        return page;
    }

    HashPage getPageHeader(long id) throws IOException {
        indexFile.seek(id);
        indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
        dataIn.restart(readBuffer);
        HashPage page = new HashPage(keysPerPage);
        page.setId(id);
        page.readHeader(dataIn);
        return page;
    }

    void addToBin(HashPage page) throws IOException {
        int index = page.getBinId();
        if (index >= this.bins.length) {
            resize(index+1);
        }
        HashBin bin = getBin(index);
        bin.addHashPageInfo(page.getId(), page.getPersistedSize());
    }

    private HashBin getBin(int index) {
        
        HashBin result = bins[index];
        if (result == null) {
            result = new HashBin(this, index, pageSize / keySize);
            bins[index] = result;
            activeBins++;
        }
        return result;
    }

    private void openIndexFile() throws IOException {
        if (indexFile == null) {
            file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
            IOHelper.mkdirs(file.getParentFile());
            indexFile = new RandomAccessFile(file, "rw");
        }
    }
    
    private HashBin getBin(Object key) {
        int hash = hash(key);
        int i = indexFor(hash, bins.length);
        return getBin(i);
    }

    private HashPage getFromCache(long pageId) {
        HashPage result = null;
        if (enablePageCaching) {
            result = pageCache.get(pageId);
        }
        return result;
    }

    private void addToCache(HashPage page) {
        if (enablePageCaching) {
            pageCache.put(page.getId(), page);
        }
    }

    private void removeFromCache(HashPage page) {
        if (enablePageCaching) {
            pageCache.remove(page.getId());
        }
    }
    
    private void doLoad() throws IOException {
        long offset = 0;
        if (loaded.compareAndSet(false, true)) {
            while ((offset + pageSize) <= indexFile.length()) {
                indexFile.seek(offset);
                indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
                dataIn.restart(readBuffer);
                HashPage page = new HashPage(keysPerPage);
                page.setId(offset);
                page.readHeader(dataIn);
                if (!page.isActive()) {
                    page.reset();
                    freeList.add(page);
                } else {
                    addToBin(page);
                    size+=page.size();
                }
                offset += pageSize;
            }
            length=offset;
        }
    }
    
    private void doCompress() throws IOException {
        String backFileName = name + "-COMPRESS";
        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
        backIndex.setKeyMarshaller(keyMarshaller);
        backIndex.setKeySize(getKeySize());
        backIndex.setNumberOfBins(getNumberOfBins());
        backIndex.setPageSize(getPageSize());
        backIndex.load();
        File backFile = backIndex.file;
        long offset = 0;
        while ((offset + pageSize) <= indexFile.length()) {
            indexFile.seek(offset);
            HashPage page = getFullPage(offset);
            if (page.isActive()) {
                for (HashEntry entry : page.getEntries()) {
                    backIndex.getBin(entry.getKey()).put(entry);
                    backIndex.size++;
                }
            }
            page=null;
            offset += pageSize;
        }
        backIndex.unload();
      
        unload();
        IOHelper.deleteFile(file);
        IOHelper.copyFile(backFile, file);
        IOHelper.deleteFile(backFile);
        openIndexFile();
        doLoad();
    }
    
    private void resize(int newCapacity) throws IOException {
        if (bins.length < getMaximumCapacity()) {
            if (newCapacity != numberOfBins) {
                int capacity = 1;
                while (capacity < newCapacity) {
                    capacity <<= 1;
                }
                newCapacity=capacity;
                if (newCapacity != numberOfBins) {
                    LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
                    String backFileName = name + "-REISZE";
                    HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
                    backIndex.setKeyMarshaller(keyMarshaller);
                    backIndex.setKeySize(getKeySize());
                    backIndex.setNumberOfBins(newCapacity);
                    backIndex.setPageSize(getPageSize());
                    backIndex.load();
                    File backFile = backIndex.file;
                    long offset = 0;
                    while ((offset + pageSize) <= indexFile.length()) {
                        indexFile.seek(offset);
                        HashPage page = getFullPage(offset);
                        if (page.isActive()) {
                            for (HashEntry entry : page.getEntries()) {
                                backIndex.getBin(entry.getKey()).put(entry);
                                backIndex.size++;
                            }
                        }
                        page=null;
                        offset += pageSize;
                    }
                    backIndex.unload();
                  
                    unload();
                    IOHelper.deleteFile(file);
                    IOHelper.copyFile(backFile, file);
                    IOHelper.deleteFile(backFile);
                    setNumberOfBins(newCapacity);
                    bins = new HashBin[newCapacity];
                    threshold = calculateThreashold();
                    openIndexFile();
                    doLoad();
                }
            }
        }else {
            threshold = Integer.MAX_VALUE;
            return;
        }
    }
    
    private int calculateThreashold() {
        return (int)(bins.length * loadFactor);
    }
    
    
    public String toString() {
        String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
        return str;
    }
      

    static int hash(Object x) {
        int h = x.hashCode();
        h += ~(h << 9);
        h ^= h >>> 14;
        h += h << 4;
        h ^= h >>> 10;
        return h;
    }

    static int indexFor(int h, int length) {
        return h & (length - 1);
    }

    static {
        DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
        DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
        DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
        MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
        DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ HashIndex.java source code file:

new blog posts

 

Copyright 1998-2016 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.