|
ActiveMQ example source code file (NIODataFileAppender.java)
The ActiveMQ NIODataFileAppender.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.kaha.impl.async; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more * efficently copy data to files. * * */ class NIODataFileAppender extends DataFileAppender { public NIODataFileAppender(AsyncDataManager fileManager) { super(fileManager); } /** * The async processing loop that writes to the data files and does the * force calls. * * Since the file sync() call is the slowest of all the operations, this * algorithm tries to 'batch' or group together several file sync() requests * into a single file sync() call. The batching is accomplished attaching * the same CountDownLatch instance to every force request in a group. * */ protected void processQueue() { DataFile dataFile = null; RandomAccessFile file = null; FileChannel channel = null; WriteBatch wb = null; try { ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE); ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE); ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize); // Populate the static parts of the headers and footers.. header.putInt(0); // size header.put((byte)0); // type header.put(RESERVED_SPACE); // reserved header.put(AsyncDataManager.ITEM_HEAD_SOR); footer.put(AsyncDataManager.ITEM_HEAD_EOR); while (true) { Object o = null; // Block till we get a command. synchronized (enqueueMutex) { while (true) { if (nextWriteBatch != null) { o = nextWriteBatch; nextWriteBatch = null; break; } if (shutdown) { return; } enqueueMutex.wait(); } enqueueMutex.notify(); } wb = (WriteBatch)o; if (dataFile != wb.dataFile) { if (file != null) { dataFile.closeRandomAccessFile(file); } dataFile = wb.dataFile; file = dataFile.openRandomAccessFile(true); channel = file.getChannel(); } WriteCommand write = wb.first; // Write all the data. // Only need to seek to first location.. all others // are in sequence. file.seek(write.location.getOffset()); boolean forceToDisk=false; // // is it just 1 big write? if (wb.size == write.location.getSize()) { forceToDisk = write.sync | write.onComplete!=null; header.clear(); header.putInt(write.location.getSize()); header.put(write.location.getType()); header.clear(); transfer(header, channel); ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); transfer(source, channel); footer.clear(); transfer(footer, channel); } else { // Combine the smaller writes into 1 big buffer while (write != null) { forceToDisk |= write.sync | write.onComplete!=null; header.clear(); header.putInt(write.location.getSize()); header.put(write.location.getType()); header.clear(); copy(header, buffer); assert !header.hasRemaining(); ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength()); copy(source, buffer); assert !source.hasRemaining(); footer.clear(); copy(footer, buffer); assert !footer.hasRemaining(); write = (WriteCommand)write.getNext(); } // Fully write out the buffer.. buffer.flip(); transfer(buffer, channel); buffer.clear(); } if( forceToDisk ) { file.getChannel().force(false); } WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode(); dataManager.setLastAppendLocation(lastWrite.location); // Now that the data is on disk, remove the writes from the in // flight // cache. write = wb.first; while (write != null) { if (!write.sync) { inflightWrites.remove(new WriteKey(write.location)); } if (write.onComplete != null) { try { write.onComplete.run(); } catch (Throwable e) { e.printStackTrace(); } } write = (WriteCommand)write.getNext(); } // Signal any waiting threads that the write is on disk. wb.latch.countDown(); } } catch (IOException e) { synchronized (enqueueMutex) { firstAsyncException = e; if (wb != null) { wb.latch.countDown(); wb.exception.set(e); } if (nextWriteBatch != null) { nextWriteBatch.latch.countDown(); nextWriteBatch.exception.set(e); } } } catch (InterruptedException e) { } finally { try { if (file != null) { dataFile.closeRandomAccessFile(file); dataFile = null; file.close(); file = null; } if (channel != null) { channel.close(); channel = null; } } catch (IOException e) { } shutdownDone.countDown(); running = false; } } /** * Copy the bytes in header to the channel. * * @param header - source of data * @param channel - destination where the data will be written. * @throws IOException */ private void transfer(ByteBuffer header, FileChannel channel) throws IOException { while (header.hasRemaining()) { channel.write(header); } } private int copy(ByteBuffer src, ByteBuffer dest) { int rc = Math.min(dest.remaining(), src.remaining()); if (rc > 0) { // Adjust our limit so that we don't overflow the dest buffer. int limit = src.limit(); src.limit(src.position() + rc); dest.put(src); // restore the limit. src.limit(limit); } return rc; } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ NIODataFileAppender.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.