|
ActiveMQ example source code file (XmppTransport.java)
The ActiveMQ XmppTransport.java source code/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.transport.xmpp; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URI; import javax.net.SocketFactory; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; import javax.xml.namespace.QName; import javax.xml.stream.Location; import javax.xml.stream.XMLEventReader; import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLReporter; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import javax.xml.stream.events.Attribute; import javax.xml.stream.events.StartElement; import javax.xml.stream.events.XMLEvent; import ietf.params.xml.ns.xmpp_sasl.Mechanisms; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.transport.tcp.TcpBufferedInputStream; import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.jabber.etherx.streams.Features; /** * */ public class XmppTransport extends TcpTransport { protected static final QName ATTRIBUTE_TO = new QName("to"); private static final transient Logger LOG = LoggerFactory.getLogger(XmppTransport.class); protected OutputStream outputStream; protected InputStream inputStream; private static JAXBContext context; private XMLEventReader xmlReader; private Unmarshaller unmarshaller; private Marshaller marshaller; private XMLStreamWriter xmlWriter; private String to = "client"; private ProtocolConverter converter; private String from = "localhost"; private String brokerId = "broker-id-1"; public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException { super(wireFormat, socket); init(); } public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri1) throws IOException { super(wireFormat, socketFactory, uri, uri1); init(); } private void init() { LOG.debug("Creating new instance of XmppTransport"); converter = new ProtocolConverter(this); } @Override public void oneway(Object object) throws IOException { if (object instanceof Command) { Command command = (Command)object; if (command instanceof BrokerInfo) { BrokerInfo brokerInfo = (BrokerInfo)command; brokerId = brokerInfo.getBrokerId().toString(); from = brokerInfo.getBrokerName(); try { writeOpenStream(brokerId, from); } catch (XMLStreamException e) { throw IOExceptionSupport.create(e); } } else { try { converter.onActiveMQCommand(command); } catch (IOException e) { throw e; } catch (Exception e) { throw IOExceptionSupport.create(e); } } } else { LOG.warn("Unkown command: " + object); } } /** * Marshalls the given POJO to the client */ public void marshall(Object command) throws IOException { if (isStopped() || isStopping()) { LOG.warn("Not marshalling command as shutting down: " + command); return; } try { marshaller.marshal(command, xmlWriter); xmlWriter.flush(); outputStream.flush(); } catch (JAXBException e) { throw IOExceptionSupport.create(e); } catch (XMLStreamException e) { throw IOExceptionSupport.create(e); } } @Override public void doRun() throws IOException { LOG.debug("XMPP consumer thread starting"); try { XMLInputFactory xif = XMLInputFactory.newInstance(); xif.setXMLReporter(new XMLReporter() { public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException { LOG.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation); } }); xmlReader = xif.createXMLEventReader(inputStream); XMLEvent docStart = xmlReader.nextEvent(); XMLEvent rootElement = xmlReader.nextTag(); if (rootElement instanceof StartElement) { StartElement startElement = (StartElement)rootElement; Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO); if (toAttribute != null) { to = toAttribute.getValue(); } } while (true) { if (isStopped()) { break; } XMLEvent event = xmlReader.peek(); if (event.isStartElement()) { // unmarshal a new object Object object = unmarshaller.unmarshal(xmlReader); if (object != null) { LOG.debug("Unmarshalled new incoming event - " + object.getClass().getName()); converter.onXmppCommand(object); } } else { if (event.getEventType() == XMLEvent.END_ELEMENT) { break; } else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) { break; } else { xmlReader.nextEvent(); } } } } catch (Exception e) { throw IOExceptionSupport.create(e); } } public String getFrom() { return from; } @Override protected void doStop(ServiceStopper stopper) throws Exception { if (xmlWriter != null) { try { xmlWriter.writeEndElement(); xmlWriter.writeEndDocument(); xmlWriter.close(); } catch (XMLStreamException e) { // the client may have closed first so ignore this LOG.info("Caught trying to close transport: " + e, e); } } if (xmlReader != null) { try { xmlReader.close(); } catch (XMLStreamException e) { // the client may have closed first so ignore this LOG.info("Caught trying to close transport: " + e, e); } } super.doStop(stopper); } @Override protected void initializeStreams() throws Exception { // TODO it would be preferable to use class discovery here! if ( context == null ) { context = JAXBContext.newInstance( "jabber.server:" + "jabber.server.dialback:" + "jabber.client:" + "jabber.iq._private:" + "jabber.iq.auth:" + "jabber.iq.gateway:" + "jabber.iq.version:" + "jabber.iq.roster:" + "jabber.iq.pass:" + "jabber.iq.last:" + "jabber.iq.oob:" + "jabber.iq.time:" + "storage.rosternotes:" + "ietf.params.xml.ns.xmpp_streams:" + "ietf.params.xml.ns.xmpp_sasl:" + "ietf.params.xml.ns.xmpp_stanzas:" + "ietf.params.xml.ns.xmpp_bind:" + "ietf.params.xml.ns.xmpp_tls:" + "org.jabber.protocol.muc:" + "org.jabber.protocol.rosterx:" + "org.jabber.protocol.disco_info:" + "org.jabber.protocol.disco_items:" + "org.jabber.protocol.activity:" + "org.jabber.protocol.amp_errors:" + "org.jabber.protocol.amp:" + "org.jabber.protocol.address:" + "org.jabber.protocol.muc_user:" + "org.jabber.protocol.muc_admin:" + "org.jabber.etherx.streams"); } inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024); outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024); unmarshaller = context.createUnmarshaller(); marshaller = context.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true); } protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException { LOG.debug("Sending initial stream element"); XMLOutputFactory factory = XMLOutputFactory.newInstance(); // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true); xmlWriter = factory.createXMLStreamWriter(outputStream); xmlWriter.writeStartDocument(); xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams"); xmlWriter.writeDefaultNamespace("jabber:client"); xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams"); xmlWriter.writeAttribute("version", "1.0"); xmlWriter.writeAttribute("id", id); if (to == null) { to = "client"; } xmlWriter.writeAttribute("to", to); xmlWriter.writeAttribute("from", from); // now lets write the features Features features = new Features(); // TODO support TLS // features.getAny().add(new Starttls()); //Mechanisms mechanisms = new Mechanisms(); // TODO support SASL // mechanisms.getMechanism().add("DIGEST-MD5"); // mechanisms.getMechanism().add("PLAIN"); //features.getAny().add(mechanisms); features.getAny().add(new ietf.params.xml.ns.xmpp_bind.ObjectFactory().createBind()); features.getAny().add(new ietf.params.xml.ns.xmpp_session.ObjectFactory().createSession("")); marshall(features); LOG.debug("Initial stream element sent!"); } } Other ActiveMQ examples (source code examples)Here is a short list of links related to this ActiveMQ XmppTransport.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.