/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package org.das2.qstream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; import org.das2.datum.Datum; import org.das2.datum.EnumerationUnits; import org.das2.datum.TimeLocationUnits; import org.das2.datum.TimeUtil; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.qds.DataSetIterator; import org.das2.qds.DataSetOps; import org.das2.qds.DataSetUtil; import org.das2.qds.QDataSet; import org.das2.qds.QubeDataSetIterator; import org.das2.qds.SemanticOps; import org.das2.qds.Slice0DataSet; import org.das2.qds.ops.Ops; import org.w3c.dom.DOMException; import org.w3c.dom.Document; import org.w3c.dom.Element; /** * Class for formatting QDataSets as QStreams. Although this is "SimpleStreamFormatter," this is the only formatter written * thus far, except for SerialStreamFormatter, which formats packet-by-packet. * @author jbf */ public class SimpleStreamFormatter { private static final char CHAR_NEWLINE = '\n'; boolean asciiTypes = true; boolean isBigEndian = true; //ByteOrder.nativeOrder() .equals( ByteOrder.BIG_ENDIAN ); private static final Logger logger= Logger.getLogger("qstream"); /** * newBundle true indicates try to format the bundle with new code. */ boolean newBundle= false; Document getNewDocument() { try { DocumentBuilder builder = DocumentBuilderFactory.newInstance().newDocumentBuilder(); return builder.newDocument(); } catch (ParserConfigurationException pce) { throw new RuntimeException(pce); } } Element getPacketElement(Document document) { Element packet = document.createElement("packet"); return packet; } Map names = new HashMap<>(); private void doValuesElement(QDataSet ds, PacketDescriptor pd, PlaneDescriptor planeDescriptor, Document document, Element qdatasetElement) throws DOMException { Object sunits= ds.property(QDataSet.UNITS); if ( sunits!=null && !(sunits instanceof Units) ) { throw new IllegalArgumentException( "UNITS property doesn't contain type units, it's type "+sunits.getClass()+": "+sunits ); } boolean highResKludge= false; Units u = (Units) sunits; if ( u==null && isBundle(ds) && ds.rank()==2 ) { QDataSet bundleDescriptor= (QDataSet) ds.property(QDataSet.BUNDLE_1); if ( bundleDescriptor==null ) { //TODO: check this schema, demoed by BundleBinsDemo.demo1(); } else { // uh-oh. just use a high-resolution format for now. for ( int i=0; i maxFp) { maxFp = fp; // check for all ints, for now. //TODO: check for highest precision. } if (fp > 0 && fp < minFp) { minFp = fp; } if (dd > 0 && dd < absMin) { absMin = dd; } if (d > max) { max = d; } } if (u instanceof EnumerationUnits) { planeDescriptor.setType(new AsciiIntegerTransferType(10)); } else if (u instanceof TimeLocationUnits) { AsciiTimeTransferType att = getTT(ds); planeDescriptor.setType(att); } else { //QDataSet diffs= Ops.subtract( ds, DataSetOps.slice0(ds,0) ); //QDataSet gcd= DataSetUtil.gcd( diffs, DataSetUtil.asDataSet(absMin/100) ); if ( highResKludge ) { // we don't have separate formatters for each bundled dataset, so just use a high resolution format for now. planeDescriptor.setType( new AsciiTransferType(18,true) ); } else if (maxFp == 0 && min > -1e8 && max < 1e8) { planeDescriptor.setType(new AsciiIntegerTransferType(10)); } else if (min > -10000 && max < 10000 && absMin > 0.0001) { planeDescriptor.setType(new AsciiTransferType(10, false)); } else if (min > -100000 && max < 100000 && minFp == 0.5 && maxFp == 0.5) { planeDescriptor.setType(new AsciiTransferType(10, false)); } else { if (absMin > 1e-100 && max < 1e100) { planeDescriptor.setType(new AsciiTransferType(10, true)); } else { planeDescriptor.setType(new AsciiTransferType(12, true)); } } } } else { if ( highResKludge ) { planeDescriptor.setType(new DoubleTransferType()); } else if (u instanceof EnumerationUnits) { planeDescriptor.setType(new IntegerTransferType()); } else if (u==Units.cdfTT2000 ) { planeDescriptor.setType(new LongTransferType()); //TODO: some cdfTT2000s will be truncated. } else if (u instanceof TimeLocationUnits) { planeDescriptor.setType(new DoubleTransferType()); } else { planeDescriptor.setType(new FloatTransferType()); } } } if (pd.isValuesInDescriptor() && ds.rank() == 2) { for (int i = 0; i < ds.length(); i++) { Element values = doValues(document, pd, planeDescriptor, DataSetOps.slice0(ds, i)); values.setAttribute("index", String.valueOf(i)); qdatasetElement.appendChild(values); } } else { if ( UnitsUtil.isNominalMeasurement(u) && pd.isValuesInDescriptor()) { Map enumerations= new HashMap<>(); DataSetIterator it= new QubeDataSetIterator(ds); assert u!=null; EnumerationUnits eu= (EnumerationUnits)u; while ( it.hasNext() ) { it.next(); int i= (int)it.getValue(ds); if ( !enumerations.containsKey(i) ) { Datum ed= eu.createDatum(i); String sed= ed.toString(); Element enumerationUnit= document.createElement("enumerationUnit"); enumerationUnit.setAttribute("name", eu.getId() ); enumerationUnit.setAttribute("value", String.valueOf(i) ); enumerationUnit.setAttribute("color", String.format("0x%06x", eu.getColor( ed ) ) ); enumerationUnit.setAttribute("label", sed ); qdatasetElement.appendChild(enumerationUnit); enumerations.put(i,sed); } } } Element values = doValues(document, pd, planeDescriptor, ds); qdatasetElement.appendChild(values); } } /** * * @param ds rank 0 dataset of time. * @return the number of characters needed to represent: 17,20,24,27,30. */ private int timeDigits( QDataSet ds ) { Units u= SemanticOps.getUnits(ds); double micros; if ( ds.rank()!=0 ) throw new IllegalArgumentException("data should be rank 0"); if ( UnitsUtil.isTimeLocation(u) ) { micros= TimeUtil.getMicroSecondsSinceMidnight( u.createDatum(ds.value()) ); } else { micros= u.convertDoubleTo( Units.microseconds,ds.value() ); } if ( micros==0 || ( micros>=60e6 && micros%60e6 <0.1 ) ) { return 17; // HH:MM } else if ( micros>=1e6 && micros%1e6 <0.001 ) { return 20; // HH:MM:SS } else if ( micros>=1000 && micros%1000 < 0.1 ) { return 24; // HH:MM:SS.mmm } else if ( micros>=1 && micros%1 < 0.001 ) { return 27; // HH:MM:SS.mmmmmm } else { return 30; // HH:MM:SS.mmmmmmmmm } } /** * pick a time formatter so that resolution is not reduced noticeably * in formatting. * @param ds * @return */ private AsciiTimeTransferType getTT( QDataSet ds ) { Units u= SemanticOps.getUnits(ds); QDataSet t0= DataSetOps.slice0(ds,0); int timeDigits; switch (t0.rank()) { case 1: timeDigits= timeDigits( t0.slice(0) ); for ( int i=1; i>t0.length(); i++ ) { timeDigits= Math.max( timeDigits, timeDigits(t0.slice(i) ) ); } break; case 0: timeDigits= timeDigits( t0 ); break; default: throw new IllegalArgumentException("time type rank limit"); } //TODO: same thing but with gcd added, use max of the two. QDataSet gcd; try { QDataSet diffs= Ops.subtract( ds, DataSetOps.slice0(ds,0) ); try { gcd= DataSetUtil.gcd( diffs, DataSetUtil.asDataSet( 1, Units.picoseconds ) ); } catch ( IndexOutOfBoundsException ex ) { diffs= Ops.diff( ds ); gcd= DataSetUtil.gcd( diffs, DataSetUtil.asDataSet( 1, Units.picoseconds ) ); } } catch ( IllegalArgumentException ex ) { logger.log( Level.FINE, ex.getMessage(), ex ); gcd= null; } if ( gcd==null ) { return new AsciiTimeTransferType(timeDigits, u); // millis } else { int timeDigitsGcd= timeDigits( gcd ); return new AsciiTimeTransferType( Math.max( timeDigits, timeDigitsGcd ), u ); } } /** * write out the plane descriptor for a bundle dataset. * @param document * @param pd * @param ds * @param streamRank * @return */ private PlaneDescriptor doPlaneDescriptorBundle(Document document, PacketDescriptor pd, QDataSet ds, int rank ) { Element qdatasetElement = document.createElement("qdataset"); qdatasetElement.setAttribute("id", nameFor(ds)); qdatasetElement.setAttribute("rank", String.valueOf(rank) ); logger.log( Level.FINE, "writing qdataset {0}", nameFor(ds)); if ( isBundle(ds) ) { for ( int i=0; i props = DataSetUtil.getProperties(ds); Element prop; for ( Entry e: props.entrySet()) { String name= e.getKey(); prop= null; Object value = e.getValue(); if (value instanceof QDataSet) { if ( name.equals(QDataSet.BUNDLE_1) && props.containsKey(QDataSet.BINS_1) && props.get(QDataSet.BINS_1).equals(QDataSet.VALUE_BINS_MIN_MAX) ) { continue; // Kludge Du Jour. } QDataSet qds = (QDataSet) value; String sliceName= (String) qds.property(QDataSet.NAME); // kludge to avoid formatting slice name, which is intended for help humans. if ( ds instanceof Slice0DataSet && sliceName!=null && sliceName.startsWith("slice") ) continue; //TODO: 3567174 this first-go at serializing datasets has a bug where it would drop DEPEND_1 property, which had been serialized in-line. // if ( ((QDataSet)value).rank()>0 && this.names.containsKey((QDataSet)value)==false ) continue; // we're not going to serialize this. if (qds.rank() == 0) { prop = document.createElement("property"); prop.setAttribute("name", name); SerializeDelegate r0d= (SerializeDelegate) SerializeRegistry.getByName("rank0dataset"); prop.setAttribute("type", "rank0dataset"); Units u= (Units) qds.property( QDataSet.UNITS ); if ( u!=null && u instanceof EnumerationUnits ) { //prop.setAttribute("value", r0d.format(value) ); //prop.setAttribute("value", String.valueOf(value) ); continue; // TODO: this should do something! } else { prop.setAttribute("value", r0d.format(value) ); } } else { prop = document.createElement("property"); prop.setAttribute("name", name); prop.setAttribute("type", "qdataset"); prop.setAttribute("value", nameFor((QDataSet) value)); } } else { SerializeDelegate sd = SerializeRegistry.getDelegate(value.getClass()); if (sd == null) { System.err.println("dropping "+name+" because unsupported type: "+value.getClass()); } else { prop = document.createElement("property"); prop.setAttribute("name", name); if ( sd instanceof XMLSerializeDelegate ) { prop.appendChild( ((XMLSerializeDelegate)sd).xmlFormat(document,value) ); } else { prop.setAttribute("type", sd.typeId(value.getClass())); prop.setAttribute("value", sd.format(value)); } } } if ( prop!=null ) properties.appendChild(prop); } return properties; } private StreamDescriptor doStreamDescriptor(QDataSet ds) throws ParserConfigurationException { StreamDescriptor sd = new StreamDescriptor(DocumentBuilderFactory.newInstance()); Document document = sd.newDocument(sd); Element streamElement = document.createElement("stream"); String name = nameFor(ds); streamElement.setAttribute("dataset_id", name); // version 1.1 changes how nominal data is encoded. streamElement.setAttribute("version", "1.1"); if (asciiTypes == false) { streamElement.setAttribute("byte_order", isBigEndian ? "big_endian" : "little_endian"); } sd.setDomElement(streamElement); sd.addDescriptor(sd); // allocate [00] for itself. return sd; } private Element doValues( Document document, PacketDescriptor packetDescriptor, PlaneDescriptor planeDescriptor, QDataSet ds ) { Element values = document.createElement("values"); int[] qubeDims=null; if ( !packetDescriptor.isValuesInDescriptor() ) { values.setAttribute("encoding", planeDescriptor.getType().name()); qubeDims = DataSetUtil.qubeDims(ds); } if (packetDescriptor.isStream()) { if (ds.rank() > 1) { assert qubeDims!=null; values.setAttribute("length", Util.encodeArray(qubeDims, 1, qubeDims.length - 1)); // must be a qube for stream. } else { values.setAttribute("length", ""); } } else { if ( qubeDims!=null ) { values.setAttribute("length", Util.encodeArray(qubeDims, 0, qubeDims.length)); } if (packetDescriptor.isValuesInDescriptor()) { Units u= (Units) ds.property(QDataSet.UNITS); StringBuilder s = new StringBuilder(""); if ( u!=null && u instanceof EnumerationUnits ) { for (int i = 0; i < ds.length(); i++) { s.append( "," ).append( (int)ds.value(i) ); } } else { for (int i = 0; i < ds.length(); i++) { s.append( "," ).append( ds.value(i) ); } } values.setAttribute("values", ds.length() == 0 ? "" : s.substring(1)); if ( ds.length()==0 ) { values.setAttribute("length", "0" ); } } } return values; } private void formatPackets(WritableByteChannel channel, StreamDescriptor sd, PacketDescriptor pd) throws IOException { int bufferSize = 4 + pd.sizeBytes(); byte[] bbuf = new byte[bufferSize]; ByteBuffer buffer = ByteBuffer.wrap(bbuf); //ByteBuffer.allocateDirect(bufferSize); if (isBigEndian) { buffer.order(ByteOrder.BIG_ENDIAN); } else { buffer.order(ByteOrder.LITTLE_ENDIAN); } String packetTag = String.format(":%02d:", sd.descriptorId(pd)); buffer.put(packetTag.getBytes()); // we only need to write it once. if (pd.isStream()) { int packetCount = pd.planes.get(0).getDs().length(); int planeCount = pd.planes.size(); Units[] checkEnumerationUnits= new Units[planeCount]; Map enumerations= new HashMap<>(); // these are the ones we've described. for (int iplane = 0; iplane < planeCount; iplane++) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds = plane.getDs(); Units u= (Units) planeds.property(QDataSet.UNITS); if ( u!=null && u instanceof EnumerationUnits ) { checkEnumerationUnits[iplane]= u; } else { checkEnumerationUnits[iplane]= null; } } for (int i = 0; i < packetCount; i++) { for (int iplane = 0; iplane < planeCount; iplane++) { if ( checkEnumerationUnits[iplane]!=null ) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds = plane.getDs(); double v=-1; if (planeds.rank() == 1) { v= planeds.value(i); } else if (planeds.rank() == 2) { throw new IllegalArgumentException("not supported rank 2 enumeration units"); } else { throw new IllegalArgumentException("not supported rank N enumeration units"); } int iv= (int)v; if ( !enumerations.containsKey(iv) ) { byte[] bytes; EnumerationUnits eu= (EnumerationUnits)checkEnumerationUnits[iplane]; Datum d= eu.createDatum(iv); int c= eu.getColor( d ); String label= d.toString(); String ss= String.format( "\n", eu.getId(), iv, c, label ); bytes= ss.getBytes( "UTF-8" ); channel.write( ByteBuffer.wrap( String.format( "[xx]%06d", bytes.length ).getBytes("UTF-8") ) ); channel.write( ByteBuffer.wrap( bytes ) ); enumerations.put( iv, label); } } } for (int iplane = 0; iplane < planeCount; iplane++) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds = plane.getDs(); boolean lastPlane = iplane == planeCount - 1; if (planeds.rank() == 1) { tt.write(planeds.value(i), buffer); } else if (planeds.rank() == 2) { for (int j = 0; j < planeds.length(i); j++) { tt.write(planeds.value(i, j), buffer); } } else { QDataSet slice = DataSetOps.slice0(planeds, i); QubeDataSetIterator it = new QubeDataSetIterator(slice); //QubeDataSetIterator it = QubeDataSetIterator.sliceIterator(planeds,i); while (it.hasNext()) { it.next(); tt.write(it.getValue(slice), buffer); } } if (lastPlane && tt.isAscii() && Character.isWhitespace(buffer.get(bufferSize - 1))) { buffer.put(bufferSize - 1, (byte) CHAR_NEWLINE); } } buffer.flip(); channel.write(buffer); buffer.position(4); } buffer.flip(); } else { int planeCount = pd.planes.size(); for (int iplane = 0; iplane < planeCount; iplane++) { PlaneDescriptor plane = pd.planes.get(iplane); TransferType tt = plane.getType(); QDataSet planeds = plane.getDs(); QubeDataSetIterator it = new QubeDataSetIterator(planeds); while (it.hasNext()) { it.next(); tt.write(it.getValue(planeds), buffer); } boolean lastPlane = iplane == planeCount - 1; if (lastPlane && tt.isAscii() && Character.isWhitespace(buffer.get(bufferSize - 1))) { buffer.put(bufferSize - 1, (byte) CHAR_NEWLINE); } } buffer.flip(); channel.write(buffer); buffer.flip(); } } /** * set the names for all the joined datasets. * @param slice * @param name */ private synchronized void setNameFor( QDataSet slice, String name ) { if ( name.equals( names.get(slice) ) ) { // already named it. } else if ( names.get(slice)!=null ) { throw new IllegalArgumentException("already have name for: "+slice + " want to set to "+name); } else { maybePutName( slice, name ); } } /** * return a name for the thing that describes dep0. This will be used in * the descriptor, so if the descriptor doesn't contain the values, then * the name can be reused. Note no names are reused at this point. * @param dep0 * @return */ private synchronized String nameFor(QDataSet dep0) { String name = names.get(dep0); boolean assignName= name==null; if (name == null) { name = (String) dep0.property(QDataSet.NAME); } if (name == null) { name = "ds_" + names.size(); } if ( assignName ) maybePutName( dep0, name ); return name; } PacketDescriptor doPacketDescriptorNonQube( StreamDescriptor sd, QDataSet ds, boolean stream, boolean valuesInDescriptor, int streamRank ) throws ParserConfigurationException { PacketDescriptor packetDescriptor = new PacketDescriptor(); packetDescriptor.setStream(stream); packetDescriptor.setStreamRank(streamRank); if (valuesInDescriptor) { packetDescriptor.setValuesInDescriptor(true); } Document document = sd.newDocument(packetDescriptor); Element packetElement = getPacketElement(document); QDataSet dep0 = (QDataSet) ds.property(QDataSet.DEPEND_0); if (dep0 != null) { PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, dep0, streamRank); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); } for (int i = 0; i < QDataSet.MAX_PLANE_COUNT; i++) { QDataSet plane0 = (QDataSet) ds.property("PLANE_" + i); if (plane0 != null) { PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, plane0, streamRank); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); } else { break; } } PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, ds, streamRank); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); packetDescriptor.setDomElement(packetElement); return packetDescriptor; } PacketDescriptor doPacketDescriptorJoin( StreamDescriptor sd, QDataSet ds, boolean stream, boolean valuesInDescriptor, int streamRank ) throws ParserConfigurationException { PacketDescriptor packetDescriptor = new PacketDescriptor(); packetDescriptor.setStream(stream); packetDescriptor.setStreamRank(streamRank); if (valuesInDescriptor) { packetDescriptor.setValuesInDescriptor(true); } Document document = sd.newDocument(packetDescriptor); Element packetElement = getPacketElement(document); Object join = ds.property(QDataSet.JOIN_0); if (join == null) { throw new IllegalArgumentException("expected join"); } PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, ds, streamRank); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); packetDescriptor.setDomElement(packetElement); return packetDescriptor; } PacketDescriptor doPacketDescriptorBundle( StreamDescriptor sd, QDataSet ds ) throws ParserConfigurationException { PacketDescriptor packetDescriptor = new PacketDescriptor(); packetDescriptor.setStream(true); packetDescriptor.setStreamRank(2); packetDescriptor.setValuesInDescriptor(true); Document document = sd.newDocument(packetDescriptor); Element packetElement = getPacketElement(document); QDataSet bds= (QDataSet) ds.property(QDataSet.BUNDLE_1); if ( bds==null ) { throw new IllegalArgumentException("not supported"); } PlaneDescriptor planeDescriptor = doPlaneDescriptorBundle( document, packetDescriptor, ds, ds.rank() ); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); //PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, bds, 2 ); //packetDescriptor.addPlane(planeDescriptor); //packetElement.appendChild(planeDescriptor.getDomElement()); packetDescriptor.setDomElement(packetElement); return packetDescriptor; } /** * create the packetDescriptor object. * @param sd the handle for the stream * @param ds the dataset that will be serialized to this packet descriptor. * @param stream the data will be streamed in packets. * @param valuesInDescriptor the data will be contained in the packet. * @param streamRank the rank of the stream. * @return * @throws ParserConfigurationException */ PacketDescriptor doPacketDescriptor(StreamDescriptor sd, QDataSet ds, boolean stream, boolean valuesInDescriptor, int streamRank, String joinId) throws ParserConfigurationException { if ( !valuesInDescriptor && DataSetUtil.isQube(ds) == false ) { throw new IllegalArgumentException("must be qube!"); } PacketDescriptor packetDescriptor = new PacketDescriptor(); packetDescriptor.setStream(stream); packetDescriptor.setStreamRank(streamRank); if (valuesInDescriptor) { packetDescriptor.setValuesInDescriptor(true); } Document document = sd.newDocument(packetDescriptor); Element packetElement = getPacketElement(document); QDataSet dep0 = (QDataSet) ds.property(QDataSet.DEPEND_0); if (dep0 != null) { PlaneDescriptor planeDescriptor = doPlaneDescriptor(document, packetDescriptor, dep0, streamRank); packetDescriptor.addPlane(planeDescriptor); packetElement.appendChild(planeDescriptor.getDomElement()); } // datasets with time-varying y tags for ( int i=1; i *
  • rank 0 *
  • rank 1 *
  • rank 2 qubes *
  • array of rank 2 qubes *
  • rank 3 qubes *
  • bundle datasets. * * The design goal is that all datasets can be serialized using this formatter, * however some schemas (e.g. high-rank non-qubes) will be inefficient. * * @param ds the dataset to serialize. * @param osout the output stream, which will not be closed here. * @param asciiTypes use ascii format types so that the stream is completely ascii. * @throws StreamException * @throws IOException */ public void format( QDataSet ds, OutputStream osout, boolean asciiTypes ) throws StreamException, IOException { try { this.asciiTypes = asciiTypes; WritableByteChannel out = Channels.newChannel(osout); StreamDescriptor sd = doStreamDescriptor(ds); List probs= new ArrayList(); if ( !DataSetUtil.validate(ds, probs ) ) { throw new IllegalArgumentException("DataSet is not valid: "+probs.get(0)); } List depPackets = new ArrayList(); sd.send(sd, out); int packetDescriptorCount; int streamRank; //TODO: describe this String dep0Name = null; if ( isBundle(ds) && asciiTypes ) { // we need to do more with this. right now there's just one planeDescriptor for the bunch. System.err.println("suboptimal implementation doesn't use different formatters for each bundled dataset"); } if (DataSetUtil.isQube(ds)) { packetDescriptorCount = 1; streamRank = 1; } else if ( isJoin(ds) ) { packetDescriptorCount = ds.length(); streamRank = 1; } else { packetDescriptorCount = ds.length(); streamRank = 2; } QDataSet dep0 = (QDataSet) ds.property(QDataSet.DEPEND_0); if (dep0 != null) { dep0Name = nameFor(dep0); } String joinDataSet=null; boolean isjoin= ds.property(QDataSet.JOIN_0) != null; if ( isjoin ) { PacketDescriptor join= doPacketDescriptorJoin(sd, ds, false, false, streamRank); sd.addDescriptor(join); sd.send(join, out); joinDataSet= nameFor(ds); } for (int ipacket = 0; ipacket < packetDescriptorCount; ipacket++) { PacketDescriptor mainPd; QDataSet packetDs; List retire= new ArrayList(); if (streamRank == 1 && !isjoin ) { packetDs = ds; } else { packetDs = ds.slice(ipacket); maybePutName( packetDs, nameFor(packetDs) ); if (dep0Name != null) { maybePutName( (QDataSet) packetDs.property(QDataSet.DEPEND_0), dep0Name ); //TODO: Planes are still a problem } } if ( isJoin(packetDs) ) { throw new IllegalArgumentException("join of join not supported"); } mainPd = doPacketDescriptor(sd, packetDs, true, false, streamRank, joinDataSet ); sd.addDescriptor(mainPd); // check for DEPEND_1 and DEPEND_2 datasets that need to be sent out first. for (int i = 1; i < QDataSet.MAX_RANK; i++) { QDataSet depi = (QDataSet) packetDs.property("DEPEND_" + i); if (depi != null) { if ( depi==dep0 ) { // kludge: if DEPEND_0==DEPEND_1, an invalid stream was created throw new RuntimeException("bug in QStream prevents DEPEND_0==DEPEND_1"); /*(System.err.println("DEPEND_0==DEPEND_1, copy kludge"); depi= DDataSet.copy(dep0); String name= (String) dep0.property(QDataSet.NAME); if ( name!=null ) { ((MutablePropertyDataSet)depi).putProperty( QDataSet.NAME, name + "_1" ); }*/ } if ( depi.rank()!=2 ) { // if it's rank 2, we'll send it out in each packet. PacketDescriptor pd; boolean valuesInDescriptor = true; // because it's a non-qube if ( depi.length()>100 ) { valuesInDescriptor= false; } pd = doPacketDescriptor(sd, depi, false, valuesInDescriptor, 1, null); pd.setValuesInDescriptor(valuesInDescriptor); sd.addDescriptor(pd); depPackets.add(pd); sd.send(pd, out); if ( valuesInDescriptor ) { retire.add(pd); } } } } // check for BUNDLE_1 datasets that need to be sent out first. for (int i = 0; i < 2; i++) { QDataSet depi = (QDataSet) packetDs.property("BUNDLE_" + i); if (depi != null) { PacketDescriptor pd; List retireBundleDep1= new ArrayList(); for ( int j=0; j