|
ActiveMQ example source code file (PList.java)
The ActiveMQ PList.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.store.kahadb.plist; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller; import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.Transaction; import org.apache.kahadb.util.ByteSequence; public class PList { final PListStore store; private String name; private long rootId = EntryLocation.NOT_SET; private long lastId = EntryLocation.NOT_SET; private final AtomicBoolean loaded = new AtomicBoolean(); private int size = 0; Object indexLock; PList(PListStore store) { this.store = store; this.indexLock = store.getIndexLock(); } public void setName(String name) { this.name = name; } /* * (non-Javadoc) * @see org.apache.activemq.beanstalk.JobScheduler#getName() */ public String getName() { return this.name; } public synchronized int size() { return this.size; } public synchronized boolean isEmpty() { return size == 0; } /** * @return the rootId */ public long getRootId() { return this.rootId; } /** * @param rootId * the rootId to set */ public void setRootId(long rootId) { this.rootId = rootId; } /** * @return the lastId */ public long getLastId() { return this.lastId; } /** * @param lastId * the lastId to set */ public void setLastId(long lastId) { this.lastId = lastId; } /** * @return the loaded */ public boolean isLoaded() { return this.loaded.get(); } void read(DataInput in) throws IOException { this.rootId = in.readLong(); this.name = in.readUTF(); } public void write(DataOutput out) throws IOException { out.writeLong(this.rootId); out.writeUTF(name); } public synchronized void destroy() throws IOException { synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { destroy(tx); } }); } } void destroy(Transaction tx) throws IOException { // start from the first EntryLocation entry = getFirst(tx); while (entry != null) { EntryLocation toRemove = entry.copy(); entry = getNext(tx, entry.getNext()); doRemove(tx, toRemove); } } synchronized void load(Transaction tx) throws IOException { if (loaded.compareAndSet(false, true)) { final Page<EntryLocation> p = tx.load(this.rootId, null); if (p.getType() == Page.PAGE_FREE_TYPE) { // Need to initialize it.. EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET); storeEntry(tx, root); this.lastId = root.getPage().getPageId(); } else { // find last id long nextId = this.rootId; while (nextId != EntryLocation.NOT_SET) { EntryLocation next = getNext(tx, nextId); if (next != null) { this.lastId = next.getPage().getPageId(); nextId = next.getNext(); this.size++; } } } } } synchronized public void unload() { if (loaded.compareAndSet(true, false)) { this.rootId = EntryLocation.NOT_SET; this.lastId = EntryLocation.NOT_SET; this.size=0; } } synchronized public void addLast(final String id, final ByteSequence bs) throws IOException { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { addLast(tx, id, bs, location); } }); } } private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException { EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET); entry.setLocation(location); storeEntry(tx, entry); this.store.incrementJournalCount(tx, location); EntryLocation last = loadEntry(tx, this.lastId); last.setNext(entry.getPage().getPageId()); storeEntry(tx, last); this.lastId = entry.getPage().getPageId(); this.size++; } synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { addFirst(tx, id, bs, location); } }); } } private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException { EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET); entry.setLocation(location); EntryLocation oldFirst = getFirst(tx); if (oldFirst != null) { oldFirst.setPrev(entry.getPage().getPageId()); storeEntry(tx, oldFirst); entry.setNext(oldFirst.getPage().getPageId()); } EntryLocation root = getRoot(tx); root.setNext(entry.getPage().getPageId()); storeEntry(tx, root); storeEntry(tx, entry); this.store.incrementJournalCount(tx, location); this.size++; } synchronized public boolean remove(final String id) throws IOException { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { result.set(remove(tx, id)); } }); } return result.get(); } synchronized public boolean remove(final int position) throws IOException { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { result.set(remove(tx, position)); } }); } return result.get(); } synchronized public boolean remove(final PListEntry entry) throws IOException { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { public void execute(Transaction tx) throws IOException { result.set(doRemove(tx, entry.getEntry())); } }); } return result.get(); } synchronized public PListEntry get(final int position) throws IOException { PListEntry result = null; final AtomicReference<EntryLocation> ref = new AtomicReference Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ PList.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.