/* * To change this template, choose Tools | Templates * and open the template in the editor. */ package org.autoplot.datasource.jython; import java.beans.ExceptionListener; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.io.LineNumberReader; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.text.ParseException; import java.util.ConcurrentModificationException; import java.util.Date; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.logging.Level; import java.util.logging.Logger; import org.das2.CancelledOperationException; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.CacheTag; import org.das2.datum.DatumRangeUtil; import org.das2.util.LoggerManager; import org.das2.util.monitor.NullProgressMonitor; import org.das2.util.monitor.ProgressMonitor; import org.python.core.Py; import org.python.core.PyArray; import org.python.core.PyDictionary; import org.python.core.PyException; import org.python.core.PyFloat; import org.python.core.PyInteger; import org.python.core.PyList; import org.python.core.PyObject; import org.python.core.PyStringMap; import org.python.util.PythonInterpreter; import org.das2.qds.DataSetOps; import org.das2.qds.MutablePropertyDataSet; import org.das2.qds.QDataSet; import org.autoplot.datasource.AbstractDataSource; import org.autoplot.datasource.DataSetURI; import org.autoplot.datasource.LogNames; import org.autoplot.datasource.ReferenceCache; import org.autoplot.datasource.URISplit; import org.autoplot.datasource.capability.Caching; import org.autoplot.datasource.capability.TimeSeriesBrowse; import org.das2.qds.ops.Ops; import org.autoplot.jythonsupport.JythonOps; import org.autoplot.jythonsupport.JythonRefactory; import org.autoplot.jythonsupport.JythonUtil; import org.autoplot.jythonsupport.PyQDataSet; /** * Use a jython script to read and process data from a number of sources. * Special parameters: *
  • timerange if used then TimeSeriesBrowse is added. * @author jbf */ public class JythonDataSource extends AbstractDataSource implements Caching { ExceptionListener listener; private Map metadata; protected final static String PARAM_SCRIPT= "script"; protected final static String PARAM_TIMERANGE= "timerange"; protected final static String PARAM_RESOURCE_URI= "resourceURI"; private static final Logger logger= LoggerManager.getLogger( LogNames.APDSS_JYDS ); private boolean notCheckedTsb= true; public JythonDataSource(URI uri, JythonDataSourceFactory factory) { super(uri); addCapability(Caching.class, this); //TODO: check for parameter inputs! this.listener = factory.listener; if ( true ) { try { File jythonScript= getScript(); // this assumes the user can go without progress feedback. JythonDataSourceTimeSeriesBrowse tsb1= JythonDataSourceTimeSeriesBrowse.checkForTimeSeriesBrowse( uri.toString(), jythonScript ); if ( tsb1!=null ) { tsb1.setJythonDataSource(this); addCapability( TimeSeriesBrowse.class, tsb1 ); tsb= tsb1; notCheckedTsb= false; } } catch (ParseException ex) { logger.severe( ex.toString() ); } catch ( IOException ex ) { logger.severe( ex.toString() ); } } } /** * get the name of the script, which is non-trivial since it can be in either the resourceURI or script= * @return * @throws IOException */ private File getScript() throws IOException { File jythonScript; // script to run. if ( params.get( PARAM_SCRIPT )!=null ) { // getFile( resourceURI ) //TODO: since we don't getFile(resourceURI), we can't use filePollUpdating. Also, why do we have local variable? jythonScript= getFile( new URL(params.get( PARAM_SCRIPT )), new NullProgressMonitor() ); } else { jythonScript= getFile(new NullProgressMonitor()); } return jythonScript; } private String nextExec( LineNumberReader reader, String[] nextLine ) throws IOException { StringBuilder s; if ( nextLine[0]!=null ) { s= new StringBuilder(nextLine[0]); nextLine[0]= null; } else { String ss= reader.readLine(); if ( ss==null ) ss=""; s = new StringBuilder(ss); } String stest= s.toString(); if ( ( stest.startsWith("def ") || stest.startsWith("if") || stest.startsWith("else") ) ) { String s1= reader.readLine(); while ( s1!=null && ( s1.length()==0 || Character.isWhitespace(s1.charAt(0)) ) ) { s.append("\n").append(s1); s1= reader.readLine(); } while ( s1!=null && s1.startsWith("else") ) { // TODO: under implementation, use python parser for ideal solution s.append("\n").append(s1); s1= reader.readLine(); while ( s1!=null && ( s1.length()==0 || Character.isWhitespace(s1.charAt(0)) ) ) { s.append("\n").append(s1); s1= reader.readLine(); } } nextLine[0]= s1; } return s.toString(); } private synchronized QDataSet getInlineDataSet(URI uri) throws Exception { interp = JythonUtil.createInterpreter(false); PyObject result= interp.eval(uri.getRawSchemeSpecificPart()); QDataSet res; if (result instanceof PyList) { res = JythonOps.dataset((PyList) result); } else { res = (QDataSet) result.__tojava__(QDataSet.class); } return res; } @Override public synchronized QDataSet getDataSet(ProgressMonitor mon) throws Exception { mon.started(); File jythonScript; // script to run. String lresourceURI; // optional resource URI that is argument to script, excluding script argument. String suri= DataSetURI.fromUri(uri); if ( tsb!=null ) { //tsb.setURI(suri); suri= tsb.getURI(); } URISplit split= URISplit.parse(suri); Map paramsl= URISplit.parseParams(split.params); // abstract datasource params don't update. if ( split.scheme.equals("inline") ) { // note this is handled elsewhere, in InlineDataSource return getInlineDataSet(new URI(uri.getRawSchemeSpecificPart())); } boolean useReferenceCache= "true".equals(System.getProperty(org.autoplot.datasource.ReferenceCache.PROP_ENABLE_REFERENCE_CACHE, "false" ) ); suri= URISplit.makeCanonical(suri); URISplit split1= URISplit.parse(suri); Map params1= URISplit.parseParams(split1.params); split1.params= URISplit.formatParams(params1); suri= URISplit.format( split1 ); // params1.remove("arg_0"); split1.params= URISplit.formatParams(params1); String lockUri= URISplit.format(split1); ReferenceCache.ReferenceCacheEntry rcent=null; if ( useReferenceCache ) { rcent= ReferenceCache.getInstance().getDataSetOrLock( lockUri, mon); if ( !rcent.shouldILoad( Thread.currentThread() ) ) { rcent.park( mon ); ReferenceCache.ReferenceCacheEntry entry= ReferenceCache.getInstance().getReferenceCacheEntry(suri); if ( entry!=null ) { QDataSet result= ReferenceCache.getInstance().getDataSet(suri); // get a strong reference before a GC if ( result==null ) { logger.fine("garbage collector got the data before a non-weak reference could be made"); logger.log(Level.FINE, "miss {0}", suri); rcent= null; mon= new NullProgressMonitor(); // we can't reuse monitor after finished is called. } else if ( result==ReferenceCache.NULL ) { return null; } else { return result; } } else { logger.log(Level.FINE, "referenceCache doesn''t know the URI: {0}", suri); //What's to be done here? It could be the name was wrong, so should //we just assume this is an error? rcent= null; // go through as before. mon= new NullProgressMonitor(); // we can't reuse monitor after finished is called. } } else { logger.log(Level.FINE, "reference cache in use, {0} is loading {1}", new Object[] { Thread.currentThread().toString(), resourceURI } ); } } boolean allowCaching= !( "F".equals( params.get("allowCaching") ) ); if ( !allowCaching ) interp= null; PyException causedBy = null; try { if ( params.get( PARAM_SCRIPT )!=null ) { jythonScript= getFile( new URL(params.get( PARAM_SCRIPT )), new NullProgressMonitor() ); mon.setProgressMessage( "loading "+uri ); split.params= null; lresourceURI= DataSetURI.fromUri( DataSetURI.getResourceURI(URISplit.format(split)) ); } else { lresourceURI= null; jythonScript= getFile(new NullProgressMonitor()); } if ( interp == null ) { // caching might leave the interpretter open. This needs to be tweaked--the TSB could set interp to null for example. logger.log(Level.FINE, "running script {0} {1}", new Object[] { jythonScript, paramsl } ); mon.setProgressMessage( "initialize Jython interpreter..."); interp = JythonUtil.createInterpreter(false); mon.setProgressMessage( "done initializing Jython interpreter"); try { interp.set("monitor", mon); } catch ( ConcurrentModificationException ex ) { logger.warning("avoiding strange concurrent modification bug that occurs within Jython on the server..."); try { Thread.sleep(100); } catch ( InterruptedException ex2 ) { } interp.set("monitor", mon); logger.warning("done."); } interp.set("PWD",split.path); interp.exec("import autoplot2017 as autoplot"); interp.exec("autoplot.params=dict()"); for ( Entry e : paramsl.entrySet()) { String s= e.getKey(); if (!s.equals("arg_0") && !s.equals("script") ) { String sval= e.getValue(); if ( sval.length()>0 ) { sval= JythonUtil.maybeQuoteString( sval ); logger.log(Level.FINE, "autoplot.params[''{0}'']={1}", new Object[]{s, sval}); interp.exec("autoplot.params['" + s + "']=" + sval); } } } if ( lresourceURI!=null ) { interp.set( PARAM_RESOURCE_URI, lresourceURI); // legacy interp.exec("autoplot.params['"+PARAM_RESOURCE_URI+"']="+ JythonUtil.maybeQuoteString( lresourceURI ) ); } mon.setProgressMessage( "executing script"); LineNumberReader reader=null; try { boolean debug = false; //TODO: exceptions will have the wrong line number in this mode. if (debug) { FileReader fr= new FileReader( jythonScript ); reader = new LineNumberReader( fr ); String[] nextLine= new String[1]; String s = nextExec( reader, nextLine ); long t0= System.currentTimeMillis(); while (s != null) { logger.log(Level.FINEST, "{0}: {1}", new Object[]{reader.getLineNumber(), s}); interp.exec(s); logger.finest( String.format( "line=%d time=%dms %s\n", reader.getLineNumber(), (System.currentTimeMillis()-t0), s ) ); if ( mon.isCancelled() ) break; mon.setProgressMessage("exec line "+reader.getLineNumber() ); s = nextExec( reader, nextLine ); t0= System.currentTimeMillis(); } fr.close(); } else { InputStream in = new FileInputStream( jythonScript ); try { in= JythonRefactory.fixImports(in); logger.log(Level.FINE, "executing script {0}", jythonScript.getName()); interp.execfile(in,jythonScript.getName()); logger.log(Level.FINE, "done executing script {0}", jythonScript.getName()); } catch ( PyException ex ) { if ( ex.toString().contains("checkForComodification") ) { in.close(); in = new FileInputStream( jythonScript ); logger.warning("avoiding second strange concurrent modification bug that occurs within Jython on the server. Run the whole thing again."); Thread.sleep(200); in= JythonRefactory.fixImports(in); interp.execfile(in,jythonScript.getName()); } else { throw ex; // This exception is caught again 6 lines down } } in.close(); } mon.setProgressMessage( "done executing script"); } catch (PyException ex) { if ( reader!=null ) { //ex.lineno= ex.lineno+iline; logger.log(Level.FINE, "debugging line number={0}", reader.getLineNumber()); } causedBy = ex; Object javaClass= ex.value.__tojava__(Exception.class); // since FileNotFoundException is a special exception where we don't want to interrupt the user with a popup, handle it specially. if ( javaClass instanceof java.io.FileNotFoundException ) { throw (Exception)javaClass; } else if ( javaClass instanceof NoDataInIntervalException ) { throw (Exception)javaClass; } else if ( javaClass instanceof CancelledOperationException ) { throw (Exception)javaClass; } else if ( javaClass instanceof org.das2.util.monitor.CancelledOperationException ) { //TODO: why are there two? throw (Exception)javaClass; } logger.warning( ex.toString() ); if (listener != null) { listener.exceptionThrown(ex); } } catch (Exception ex) { throw ex; } if (causedBy == null && allowCaching ) { cacheDate = resourceDate(this.uri); cacheUrl = cacheUrl(this.uri); } } else { logger.fine("using existing interpreter to provide caching"); } String expr = params.get("arg_0"); PyObject result=null; String label= null; if (expr == null) { try { result = interp.eval("result"); // legacy } catch ( PyException ex ) { try { result = interp.eval("data"); } catch ( PyException ex2 ) { if ( causedBy!=null ) { throw ex2; } else { throw new IllegalArgumentException("neither \"data\" nor \"result\" is defined"); } } } } else { Object o= interp.get("outputParams"); if ( o!=null && o instanceof PyDictionary ) { PyDictionary dict= (PyDictionary)o; result= dict.get(Py.newString(expr)); } if ( result==null || result==Py.None ) { result = interp.eval(expr); } label= expr; } metadata= new LinkedHashMap(); PyObject pymeta; try { pymeta= interp.eval("metadata"); if ( pymeta instanceof PyDictionary ) { PyDictionary dict= ((PyDictionary)pymeta); PyList keys= dict.keys(); for ( Iterator i= keys.iterator(); i.hasNext(); ) { Object key= i.next(); String name= key.toString(); Object o= dict.get( Py.java2py(key) ); if ( o instanceof PyList ) { String[] arr= new String[ ((PyList)o).__len__() ]; for ( int i2=0; i2 m= URISplit.parseParams( t.params ); String s= m.remove("arg_0"); PyStringMap locals= (PyStringMap) interp.getLocals(); PyList keys= locals.keys(); PyList values= locals.values(); boolean useOutputParams= false; Object o= interp.get("outputParams"); if ( o!=null && o instanceof PyDictionary ) { if ( ((PyDictionary)o).__len__()>0 ) { useOutputParams= true; } } if ( useOutputParams==false ) { logger.fine("loading local datasets to cache"); for ( int i=0; i{1}", new Object[]{uri1, value}); } } } else { logger.fine("loading output params to cache"); PyDictionary dict= (PyDictionary)o; assert dict!=null; keys= dict.keys(); for ( int i=0; i{1}", new Object[]{uri1, value}); } } } if ( s==null ) { rcent.finished(res); } else { rcent.finished( Ops.dataset( "1971-01-01T00:00" ) ); } } if (causedBy != null) { interp = null; cacheUrl = null; cacheDate = null; logger.log(Level.WARNING, "exception in processing: {0}", causedBy); throw causedBy; } if ( !allowCaching ) { logger.log(Level.FINE, "reset caching because allowCaching is false" ); interp= null; } return res; } catch (PyException ex) { if ( rcent!=null ) rcent.exception(ex); if (causedBy != null) { logger.log(Level.FINE, "rethrow causedBy" ); throw causedBy; } logger.log(Level.FINE, "resetting caching because of PyException" ); interp = null; cacheUrl = null; cacheDate = null; throw ex; } catch ( Exception ex ) { if ( rcent!=null ) rcent.exception(ex); throw ex; } finally { if ( !mon.isFinished() ) mon.finished(); } } @Override public Map getMetadata(ProgressMonitor mon) throws Exception { return metadata; } PythonInterpreter interp = null; TimeSeriesBrowse tsb= null; // if the script has getParam('timerange','2011-001') private String cacheUrl(URI uri) { URISplit split = URISplit.parse(uri); Map params2 = URISplit.parseParams(split.params); params2.remove("arg_0"); split.params = URISplit.formatParams(params2); return URISplit.format(split); } private Date resourceDate(URI uri) throws IOException { File src = DataSetURI.getFile( uri.toString(), true, new NullProgressMonitor()); //TODO: this is probably wrong, because it should always be the script... return new Date(src.lastModified()); } Date cacheDate = null; String cacheUrl = null; private synchronized boolean useCache(URI uri) { try { if ((cacheDate != null && !resourceDate(uri).after(cacheDate)) && (cacheUrl != null && cacheUrl.equals(cacheUrl(uri)))) { if ( uri.toString().contains("allowCaching=F") ) { return false; } else { return true; } } return false; } catch (IOException ex) { return false; } } @Override public boolean satisfies(String surl) { URISplit split= URISplit.parse(surl); if ( !"vap+jyds".equals( split.vapScheme ) ) { return false; } try { return useCache( DataSetURI.getURI(surl) ); } catch (URISyntaxException ex) { return false; } } @Override public void resetURI(String surl) { try { this.uri = DataSetURI.getURI(surl); URISplit split = URISplit.parse(uri); params = URISplit.parseParams(split.params); resourceURI = DataSetURI.toUri(split.file); } catch (URISyntaxException ex) { throw new RuntimeException(ex); } } /** * explicitly reset the interpreter, which is cached to provide results. */ @Override public synchronized void reset() { logger.fine("JythonDataSource.reset() clears cache"); interp= null; } }