package org.autoplot.hapi; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.InterruptedIOException; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.das2.qds.buffer.BufferDataSet; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.CacheTag; import org.das2.datum.Datum; import org.das2.datum.DatumRange; import org.das2.datum.DatumRangeUtil; import org.das2.datum.EnumerationUnits; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.util.LoggerManager; import org.das2.util.filesystem.FileSystemUtil; import org.das2.util.filesystem.HtmlUtil; import org.das2.util.filesystem.HttpUtil; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.das2.qds.ArrayDataSet; import org.das2.qds.DDataSet; import org.das2.qds.DataSetUtil; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.QDataSet; import org.das2.qds.SemanticOps; import org.das2.qds.SparseDataSetBuilder; import org.das2.qds.WritableDataSet; import org.autoplot.datasource.AbstractDataSource; import org.autoplot.datasource.AutoplotSettings; import org.autoplot.datasource.DefaultTimeSeriesBrowse; import org.autoplot.datasource.URISplit; import org.autoplot.datasource.capability.Caching; import org.autoplot.datasource.capability.TimeSeriesBrowse; import org.das2.datum.DatumUtil; import org.das2.datum.TimeParser; import org.das2.datum.TimeUtil; import org.das2.fsm.FileStorageModel; import org.das2.qds.ops.Ops; import org.das2.qds.util.DataSetBuilder; import org.das2.qstream.TransferType; import org.das2.util.filesystem.FileSystem; import org.das2.util.filesystem.FileSystemSettings; import org.das2.util.monitor.CancelledOperationException; /** * HAPI data source uses transactions with HAPI servers to collect data. * @author jbf */ public final class HapiDataSource extends AbstractDataSource { protected final static Logger logger= LoggerManager.getLogger("apdss.hapi"); /** * this logger is for opening connections to remote sites. */ protected static final Logger loggerUrl= org.das2.util.LoggerManager.getLogger( "das2.url" ); TimeSeriesBrowse tsb; public HapiDataSource(URI uri) { super(uri); tsb= new DefaultTimeSeriesBrowse(); String str= params.get( URISplit.PARAM_TIME_RANGE ); if ( str!=null ) { try { tsb.setURI(uri.toString()); } catch (ParseException ex) { logger.log(Level.SEVERE, ex.getMessage(), ex); } } addCapability( TimeSeriesBrowse.class, tsb ); addCapability( Caching.class, new Caching() { @Override public boolean satisfies(String surl) { return false; } @Override public void resetURI(String surl) { } @Override public void reset() { logger.fine("reset cache"); HapiDataSource.csvCache.clear(); } }); } private static QDataSet getJSONBins( JSONObject binsObject ) throws JSONException { JSONArray bins=null; if ( binsObject.has("values") ) { logger.fine("using deprecated bins"); bins= binsObject.getJSONArray("values"); } else if ( binsObject.has("centers") ) { bins= binsObject.getJSONArray("centers"); } JSONArray ranges= null; if ( binsObject.has("ranges") ) { ranges= binsObject.getJSONArray("ranges"); } int len; if ( ranges==null && bins==null ) { throw new IllegalArgumentException("ranges or centers must be specified"); } else { len= ranges==null ? bins.length() : ranges.length(); } DDataSet result= DDataSet.createRank1(len); DDataSet max= DDataSet.createRank1(len); DDataSet min= DDataSet.createRank1(len); boolean hasMin= false; boolean hasMax= false; boolean hasCenter= false; if ( len==0 ) { throw new IllegalArgumentException("bins must have ranges or centers specified"); } else { if ( bins!=null ) { hasCenter= true; Object o= bins.get(0); if ( o instanceof Number ) { for ( int j=0; j params= URISplit.parseParams(split.params); String smin= tr.min().toString(); String smax= tr.max().toString(); if ( smin.endsWith("00:00:00.000Z") ) smin= smin.substring(0,smin.length()-14) + "T00:00Z"; if ( smax.endsWith("00:00:00.000Z") ) smax= smax.substring(0,smax.length()-14) + "T00:00Z"; params.put("time.min",smin); params.put("time.max",smax); split.params= URISplit.formatParams(params); String surl= URISplit.format(split); url= new URL(surl); return url; } catch (URISyntaxException | MalformedURLException ex) { throw new RuntimeException(ex); } } public static class ParamDescription { boolean hasFill= false; double fillValue= -1e38; Units units= Units.dimensionless; String name= ""; String description= ""; String label=""; String type= ""; /** * number of indices in each index. */ int[] size= new int[0]; /** * total number of fields */ int nFields= 1; /** * length in bytes when transferring with binary. */ int length= 0; QDataSet[] depend= null; /** * for time-varying depend1 (not in HAPI1.1) */ String[] dependName= null; /** * date the parameter was last modified, or 0 if not known. */ long modifiedDateMillis= 0; /** * may contain hint for renderer, such as nnspectrogram */ String renderType=null; private ParamDescription( String name ) { this.name= name; } @Override public String toString() { return this.name; } } private static final Map lastRecordFound= new HashMap<>(); private static String[] cacheFilesFor( URL url, ParamDescription[] pp, Datum xx ) { String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + "/" + url.getPath() ); if ( url.getQuery()!=null ) { String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); String[] result= new String[pp.length]; for ( int i=0; i> binaryCache= new ConcurrentHashMap<>(); private static final Map> csvCache= new ConcurrentHashMap<>(); /** * print the cache stats. * @see https://sourceforge.net/p/autoplot/bugs/1996/ */ public static void printCacheStats() { if ( csvCache==null || csvCache.isEmpty() ) { System.err.println( "(cache is empty)" ); } else { for ( Entry> s: csvCache.entrySet() ) { System.err.println( "" + s.getKey() +": "+s.getValue().size()+" records"); } } if ( binaryCache==null || binaryCache.isEmpty() ) { System.err.println( "(cache is empty)" ); } else { for ( Entry> s: binaryCache.entrySet() ) { System.err.println( "" + s.getKey() +": "+s.getValue().size()+" records"); } } } /** * return the local folder of the cache for HAPI data. This will end with * a slash. * @return the local folder of the cache for HAPI data. */ public static String getHapiCache() { String hapiCache= System.getProperty("HAPI_DATA"); if ( hapiCache!=null ) { String home=System.getProperty("user.home") ; if ( hapiCache.contains("${HOME}") ) { // the filesystem settings used ${}, %{} seems more conventional. hapiCache= hapiCache.replace("${HOME}", home ); } else if ( hapiCache.contains("%{HOME}") ) { hapiCache= hapiCache.replace("%{HOME}", home ); } } if ( hapiCache!=null && hapiCache.contains("\\") ) { // Windows path sep hapiCache= hapiCache.replaceAll("\\\\", "/" ); } if ( hapiCache==null ) { String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); hapiCache= s + "/hapi/"; } if ( !hapiCache.endsWith("/") ) hapiCache= hapiCache + "/"; if ( !hapiCache.endsWith("/") ) { throw new IllegalArgumentException("hapiCache must end with /"); } if ( HapiServer.useCache() ) { if ( !new File(hapiCache).exists() ) { new File(hapiCache).mkdirs(); } } return hapiCache; } private static void writeToBinaryCachedData(URL url, ParamDescription[] pp, Datum xx, ByteBuffer buf ) throws IOException { StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } String hapiCache= getHapiCache(); TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String format= "binary"; String u= ub.toString(); Datum t0= lastRecordFound.get( u + "/" + sxx ); if ( t0==null ) { String f= hapiCache + u + "/" + sxx + "." + pp[0].name + "." + format; File ff= new File(f); if ( ff.exists() ) { BufferedReader read= new BufferedReader(new FileReader(ff)); String line= read.readLine(); String lastLine= null; while ( line!=null ) { lastLine= line; line= read.readLine(); } if ( lastLine!=null ) { try { t0= Units.us2000.parse(lastLine); lastRecordFound.put( u + "/" + sxx,t0); } catch (ParseException ex) { t0= null; } } else { t0= null; } } } if ( t0!=null && t0.ge(xx) ) { logger.log(Level.FINE, "clear all cached files for {0}", sxx); for (ParamDescription pp1 : pp) { String f = hapiCache + u + "/" + sxx + "." + pp1.name + "." +format; File ff= new File(f); if ( ff.exists() ) { if ( !ff.delete() ) logger.log(Level.INFO, "unable to delete file: {0}", ff); } } } for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + "." + format + "." + Thread.currentThread().getId(); logger.log(Level.FINER, "cache.get({0})", f); ArrayList sparam= binaryCache.get(f); if ( sparam==null ) { sparam= new ArrayList<>(); binaryCache.put(f,sparam); logger.log(Level.FINE, "cache.put({0},ArrayList({1}))", new Object[]{f, sparam.size()}); } ByteBuffer buf2= ByteBuffer.allocate( buf.capacity() ); buf2.put(buf); sparam.add( buf2 ); } lastRecordFound.put( u + "/" + sxx,xx); } private static void writeToCsvCachedData(URL url, ParamDescription[] pp, Datum xx, String[] ss) throws IOException { StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } String hapiCache= getHapiCache(); TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); Datum t0= lastRecordFound.get( u + "/" + sxx ); if ( t0==null ) { String f= hapiCache + u + "/" + sxx + "." + pp[0].name + ".csv"; File ff= new File(f); if ( ff.exists() ) { BufferedReader read= new BufferedReader(new FileReader(ff)); String line= read.readLine(); String lastLine= null; while ( line!=null ) { lastLine= line; line= read.readLine(); } if ( lastLine!=null ) { try { t0= Units.us2000.parse(lastLine); lastRecordFound.put( u + "/" + sxx,t0); } catch (ParseException ex) { t0= null; } } else { t0= null; } } } if ( t0!=null && t0.ge(xx) ) { logger.log(Level.FINE, "clear all cached files for {0}", sxx); for (ParamDescription pp1 : pp) { String f = hapiCache + u + "/" + sxx + "." + pp1.name + ".csv"; File ff= new File(f); if ( ff.exists() ) { if ( !ff.delete() ) logger.log(Level.INFO, "unable to delete file: {0}", ff); } } } int ifield=0; for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + ".csv" + "." + Thread.currentThread().getId(); logger.log(Level.FINER, "cache.get({0})", f); ArrayList sparam= csvCache.get(f); if ( sparam==null ) { sparam= new ArrayList<>(); csvCache.put(f,sparam); logger.log(Level.FINE, "cache.put({0},ArrayList({1}))", new Object[]{f, sparam.size()}); } StringBuilder build= new StringBuilder(); int length = pp1.nFields; for ( int k=0; k0 ) build.append(","); build.append( ss[ifield++] ); } sparam.add(build.toString()); } lastRecordFound.put( u + "/" + sxx,xx); } /** * See https://sourceforge.net/p/autoplot/bugs/2043/ * @param url url used to locate position in cache. * @param pp parameters * @param xx time used to id the file. */ private static void writeToBinaryCachedDataFinish(URL url, ParamDescription[] pp, Datum xx ) throws IOException { logger.log(Level.FINE, "writeToBinaryCachedDataFinish: {0}", xx); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { // get the id from the url String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } String hapiCache= getHapiCache(); String format= "binary"; long currentTimeMillis= pp[0].modifiedDateMillis; TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); int ipos=0; for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + "." + format + "."+ Thread.currentThread().getId(); logger.log(Level.FINE, "remove from cache: {0}", f); File ff= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "." + format +""); if ( !ff.getParentFile().exists() ) { if ( !ff.getParentFile().mkdirs() ) { throw new IOException("unable to mkdirs "+ff.getParent() ); } } File ffTemp= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "."+ format + "."+Thread.currentThread().getId() ); ArrayList data= binaryCache.get(f); //TODO: use "remove" after debugging. int ilen= BufferDataSet.byteCount(pp1.type) * DataSetUtil.product(pp1.size); try ( FileChannel ffTempChannel= new FileOutputStream(ffTemp).getChannel() ) { for ( ByteBuffer buf: data ) { buf.position(ipos); buf.limit(ipos+ilen); ffTempChannel.write(buf); } } ipos+= pp1.length; } synchronized ( HapiDataSource.class ) { for (ParamDescription pp1 : pp) { File ffTemp= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "."+ format + "."+Thread.currentThread().getId() ); File ff= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "." + format +""); ffTemp.renameTo(ff); if ( currentTimeMillis>0 ) ff.setLastModified(currentTimeMillis); } } } /** * See https://sourceforge.net/p/autoplot/bugs/2043/ * @param url url used to locate position in cache. * @param pp parameters * @param xx time used to id the file. */ private static void writeToCsvCachedDataFinish(URL url, ParamDescription[] pp, Datum xx ) throws IOException { logger.log(Level.FINE, "writeToCachedDataFinish: {0}", xx); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { // get the id from the url String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } String hapiCache= getHapiCache(); String format= "csv"; long currentTimeMillis= pp[0].modifiedDateMillis; TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + "." + format + "."+ Thread.currentThread().getId(); logger.log(Level.FINE, "remove from cache: {0}", f); ArrayList sparam= csvCache.remove(f); File ff= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "." + format +".gz"); if ( !ff.getParentFile().exists() ) { if ( !ff.getParentFile().mkdirs() ) { throw new IOException("unable to mkdirs "+ff.getParent() ); } } File ffTemp= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "."+ format + ".gz."+Thread.currentThread().getId() ); //int line=0; try (final BufferedWriter w = new BufferedWriter( new OutputStreamWriter( new GZIPOutputStream( new FileOutputStream(ffTemp) ) ) ) ) { if ( sparam!=null ) { for ( String s123: sparam ) { //line++; w.write(s123); w.newLine(); } } } } synchronized ( HapiDataSource.class ) { for (ParamDescription pp1 : pp) { File ff= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "." + format +".gz"); File ffTemp= new File( hapiCache + u + "/" + sxx + "." + pp1.name + "."+ format + ".gz."+Thread.currentThread().getId() ); ffTemp.renameTo(ff); if ( currentTimeMillis>0 ) ff.setLastModified(currentTimeMillis); } } } /** * To assist in getting the CDAWeb HAPI server going, handle a few differences * like:
    *
  • parameters are returned within the JSON code. *
  • Epoch is needed. *
* @param monitor * @return * @throws Exception */ private QDataSet getDataSetCDAWeb( ProgressMonitor monitor) throws Exception { URI server = this.resourceURI; String id= getParam("id","" ); if ( id.equals("") ) throw new IllegalArgumentException("missing id"); id= URLDecoder.decode( id,"UTF-8" ); String pp= getParam("parameters",""); if ( !pp.equals("") && !pp.startsWith("Epoch,") ) { pp= "Epoch,"+pp; } if ( id.equals("") ) throw new IllegalArgumentException("missing id"); id= URLDecoder.decode(id,"UTF-8"); DatumRange tr; // TSB = DatumRangeUtil.parseTimeRange(timeRange); tr= tsb.getTimeRange(); URL url= HapiServer.getDataURL( server.toURL(), id, tr, pp ); url= new URL( url.toString()+"&include=header&format=json1" ); monitor.started(); monitor.setProgressMessage("server is preparing data"); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. int lineNum=0; StringBuilder builder= new StringBuilder(); logger.log(Level.FINE, "getDocument {0}", url.toString()); loggerUrl.log(Level.FINE, "GET {0}", new Object[] { url } ); HttpURLConnection httpConnect= ((HttpURLConnection)url.openConnection()); httpConnect.setConnectTimeout(FileSystem.settings().getConnectTimeoutMs()); httpConnect.setReadTimeout(FileSystem.settings().getReadTimeoutMs()); httpConnect= (HttpURLConnection) HttpUtil.checkRedirect( httpConnect ); try ( BufferedReader in= new BufferedReader( new InputStreamReader( httpConnect.getInputStream() ) ) ) { String line= in.readLine(); lineNum++; while ( line!=null ) { if ( System.currentTimeMillis()-t0 > 100 ) { monitor.setProgressMessage("reading line "+lineNum); t0= System.currentTimeMillis(); } builder.append(line); line= in.readLine(); } } catch ( IOException ex ) { ByteArrayOutputStream baos= new ByteArrayOutputStream(); FileSystemUtil.copyStream( httpConnect.getErrorStream(), baos, new NullProgressMonitor() ); String s= baos.toString("UTF-8"); if ( s.contains("No data available") ) { logger.log(Level.FINE, "No data available, server responded with {0}: {1}", new Object[]{httpConnect.getResponseCode(), httpConnect.getResponseMessage()}); throw new NoDataInIntervalException("No data available"); } else { if ( s.length()<256 ) { throw new IOException( ex.getMessage() + ": "+s ); } else { throw ex; } } } httpConnect.disconnect(); JSONObject o= new JSONObject(builder.toString()); JSONObject doc= o; ParamDescription[] pds= getParameterDescriptions(doc); monitor.setProgressMessage("parsing data"); int[] nfields= new int[pds.length]; for ( int i=0; i1 ) { int nf= nfields[ipd]; DDataSet column= DDataSet.createRank2(param.length(),nfields[ipd]); for ( int i=0; i0 ) { String[] pps= pp.split(","); Map map= new HashMap(); for ( int i=0; i1 ) { monitor.setTaskSize(nday*10); monitor.started(); } int iday=0; while ( currentDay.min().le(tr.max()) ) { logger.log(Level.FINER, "useCache, request {0}", currentDay); ProgressMonitor mon1= nday==1 ? monitor : monitor.getSubtaskMonitor( 10*iday, 10*(iday+1), "read "+currentDay ); QDataSet ds1; try { DatumRange oneDaysRange= DatumRangeUtil.sloppyIntersection( currentDay, startStopDate ); if ( oneDaysRange.width().value()>0 ) { URL url1= replaceTimeRangeURL(url,oneDaysRange); ds1 = getDataSetViaCsv(totalFields, mon1, url1, pds, oneDaysRange, nparam, nfields, getParam( "cache", "" ) ); if ( ds1.length()>0 ) { dsall= Ops.append( dsall, ds1 ); } } } catch ( NoDataInIntervalException ex ) { if ( ! FileSystem.settings().isOffline() ) { throw ex; } else { logger.log(Level.FINE, "no granule found for day, but we are offline: {0}", currentDay); } } currentDay= currentDay.next(); iday++; } if ( dsall==null ) { logger.info("no records found"); return null; } logger.finer("done useCache, so make daily requests to form granules"); ds= dsall; ds= Ops.putProperty( ds, QDataSet.UNITS, null ); // kludge, otherwise time units are messed up. TODO: who puts unit here? } else { ds= getDataSetViaCsv(totalFields, monitor, url, pds, tr, nparam, nfields, getParam( "cache", "" ) ); } break; } if ( ds.length()==0 ) { monitor.finished(); throw new NoDataInIntervalException("no records found"); } ds = repackage(ds,pds,null); Units u= (Units) ds.property(QDataSet.UNITS); if ( u!=null && u.toString().trim().length()>0 ) { String l= (String) ds.property(QDataSet.LABEL); if ( l==null ) { ds= Ops.putProperty( ds, QDataSet.LABEL, "%{UNITS}" ); } else { ds= Ops.putProperty( ds, QDataSet.LABEL, l.trim() + " (%{UNITS})" ); } } // install a cacheTag. The following code assumes depend_0 is mutable. QDataSet xds= (QDataSet) ds.property(QDataSet.DEPEND_0); if ( xds==null && ( UnitsUtil.isTimeLocation( SemanticOps.getUnits(ds) ) ) ) { xds= ds; } if ( timeStampLocation.equalsIgnoreCase("BEGIN") || timeStampLocation.equalsIgnoreCase("END" ) ) { if ( cadence==null ) { cadence= DataSetUtil.asDatum( DataSetUtil.guessCadenceNew( xds, null ) ); } if ( cadence!=null ) { if ( timeStampLocation.equalsIgnoreCase("BEGIN") ) { xds= Ops.add( xds, cadence.divide(2) ); } else if ( timeStampLocation.equalsIgnoreCase("END") ) { xds= Ops.subtract( xds, cadence.divide(2) ); } } else { logger.info("timetags are identified as BEGIN, but cadence was not available to center the data"); } } if ( xds!=null ) { ((MutablePropertyDataSet)xds).putProperty(QDataSet.CACHE_TAG, new CacheTag(tr,null) ); } monitor.setTaskProgress(100); monitor.finished(); return ds; } private static boolean useCache( String useCacheUriParam ) { boolean useCache= HapiServer.useCache(); String cacheParam= useCacheUriParam; if ( cacheParam.equals("F") ) { useCache= false; } return useCache; } /** * read the interval using CSV. * @param totalFields * @param monitor * @param url * @param pds * @param tr * @param nparam * @param nfields * @param useCacheUriParam * @return * @throws IllegalArgumentException * @throws Exception * @throws IOException */ public static QDataSet getDataSetViaCsv(int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields, String useCacheUriParam ) throws IllegalArgumentException, Exception, IOException { DataSetBuilder builder= new DataSetBuilder(2,100,totalFields); monitor.setProgressMessage("reading data"); monitor.setTaskProgress(20); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. boolean cacheIsEnabled= useCache(useCacheUriParam); AbstractLineReader cacheReader; if ( cacheIsEnabled ) { // this branch posts the request, expecting that the server may respond with 304, indicating the cache should be used. String[] parameters= new String[pds.length]; for ( int i=0; i 100 ) { monitor.setProgressMessage("reading "+xx); t0= System.currentTimeMillis(); double d= DatumRangeUtil.normalize( tr, xx ); monitor.setTaskProgress( 20 + (int)( 75 * d ) ); if ( monitor.isCancelled() ) throw new CancelledOperationException("cancel was pressed"); } } catch ( ParseException ex ) { if ( timeWarningCount>0 ) { logger.log(Level.INFO, "malformed time: {0}", ss[ifield]); timeWarningCount--; } line= in.readLine(); continue; } // if a cache file is opened for the previous day, then close the file for the day, gzipping it. if ( writeDataToCache ) { if ( !currentDay.contains(xx) && tr.intersects(currentDay) && completeDay ) { // https://sourceforge.net/p/autoplot/bugs/1968/ HAPI caching must not cache after "modificationDate" or partial days remain in cache if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { writeToCsvCachedDataFinish(url, pds, currentDay.middle() ); } else { logger.fine("data after modification date is not cached."); } } while ( !currentDay.contains(xx) && tr.intersects(currentDay ) ) { // find the new current day, writing empty files for any missing days. currentDay= currentDay.next(); completeDay= tr.contains(currentDay); if ( !currentDay.contains(xx) && tr.intersects(currentDay ) ) { if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { // put empty file which is placeholder. writeToCsvCachedDataFinish(url, pds, currentDay.middle() ); } } } } if ( !currentDay.contains(xx) ) { logger.info("something's gone wrong, perhaps out-of-order timetags."); completeDay= false; } if ( writeDataToCache ) { if ( completeDay ) { if ( pds[0].modifiedDateMillis==0 || xx.doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { writeToCsvCachedData( url, pds, xx, ss ); } } } builder.putValue( -1, ifield, xx ); ifield++; for ( int i=1; i cacheFiles= new HashMap<>(); cacheFiles.put( "cached", "true" ); builder.putProperty( QDataSet.USER_PROPERTIES, cacheFiles ); } monitor.setTaskProgress(95); QDataSet ds= builder.getDataSet(); return ds; } private static TransferType getTimeTransferType( ParamDescription pdsi ) { final Units u= pdsi.units; final int length= pdsi.length; final byte[] bytes= new byte[length]; return new TransferType() { @Override public void write(double d, ByteBuffer buffer) { } @Override public double read(ByteBuffer buffer) { buffer.get(bytes); //buf2.get(bytes); String s= new String( bytes ); Datum d= ((EnumerationUnits)u).createDatum(s); return d.doubleValue(u); } @Override public int sizeBytes() { return length; } @Override public boolean isAscii() { return false; } @Override public String name() { return "string"+length; } }; } /** * read the interval using binary. * @param totalFields * @param monitor * @param url * @param pds * @param tr * @param nparam * @param nfields * @param useCacheUriParam * @return * @throws IllegalArgumentException * @throws Exception * @throws IOException */ public static QDataSet getDataSetViaBinary(int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields, String useCacheUriParam ) throws IllegalArgumentException, Exception, IOException { DataSetBuilder builder = new DataSetBuilder(2, 100, totalFields); monitor.setProgressMessage("reading data"); monitor.setTaskProgress(20); long t0 = System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. boolean cacheIsEnabled= useCache(useCacheUriParam); AbstractBinaryRecordReader cacheReader; if ( cacheIsEnabled ) { // this branch posts the request, expecting that the server may respond with 304, indicating the cache should be used. String[] parameters= new String[pds.length]; for ( int i=0; i 100) { monitor.setProgressMessage("reading " + xx); t0 = System.currentTimeMillis(); double d = DatumRangeUtil.normalize(tr, xx); monitor.setTaskProgress(20 + (int) (75 * d)); if ( monitor.isCancelled() ) throw new CancelledOperationException("cancel was pressed"); } } catch ( RuntimeException ex ) { if ( timeWarningCount>0 ) { logger.log(Level.INFO, "malformed time: {0}", result[0] ); timeWarningCount--; } buf.flip(); bytesRead = in.readRecord(buf); continue; } // if a cache file is opened for the previous day, then close the file for the day, gzipping it. if ( writeDataToCache ) { if ( !currentDay.contains(xx) && tr.intersects(currentDay) && completeDay ) { // https://sourceforge.net/p/autoplot/bugs/1968/ HAPI caching must not cache after "modificationDate" or partial days remain in cache if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { writeToBinaryCachedDataFinish(url, pds, currentDay.middle() ); } else { logger.fine("data after modification date is not cached."); } } while ( !currentDay.contains(xx) && tr.intersects(currentDay ) ) { // find the new current day, writing empty files for any missing days. currentDay= currentDay.next(); completeDay= tr.contains(currentDay); if ( !currentDay.contains(xx) && tr.intersects(currentDay ) ) { if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { // put empty file which is placeholder. writeToBinaryCachedDataFinish(url, pds, currentDay.middle() ); } } } } if ( !currentDay.contains(xx) ) { logger.info("something's gone wrong, perhaps out-of-order timetags."); completeDay= false; } if ( writeDataToCache ) { if ( completeDay ) { if ( pds[0].modifiedDateMillis==0 || xx.doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { buf.flip(); writeToBinaryCachedData( url, pds, xx, buf ); } } } builder.putValue(-1, ifield, xx); ifield++; for (int i = 1; i < nparam; i++) { // nparam is number of parameters, which may have multiple fields. for (int j = 0; j < nfields[i]; j++) { builder.putValue(-1, ifield, result[ifield]); //TODO: fill? ifield++; } } builder.nextRecord(); buf.flip(); bytesRead = in.readRecord(buf); } if ( writeDataToCache ) { while ( completeDay && tr.intersects(currentDay) ) { if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { // put empty file which is placeholder. writeToBinaryCachedDataFinish(url, pds, currentDay.middle() ); } currentDay= currentDay.next(); completeDay= tr.contains(currentDay); } } } catch (IOException e) { logger.log( Level.WARNING, e.getMessage(), e ); monitor.finished(); throw new IOException(String.valueOf(httpConnect.getResponseCode()) + ":" + httpConnect.getResponseMessage()); } catch (Exception e) { logger.log( Level.WARNING, e.getMessage(), e ); monitor.finished(); throw e; } finally { if ( httpConnect!=null ) httpConnect.disconnect(); } if ( cacheReader!=null ) { Map cacheFiles= new HashMap<>(); cacheFiles.put( "cached", "true" ); builder.putProperty( QDataSet.USER_PROPERTIES, cacheFiles ); } monitor.setTaskProgress(95); QDataSet ds = builder.getDataSet(); return ds; } /** * read data embedded within a JSON response. This current reads in the entire JSON document, * but the final version should use a streaming JSON library. * @param monitor * @return the dataset. * @throws Exception */ private static QDataSet getDataSetViaJSON( int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields) throws IllegalArgumentException, Exception, IOException { monitor.started(); monitor.setProgressMessage("server is preparing data"); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. int lineNum=0; StringBuilder builder= new StringBuilder(); logger.log(Level.FINE, "getDocument {0}", url.toString()); loggerUrl.log(Level.FINE, "GET {0}", new Object[] { url } ); HttpURLConnection httpConnect= ((HttpURLConnection)url.openConnection()); httpConnect.setConnectTimeout(FileSystem.settings().getConnectTimeoutMs()); httpConnect.setReadTimeout(FileSystem.settings().getReadTimeoutMs()); httpConnect= (HttpURLConnection) HttpUtil.checkRedirect(httpConnect); try ( BufferedReader in= new BufferedReader( new InputStreamReader( httpConnect.getInputStream() ) ) ) { String line= in.readLine(); lineNum++; while ( line!=null ) { if ( System.currentTimeMillis()-t0 > 100 ) { monitor.setProgressMessage("reading line "+lineNum); t0= System.currentTimeMillis(); } //if ( line.startsWith("{ \"data\" :") ) { // TODO: kludge for Jon's server // in.readLine(); // line= in.readLine(); //} builder.append(line); line= in.readLine(); } } catch ( IOException ex ) { ByteArrayOutputStream baos= new ByteArrayOutputStream(); FileSystemUtil.copyStream( httpConnect.getErrorStream(), baos, new NullProgressMonitor() ); String s= baos.toString("UTF-8"); if ( s.contains("No data available") ) { logger.log(Level.FINE, "No data available, server responded with {0}: {1}", new Object[]{httpConnect.getResponseCode(), httpConnect.getResponseMessage()}); throw new NoDataInIntervalException("No data available"); } else { if ( s.length()<256 ) { throw new IOException( ex.getMessage() + ": "+s ); } else { throw ex; } } } httpConnect.disconnect(); // See unix tcptrack which shows there are many connections to the server. jbf@gardenhousepi:~ $ sudo tcptrack -i eth0 monitor.setProgressMessage("parsing data"); JSONObject jo= new JSONObject(builder.toString()); JSONArray data= jo.getJSONArray("data"); DataSetBuilder build= new DataSetBuilder( 2, data.length(), totalFields ); for ( int i=0; i1 ) { JSONArray fields= record.getJSONArray(ipd); int nf= nfields[ipd]; int lastField= nf+ifield; for ( ; ifield getCacheFiles( URL url, String id, String[] parameters, DatumRange timeRange, String format ) { String s= getHapiCache(); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); String u= url.getProtocol() + "/" + url.getHost() + url.getPath(); u= u + "/data/" + id.replaceAll(" ","+"); LinkedHashMap result= new LinkedHashMap<>(); try { for (String parameter : parameters) { String theFile= s + "/"+ u ; FileStorageModel fsm = FileStorageModel.create(FileSystem.create( "file:" +theFile ), "$Y/$m/$Y$m$d." + parameter + "."+format+".gz"); String[] ff= fsm.getNamesFor(null); for (String ff1 : ff) { DatumRange tr1= fsm.getRangeFor(ff1); if ( timeRange==null || timeRange.intersects(tr1)) { result.put(ff1,tr1); } } } } catch ( IOException | IllegalArgumentException ex) { logger.log(Level.FINE, "exception in cache", ex ); return null; } return result; } private static AbstractLineReader calculateCsvCacheReader( File[][] files ) { ConcatenateBufferedReader cacheReader= new ConcatenateBufferedReader(); for ( int i=0; i trs= DatumRangeUtil.generateList( timeRange, aday ); // which granules are available for all parameters? boolean[][] hits= new boolean[trs.size()][parameters.length]; File[][] files= new File[trs.size()][parameters.length]; boolean staleCacheFiles; String u= ub.toString(); if ( ! new File( s + "/" + u ).exists() ) { return null; } try { FileSystem fs= FileSystem.create( "file:" + s + "/"+ u ); staleCacheFiles= getCacheFilesWithTime( trs, parameters, fs, "csv", hits, files, offline, lastModified ); } catch ( IOException | IllegalArgumentException ex) { logger.log(Level.FINE, "exception in cache", ex ); return null; } if ( staleCacheFiles && !offline ) { logger.fine("old cache files found, but new data is available and accessible"); return null; } boolean haveSomething= false; boolean haveAll= true; for ( int i=0; i trs= DatumRangeUtil.generateList( timeRange, aday ); // which granules are available for all parameters? boolean[][] hits= new boolean[trs.size()][parameters.length]; File[][] files= new File[trs.size()][parameters.length]; boolean staleCacheFiles; String u= ub.toString(); if ( ! new File( s + "/" + u ).exists() ) { return null; } try { FileSystem fs= FileSystem.create( "file:" + s + "/"+ u ); staleCacheFiles= getCacheFilesWithTime( trs, parameters, fs, "binary", hits, files, offline, lastModified ); } catch ( IOException | IllegalArgumentException ex) { logger.log(Level.FINE, "exception in cache", ex ); return null; } if ( staleCacheFiles && !offline ) { logger.fine("old cache files found, but new data is available and accessible"); return null; } boolean haveSomething= false; boolean haveAll= true; for ( int i=0; i trs, boolean[][] hits, DatumRange timeRange) throws IllegalArgumentException { DatumRange missingRange=null; for ( int i=0; i trs, String[] parameters, FileSystem fs, String format, boolean[][] hits, File[][] files, boolean offline, long lastModified) throws IOException, IllegalArgumentException { boolean staleCacheFiles; long timeNow= System.currentTimeMillis(); staleCacheFiles= false; for ( int i=0; i1 ) { throw new IllegalArgumentException("implementation error, should get just one file per day."); } else if ( ff.length==0 ) { hits[i][j]= false; } else { File f= ff[0]; long ageMillis= timeNow - f.lastModified(); boolean isStale= ( ageMillis > HapiServer.cacheAgeLimitMillis() ); if ( lastModified>0 ) { isStale= f.lastModified() < lastModified; // Note FAT32 only has 4sec resolution, which could cause problems. if ( !isStale ) { logger.fine("server lastModified indicates the cache file can be used"); } else { logger.fine("server lastModified indicates the cache file should be updated"); } } if ( offline || !isStale ) { hits[i][j]= true; files[i][j]= f; } else { logger.log(Level.FINE, "cached file is too old to use: {0}", f); hits[i][j]= false; staleCacheFiles= true; } } } } return staleCacheFiles; } public static ParamDescription[] getParameterDescriptions(JSONObject doc) throws IllegalArgumentException, ParseException, JSONException { JSONArray parameters= doc.getJSONArray("parameters"); int nparameters= parameters.length(); long modificationDate= 0L; if ( doc.has("modificationDate") ) { String s= doc.getString("modificationDate"); Datum d= Units.ms1970.parse(s); modificationDate= (long)( d.doubleValue(Units.ms1970) ); } ParamDescription[] pds= new ParamDescription[nparameters]; for ( int i=0; i1 ) { ds= Ops.reform( ds, ds.length(), pds[1].size ); } ds= Ops.putProperty( ds, QDataSet.DEPEND_0, depend0 ); ds= Ops.putProperty( ds, QDataSet.NAME, Ops.safeName(pds[1].name) ); ds= Ops.putProperty( ds, QDataSet.LABEL, pds[1].label ); ds= Ops.putProperty( ds, QDataSet.TITLE, pds[1].description ); ds= Ops.putProperty( ds, QDataSet.UNITS, pds[1].units ); if ( pds[1].hasFill ) { ds= Ops.putProperty( ds, QDataSet.FILL_VALUE, pds[1].fillValue ); } //if ( pds[1].depend1!=null ) { // ds= Ops.putProperty( ds, QDataSet.DEPEND_1, pds[1].depend1 ); //} if ( pds[1].depend!=null ) { for ( int j=0; j1 ) { //bdsb.putProperty( QDataSet.ELEMENT_DIMENSIONS, ifield-1, pds[i].size ); // not supported yet. sdsb.putProperty( QDataSet.ELEMENT_NAME, startIndex, Ops.safeName( pds[i].name ) ); sdsb.putProperty( QDataSet.ELEMENT_LABEL, startIndex, pds[i].name ); for ( int j=0; j1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name +" ch"+j ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name)+"_"+j ); } else { sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name) ); } sdsb.putProperty( QDataSet.TITLE, startIndex + j, pds[i].description ); sdsb.putProperty( QDataSet.UNITS, startIndex + j, pds[i].units ); if ( pds[i].hasFill ) { sdsb.putProperty( QDataSet.FILL_VALUE, startIndex + j, pds[i].fillValue ); } if ( nfields1>1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); } ifield++; } length1= nfields1; sdsbs[i]= sdsb; } int start= 1; WritableDataSet wds= Ops.copy( Ops.trim1( ds, start, start+length1 ) ); start= start+length1; wds.putProperty( QDataSet.DEPEND_0, depend0 ); wds.putProperty( QDataSet.BUNDLE_1, sdsbs[1].getDataSet() ); for ( int i=1; i1 ) { //bdsb.putProperty( QDataSet.ELEMENT_DIMENSIONS, ifield-1, pds[i].size ); // not supported yet. sdsb.putProperty( QDataSet.ELEMENT_NAME, startIndex, Ops.safeName( pds[i].name ) ); sdsb.putProperty( QDataSet.ELEMENT_LABEL, startIndex, pds[i].name ); for ( int j=0; j1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name +" ch"+j ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name)+"_"+j ); } else { sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name) ); } sdsb.putProperty( QDataSet.TITLE, startIndex + j, pds[i].description ); sdsb.putProperty( QDataSet.UNITS, startIndex + j, pds[i].units ); if ( pds[i].hasFill ) { sdsb.putProperty( QDataSet.FILL_VALUE, startIndex + j, pds[i].fillValue ); } if ( nfields1>1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); } ifield++; } } ds= Ops.copy( Ops.trim1( ds, 1, ds.length(0) ) ); ds= Ops.putProperty( ds, QDataSet.DEPEND_0, depend0 ); ds= Ops.putProperty( ds, QDataSet.BUNDLE_1, sdsb.getDataSet() ); } return ds; } }