package org.das2.qstream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.logging.Logger; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.LoggerManager; import org.w3c.dom.Document; import org.w3c.dom.Element; /** * Writes the stream based on the messages sent to it. This overlaps with the SimpleStreamFormatter, * but was needed to support streams. The SimpleStreamFormatter took a QDataSet and formatted it. This formats * based on the callbacks. * * Note the library was poorly designed, and this is pretty simple because most of the hard work is buried within * the StreamDescriptor. StreamDescriptor should be simplified, and the code should be moved to here. * * @author jbf */ public class FormatStreamHandler implements StreamHandler { private static final Logger logger= LoggerManager.getLogger("qstream"); WritableByteChannel out; StreamDescriptor sd; public void setOutputStream( OutputStream outs ) { this.out= Channels.newChannel(outs); } public void setWritableByteChannel( WritableByteChannel outs ) { this.out= outs; } /** * create a stream descriptor packet. TODO: createPacketDescriptor. See SerialStreamFormatter for examples of how this * would be done. * @param name * @param asciiTypes * @param isBigEndian * @return */ public StreamDescriptor createStreamDescriptor( String name, boolean asciiTypes, boolean isBigEndian ) { try { StreamDescriptor lsd = new StreamDescriptor(DocumentBuilderFactory.newInstance()); Document document = lsd.newDocument(lsd); Element streamElement = document.createElement("stream"); streamElement.setAttribute("dataset_id", name); if (asciiTypes == false) { streamElement.setAttribute("byte_order", isBigEndian ? "big_endian" : "little_endian"); } return lsd; } catch ( ParserConfigurationException ex ) { throw new RuntimeException(ex); } } @Override public void streamDescriptor(StreamDescriptor sd) throws StreamException { this.sd= sd; this.sd.setFactory( DocumentBuilderFactory.newInstance() ); try { this.sd.addDescriptor(sd); // allocate [00] for itself. This is very goofy, I realize now... this.sd.newDocument(sd); sd.send( sd, out ); } catch ( IOException ex ) { throw new StreamException(ex); } catch ( ParserConfigurationException ex ) { throw new StreamException(ex); } } @Override public void packetDescriptor(PacketDescriptor pd) throws StreamException { this.sd.addDescriptor(pd); try { Document d= sd.newDocument(pd); if ( pd.getDomElement()==null ) { Element ele= d.createElement("packet"); for ( PlaneDescriptor pld: pd.getPlanes() ) { // all this begs the question, where is the information stored? Is it in the PlaneDescriptor, or is it in the Document??? Executive decision 2013-10-23: it is in the document, and xpath should be used to process. Element qdatasetElement= d.createElement("qdataset"); qdatasetElement.setAttribute( "id", pld.getName() ); qdatasetElement.setAttribute( "rank", String.valueOf(pld.getRank()) ); ele.appendChild(qdatasetElement); } pd.setDomElement( ele ); throw new IllegalStateException("this implementation is not complete. See SimpleStreamFormatter"); } this.sd.send(pd,out); } catch ( IOException ex ) { throw new StreamException(ex); } catch ( ParserConfigurationException ex ) { throw new StreamException(ex); } } @Override public void packet(PacketDescriptor pd, ByteBuffer data) throws StreamException { int id; try { id= sd.descriptorId(pd); } catch ( IllegalArgumentException ex ) { logger.fine("stream doesn't recognize this packet type."); id= pd.getPacketId(); } try { out.write( ByteBuffer.wrap( String.format( ":%02d:", id ).getBytes("US-ASCII")) ); out.write(data); } catch ( IOException ex ) { throw new StreamException(ex); } } @Override public void streamClosed(StreamDescriptor sd) throws StreamException { } private String xmlSafe( String s ) { s= s.replaceAll("\'", ""); return s; } @Override public void streamException(StreamException se) throws StreamException { String type= "StreamException"; if ( se.getCause() instanceof NoDataInIntervalException ) { type= "NoDataInInterval"; } String msg= String.format("\n", type, xmlSafe(se.getMessage()) ); try { out.write( ByteBuffer.wrap( String.format( "[xx]%06d", msg.length() ).getBytes("US-ASCII") ) ); out.write( ByteBuffer.wrap( msg.getBytes("US-ASCII") ) ); } catch ( IOException ex ) { throw new StreamException(ex); } } @Override public void streamComment(StreamComment se) throws StreamException { String msg= String.format("\n", se.getType(), xmlSafe(se.getMessage()) ); try { out.write( ByteBuffer.wrap( String.format( "[xx]%06d", msg.length() ).getBytes("US-ASCII") ) ); out.write( ByteBuffer.wrap( msg.getBytes("US-ASCII") ) ); } catch ( IOException ex ) { throw new StreamException(ex); } } }