/* * AggregatingDataSource.java * * Created on October 25, 2007, 10:29 AM * * To change this template, choose Tools | Template Manager * and open the template in the editor. */ package org.autoplot.aggregator; import java.io.FileNotFoundException; import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import org.das2.qds.buffer.BufferDataSet; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.CacheTag; import org.das2.datum.Datum; import org.das2.datum.DatumRange; import org.das2.datum.DatumRangeUtil; import org.das2.datum.EnumerationUnits; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.fsm.FileStorageModel; import org.das2.util.LoggerManager; import org.das2.util.filesystem.FileSystem; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; import org.das2.qds.ArrayDataSet; import org.das2.qds.DDataSet; import org.das2.qds.DataSetOps; import org.das2.qds.DataSetUtil; import org.das2.qds.JoinDataSet; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.QDataSet; import org.das2.qds.SemanticOps; import org.das2.qds.WritableDataSet; import org.autoplot.datasource.AbstractDataSource; import org.autoplot.datasource.DataSetURI; import org.autoplot.datasource.DataSource; import org.autoplot.datasource.DataSourceFactory; import org.autoplot.datasource.DataSourceUtil; import org.autoplot.datasource.MetadataModel; import org.autoplot.datasource.ReferenceCache; import org.autoplot.datasource.URISplit; import org.autoplot.datasource.Version; import org.autoplot.datasource.capability.Streaming; import org.autoplot.datasource.capability.TimeSeriesBrowse; import org.autoplot.datasource.capability.Updating; import org.das2.qds.ops.Ops; import org.das2.qds.util.BundleBuilder; import org.das2.qds.util.DataSetBuilder; import org.das2.qds.util.Reduction; /** * Data Source that aggregates (or combines) the data from granule files containing * data for intervals. For example, * https://cdaweb.gsfc.nasa.gov/istp_public/data/polar/hydra/hyd_h0/$Y/po_h0_hyd_$Y$m$d_v$v.cdf?ELECTRON_DIFFERENTIAL_ENERGY_FLUX&timerange=20000109 * is the aggregation of daily files from the CDAWeb. This provides an * easy method for storing a long time series without having a complex * data server. * * The result of this is not guaranteed to be monotonically increasing in * time. See https://sourceforge.net/p/autoplot/bugs/1326/ * * @author jbf */ public final class AggregatingDataSource extends AbstractDataSource { private static final Logger logger= LoggerManager.getLogger("apdss.agg"); /** * message used when no files are found in the interval. */ public static final String MSG_NO_FILES_FOUND = "No files in interval"; public static final String PARAM_AVAIL= "avail"; private FileStorageModel fsm; DataSourceFactory delegateDataSourceFactory; AggregationPollUpdating upd; // allow a group of files to be watched. This is experimental. String delegateVapScheme= null; /** * metadata from the last read. */ Map metadata; MetadataModel metadataModel; TimeSeriesBrowse tsb; /** * Creates a new instance of AggregatingDataSource * @param uri the URI * @param delegateFactory the factory used to read each granule. * @throws java.net.MalformedURLException * @throws org.das2.util.filesystem.FileSystem.FileSystemOfflineException * @throws java.text.ParseException */ public AggregatingDataSource(URI uri,DataSourceFactory delegateFactory) throws MalformedURLException, FileSystem.FileSystemOfflineException, IOException, ParseException { super(uri); this.delegateDataSourceFactory = delegateFactory; try { URISplit split= URISplit.parse(uri); this.delegateVapScheme= split.vapScheme; } catch ( RuntimeException ex ) { logger.log( Level.WARNING, null, ex ); } if ( AggregatingDataSourceFactory.hasTimeFields( uri.toString() ) ) { tsb= new AggTimeSeriesBrowse(); addCapability( TimeSeriesBrowse.class, tsb ); addCapability( Streaming.class, new StreamingCapability(uri,this) ); } String stimeRange= super.params.get( URISplit.PARAM_TIME_RANGE ); if ( stimeRange!=null ) { if ( super.params.get("timeRange")!=null ) { stimeRange= super.params.get("timeRange"); } if ( stimeRange==null ) { throw new IllegalArgumentException("timerange not found"); } stimeRange= stimeRange.replaceAll("\\+"," " ); viewRange= DatumRangeUtil.parseTimeRange( stimeRange ); } String filePollUpdates= getParam( URISplit.PARAM_FILE_POLL_UPDATES,"" ); if ( filePollUpdates.length()>0 ) { String surl = DataSetURI.fromUri( uri ); FileStorageModel fsm1 = AggregatingDataSourceFactory.getFileStorageModel(surl); double ffilePollUpdates= Math.ceil( Double.parseDouble( filePollUpdates ) ); upd= new AggregationPollUpdating(fsm1, viewRange, (long)(ffilePollUpdates) ); addCapability( Updating.class, upd ); } } /** * It's easy for programs to mess up and contain timetags that are 24 hours off, so check that the * tags are not more than 50% outside the bounds. * @param bounds the expected bounds for the data * @param ads0 the data which ought to be within these bounds. * @return the dataset, possibly trimmed to exclude miscalculated times. */ private MutablePropertyDataSet checkBoundaries( DatumRange bounds, MutablePropertyDataSet ads0 ) { logger.entering( "org.autoplot.aggregator.AggregatingDataSource","checkBoundaries"); QDataSet dep0_0= (QDataSet) ads0.property(QDataSet.DEPEND_0); if ( dep0_0==null && UnitsUtil.isTimeLocation(SemanticOps.getUnits(ads0) ) ) { dep0_0= ads0; } else if ( dep0_0==null ) { return ads0; } if ( !UnitsUtil.isTimeLocation( SemanticOps.getUnits(dep0_0) ) ) { return ads0; } if ( dep0_0.rank()!=1 ) return ads0; int ist= 0; while ( ist0 && DatumRangeUtil.normalize( bounds, DataSetUtil.asDatum( dep0_0.slice(ien-1) ) ) > 1.5 ) ien--; // clip off krud at the end if ( ist>0 || ien0 ) logger.log(Level.INFO, "trimming records 0-{0} to remove timetags outside the bounds.", new Object[] { (ist-1) } ); if ( ien1 ) throw new IllegalArgumentException("expected rank 1 depend0"); if ( Ops.gt( dep0_1.slice(0), dep0_0.slice(dep0_0.length()-1) ).value()==1 ) { logger.exiting( "org.autoplot.aggregator.AggregatingDataSource","trimOverlap"); return ads1; } else { int i=0; while ( i mparams = URISplit.parseParams(split.params); String stimeRange = viewRange.toString(); mparams.put("timerange", stimeRange); if ( resolution!=null ) { mparams.put("resolution", String.valueOf(resolution)); } split.params = URISplit.formatParams(mparams); URISplit split2= URISplit.parse(AggregatingDataSource.this.uri); split.vapScheme= split2.vapScheme; return URISplit.format(split); } @Override public DatumRange getTimeRange() { return viewRange; } @Override public Datum getTimeResolution() { return resolution; } @Override public String toString() { return "aggtsb: " + viewRange + "@" + ( resolution==null ? "intrinsic" : resolution ); } @Override public void setURI(String suri) throws ParseException { viewRange= URISplit.parseTimeRange(suri); logger.log( Level.FINE, "setURI sets viewRange to {0}", viewRange); } @Override public String blurURI() { String surl = DataSetURI.fromUri( AggregatingDataSource.this.resourceURI ) + "?"; if (sparams != null && !sparams.equals("") ) surl += sparams + "&"; URISplit split = URISplit.parse(surl); Map mparams = URISplit.parseParams(split.params); mparams.remove("resolution"); split.params = URISplit.formatParams(mparams); URISplit split2= URISplit.parse(AggregatingDataSource.this.uri); split.vapScheme= split2.vapScheme; return URISplit.format(split); } } /** * read the data. This supports reference caching. * @param mon * @return * @throws Exception */ @Override public QDataSet getDataSet(ProgressMonitor mon) throws Exception { logger.log(Level.FINE, "getDataSet {0}", uri); boolean useReferenceCache= "true".equals( System.getProperty( ReferenceCache.PROP_ENABLE_REFERENCE_CACHE, "false" ) ); String theUri= this.tsb!=null ? this.tsb.getURI() : this.uri.toString(); ReferenceCache.ReferenceCacheEntry cacheEntry=null; if ( useReferenceCache ) { cacheEntry= ReferenceCache.getInstance().getDataSetOrLock( theUri, mon); if ( !cacheEntry.shouldILoad( Thread.currentThread() ) ) { try { logger.log(Level.FINE, "wait for other thread {0}", uri); QDataSet result= cacheEntry.park( mon ); return result; } catch ( Exception ex ) { throw ex; } } else { logger.log(Level.FINE, "reference cache in use, {0} is loading {1}", new Object[] { Thread.currentThread().toString(), theUri } ); } } logger.log(Level.FINE, "reading {0}", uri ); try { QDataSet result= getDataSet( mon, viewRange, resolution ); if ( cacheEntry!=null ) cacheEntry.finished(result); return result; } catch ( Exception ex ) { if ( cacheEntry!=null ) cacheEntry.exception(ex); throw ex; } } /** * read the data, not using the reference cache. * @param mon monitor for the load * @param lviewRange the time span to load * @param lresolution resolution which is used where reduce=T * @return * @throws Exception */ public QDataSet getDataSet( ProgressMonitor mon, DatumRange lviewRange, Datum lresolution ) throws Exception { try { mon.started(); String[] ss = getFsm().getBestNamesFor( lviewRange, mon.getSubtaskMonitor(sparams) ); if ( "true".equals( System.getProperty( Version.PROP_ENABLE_CLEAN_CACHE ) ) ) { logger.fine("enableCleanCache is true"); getFsm().cacheCleanup(); } boolean avail= getParam( "avail", "F" ).equals("T"); boolean reduce= getParam( "reduce", "F" ).equals("T"); if ( avail ) { logger.log(Level.FINE, "availablility {0} ", new Object[]{ lviewRange}); DataSetBuilder build= new DataSetBuilder(2,ss.length,4); Units u= Units.us2000; EnumerationUnits eu= new EnumerationUnits("default"); for ( String s: ss ) { DatumRange dr= getFsm().getRangeFor(s); //if ( getFsm().hasField("x") ) { // s= getFsm().getField( "x", s ); //} build.putValues( -1, DDataSet.wrap( new double[] { dr.min().doubleValue(u), dr.max().doubleValue(u), 0x80FF80, eu.createDatum(s).doubleValue(eu) } ), 4 ); build.nextRecord(); } DDataSet result= build.getDataSet(); DDataSet bds= DDataSet.createRank2( 4, 0 ); bds.putProperty( "NAME__0", "StartTime" ); bds.putProperty( "UNITS__0", u ); bds.putProperty( "NAME__1", "StopTime" ); bds.putProperty( "UNITS__1", u ); bds.putProperty( "NAME__2", "Color" ); bds.putProperty( "NAME__3", "Filename" ); bds.putProperty( "UNITS__3", eu ); result.putProperty( QDataSet.BUNDLE_1, bds ); result.putProperty( QDataSet.RENDER_TYPE, "eventsBar" ); result.putProperty( QDataSet.LABEL, "Availability"); URISplit split= URISplit.parse(getURI() ); result.putProperty( QDataSet.TITLE, split.file ); mon.finished(); return result; } logger.log(Level.FINE, "aggregating {0} files for {1}", new Object[]{ss.length, lviewRange}); if ( logger.isLoggable(Level.FINE) ) { StringBuilder log= new StringBuilder( "== getDataSet will read the following ==" ); for ( String s : ss ) { log.append("\n").append(s); } logger.log( Level.FINE, log.toString() ); } MutablePropertyDataSet result = null; JoinDataSet altResult= null; // used when JoinDataSets are found DataSetBuilder dep0Builder= null; // used when joining non-time-series. if ( ss.length==0 ) { if ( null==getFsm().getRepresentativeFile( mon.getSubtaskMonitor("get representative file") ) ) { throw new FileNotFoundException("Unable to find representative file: No files found matching "+getFsm().toString()); } else { throw new FileNotFoundException( MSG_NO_FILES_FOUND+" "+lviewRange ); } } if (ss.length > 1) { mon.setTaskSize(ss.length * 10); } DatumRange cacheRange1 = null; EnumerationUnits exunits= EnumerationUnits.create("notes"); DataSetBuilder notesBuilder= new DataSetBuilder( 2, ss.length/2, 3 ); // container for messages will be an events list. BundleBuilder bds= new BundleBuilder(3); bds.putProperty( QDataSet.NAME, 0, "startTime" ); bds.putProperty( QDataSet.NAME, 1, "stopTime" ); bds.putProperty( QDataSet.NAME, 2, "note" ); bds.putProperty( QDataSet.UNITS, 0, Units.us2000 ); bds.putProperty( QDataSet.UNITS, 1, Units.us2000 ); bds.putProperty( QDataSet.UNITS, 2, exunits ); notesBuilder.putProperty( QDataSet.BUNDLE_1, bds.getDataSet() ); if ( delegateDataSourceFactory==null ) { throw new IllegalArgumentException("unable to identify data source"); } boolean doThrow= false; // this will be set to true if we really do want to throw the exception instead of simply making a note of it. for (int i = 0; i < ss.length; i++) { String scompUrl = getFsm().getFileSystem().getRootURI().toString() + ss[i]; if (!sparams.equals("")) { scompUrl += "?" + sparams; } URI delegateUri; if ( delegateVapScheme!=null ) { //TODO: I don't believe delegateVapScheme will be null. delegateUri = DataSetURI.getURIValid(delegateVapScheme+":"+scompUrl); } else { delegateUri = DataSetURI.getURIValid(scompUrl); } DataSource delegateDataSource = delegateDataSourceFactory.getDataSource(delegateUri); if ( delegateDataSource.getCapability( TimeSeriesBrowse.class )!=null ) { TimeSeriesBrowse delegateTsb= delegateDataSource.getCapability( TimeSeriesBrowse.class ); delegateTsb.setTimeRange(lviewRange); delegateTsb.setTimeResolution(lresolution); setResolution( delegateTsb.getTimeResolution() ); } else { // resolution= null; TODO: verify there's no reason to do this. } metadataModel = delegateDataSource.getMetadataModel(); ProgressMonitor mon1; if (ss.length > 1) { mon.setProgressMessage("getting " + ss[i]); mon1 = mon.getSubtaskMonitor( i * 10, 10 * (i + 1), "getting "+ss[i] ); if ( mon1.isCancelled() ) break; mon1.setTaskProgress(0); // cause it to paint } else if ( ss.length==1 ) { mon.setProgressMessage("getting " + ss[0] ); mon1 = mon.getSubtaskMonitor( "getting " + ss[0] ); if ( mon1.isCancelled() ) break; mon1.started(); mon1.setTaskProgress(0); } else { mon1= mon.getSubtaskMonitor("no files found"); if ( mon1.isCancelled() ) break; } DatumRange drex= null; // in case there is an exception, where did it occur? try { DatumRange dr1 = getFsm().getRangeFor(ss[i]); drex= dr1; logger.log(Level.FINER, "delegate URI: {0}", new Object[]{ delegateDataSource.getURI() } ); // Here is the single-granule read, the heart of aggregation. QDataSet ds1 = delegateDataSource.getDataSet(mon1); logger.log(Level.FINER, " read: {0}", new Object[]{ ds1 } ); //https://sourceforge.net/p/autoplot/bugs/2206/ //logger.fine("ask for a garbage collection to get rid of junk"); //System.gc(); //logger.fine("done ask for a garbage collection to get rid of junk"); if ( ds1==null ) { logger.warning("delegate returned null"); ds1 = delegateDataSource.getDataSet(mon1); if ( ds1==null ) continue; } // check to see if it is enumeration and all values are present in the enumeration unit. Units u= (Units) ds1.property(QDataSet.UNITS); if ( u!=null && u instanceof EnumerationUnits && ds1.rank()==1 ) { for ( int i2=0; i20 ) { QDataSet exds= Ops.extent(xds); if ( !( UnitsUtil.isTimeLocation( dr1.getUnits() ) && UnitsUtil.isTimeLocation(SemanticOps.getUnits(exds)) ) ) { logger.log(Level.WARNING, "Hey units! \"{0}\" \"{1}\"", new Object[] { dr1.getUnits(), SemanticOps.getUnits(exds) } ); } if ( !dr1.intersects(DataSetUtil.asDatumRange(exds)) ) { logger.log(Level.WARNING, "file for {0} contains data from an unexpected interval: {1}", new Object[] { dr1, exds } ); } } } } List problems= new ArrayList(); if ( !DataSetUtil.validate(ds1, problems) ) { for ( String p: problems ) { System.err.println("problem with aggregation element "+ss[i]+": "+p); logger.log(Level.WARNING, "problem with aggregation element {0}: {1}", new Object[]{ss[i], p}); } } if ( reduce && ds1.rank()<3 && SemanticOps.isTimeSeries(ds1) ) { QDataSet dep0= (QDataSet) ds1.property(QDataSet.DEPEND_0); if ( dep0!=null ) { if ( DataSetUtil.isMonotonic(dep0) ) { logger.fine("trimming dataset to save memory"); mon1.setProgressMessage("trim to visible: "+lviewRange ); int imin= DataSetUtil.closestIndex( dep0, lviewRange.min() ); int imax= DataSetUtil.closestIndex( dep0, lviewRange.max() ); imax= imax+1; if ( imin>0 || imax2 || // dep0==null && ds1.rank()>2 ( ds1.rank()==2 && ds1.length(0)>QDataSet.MAX_PLANE_COUNT ) ); if ( isSeriesOfImages ) { // rfe521: experiment with aggregation types. result= new JoinDataSet(ds1); dep0Builder= new DataSetBuilder(1,ss.length); dep0Builder.nextRecord(dr1.middle()); } else { if ( ds1 instanceof BufferDataSet ) { if ( ss.length==1 ) { result= BufferDataSet.maybeCopy(ds1); } else { result = BufferDataSet.copy(ds1); int newSize= result.length()*ss.length; if ( newSize