|
ActiveMQ example source code file (VMTransport.java)
The ActiveMQ VMTransport.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.transport.vm; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.Valve; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.IOExceptionSupport; /** * A Transport implementation that uses direct method invocations. * * */ public class VMTransport implements Transport, Task { private static final Object DISCONNECT = new Object(); private static final AtomicLong NEXT_ID = new AtomicLong(0); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; protected boolean marshal; protected boolean network; protected boolean async = true; protected int asyncQueueDepth = 2000; protected LinkedBlockingQueue<Object> messageQueue; protected boolean started; protected final URI location; protected final long id; private TaskRunner taskRunner; private final Object lazyInitMutext = new Object(); private final Valve enqueueValve = new Valve(true); protected final AtomicBoolean stopping = new AtomicBoolean(); private volatile int receiveCounter; public VMTransport(URI location) { this.location = location; this.id = NEXT_ID.getAndIncrement(); } public void setPeer(VMTransport peer) { this.peer = peer; } public void oneway(Object command) throws IOException { if (disposed) { throw new TransportDisposedIOException("Transport disposed."); } if (peer == null) { throw new IOException("Peer not connected."); } TransportListener transportListener=null; try { // Disable the peer from changing his state while we try to enqueue onto him. peer.enqueueValve.increment(); if (peer.disposed || peer.stopping.get()) { throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); } if (peer.started) { if (peer.async) { peer.getMessageQueue().put(command); peer.wakeup(); } else { transportListener = peer.transportListener; } } else { peer.getMessageQueue().put(command); } } catch (InterruptedException e) { InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); iioe.initCause(e); throw iioe; } finally { // Allow the peer to change state again... peer.enqueueValve.decrement(); } dispatch(peer, transportListener, command); } public void dispatch(VMTransport transport, TransportListener transportListener, Object command) { if( transportListener!=null ) { if( command == DISCONNECT ) { transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { transport.receiveCounter++; transportListener.onCommand(command); } } } public void start() throws Exception { if (transportListener == null) { throw new IOException("TransportListener not set."); } try { enqueueValve.turnOff(); if (messageQueue != null && !async) { Object command; while ((command = messageQueue.poll()) != null && !stopping.get() ) { receiveCounter++; dispatch(this, transportListener, command); } } started = true; wakeup(); } finally { enqueueValve.turnOn(); } // If we get stopped while starting up, then do the actual stop now // that the enqueueValve is back on. if( stopping.get() ) { stop(); } } public void stop() throws Exception { stopping.set(true); // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. if( enqueueValve.isOn() ) { // let the peer know that we are disconnecting.. try { peer.transportListener.onCommand(new ShutdownInfo()); } catch (Exception ignore) { } TaskRunner tr = null; try { enqueueValve.turnOff(); if (!disposed) { started = false; disposed = true; if (taskRunner != null) { tr = taskRunner; taskRunner = null; } } } finally { stopping.set(false); enqueueValve.turnOn(); } if (tr != null) { tr.shutdown(1000); } } } /** * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { final TransportListener tl; try { // Disable changing the state variables while we are running... enqueueValve.increment(); tl = transportListener; if (!started || disposed || tl == null || stopping.get()) { if( stopping.get() ) { // drain the queue it since folks could be blocked putting on to // it and that would not allow the stop() method for finishing up. getMessageQueue().clear(); } return false; } } catch (InterruptedException e) { return false; } finally { enqueueValve.decrement(); } LinkedBlockingQueue<Object> mq = getMessageQueue(); Object command = mq.poll(); if (command != null) { if( command == DISCONNECT ) { tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { tl.onCommand(command); } return !mq.isEmpty(); } else { return false; } } public void setTransportListener(TransportListener commandListener) { try { try { enqueueValve.turnOff(); this.transportListener = commandListener; wakeup(); } finally { enqueueValve.turnOn(); } } catch (InterruptedException e) { throw new RuntimeException(e); } } private LinkedBlockingQueue<Object> getMessageQueue() { synchronized (lazyInitMutext) { if (messageQueue == null) { messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); } return messageQueue; } } public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); } public Object request(Object command) throws IOException { throw new AssertionError("Unsupported Method"); } public Object request(Object command, int timeout) throws IOException { throw new AssertionError("Unsupported Method"); } public TransportListener getTransportListener() { return transportListener; } public <T> T narrow(Class Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ VMTransport.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.