package org.autoplot.datasource; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; import org.das2.datum.DatumRange; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; import org.autoplot.aggregator.AggregatingDataSourceFactory; import org.das2.qds.DataSetOps; import org.das2.qds.DataSetUtil; import org.das2.qds.QDataSet; import org.autoplot.datasource.capability.Streaming; import org.autoplot.datasource.capability.TimeSeriesBrowse; import org.das2.datum.Datum; import org.das2.datum.TimeUtil; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.ops.Ops; import org.das2.qds.util.DataSetBuilder; /** * Introduce class to hold code for iterating through any dataset. * This will detect the time series browse capability and streaming. * So if the data source is already streaming, then this is trivial. If * not, then it will request chunks of data from the time series browse * and handle them one chunk at a time. And additional trim can be used, * see constrainDepend0. * @author jbf */ public class RecordIterator implements Iterator { int index; int lastIndex=-1; // -1 means we don't know, no constraint. QDataSet src=null; Iterator streamingIterator=null; private DatumRange depend0Constraint=null; QDataSet nextRecord=null; // when depend0Constraint is set, we need to read to the next record in the hasNext method. QDataSet sortDataSet=null; private static final Logger logger= Logger.getLogger("apdss.recordIterator"); /** * this will change as serial DataSources are available. * @param suri * @param timeRange * @param monitor * @return * @throws URISyntaxException * @throws Exception */ private QDataSet getDataSet( String suri, DatumRange timeRange, ProgressMonitor monitor ) throws URISyntaxException, Exception { logger.log( Level.FINE, "getDataSet(\"{0}\",DatumRangeUtil.parseTimeRange({1}),monitor)", new Object[]{suri, timeRange} ); URI uri = DataSetURI.getURI(suri); DataSourceFactory factory = DataSetURI.getDataSourceFactory(uri, new NullProgressMonitor()); if ( factory==null ) { throw new IllegalArgumentException("no data source factory found for URI: "+ uri ); } TimeSeriesBrowse tsb= factory.getCapability(TimeSeriesBrowse.class); // see if we can allow for URIs without timeranges. if ( tsb!=null ) { tsb.setURI(suri); tsb.setTimeRange( timeRange ); uri= new URI( tsb.getURI() ); } DataSource result = factory.getDataSource( uri ); if (monitor == null) { monitor = new NullProgressMonitor(); } Streaming streaming= result.getCapability( Streaming.class ); if ( streaming!=null ) { logger.fine("this data could be streamed"); } tsb= result.getCapability( TimeSeriesBrowse.class ); if ( tsb!=null ) { tsb.setTimeRange( timeRange ); } else { logger.fine("TimeSeriesBrowse capability not found, simply returning dataset."); } QDataSet rds= result.getDataSet(monitor); if ( rds==null && factory instanceof AggregatingDataSourceFactory ) { logger.info("strange condition where occasional null is returned because of reference caching. This needs to be studied more."); monitor = new NullProgressMonitor(); monitor.setLabel("strange condition where occasional null..."); rds= result.getDataSet(monitor); //TODO nasty kludge, just try reading again... } return rds; } /** * create a new RecordIterator for the given URI and time range. * @param suri the data URI * @param timeRange the time range * @throws Exception if the data read throws an exception. */ public RecordIterator( String suri, DatumRange timeRange ) throws Exception { this( suri, timeRange, true ); } /** * create a new RecordIterator for the given URI and time range. * @param suri the data URI * @param timeRange the time range * @param allowStream if false, then don't use the streaming capability, even when it is available. * @throws Exception if the data read throws an exception. */ public RecordIterator( String suri, DatumRange timeRange, boolean allowStream ) throws Exception { URI uri = DataSetURI.getURI(suri); DataSourceFactory factory = DataSetURI.getDataSourceFactory(uri, new NullProgressMonitor()); if ( factory==null ) { throw new IllegalArgumentException("no data source factory found for URI: "+ uri ); } DatumRange timeRangeExt= new DatumRange( TimeUtil.prev( TimeUtil.MINUTE, timeRange.min() ), TimeUtil.next( TimeUtil.MINUTE, timeRange.max() ) ); TimeSeriesBrowse tsb= factory.getCapability(TimeSeriesBrowse.class); // see if we can allow for URIs without timeranges. if ( tsb!=null ) { tsb.setURI(suri); tsb.setTimeRange( timeRangeExt ); uri= new URI( tsb.getURI() ); } DataSource result = factory.getDataSource( uri ); Streaming streaming = result.getCapability( Streaming.class ); if ( streaming!=null && allowStream ) { streamingIterator= streaming.streamDataSet( new NullProgressMonitor() ); } else { QDataSet ds; try { ds= getDataSet( uri.toString(), timeRangeExt, new NullProgressMonitor() ); } catch ( Exception ex ) { throw ex; // breakpoint here } if ( ds==null ) { this.index= 0; this.lastIndex=0; return; } QDataSet dep0= (QDataSet) ds.property(QDataSet.DEPEND_0); if ( dep0!=null ) { if ( ds.rank()==1 ) { this.src= Ops.bundle( dep0, ds ); } else if ( ds.rank()==2 ) { QDataSet dep1= (QDataSet) ds.property(QDataSet.DEPEND_1); this.src= Ops.bundle( null, dep0 ); for ( int i=0; i2 ) { // flatten the rank>2 to rank=2. int[] qube= DataSetUtil.qubeDims(ds.slice(0)); QDataSet dep2= (QDataSet) ds.property(QDataSet.DEPEND_2); if ( dep2!=null && dep2.rank()==3 ) { dep2= Ops.reform( dep2, dep2.length(), new int[] { DataSetUtil.product(qube) } ); } ds= Ops.reform( ds, ds.length(), new int[] { DataSetUtil.product(qube) } ); this.src= Ops.bundle( dep0, Ops.slice1(ds,0) ); for ( int i=1; i1 ) { this.src= Ops.bundle( this.src, Ops.slice1(dep2,0) ); for ( int i=1; i2 streaming not supported"); } } return result; } @Override public QDataSet next() { if ( this.streamingIterator!=null ) { QDataSet result= nextRecord; if ( this.sortDataSet!=null ) { result= DataSetOps.applyIndex( result, 0, sortDataSet, true ); } if ( streamingIterator.hasNext() ) { QDataSet nextRecord1= streamingIterator.next(); nextRecord1= normalize(nextRecord1); QDataSet dep0= (QDataSet) nextRecord1.slice(0); if ( depend0Constraint==null || DataSetUtil.asDatum(dep0).lt( depend0Constraint.max() ) ) { nextRecord= nextRecord1; } else { nextRecord= null; } } else { nextRecord= null; } return result; } else { return src.slice(index++); } } @Override public void remove() { //JAVA7: this can be removed when Java 8 is required. // do nothing. } /** * do the opposite function, collect all the records and return a dataset. * @param qds * @return */ public static QDataSet collect( Iterator qds ) { QDataSet rec= qds.next(); DataSetBuilder b; DataSetBuilder dep0b= new DataSetBuilder(1,100); switch ( rec.rank() ) { case 0: b= new DataSetBuilder(1,100); break; case 1: b= new DataSetBuilder(2,100,rec.length()); break; case 2: b= new DataSetBuilder(2,100,rec.length(),rec.length(0)); break; case 3: b= new DataSetBuilder(2,100,rec.length(),rec.length(0),rec.length(1)); break; default: throw new IllegalArgumentException("bad rank"); } b.nextRecord(rec); QDataSet dep0= (QDataSet)rec.property(QDataSet.CONTEXT_0); if ( dep0!=null ) dep0b.nextRecord(); while ( qds.hasNext() ) { rec= qds.next(); b.nextRecord(rec); dep0= (QDataSet)rec.property(QDataSet.CONTEXT_0); if ( dep0!=null ) dep0b.nextRecord(); } MutablePropertyDataSet result= b.getDataSet(); if ( dep0b.getLength()>0 ) { result.putProperty( QDataSet.DEPEND_0, dep0b.getDataSet()); } return result; } }