alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (IndexManager.java)

This example ActiveMQ source code file (IndexManager.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

atomiclong, file, indexitem, indexitem, indexmanager, indexmanager, io, ioexception, ioexception, name_prefix, randomaccessfile, storeindexreader, storeindexwriter, string, string

The ActiveMQ IndexManager.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.kaha.impl.index;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Optimized Store reader
 * 
 * 
 */
public final class IndexManager {

    public static final String NAME_PREFIX = "index-";
    private static final Logger LOG = LoggerFactory.getLogger(IndexManager.class);
    private final String name;
    private File directory;
    private File file;
    private RandomAccessFile indexFile;
    private StoreIndexReader reader;
    private StoreIndexWriter writer;
    private DataManager redoLog;
    private String mode;
    private long length;
    private IndexItem firstFree;
    private IndexItem lastFree;
    private boolean dirty;
    private final AtomicLong storeSize;
    private int freeSize = 0;

    public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
        this.directory = directory;
        this.name = name;
        this.mode = mode;
        this.redoLog = redoLog;
        this.storeSize=storeSize;
        initialize();
    }

    public synchronized boolean isEmpty() {
        return lastFree == null && length == 0;
    }

    public synchronized IndexItem getIndex(long offset) throws IOException {
        IndexItem result = null;
        if (offset >= 0) {
            result = reader.readItem(offset);
        }
        return result;
    }

    public synchronized IndexItem refreshIndex(IndexItem item) throws IOException {
        reader.updateIndexes(item);
        return item;
    }

    public synchronized void freeIndex(IndexItem item) throws IOException {
        item.reset();
        item.setActive(false);
        if (lastFree == null) {
            firstFree = item;
            lastFree = item;
        } else {
            lastFree.setNextItem(item.getOffset());
            if (lastFree.equals(firstFree)) {
                firstFree=new IndexItem();
                firstFree.copyIndex(lastFree);
                writer.updateIndexes(firstFree);
            }
            writer.updateIndexes(lastFree);
            lastFree=item;
        }
        writer.updateIndexes(item);
        freeSize++;
        dirty = true;
    }

    public synchronized void storeIndex(IndexItem index) throws IOException {
        writer.storeItem(index);
        dirty = true;
    }

    public synchronized void updateIndexes(IndexItem index) throws IOException {
        try {
            writer.updateIndexes(index);
        } catch (Throwable e) {
            LOG.error(name + " error updating indexes ", e);
        }
        dirty = true;
    }

    public synchronized void redo(final RedoStoreIndexItem redo) throws IOException {
        writer.redoStoreItem(redo);
        dirty = true;
    }

    public synchronized IndexItem createNewIndex() throws IOException {
        IndexItem result = getNextFreeIndex();
        if (result == null) {
            // allocate one
            result = new IndexItem();
            result.setOffset(length);
            length += IndexItem.INDEX_SIZE;
            storeSize.addAndGet(IndexItem.INDEX_SIZE);
        }
        return result;
    }

    public synchronized void close() throws IOException {
        if (indexFile != null) {
            indexFile.close();
            indexFile = null;
        }
    }

    public synchronized void force() throws IOException {
        if (indexFile != null && dirty) {
            indexFile.getFD().sync();
            dirty = false;
        }
    }

    public synchronized boolean delete() throws IOException {
        firstFree = null;
        lastFree = null;
        if (indexFile != null) {
            indexFile.close();
            indexFile = null;
        }
        return file.delete();
    }

    private synchronized IndexItem getNextFreeIndex() throws IOException {
        IndexItem result = null;
        if (firstFree != null) {
            if (firstFree.equals(lastFree)) {
                result = firstFree;
                firstFree = null;
                lastFree = null;
            } else {
                result = firstFree;
                firstFree = getIndex(firstFree.getNextItem());
                if (firstFree == null) {
                    lastFree = null;
                }
            }
            result.reset();
            writer.updateIndexes(result);
            freeSize--;
        }
        return result;
    }

    synchronized long getLength() {
        return length;
    }
    
    public final long size() {
        return length;
    }

    public synchronized void setLength(long value) {
        this.length = value;
        storeSize.addAndGet(length);
    }
    
    public synchronized FileLock getLock() throws IOException {
        return indexFile.getChannel().tryLock(0, indexFile.getChannel().size(), false);
    }


    public String toString() {
        return "IndexManager:(" + NAME_PREFIX + name + ")";
    }

    protected void initialize() throws IOException {
        file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name) );
        IOHelper.mkdirs(file.getParentFile());        
        indexFile = new RandomAccessFile(file, mode);
        reader = new StoreIndexReader(indexFile);
        writer = new StoreIndexWriter(indexFile, name, redoLog);
        long offset = 0;
        while ((offset + IndexItem.INDEX_SIZE) <= indexFile.length()) {
            IndexItem index = reader.readItem(offset);
            if (!index.isActive()) {
                index.reset();
                if (lastFree != null) {
                    lastFree.setNextItem(index.getOffset());
                    updateIndexes(lastFree);
                    lastFree = index;
                } else {
                    lastFree = index;
                    firstFree = index;
                }
               freeSize++;
            }
            offset += IndexItem.INDEX_SIZE;
        }
        length = offset;
        storeSize.addAndGet(length);
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ IndexManager.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.