package org.autoplot.hapi; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.InterruptedIOException; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import org.das2.qds.buffer.BufferDataSet; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.CacheTag; import org.das2.datum.Datum; import org.das2.datum.DatumRange; import org.das2.datum.DatumRangeUtil; import org.das2.datum.EnumerationUnits; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.util.LoggerManager; import org.das2.util.filesystem.FileSystemUtil; import org.das2.util.filesystem.HtmlUtil; import org.das2.util.filesystem.HttpUtil; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.das2.qds.ArrayDataSet; import org.das2.qds.DDataSet; import org.das2.qds.DataSetUtil; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.QDataSet; import org.das2.qds.SemanticOps; import org.das2.qds.SparseDataSetBuilder; import org.das2.qds.WritableDataSet; import org.autoplot.datasource.AbstractDataSource; import org.autoplot.datasource.AutoplotSettings; import org.autoplot.datasource.DefaultTimeSeriesBrowse; import org.autoplot.datasource.URISplit; import org.autoplot.datasource.capability.Caching; import org.autoplot.datasource.capability.TimeSeriesBrowse; import org.das2.datum.TimeParser; import org.das2.datum.TimeUtil; import org.das2.fsm.FileStorageModel; import org.das2.qds.ops.Ops; import org.das2.qds.util.DataSetBuilder; import org.das2.qstream.TransferType; import org.das2.util.filesystem.FileSystem; import org.das2.util.monitor.CancelledOperationException; /** * HAPI data source uses transactions with HAPI servers to collect data. * @author jbf */ public final class HapiDataSource extends AbstractDataSource { protected final static Logger logger= LoggerManager.getLogger("apdss.hapi"); /** * this logger is for opening connections to remote sites. */ protected static final Logger loggerUrl= org.das2.util.LoggerManager.getLogger( "das2.url" ); TimeSeriesBrowse tsb; public HapiDataSource(URI uri) { super(uri); tsb= new DefaultTimeSeriesBrowse(); String str= params.get( URISplit.PARAM_TIME_RANGE ); if ( str!=null ) { try { tsb.setURI(uri.toString()); } catch (ParseException ex) { logger.log(Level.SEVERE, ex.getMessage(), ex); } } addCapability( TimeSeriesBrowse.class, tsb ); addCapability( Caching.class, new Caching() { @Override public boolean satisfies(String surl) { return false; } @Override public void resetURI(String surl) { } @Override public void reset() { logger.fine("reset cache"); HapiDataSource.cache.clear(); } }); } private static QDataSet getJSONBins( JSONObject binsObject ) throws JSONException { JSONArray bins=null; if ( binsObject.has("values") ) { logger.fine("using deprecated bins"); bins= binsObject.getJSONArray("values"); } else if ( binsObject.has("centers") ) { bins= binsObject.getJSONArray("centers"); } JSONArray ranges= null; if ( binsObject.has("ranges") ) { ranges= binsObject.getJSONArray("ranges"); } int len; if ( ranges==null && bins==null ) { throw new IllegalArgumentException("ranges or centers must be specified"); } else { len= ranges==null ? bins.length() : ranges.length(); } DDataSet result= DDataSet.createRank1(len); DDataSet max= DDataSet.createRank1(len); DDataSet min= DDataSet.createRank1(len); boolean hasMin= false; boolean hasMax= false; boolean hasCenter= false; if ( len==0 ) { throw new IllegalArgumentException("bins must have ranges or centers specified"); } else { if ( bins!=null ) { hasCenter= true; Object o= bins.get(0); if ( o instanceof Number ) { for ( int j=0; j lastRecordFound= new HashMap<>(); private static String[] cacheFilesFor( URL url, ParamDescription[] pp, Datum xx ) { String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + "/" + url.getPath() ); if ( url.getQuery()!=null ) { String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); String[] result= new String[pp.length]; for ( int i=0; i> cache= new ConcurrentHashMap<>(); /** * print the cache stats. * @see https://sourceforge.net/p/autoplot/bugs/1996/ */ public static void printCacheStats() { if ( cache==null || cache.isEmpty() ) { System.err.println( "(cache is empty)" ); } else { for ( Entry> s: cache.entrySet() ) { System.err.println( "" + s.getKey() +": "+s.getValue().size()+" records"); } } } private static void writeToCachedData(URL url, ParamDescription[] pp, Datum xx, String[] ss) throws IOException { String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); Datum t0= lastRecordFound.get( u + "/" + sxx ); if ( t0==null ) { String f= s + "/" + u + "/" + sxx + "." + pp[0].name + ".csv"; File ff= new File(f); if ( ff.exists() ) { BufferedReader read= new BufferedReader(new FileReader(ff)); String line= read.readLine(); String lastLine= null; while ( line!=null ) { lastLine= line; line= read.readLine(); } if ( lastLine!=null ) { try { t0= Units.us2000.parse(lastLine); lastRecordFound.put( u + "/" + sxx,t0); } catch (ParseException ex) { t0= null; } } else { t0= null; } } } if ( t0!=null && t0.ge(xx) ) { logger.log(Level.FINE, "clear all cached files for {0}", sxx); for (ParamDescription pp1 : pp) { String f = s + "/hapi/" + u + "/" + sxx + "." + pp1.name + ".csv"; File ff= new File(f); if ( ff.exists() ) { if ( !ff.delete() ) logger.log(Level.INFO, "unable to delete file: {0}", ff); } } } int ifield=0; for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + ".csv" + "." + Thread.currentThread().getId(); logger.log(Level.FINER, "cache.get({0})", f); ArrayList sparam= cache.get(f); if ( sparam==null ) { sparam= new ArrayList<>(); cache.put(f,sparam); logger.log(Level.FINE, "cache.put({0},ArrayList({1}))", new Object[]{f, sparam.size()}); } StringBuilder build= new StringBuilder(); int length = pp1.nFields; for ( int k=0; k0 ) build.append(","); build.append( ss[ifield++] ); } sparam.add(build.toString()); } lastRecordFound.put( u + "/" + sxx,xx); } /** * TODO: this needs to use HAPI_DATA to locate the directory. * See https://sourceforge.net/p/autoplot/bugs/2043/ * */ private static void writeToCachedDataFinish(URL url, ParamDescription[] pp, Datum xx ) throws IOException { logger.log(Level.FINE, "writeToCachedDataFinish: {0}", xx); String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); StringBuilder ub= new StringBuilder( url.getProtocol() + "/" + url.getHost() + url.getPath() ); if ( url.getQuery()!=null ) { // get the id from the url String[] querys= url.getQuery().split("\\&"); Pattern p= Pattern.compile("id=(.+)"); for ( String q : querys ) { Matcher m= p.matcher(q); if ( m.matches() ) { ub.append("/").append(m.group(1)); break; } } } else { throw new IllegalArgumentException("query must be specified, implementation error"); } long currentTimeMillis= pp[0].modifiedDateMillis; TimeParser tp= TimeParser.create( "$Y/$m/$Y$m$d" ); String sxx= tp.format(xx); String u= ub.toString(); for (ParamDescription pp1 : pp) { String f = u + "/" + sxx + "." + pp1.name + ".csv" + "."+ Thread.currentThread().getId(); logger.log(Level.FINE, "remove from cache: {0}", f); ArrayList sparam= cache.remove(f); File ff= new File(s + "/hapi/" + u + "/" + sxx + "." + pp1.name + ".csv" +".gz"); if ( !ff.getParentFile().exists() ) { if ( !ff.getParentFile().mkdirs() ) { throw new IOException("unable to mkdirs "+ff.getParent() ); } } File ffTemp= new File(s + "/hapi/" + u + "/" + sxx + "." + pp1.name + ".csv"+".gz."+Thread.currentThread().getId() ); //int line=0; try (final BufferedWriter w = new BufferedWriter( new OutputStreamWriter( new GZIPOutputStream( new FileOutputStream(ff) ) ) ) ) { if ( sparam!=null ) { for ( String s123: sparam ) { //line++; w.write(s123); w.newLine(); } } } synchronized ( HapiDataSource.class ) { ffTemp.renameTo(ff); if ( currentTimeMillis>0 ) ff.setLastModified(currentTimeMillis); } } } /** * To assist in getting the CDAWeb HAPI server going, handle a few differences * like:
    *
  • parameters are returned within the JSON code. *
  • Epoch is needed. *
* @param monitor * @return * @throws Exception */ private QDataSet getDataSetCDAWeb( ProgressMonitor monitor) throws Exception { URI server = this.resourceURI; String id= getParam("id","" ); if ( id.equals("") ) throw new IllegalArgumentException("missing id"); id= URLDecoder.decode( id,"UTF-8" ); String pp= getParam("parameters",""); if ( !pp.equals("") && !pp.startsWith("Epoch,") ) { pp= "Epoch,"+pp; } if ( id.equals("") ) throw new IllegalArgumentException("missing id"); id= URLDecoder.decode(id,"UTF-8"); DatumRange tr; // TSB = DatumRangeUtil.parseTimeRange(timeRange); tr= tsb.getTimeRange(); URL url= HapiServer.getDataURL( server.toURL(), id, tr, pp ); url= new URL( url.toString()+"&include=header&format=json1" ); monitor.started(); monitor.setProgressMessage("server is preparing data"); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. int lineNum=0; StringBuilder builder= new StringBuilder(); logger.log(Level.FINE, "getDocument {0}", url.toString()); loggerUrl.log(Level.FINE, "GET {0}", new Object[] { url } ); HttpURLConnection httpConnect= ((HttpURLConnection)url.openConnection()); httpConnect.setConnectTimeout(FileSystem.settings().getConnectTimeoutMs()); httpConnect.setReadTimeout(FileSystem.settings().getReadTimeoutMs()); httpConnect= (HttpURLConnection) HttpUtil.checkRedirect( httpConnect ); try ( BufferedReader in= new BufferedReader( new InputStreamReader( httpConnect.getInputStream() ) ) ) { String line= in.readLine(); lineNum++; while ( line!=null ) { if ( System.currentTimeMillis()-t0 > 100 ) { monitor.setProgressMessage("reading line "+lineNum); t0= System.currentTimeMillis(); } builder.append(line); line= in.readLine(); } } catch ( IOException ex ) { ByteArrayOutputStream baos= new ByteArrayOutputStream(); FileSystemUtil.copyStream( httpConnect.getErrorStream(), baos, new NullProgressMonitor() ); String s= baos.toString("UTF-8"); if ( s.contains("No data available") ) { logger.log(Level.FINE, "No data available, server responded with {0}: {1}", new Object[]{httpConnect.getResponseCode(), httpConnect.getResponseMessage()}); throw new NoDataInIntervalException("No data available"); } else { if ( s.length()<256 ) { throw new IOException( ex.getMessage() + ": "+s ); } else { throw ex; } } } httpConnect.disconnect(); JSONObject o= new JSONObject(builder.toString()); JSONObject doc= o; ParamDescription[] pds= getParameterDescriptions(doc); monitor.setProgressMessage("parsing data"); int[] nfields= new int[pds.length]; for ( int i=0; i1 ) { int nf= nfields[ipd]; DDataSet column= DDataSet.createRank2(param.length(),nfields[ipd]); for ( int i=0; i0 ) { String[] pps= pp.split(","); Map map= new HashMap(); for ( int i=0; i1 ) { monitor.setTaskSize(nday*10); monitor.started(); } int iday=0; while ( currentDay.min().le(tr.max()) ) { logger.log(Level.FINER, "useCache, request {0}", currentDay); ProgressMonitor mon1= nday==1 ? monitor : monitor.getSubtaskMonitor( 10*iday, 10*(iday+1), "read "+currentDay ); QDataSet ds1= getDataSetViaCsv(totalFields, mon1, url, pds, currentDay, nparam, nfields); if ( ds1.length()>0 ) { dsall= Ops.append( dsall, ds1 ); } currentDay= currentDay.next(); iday++; } if ( dsall==null ) { logger.info("no records found"); return null; } logger.finer("done useCache, so make daily requests to form granules"); ds= dsall; ds= Ops.putProperty( ds, QDataSet.UNITS, null ); // kludge, otherwise time units are messed up. TODO: who puts unit here? } else { ds= getDataSetViaCsv(totalFields, monitor, url, pds, tr, nparam, nfields); } break; } if ( ds.length()==0 ) { monitor.finished(); throw new NoDataInIntervalException("no records found"); } ds = repackage(ds,pds,null); // install a cacheTag. The following code assumes depend_0 is mutable. QDataSet xds= (QDataSet) ds.property(QDataSet.DEPEND_0); if ( xds==null && ( UnitsUtil.isTimeLocation( SemanticOps.getUnits(ds) ) ) ) { xds= ds; } if ( timeStampLocation.equals("BEGIN") || timeStampLocation.equals("END" ) ) { if ( cadence==null ) { cadence= DataSetUtil.asDatum( DataSetUtil.guessCadenceNew( xds, null ) ); } if ( cadence!=null ) { if ( timeStampLocation.equals("BEGIN") ) { xds= Ops.add( xds, cadence.divide(2) ); } else if ( timeStampLocation.equals("END") ) { xds= Ops.subtract( xds, cadence.divide(2) ); } } else { logger.info("timetags are identified as BEGIN, but cadence was not available to center the data"); } } if ( xds!=null ) { ((MutablePropertyDataSet)xds).putProperty(QDataSet.CACHE_TAG, new CacheTag(tr,null) ); } monitor.setTaskProgress(100); monitor.finished(); return ds; } private boolean useCache() { boolean useCache= HapiServer.useCache(); String cacheParam= getParam( "cache", "" ); if ( cacheParam.equals("F") ) { useCache= false; } return useCache; } private QDataSet getDataSetViaCsv(int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields) throws IllegalArgumentException, Exception, IOException { DataSetBuilder builder= new DataSetBuilder(2,100,totalFields); monitor.setProgressMessage("reading data"); monitor.setTaskProgress(20); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. boolean useCache= useCache(); if ( useCache ) { // round out data request to day boundaries. Datum minMidnight= TimeUtil.prevMidnight( tr.min() ); Datum maxMidnight= TimeUtil.nextMidnight( tr.max() ); tr= new DatumRange( minMidnight, maxMidnight ); URISplit split= URISplit.parse(url.toURI()); Map params= URISplit.parseParams(split.params); params.put("time.min",minMidnight.toString()); params.put("time.max",maxMidnight.toString()); split.params= URISplit.formatParams(params); String surl= URISplit.format(split); url= new URL(surl); } AbstractLineReader cacheReader; if ( useCache ) { // this branch posts the request, expecting that the server may respond with 304, indicating the cache should be used. String[] parameters= new String[pds.length]; for ( int i=0; i 100 ) { monitor.setProgressMessage("reading "+xx); t0= System.currentTimeMillis(); double d= DatumRangeUtil.normalize( tr, xx ); monitor.setTaskProgress( 20 + (int)( 75 * d ) ); if ( monitor.isCancelled() ) throw new CancelledOperationException("cancel was pressed"); } } catch ( ParseException ex ) { line= in.readLine(); continue; } // "close" the current file, gzipping it. if ( cacheReader==null && useCache && !currentDay.contains(xx) && tr.intersects(currentDay) && completeDay ) { // https://sourceforge.net/p/autoplot/bugs/1968/ HAPI caching must not cache after "modificationDate" or partial days remain in cache if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { writeToCachedDataFinish( url, pds, currentDay.middle() ); } else { logger.fine("data after modification date is not cached."); } } while ( !currentDay.contains(xx) && tr.intersects(currentDay ) ) { currentDay= currentDay.next(); completeDay= tr.contains(currentDay); if ( cacheReader==null && useCache && !currentDay.contains(xx) && tr.intersects(currentDay ) ) { if ( pds[0].modifiedDateMillis==0 || currentDay.middle().doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { // put empty file which is placeholder. writeToCachedDataFinish( url, pds, currentDay.middle() ); } } } if ( !currentDay.contains(xx) ) { logger.fine("something's gone wrong, perhaps out-of-order timetags."); completeDay= false; } if ( completeDay ) { if ( cacheReader==null && useCache ) { if ( pds[0].modifiedDateMillis==0 || xx.doubleValue(Units.ms1970) - pds[0].modifiedDateMillis <= 0 ) { writeToCachedData( url, pds, xx, ss ); } } } builder.putValue( -1, ifield, xx ); ifield++; for ( int i=1; i cacheFiles= new HashMap<>(); cacheFiles.put( "cached", "true" ); builder.putProperty( QDataSet.USER_PROPERTIES, cacheFiles ); } monitor.setTaskProgress(95); QDataSet ds= builder.getDataSet(); return ds; } private QDataSet getDataSetViaBinary(int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields) throws IllegalArgumentException, Exception, IOException { DataSetBuilder builder = new DataSetBuilder(2, 100, totalFields); monitor.setProgressMessage("reading data"); monitor.setTaskProgress(20); long t0 = System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. loggerUrl.log(Level.FINE, "GET {0}", new Object[] { url } ); HttpURLConnection httpConnect = (HttpURLConnection) url.openConnection(); httpConnect.setConnectTimeout(FileSystem.settings().getConnectTimeoutMs()); httpConnect.setReadTimeout(FileSystem.settings().getReadTimeoutMs()); httpConnect.setRequestProperty("Accept-Encoding", "gzip"); httpConnect.connect(); boolean gzip = "gzip".equals(httpConnect.getContentEncoding()); int recordLengthBytes = 0; TransferType[] tts = new TransferType[pds.length]; for (int i = 0; i < pds.length; i++) { if (pds[i].type.startsWith("time")) { recordLengthBytes += Integer.parseInt(pds[i].type.substring(4)); tts[i] = TransferType.getForName(pds[i].type, Collections.singletonMap(QDataSet.UNITS, (Object)pds[i].units)); } else if (pds[i].type.startsWith("string")) { recordLengthBytes += pds[i].length; final Units u= pds[i].units; final int length= pds[i].length; final byte[] bytes= new byte[length]; tts[i] = new TransferType() { @Override public void write(double d, ByteBuffer buffer) { } @Override public double read(ByteBuffer buffer) { buffer.get(bytes); //buf2.get(bytes); String s= new String( bytes ); Datum d= ((EnumerationUnits)u).createDatum(s); return d.doubleValue(u); } @Override public int sizeBytes() { return length; } @Override public boolean isAscii() { return false; } @Override public String name() { return "string"+length; } }; } else { Object type= pds[i].type; recordLengthBytes += BufferDataSet.byteCount(type) * DataSetUtil.product(pds[i].size); tts[i] = TransferType.getForName( type.toString(), Collections.singletonMap(QDataSet.UNITS, (Object)pds[i].units) ); } if ( tts[i]==null ) { throw new IllegalArgumentException("unable to identify transfer type for \""+pds[i].type+"\""); } } totalFields= DataSetUtil.sum(nfields); double[] result = new double[totalFields]; try (InputStream in = gzip ? new GZIPInputStream(httpConnect.getInputStream()) : httpConnect.getInputStream()) { ByteBuffer buf = TransferType.allocate( recordLengthBytes,ByteOrder.LITTLE_ENDIAN ); byte[] bytes = buf.array(); int bytesRead = in.read(bytes); while (bytesRead != -1) { while ( bytesRead 100) { monitor.setProgressMessage("reading " + xx); t0 = System.currentTimeMillis(); double d = DatumRangeUtil.normalize(tr, xx); monitor.setTaskProgress(20 + (int) (75 * d)); } builder.putValue(-1, ifield, xx); ifield++; for (int i = 1; i < nparam; i++) { // nparam is number of parameters, which may have multiple fields. for (int j = 0; j < nfields[i]; j++) { builder.putValue(-1, ifield, result[ifield]); //TODO: fill? ifield++; } } builder.nextRecord(); buf.flip(); bytesRead = in.read(bytes); } } catch (IOException e) { logger.log( Level.WARNING, e.getMessage(), e ); monitor.finished(); throw new IOException(String.valueOf(httpConnect.getResponseCode()) + ":" + httpConnect.getResponseMessage()); } catch (Exception e) { logger.log( Level.WARNING, e.getMessage(), e ); monitor.finished(); throw e; } finally { httpConnect.disconnect(); } monitor.setTaskProgress(95); QDataSet ds = builder.getDataSet(); return ds; } /** * read data embedded within a JSON response. This current reads in the entire JSON document, * but the final version should use a streaming JSON library. * @param monitor * @return the dataset. * @throws Exception */ private QDataSet getDataSetViaJSON( int totalFields, ProgressMonitor monitor, URL url, ParamDescription[] pds, DatumRange tr, int nparam, int[] nfields) throws IllegalArgumentException, Exception, IOException { monitor.started(); monitor.setProgressMessage("server is preparing data"); long t0= System.currentTimeMillis() - 100; // -100 so it updates after receiving first record. int lineNum=0; StringBuilder builder= new StringBuilder(); logger.log(Level.FINE, "getDocument {0}", url.toString()); loggerUrl.log(Level.FINE, "GET {0}", new Object[] { url } ); HttpURLConnection httpConnect= ((HttpURLConnection)url.openConnection()); httpConnect.setConnectTimeout(FileSystem.settings().getConnectTimeoutMs()); httpConnect.setReadTimeout(FileSystem.settings().getReadTimeoutMs()); httpConnect= (HttpURLConnection) HttpUtil.checkRedirect(httpConnect); try ( BufferedReader in= new BufferedReader( new InputStreamReader( httpConnect.getInputStream() ) ) ) { String line= in.readLine(); lineNum++; while ( line!=null ) { if ( System.currentTimeMillis()-t0 > 100 ) { monitor.setProgressMessage("reading line "+lineNum); t0= System.currentTimeMillis(); } //if ( line.startsWith("{ \"data\" :") ) { // TODO: kludge for Jon's server // in.readLine(); // line= in.readLine(); //} builder.append(line); line= in.readLine(); } } catch ( IOException ex ) { ByteArrayOutputStream baos= new ByteArrayOutputStream(); FileSystemUtil.copyStream( httpConnect.getErrorStream(), baos, new NullProgressMonitor() ); String s= baos.toString("UTF-8"); if ( s.contains("No data available") ) { logger.log(Level.FINE, "No data available, server responded with {0}: {1}", new Object[]{httpConnect.getResponseCode(), httpConnect.getResponseMessage()}); throw new NoDataInIntervalException("No data available"); } else { if ( s.length()<256 ) { throw new IOException( ex.getMessage() + ": "+s ); } else { throw ex; } } } httpConnect.disconnect(); // See unix tcptrack which shows there are many connections to the server. jbf@gardenhousepi:~ $ sudo tcptrack -i eth0 monitor.setProgressMessage("parsing data"); JSONObject jo= new JSONObject(builder.toString()); JSONArray data= jo.getJSONArray("data"); DataSetBuilder build= new DataSetBuilder( 2, data.length(), totalFields ); for ( int i=0; i1 ) { JSONArray fields= record.getJSONArray(ipd); int nf= nfields[ipd]; int lastField= nf+ifield; for ( ; ifield getCacheFiles( URL url, String id, String[] parameters, DatumRange timeRange ) { String s= AutoplotSettings.settings().resolveProperty(AutoplotSettings.PROP_FSCACHE); if ( s.endsWith("/") ) s= s.substring(0,s.length()-1); String u= url.getProtocol() + "/" + url.getHost() + "/" + url.getPath(); u= u + "/data/" + id; LinkedHashMap result= new LinkedHashMap<>(); try { for (String parameter : parameters) { String theFile= s + "/hapi/"+ u ; FileStorageModel fsm = FileStorageModel.create(FileSystem.create( "file:" +theFile ), "$Y/$m/$Y$m$d." + parameter + ".csv.gz"); String[] ff= fsm.getNamesFor(null); for (String ff1 : ff) { DatumRange tr1= fsm.getRangeFor(ff1); if ( timeRange==null || timeRange.intersects(tr1)) { result.put(ff1,tr1); } } } } catch ( IOException | IllegalArgumentException ex) { logger.log(Level.FINE, "exception in cache", ex ); return null; } return result; } private static AbstractLineReader calculateCacheReader( File[][] files ) { ConcatenateBufferedReader cacheReader= new ConcatenateBufferedReader(); for ( int i=0; i trs= DatumRangeUtil.generateList( timeRange, aday ); long timeNow= System.currentTimeMillis(); // which granules are available for all parameters? boolean[][] hits= new boolean[trs.size()][parameters.length]; File[][] files= new File[trs.size()][parameters.length]; boolean staleCacheFiles= false; String u= ub.toString(); if ( ! new File( s + "/hapi/"+ u ).exists() ) { return null; } try { for ( int i=0; i1 ) { throw new IllegalArgumentException("implementation error, should get just one file per day."); } else if ( ff.length==0 ) { hits[i][j]= false; } else { File f= ff[0]; long ageMillis= timeNow - f.lastModified(); boolean isStale= ( ageMillis > HapiServer.cacheAgeLimitMillis() ); if ( lastModified>0 ) { isStale= f.lastModified() < lastModified; // Note FAT32 only has 4sec resolution, which could cause problems. if ( !isStale ) { logger.fine("server lastModified indicates the cache file can be used"); } else { logger.fine("server lastModified indicates the cache file should be updated"); } } if ( offline || !isStale ) { hits[i][j]= true; files[i][j]= f; } else { logger.log(Level.FINE, "cached file is too old to use: {0}", f); hits[i][j]= false; staleCacheFiles= true; } } } } } catch ( IOException | IllegalArgumentException ex) { logger.log(Level.FINE, "exception in cache", ex ); return null; } if ( staleCacheFiles && !offline ) { logger.fine("old cache files found, but new data is available and accessible"); return null; } boolean haveSomething= false; boolean haveAll= true; for ( int i=0; i1 ) { ds= Ops.reform( ds, ds.length(), pds[1].size ); } ds= Ops.putProperty( ds, QDataSet.DEPEND_0, depend0 ); ds= Ops.putProperty( ds, QDataSet.NAME, Ops.safeName(pds[1].name) ); ds= Ops.putProperty( ds, QDataSet.LABEL, pds[1].label ); ds= Ops.putProperty( ds, QDataSet.TITLE, pds[1].description ); ds= Ops.putProperty( ds, QDataSet.UNITS, pds[1].units ); if ( pds[1].hasFill ) { ds= Ops.putProperty( ds, QDataSet.FILL_VALUE, pds[1].fillValue ); } //if ( pds[1].depend1!=null ) { // ds= Ops.putProperty( ds, QDataSet.DEPEND_1, pds[1].depend1 ); //} if ( pds[1].depend!=null ) { for ( int j=0; j1 ) { //bdsb.putProperty( QDataSet.ELEMENT_DIMENSIONS, ifield-1, pds[i].size ); // not supported yet. sdsb.putProperty( QDataSet.ELEMENT_NAME, startIndex, Ops.safeName( pds[i].name ) ); sdsb.putProperty( QDataSet.ELEMENT_LABEL, startIndex, pds[i].name ); for ( int j=0; j1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name +" ch"+j ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name)+"_"+j ); } else { sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name) ); } sdsb.putProperty( QDataSet.TITLE, startIndex + j, pds[i].description ); sdsb.putProperty( QDataSet.UNITS, startIndex + j, pds[i].units ); if ( pds[i].hasFill ) { sdsb.putProperty( QDataSet.FILL_VALUE, startIndex + j, pds[i].fillValue ); } if ( nfields1>1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); } ifield++; } length1= nfields1; sdsbs[i]= sdsb; } int start= 1; WritableDataSet wds= Ops.copy( Ops.trim1( ds, start, start+length1 ) ); start= start+length1; wds.putProperty( QDataSet.DEPEND_0, depend0 ); wds.putProperty( QDataSet.BUNDLE_1, sdsbs[1].getDataSet() ); for ( int i=1; i1 ) { //bdsb.putProperty( QDataSet.ELEMENT_DIMENSIONS, ifield-1, pds[i].size ); // not supported yet. sdsb.putProperty( QDataSet.ELEMENT_NAME, startIndex, Ops.safeName( pds[i].name ) ); sdsb.putProperty( QDataSet.ELEMENT_LABEL, startIndex, pds[i].name ); for ( int j=0; j1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name +" ch"+j ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name)+"_"+j ); } else { sdsb.putProperty( QDataSet.LABEL, startIndex + j, pds[i].name ); sdsb.putProperty( QDataSet.NAME, startIndex + j, Ops.safeName(pds[i].name) ); } sdsb.putProperty( QDataSet.TITLE, startIndex + j, pds[i].description ); sdsb.putProperty( QDataSet.UNITS, startIndex + j, pds[i].units ); if ( pds[i].hasFill ) { sdsb.putProperty( QDataSet.FILL_VALUE, startIndex + j, pds[i].fillValue ); } if ( nfields1>1 ) { sdsb.putProperty( QDataSet.START_INDEX, startIndex + j, startIndex ); } ifield++; } } ds= Ops.copy( Ops.trim1( ds, 1, ds.length(0) ) ); ds= Ops.putProperty( ds, QDataSet.DEPEND_0, depend0 ); ds= Ops.putProperty( ds, QDataSet.BUNDLE_1, sdsb.getDataSet() ); } return ds; } }