/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package org.das2.qstream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.das2.datum.EnumerationUnits; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.qds.DataSetOps; import org.das2.qds.DataSetUtil; import org.das2.qds.QDataSet; import org.das2.qds.QubeDataSetIterator; import org.das2.qds.SemanticOps; import org.das2.qds.ops.Ops; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.Element; /** * We need a class that can format a stream serially. SimpleStreamFormatter needs the whole dataset, which really * misses the point with QStreams, that you can process and create data streams serially on the server side. The * das2stream codes were first coded with this use case, but QStreams were not and because of this a good serial * generator was never created. * * See the main method for an example of how this is used. * * Note this class is not thread-safe and assumes that only one thread will be working on the stream. This may change. * * @author jbf */ public class SerialStreamFormatter { public static final int DEFAULT_TIME_DIGITS = 27; public static final String INOUTFORM_INLINE = "inline"; // put the values in the descriptor public static final String INOUTFORM_ONE_RECORD = "oneRecord"; // put the values in one record public static final String INOUTFORM_STREAMING = "streaming"; // put the values in records, one slice at a time. StreamDescriptor sd; StreamHandler sh; //Map planes; Map pds; Map transferTypes; Map unitsTransferTypes; /** * keeps track of names assigned to each dataset. Presently these are automatically assigned, but there * may also be a method for explicitly naming packets. */ Map names; Map namesRev; private static final char CHAR_NEWLINE = '\n'; private static final Logger logger= Logger.getLogger("qstream"); protected boolean asciiTypes = true; public boolean isAsciiTypes() { return asciiTypes; } public void setAsciiTypes(boolean asciiTypes) { this.asciiTypes = asciiTypes; } protected boolean bigEndian = false; public boolean isBigEndian() { return bigEndian; } public void setBigEndian(boolean bigEndian) { this.bigEndian = bigEndian; } public SerialStreamFormatter() { names= new LinkedHashMap(); namesRev= new LinkedHashMap(); transferTypes= new LinkedHashMap(); unitsTransferTypes= new LinkedHashMap(); } /** * send out the streamDescriptor, which contains just the name of the stream's default dataset. * @param name */ protected StreamDescriptor doStreamDescriptor( String name ) { try { StreamDescriptor sd = new StreamDescriptor( DocumentBuilderFactory.newInstance() ); Document document = sd.newDocument(sd); Element streamElement = document.createElement("stream"); streamElement.setAttribute("dataset_id", name); if ( asciiTypes == false) { streamElement.setAttribute("byte_order", isBigEndian() ? "big_endian" : "little_endian"); } sd.setDomElement(streamElement); sd.addDescriptor(sd); // allocate [00] for itself. return sd; } catch ( ParserConfigurationException ex ) { throw new RuntimeException(ex); } } /** * the name of the default dataset. * @param name */ public void init( String name, WritableByteChannel out ) throws IOException, StreamException { FormatStreamHandler sh= new FormatStreamHandler(); sh.setWritableByteChannel(out); init( name, sh ); } /** * initialize, sending data directly via the StreamHandler interface. This avoids parsing and formatting the XML. * @param name the name of the default dataset. */ public void init( String name, StreamHandler sh ) throws IOException, StreamException { pds= new HashMap(); this.sd= doStreamDescriptor(name); this.sh= sh; sh.streamDescriptor(sd); } /** * return a name for the thing that describes ds1. This will be used in * the descriptor, so if the descriptor doesn't contain the values, then * the name can be reused. Note no names are reused at this point. * @param ds1 * @return */ private synchronized String newNameFor(QDataSet ds1) { String name = names.get(ds1); if (name == null) { name = (String) ds1.property(QDataSet.NAME); } if (name == null) { name = "ds_" + names.size(); } if ( namesRev.containsKey(name) ) { String name0= name; int i=1; name= name0 + String.valueOf(i); while ( namesRev.containsKey(name) ) { i=i+1; name = name0 + String.valueOf(i); } } names.put( ds1, name ); namesRev.put( name, ds1.toString() ); return name; } /** * return a name for the thing that describes ds1. This will be used in * the descriptor, so if the descriptor doesn't contain the values, then * the name can be reused. Note no names are reused at this point. * @param ds1 * @return */ private synchronized String nameFor(QDataSet ds1) { System.err.println(""+ ds1 + " " + ds1.hashCode() ); String name = names.get(ds1); boolean assignName= name==null; if (name == null) { name = (String) ds1.property(QDataSet.NAME); } if (name == null) { name = "ds_" + names.size(); } if ( assignName ) { names.put( ds1, name ); namesRev.put( name,ds1.toString() ); } return name; } /** * return the transfer type for this unit. * @param ds * @return */ private TransferType getUnitTransferType( QDataSet ds ) { Units u= SemanticOps.getUnits(ds); return unitsTransferTypes.get(u); } private TransferType getTransferType( String name, QDataSet ds1 ) { TransferType tt= transferTypes.get(name); if ( tt==null ) { tt= getUnitTransferType(ds1); if ( tt!=null ) { return tt; } if ( asciiTypes ) { Units u= SemanticOps.getUnits(ds1); if ( UnitsUtil.isTimeLocation(u) ) { tt= new AsciiTimeTransferType( DEFAULT_TIME_DIGITS , u); } else { tt= new AsciiTransferType( 10, true ); } } else { if ( ds1.length()>125000 ) { logger.fine("using floats because we'll certainly run out of room otherwise"); tt= new FloatTransferType(); } else { tt= new DoubleTransferType(); } } } return tt; } /** * explicitly set the transfer type used to transfer data that is convertible to this * unit. * @param name * @param tt */ public void setUnitTransferType( Units u, TransferType tt ) { if ( asciiTypes && !tt.isAscii() ) { throw new IllegalArgumentException("stream is declared as ascii stream, but non-ascii transfer type specified for times: "+tt ); } unitsTransferTypes.put( u, tt ); } /** * explicitly set the transfer type. * @param name * @param tt */ public void setTransferType( String name, TransferType tt ) { if ( asciiTypes && !tt.isAscii() ) { throw new IllegalArgumentException("stream is declared as ascii stream, but non-ascii transfer type specified for "+name+": "+tt ); } transferTypes.put( name, tt ); } private Element doProperties(Document document, Map props, boolean slice) { Element properties = document.createElement("properties"); Element prop; for ( Entry e: props.entrySet() ) { String name= e.getKey(); prop= null; Object value = e.getValue(); String name1= name; // name of the unsliced dataset boolean allowRank0 = true; if ( slice ) { if ( name.startsWith("DEPEND_") ) { name1= "DEPEND_" + ( 1 + Integer.parseInt( name.substring(7) ) ); } else if ( name.equals("CONTEXT_0") ) { name1= "DEPEND_0"; allowRank0= false; } else if ( name.startsWith("BINS_") ) { name1= "BINS_" + ( 1 + Integer.parseInt( name.substring(5) ) ); } else if ( name.startsWith("BUNDLE_") ) { name1= "BUNDLE_" + ( 1 + Integer.parseInt( name.substring(7) ) ); } } if ( value==null ) { continue; // slice } if (value instanceof QDataSet) { QDataSet qds = (QDataSet) value; prop = document.createElement("property"); prop.setAttribute("name", name1); if ( qds.rank() == 0 && allowRank0 ) { SerializeDelegate r0d= (SerializeDelegate) SerializeRegistry.getByName("rank0dataset"); prop.setAttribute("type", "rank0dataset"); Units u= (Units) qds.property( QDataSet.UNITS ); if ( u!=null && u instanceof EnumerationUnits ) { continue; // TODO: this should do something! } else { prop.setAttribute("value", r0d.format(value) ); } } else { if ( !names.containsKey((QDataSet)value) ) { System.err.println("Unidentified "+name1 +"!!"); } prop.setAttribute("type", "qdataset"); prop.setAttribute("value", nameFor((QDataSet) value)); } } else { SerializeDelegate sd = SerializeRegistry.getDelegate(value.getClass()); if (sd == null) { System.err.println("dropping "+name1+" because unsupported type: "+value.getClass()); } else { prop = document.createElement("property"); prop.setAttribute("name", name1); if ( sd instanceof XMLSerializeDelegate ) { prop.appendChild( ((XMLSerializeDelegate)sd).xmlFormat(document,value) ); } else { prop.setAttribute("type", sd.typeId(value.getClass())); prop.setAttribute("value", sd.format(value)); } } } if ( prop!=null ) properties.appendChild(prop); } return properties; } private Element doValues( Document document, PacketDescriptor packetDescriptor, PlaneDescriptor planeDescriptor, QDataSet ds1 ) { Element values = document.createElement("values"); int[] qubeDims=null; if ( !packetDescriptor.isValuesInDescriptor() ) { values.setAttribute("encoding", planeDescriptor.getType().name()); qubeDims = DataSetUtil.qubeDims(ds1); } if (packetDescriptor.isStream()) { if (ds1.rank() > 0) { values.setAttribute("length", Util.encodeArray(qubeDims, 0, qubeDims.length) ); // must be a qube for stream. } else { values.setAttribute("length", ""); } } else { if ( qubeDims!=null ) { values.setAttribute("length", Util.encodeArray(qubeDims, 0, qubeDims.length) ); } if (packetDescriptor.isValuesInDescriptor()) { StringBuilder s = new StringBuilder(""); for (int i = 0; i < ds1.length(); i++) { s.append( "," ).append( ds1.value(i) ); } values.setAttribute("values", ds1.length() == 0 ? "" : s.substring(1)); if ( ds1.length()==0 ) { values.setAttribute("length", "0" ); } } } return values; } private void doValuesElement(QDataSet ds, PacketDescriptor pd, PlaneDescriptor planeDescriptor, Document document, Element qdatasetElement) throws DOMException { Object sunits= ds.property(QDataSet.UNITS); if ( sunits!=null && !(sunits instanceof Units) ) { throw new IllegalArgumentException( "UNITS property doesn't contain type units, it's type "+sunits.getClass()+": "+sunits ); } Units u = (Units) sunits; if ( u==null && SemanticOps.isRank1Bundle(ds) && ds.rank()==1 ) { QDataSet bundleDescriptor= (QDataSet) ds.property(QDataSet.BUNDLE_0); if ( bundleDescriptor==null ) { //TODO: check this schema, demoed by BundleBinsDemo.demo1(); } else { // uh-oh. just use a high-resolution format for now. for ( int i=0; i2 ) throw new IllegalArgumentException("more than two planes found!"); for (int iplane = 0; iplane < planeCount; iplane++) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds; if ( iplane==planeCount-1 ) { planeds= ds1; } else { planeds= (QDataSet) ds1.property(QDataSet.CONTEXT_0); } boolean lastPlane = iplane == planeCount - 1; if (planeds.rank() == 0) { // format just the one number tt.write(planeds.value(), buffer); } else if (planeds.rank() == 1) { // format the 1-D array of numbers for (int j = 0; j < planeds.length(); j++) { tt.write(planeds.value(j), buffer); } } else { // format the flattened n-D array QDataSet slice = planeds; QubeDataSetIterator it = new QubeDataSetIterator(slice); while (it.hasNext()) { it.next(); tt.write(it.getValue(slice), buffer); } } if (lastPlane && tt.isAscii() && Character.isWhitespace(buffer.get(bufferSize - 1))) { buffer.put(bufferSize - 1, (byte) CHAR_NEWLINE); } } buffer.flip(); buffer.position(4); sh.packet( pd, buffer.slice() ); buffer.flip(); } else { int planeCount = pd.planes.size(); for (int iplane = 0; iplane < planeCount; iplane++) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds = plane.getDs(); QubeDataSetIterator it = new QubeDataSetIterator(planeds); while (it.hasNext()) { it.next(); tt.write(it.getValue(planeds), buffer); } boolean lastPlane = iplane == planeCount - 1; if (lastPlane && tt.isAscii() && Character.isWhitespace(buffer.get(bufferSize - 1))) { buffer.put(bufferSize - 1, (byte) CHAR_NEWLINE); } } buffer.flip(); buffer.position(4); sh.packet( pd, buffer.slice() ); buffer.flip(); } List probs= new ArrayList(); if ( !DataSetUtil.validate(ds1, probs ) ) { throw new IllegalArgumentException("DataSet is not valid: "+probs.get(0)); } } /** * format the dataset which is part of a join dataset. * @param name the name for the dataset of which ds1 is all or a slice of, or null. * @param ds1 * @param inline if true, then the entire dataset will be present. * @throws IOException * @throws StreamException */ public void format( String name, QDataSet ds1, String inoutForm ) throws IOException, StreamException { format( null, name, ds1, inoutForm ); } public void join( String name, int rank, Map props ) throws StreamException, IOException { PacketDescriptor packetDescriptor = new PacketDescriptor(); packetDescriptor.setStream(true); packetDescriptor.setStreamRank(rank); try { Document document = sd.newDocument(packetDescriptor); Element packetElement = document.createElement("packet"); Element qdataset= document.createElement("qdataset"); packetElement.appendChild( qdataset ); qdataset.setAttribute( "id", name ); qdataset.setAttribute("rank", String.valueOf(rank) ); Element propsElement= doProperties( document, props, false ); qdataset.appendChild(propsElement); Element valuesElement= document.createElement("values"); valuesElement.setAttribute( "join", "ignore" ); // this property value is ignored, but must have length>0 qdataset.appendChild(valuesElement); packetDescriptor.setDomElement(packetElement); sh.packetDescriptor( packetDescriptor ); } catch ( ParserConfigurationException ex ) { throw new RuntimeException(ex); } //TODO: finish me! } /** * allow the stream to recycle the name. new packets with this name will issue a new packetDescriptor. * * @param name */ public void retire( String name ) { //planes.remove(name); PacketDescriptor pd= pds.get(name); sd.retireDescriptor(pd); } public static void main( String[] args ) throws FileNotFoundException, IOException, StreamException { boolean join; //QDataSet ds= Ops.ripplesTimeSeries(24); //QDataSet ds= Ops.ripplesVectorTimeSeries(24); //QDataSet ds= Ops.ripplesSpectrogramTimeSeries(24); QDataSet ds= Ops.ripplesJoinSpectrogramTimeSeries(24); join= SemanticOps.isJoin(ds); SerialStreamFormatter form= new SerialStreamFormatter(); String name= "Flux"; //File f = new File( name + ".qds" ); //FileOutputStream out = new FileOutputStream(f); // configure the tool form.setAsciiTypes(true); form.setTransferType( name, new AsciiTransferType(10,false) ); form.setUnitTransferType( Units.us2000, new AsciiTimeTransferType(17,Units.us2000 ) ); //FormatStreamHandler sh= new FormatStreamHandler(); //sh.setOutputStream( new FileOutputStream("/tmp/foo.serialStreamFormatter.via.formatStreamHandler.qds") ); //form.init( name, sh ); form.init( name, java.nio.channels.Channels.newChannel( new FileOutputStream("/tmp/foo.serialStreamFormatter.toStream.qds") ) ); //form.init( name, Channels.newChannel( System.out ) ); // if ( join ) { form.join( name, 3, new HashMap() ); for ( int j=0; j