alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

ActiveMQ example source code file (JournalEndpoint.java)

This example ActiveMQ source code file (JournalEndpoint.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - ActiveMQ tags/keywords

asyncdatamanager, bytesequence, bytesequence, consumer, exception, exception, file, file, io, ioexception, ioexception, journalendpoint, last, override, runtimecamelexception

The ActiveMQ JournalEndpoint.java source code

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.camel.component;

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.util.ByteSequence;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JournalEndpoint extends DefaultEndpoint {

    private static final transient Logger LOG = LoggerFactory.getLogger(JournalEndpoint.class);

    private final File directory;
    private final AtomicReference<DefaultConsumer> consumer = new AtomicReference();
    private final Object activationMutex = new Object();
    private int referenceCount;
    private AsyncDataManager dataManager;
    private Thread thread;
    private Location lastReadLocation;
    private long idleDelay = 1000;
    private boolean syncProduce = true;
    private boolean syncConsume;

    public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
        super(uri, journalComponent.getCamelContext());
        this.directory = directory;
    }

    public JournalEndpoint(String endpointUri, File directory) {
        super(endpointUri);
        this.directory = directory;
    }

    public boolean isSingleton() {
        return true;
    }

    public File getDirectory() {
        return directory;
    }

    public Consumer createConsumer(Processor processor) throws Exception {
        return new DefaultConsumer(this, processor) {
            @Override
            public void start() throws Exception {
                super.start();
                activateConsumer(this);
            }

            @Override
            public void stop() throws Exception {
                deactivateConsumer(this);
                super.stop();
            }
        };
    }

    protected void decrementReference() throws IOException {
        synchronized (activationMutex) {
            referenceCount--;
            if (referenceCount == 0) {
                LOG.debug("Closing data manager: " + directory);
                LOG.debug("Last mark at: " + lastReadLocation);
                dataManager.close();
                dataManager = null;
            }
        }
    }

    protected void incrementReference() throws IOException {
        synchronized (activationMutex) {
            referenceCount++;
            if (referenceCount == 1) {
                LOG.debug("Opening data manager: " + directory);
                dataManager = new AsyncDataManager();
                dataManager.setDirectory(directory);
                dataManager.start();

                lastReadLocation = dataManager.getMark();
                LOG.debug("Last mark at: " + lastReadLocation);
            }
        }
    }

    protected void deactivateConsumer(DefaultConsumer consumer) throws IOException {
        synchronized (activationMutex) {
            if (this.consumer.get() != consumer) {
                throw new RuntimeCamelException("Consumer was not active.");
            }
            this.consumer.set(null);
            try {
                thread.join();
            } catch (InterruptedException e) {
                throw new InterruptedIOException();
            }
            decrementReference();
        }
    }

    protected void activateConsumer(DefaultConsumer consumer) throws IOException {
        synchronized (activationMutex) {
            if (this.consumer.get() != null) {
                throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
            }
            incrementReference();
            this.consumer.set(consumer);
            thread = new Thread() {
                @Override
                public void run() {
                    dispatchToConsumer();
                }
            };
            thread.setName("Dipatch thread: " + getEndpointUri());
            thread.setDaemon(true);
            thread.start();
        }
    }

    protected void dispatchToConsumer() {
        try {
            DefaultConsumer consumer;
            while ((consumer = this.consumer.get()) != null) {
                // See if there is a new record to process
                Location location = dataManager.getNextLocation(lastReadLocation);
                if (location != null) {

                    // Send it on.
                    ByteSequence read = dataManager.read(location);
                    Exchange exchange = createExchange();
                    exchange.getIn().setBody(read);
                    exchange.getIn().setHeader("journal", getEndpointUri());
                    exchange.getIn().setHeader("location", location);
                    consumer.getProcessor().process(exchange);

                    // Setting the mark makes the data manager forget about
                    // everything
                    // before that record.
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Consumed record at: " + location);
                    }
                    dataManager.setMark(location, syncConsume);
                    lastReadLocation = location;
                } else {
                    // Avoid a tight CPU loop if there is no new record to read.
                    LOG.debug("Sleeping due to no records being available.");
                    Thread.sleep(idleDelay);
                }
            }
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    public Producer createProducer() throws Exception {
        return new DefaultProducer(this) {
            public void process(Exchange exchange) throws Exception {
                incrementReference();
                try {
                    ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
                    if (body == null) {
                        byte[] bytes = exchange.getIn().getBody(byte[].class);
                        if (bytes != null) {
                            body = new ByteSequence(bytes);
                        }
                    }
                    if (body == null) {
                        throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
                    }
                    dataManager.write(body, syncProduce);

                } finally {
                    decrementReference();
                }
            }
        };
    }

    public boolean isSyncConsume() {
        return syncConsume;
    }

    public void setSyncConsume(boolean syncConsume) {
        this.syncConsume = syncConsume;
    }

    public boolean isSyncProduce() {
        return syncProduce;
    }

    public void setSyncProduce(boolean syncProduce) {
        this.syncProduce = syncProduce;
    }

    boolean isOpen() {
        synchronized (activationMutex) {
            return referenceCount > 0;
        }
    }
}

Other ActiveMQ examples (source code examples)

Here is a short list of links related to this ActiveMQ JournalEndpoint.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.