alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (Transaction.java)

This example ActiveMQ source code file (Transaction.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

bytesequence, databytearrayinputstream, databytearrayoutputstream, io, ioexception, ioexception, iterator, page, page, pagewrite, sequenceset, suppresswarnings, suppresswarnings, t, t, util

The ActiveMQ Transaction.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kahadb.page;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.kahadb.page.PageFile.PageWrite;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;

/**
 * The class used to read/update a PageFile object.  Using a transaction allows you to
 * do multiple update operations in a single unit of work.
 */
public class Transaction implements Iterable<Page> {
    
    /**
     * The PageOverflowIOException occurs when a page write is requested
     * and it's data is larger than what would fit into a single page.
     */
    public class PageOverflowIOException extends IOException {
        public PageOverflowIOException(String message) {
            super(message);
        }
    }
    
    /**
     * The InvalidPageIOException is thrown if try to load/store a a page
     * with an invalid page id.
     */
    public class InvalidPageIOException extends IOException {
        private final long page;

        public InvalidPageIOException(String message, long page) {
            super(message);
            this.page = page;
        }

        public long getPage() {
            return page;
        }
    }    
    
    /**
     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
     * 
     * @param <T> The type of exceptions that operation will throw.
     */
    public interface Closure <T extends Throwable> {
        public void execute(Transaction tx) throws T;
    }

    /**
     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
     * 
     * @param <R> The type of result that the closure produces.
     * @param <T> The type of exceptions that operation will throw.
     */
    public interface CallableClosure<R, T extends Throwable> {
        public R execute(Transaction tx) throws T;
    }
    

    // The page file that this Transaction operates against.
    private final PageFile pageFile;
    // If this transaction is updating stuff.. this is the tx of 
    private long writeTransactionId=-1;
    // List of pages that this transaction has modified.
    private HashMap<Long, PageWrite> writes=new HashMap();
    // List of pages allocated in this transaction
    private final SequenceSet allocateList = new SequenceSet();
    // List of pages freed in this transaction
    private final SequenceSet freeList = new SequenceSet();

    Transaction(PageFile pageFile) {
        this.pageFile = pageFile;
    }

    /**
     * @return the page file that created this Transaction
     */
    public PageFile getPageFile() {
        return this.pageFile;
    }

    /** 
     * Allocates a free page that you can write data to.
     * 
     * @return a newly allocated page.  
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> Page allocate() throws IOException {
        return allocate(1);
    }

    /** 
     * Allocates a block of free pages that you can write data to.
     * 
     * @param count the number of sequential pages to allocate
     * @return the first page of the sequential set. 
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> Page allocate(int count) throws IOException {
        // TODO: we need to track allocated pages so that they can be returned if the 
        // transaction gets rolled back.
        Page<T> rc = pageFile.allocate(count);
        allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
        return rc;
    }

    /**
     * Frees up a previously allocated page so that it can be re-allocated again.
     * 
     * @param page the page to free up
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public void free(long pageId) throws IOException {
        free(load(pageId, null));
    }

    /**
     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
     * 
     * @param page the initial page of the sequence that will be getting freed
     * @param count the number of pages in the sequence
     * 
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public void free(long pageId, int count) throws IOException {
        free(load(pageId, null), count);
    }

    /**
     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
     * 
     * @param page the initial page of the sequence that will be getting freed
     * @param count the number of pages in the sequence
     * 
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> void free(Page page, int count) throws IOException {
        pageFile.assertLoaded();
        long offsetPage = page.getPageId();
        for (int i = 0; i < count; i++) {
            if (page == null) {
                page = load(offsetPage + i, null);
            }
            free(page);
            page = null;
        }
    }
    
    /**
     * Frees up a previously allocated page so that it can be re-allocated again.
     * 
     * @param page the page to free up
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> void free(Page page) throws IOException {
        pageFile.assertLoaded();

        // We may need loop to free up a page chain.
        while (page != null) {

            // Is it already free??
            if (page.getType() == Page.PAGE_FREE_TYPE) {
                return;
            }

            Page<T> next = null;
            if (page.getType() == Page.PAGE_PART_TYPE) {
                next = load(page.getNext(), null);
            }

            page.makeFree(getWriteTransactionId());

            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
            page.write(out);
            write(page, out.getData());

            freeList.add(page.getPageId());
            page = next;
        }
    }

    /**
     * 
     * @param page
     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
     * @param marshaller
     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
     * @param overflow
     *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 
     *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
     *        and the overflow condition would occur, then the PageOverflowIOException is thrown. 
     * @throws IOException
     *         If an disk error occurred.
     * @throws PageOverflowIOException
     *         If the page data marshalls to size larger than maximum page size and overflow was false.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> void store(Page page, Marshaller marshaller, final boolean overflow) throws IOException {
        DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
        if (marshaller != null) {
            marshaller.writePayload(page.get(), out);
        }
        out.close();
    }

    /**
     * @throws IOException
     */
    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
        pageFile.assertLoaded();

        // Copy to protect against the end user changing
        // the page instance while we are doing a write.
        final Page copy = page.copy();
        pageFile.addToCache(copy);

        //
        // To support writing VERY large data, we override the output stream so
        // that we
        // we do the page writes incrementally while the data is being
        // marshalled.
        DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
            Page current = copy;

            @SuppressWarnings("unchecked")
            @Override
            protected void onWrite() throws IOException {

                // Are we at an overflow condition?
                final int pageSize = pageFile.getPageSize();
                if (pos >= pageSize) {
                    // If overflow is allowed
                    if (overflow) {

                        Page next;
                        if (current.getType() == Page.PAGE_PART_TYPE) {
                            next = load(current.getNext(), null);
                        } else {
                            next = allocate();
                        }

                        next.txId = current.txId;

                        // Write the page header
                        int oldPos = pos;
                        pos = 0;

                        current.makePagePart(next.getPageId(), getWriteTransactionId());
                        current.write(this);

                        // Do the page write..
                        byte[] data = new byte[pageSize];
                        System.arraycopy(buf, 0, data, 0, pageSize);
                        Transaction.this.write(current, data);

                        // Reset for the next page chunk
                        pos = 0;
                        // The page header marshalled after the data is written.
                        skip(Page.PAGE_HEADER_SIZE);
                        // Move the overflow data after the header.
                        System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
                        pos += oldPos - pageSize;
                        current = next;

                    } else {
                        throw new PageOverflowIOException("Page overflow.");
                    }
                }

            }

            @SuppressWarnings("unchecked")
            @Override
            public void close() throws IOException {
                super.close();

                // We need to free up the rest of the page chain..
                if (current.getType() == Page.PAGE_PART_TYPE) {
                    free(current.getNext());
                }

                current.makePageEnd(pos, getWriteTransactionId());

                // Write the header..
                pos = 0;
                current.write(this);

                Transaction.this.write(current, buf);
            }
        };

        // The page header marshaled after the data is written.
        out.skip(Page.PAGE_HEADER_SIZE);
        return out;
    }

    /**
     * Loads a page from disk.
     * 
     * @param pageId 
     *        the id of the page to load
     * @param marshaller
     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
     * @return The page with the given id
     * @throws IOException
     *         If an disk error occurred.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public <T> Page load(long pageId, Marshaller marshaller) throws IOException {
        pageFile.assertLoaded();
        Page<T> page = new Page(pageId);
        load(page, marshaller);
        return page;
    }

    /**
     * Loads a page from disk.
     * 
     * @param page - The pageId field must be properly set 
     * @param marshaller
     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
     * @throws IOException
     *         If an disk error occurred.
     * @throws InvalidPageIOException
     *         If the page is is not valid.      
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    @SuppressWarnings("unchecked")
    public <T> void load(Page page, Marshaller marshaller) throws IOException {
        pageFile.assertLoaded();

        // Can't load invalid offsets...
        long pageId = page.getPageId();
        if (pageId < 0) {
            throw new InvalidPageIOException("Page id is not valid", pageId);
        }

        // It might be a page this transaction has modified...
        PageWrite update = writes.get(pageId);
        if (update != null) {
            page.copy(update.getPage());
            return;
        }

        // We may be able to get it from the cache...
        Page<T> t = pageFile.getFromCache(pageId);
        if (t != null) {
            page.copy(t);
            return;
        }

        if (marshaller != null) {
            // Full page read..
            InputStream is = openInputStream(page);
            DataInputStream dataIn = new DataInputStream(is);
            page.set(marshaller.readPayload(dataIn));
            is.close();
        } else {
            // Page header read.
            DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
            pageFile.readPage(pageId, in.getRawData());
            page.read(in);
            page.set(null);
        }

        // Cache it.
        if (marshaller != null) {
            pageFile.addToCache(page);
        }
    }

    /**
     * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
     *      org.apache.kahadb.util.Marshaller)
     */
    public InputStream openInputStream(final Page p) throws IOException {

        return new InputStream() {

            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
            private Page page = readPage(p);
            private int pageCount = 1;

            private Page markPage;
            private ByteSequence markChunk;

            private Page readPage(Page page) throws IOException {
                // Read the page data
                
                pageFile.readPage(page.getPageId(), chunk.getData());
                
                chunk.setOffset(0);
                chunk.setLength(pageFile.getPageSize());

                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
                page.read(in);

                chunk.setOffset(Page.PAGE_HEADER_SIZE);
                if (page.getType() == Page.PAGE_END_TYPE) {
                    chunk.setLength((int)(page.getNext()));
                }

                if (page.getType() == Page.PAGE_FREE_TYPE) {
                    throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
                }

                return page;
            }

            public int read() throws IOException {
                if (!atEOF()) {
                    return chunk.data[chunk.offset++] & 0xff;
                } else {
                    return -1;
                }
            }

            private boolean atEOF() throws IOException {
                if (chunk.offset < chunk.length) {
                    return false;
                }
                if (page.getType() == Page.PAGE_END_TYPE) {
                    return true;
                }
                fill();
                return chunk.offset >= chunk.length;
            }

            private void fill() throws IOException {
                page = readPage(new Page(page.getNext()));
                pageCount++;
            }

            public int read(byte[] b) throws IOException {
                return read(b, 0, b.length);
            }

            public int read(byte b[], int off, int len) throws IOException {
                if (!atEOF()) {
                    int rc = 0;
                    while (!atEOF() && rc < len) {
                        len = Math.min(len, chunk.length - chunk.offset);
                        if (len > 0) {
                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
                            chunk.offset += len;
                        }
                        rc += len;
                    }
                    return rc;
                } else {
                    return -1;
                }
            }

            public long skip(long len) throws IOException {
                if (atEOF()) {
                    int rc = 0;
                    while (!atEOF() && rc < len) {
                        len = Math.min(len, chunk.length - chunk.offset);
                        if (len > 0) {
                            chunk.offset += len;
                        }
                        rc += len;
                    }
                    return rc;
                } else {
                    return -1;
                }
            }

            public int available() {
                return chunk.length - chunk.offset;
            }

            public boolean markSupported() {
                return true;
            }

            public void mark(int markpos) {
                markPage = page;
                byte data[] = new byte[pageFile.getPageSize()];
                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
            }

            public void reset() {
                page = markPage;
                chunk = markChunk;
            }

        };
    }

    /**
     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
     * not included in this iteration. 
     * 
     * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
     * 
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    @SuppressWarnings("unchecked")
    public Iterator<Page> iterator() {
        return (Iterator<Page>)iterator(false);
    }

    /**
     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
     * iterated.
     * 
     * @param includeFreePages - if true, free pages are included in the iteration
     * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
     * @throws IllegalStateException
     *         if the PageFile is not loaded
     */
    public Iterator<Page> iterator(final boolean includeFreePages) {

        pageFile.assertLoaded();

        return new Iterator<Page>() {
            long nextId;
            Page nextPage;
            Page lastPage;

            private void findNextPage() {
                if (!pageFile.isLoaded()) {
                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
                }

                if (nextPage != null) {
                    return;
                }

                try {
                    while (nextId < pageFile.getPageCount()) {

                        Page page = load(nextId, null);

                        if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
                            nextPage = page;
                            return;
                        } else {
                            nextId++;
                        }
                    }
                } catch (IOException e) {
                }
            }

            public boolean hasNext() {
                findNextPage();
                return nextPage != null;
            }

            public Page next() {
                findNextPage();
                if (nextPage != null) {
                    lastPage = nextPage;
                    nextPage = null;
                    nextId++;
                    return lastPage;
                } else {
                    throw new NoSuchElementException();
                }
            }

            @SuppressWarnings("unchecked")
            public void remove() {
                if (lastPage == null) {
                    throw new IllegalStateException();
                }
                try {
                    free(lastPage);
                    lastPage = null;
                } catch (IOException e) {
                    new RuntimeException(e);
                }
            }
        };
    }

    ///////////////////////////////////////////////////////////////////
    // Commit / Rollback related methods..
    ///////////////////////////////////////////////////////////////////
    
    /**
     * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
     * with the transaction are written to disk or none will.
     */
    public void commit() throws IOException {
        if( writeTransactionId!=-1 ) {
            // Actually do the page writes...
            pageFile.write(writes.entrySet());
            // Release the pages that were freed up in the transaction..
            freePages(freeList);
            
            freeList.clear();
            allocateList.clear();
            writes.clear();
            writeTransactionId = -1;
        }
    }

    /**
     * Rolls back the transaction.
     */
    public void rollback() throws IOException {
        if( writeTransactionId!=-1 ) {
            // Release the pages that were allocated in the transaction...
            freePages(allocateList);

            freeList.clear();
            allocateList.clear();
            writes.clear();
            writeTransactionId = -1;
        }
    }

    private long getWriteTransactionId() {
        if( writeTransactionId==-1 ) {
            writeTransactionId = pageFile.getNextWriteTransactionId();
        }
        return writeTransactionId;
    }

    /**
     * Queues up a page write that should get done when commit() gets called.
     */
    @SuppressWarnings("unchecked")
    private void write(final Page page, byte[] data) throws IOException {
        Long key = page.getPageId();
        // TODO: if a large update transaction is in progress, we may want to move
        // all the current updates to a temp file so that we don't keep using 
        // up memory.
        writes.put(key, new PageWrite(page, data));        
    }   

    /**
     * @param list
     * @throws RuntimeException
     */
    private void freePages(SequenceSet list) throws RuntimeException {
        Sequence seq = list.getHead();
        while( seq!=null ) {
            seq.each(new Sequence.Closure<RuntimeException>(){
                public void execute(long value) {
                    pageFile.freePage(value);
                }
            });
            seq = seq.getNext();
        }
    }
    
    /**
     * @return true if there are no uncommitted page file updates associated with this transaction.
     */
    public boolean isReadOnly() {
        return writeTransactionId==-1;
    }
    
    ///////////////////////////////////////////////////////////////////
    // Transaction closure helpers...
    ///////////////////////////////////////////////////////////////////
    
    /**
     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
     * If the closure throws an Exception, then the transaction is rolled back.
     * 
     * @param <T>
     * @param closure - the work to get exectued.
     * @throws T if the closure throws it
     * @throws IOException If the commit fails.
     */
    public <T extends Throwable> void execute(Closure closure) throws T, IOException {
        boolean success = false;
        try {
            closure.execute(this);
            success = true;
        } finally {
            if (success) {
                commit();
            } else {
                rollback();
            }
        }
    }

    /**
     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
     * If the closure throws an Exception, then the transaction is rolled back.
     * 
     * @param <T>
     * @param closure - the work to get exectued.
     * @throws T if the closure throws it
     * @throws IOException If the commit fails.
     */
    public <R, T extends Throwable> R execute(CallableClosure closure) throws T, IOException {
        boolean success = false;
        try {
            R rc = closure.execute(this);
            success = true;
            return rc;
        } finally {
            if (success) {
                commit();
            } else {
                rollback();
            }
        }
    }

}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ Transaction.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.