package org.autoplot.csv; import com.csvreader.CsvReader; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.text.ParseException; import java.util.Arrays; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.das2.dataset.NoDataInIntervalException; import org.das2.datum.EnumerationUnits; import org.das2.datum.TimeLocationUnits; import org.das2.datum.TimeUtil; import org.das2.datum.Units; import org.das2.datum.UnitsUtil; import org.das2.util.LoggerManager; import org.das2.util.monitor.ProgressMonitor; import org.das2.qds.DDataSet; import org.das2.qds.QDataSet; import org.das2.qds.SemanticOps; import org.das2.qds.SparseDataSet; import org.autoplot.datasource.AbstractDataSource; import org.autoplot.datasource.DataSetURI; import org.autoplot.datasource.capability.Streaming; import org.autoplot.html.AsciiTableStreamer; import org.das2.datum.Datum; import org.das2.datum.DatumUtil; import org.das2.datum.InconvertibleUnitsException; import org.das2.qds.ops.Ops; import org.das2.qds.util.AsciiParser; import org.das2.qds.util.DataSetBuilder; import org.das2.util.monitor.NullProgressMonitor; /** * Specialized reader only reads csv files. These csv files must be simple tables with the same number of fields in each record. * @author jbf */ public class CsvDataSource extends AbstractDataSource { private static final Logger logger= LoggerManager.getLogger("apdss.csv"); /** * initializer * @param uri the URI */ public CsvDataSource(URI uri) { super(uri); addCapability( Streaming.class, new CsvTableStreamingSource() ); } private QDataSet parseHeader( int icol, String header, String sval ) { header= header.trim(); DDataSet result= DDataSet.create( new int[0] ); // rank 0 dataset Units u= guessUnits(sval); if ( u!=Units.dimensionless ) result.putProperty( QDataSet.UNITS, u ); if ( UnitsUtil.isTimeLocation(u) ) result.putProperty( QDataSet.NAME, "UTC" ); if ( header.length()==0 ) { try { result.putValue(u.parse(sval).doubleValue(u)); } catch (ParseException ex) { logger.log(Level.SEVERE, ex.getMessage(), ex); } return result; } else { Pattern p= Pattern.compile( "([a-zA-Z0-9\\-\\+ ]*)(\\(([a-zA-Z-0-9\\-\\+ ]*)\\))?"); Matcher m= p.matcher(header); if ( m.matches() ) { String label= m.group(1).trim(); String sunits= m.group(3); if ( header.length()>0 ) result.putProperty( QDataSet.NAME, Ops.safeName(label) ); result.putProperty(QDataSet.LABEL,label); if ( sunits!=null ) result.putProperty(QDataSet.UNITS, SemanticOps.lookupUnits(sunits.trim()) ); } return result; } } private static Units guessUnits( String sval ) { try { Units.dimensionless.parse(sval); return Units.dimensionless; } catch ( ParseException ex ) { logger.log(Level.FINER, "fails to parse as number: {0}", sval); } try { AsciiParser.UNIT_UTC.parse(sval); return AsciiParser.UNIT_UTC; } catch ( ParseException ex ) { logger.log(Level.FINER, "fails to parse as time: {0}", sval); } return EnumerationUnits.create("enum"); } @Override public QDataSet getDataSet(ProgressMonitor mon) throws Exception { InputStream in = DataSetURI.getInputStream(uri, mon.getSubtaskMonitor("load input stream")); //char delimiter= TableOps.getDelim(thein); String sdelimiter= getParam("delim", ","); if ( sdelimiter.equals("COMMA") ) sdelimiter= ","; if ( sdelimiter.equals("SEMICOLON") ) sdelimiter= ";"; char delimiter= sdelimiter.charAt(0); BufferedReader breader= new BufferedReader( new InputStreamReader(in) ); String skip= getParam( "skipLines", "" ); if ( skip.length()==0 ) skip= getParam( "skip", "" ); if ( skip.length()>0 ) { int iskip= Integer.parseInt(skip); // TODO: getIntegerParam( "skip", -1, "min=0,max=100" ); for ( int i=0; i0 && irecCount<(Integer.MAX_VALUE-irecStart) ) { irecCount+= irecStart; } CsvReader reader= new CsvReader( breader ); if ( delimiter!=',' ) reader.setDelimiter(delimiter); String[] columnHeaders; columnHeaders= CsvDataSourceFactory.getColumnHeaders(reader,true); String column= getParam( "column", null ); /** * icolumn is the column we are reading, or -1 when reading multiple columns. */ int icolumn; if ( column==null ) { icolumn= -1; } else { icolumn= TableOps.columnIndex( column, columnHeaders ); if ( icolumn==-1 ) { throw new IllegalArgumentException("column not found: "+column); } } QDataSet icolumnDs=null; // metadata for column String bundle= getParam( "bundle", null ); int[] cols; if ( bundle==null ) { cols= null; } else { cols= TableOps.parseRangeStr( bundle, columnHeaders ); icolumn= cols[0]; // get the units from this column } String time= getParam("time",null); // time or depend0 can be used. String dep0column= getParam( "depend0", time ); int idep0column; if ( dep0column==null ) { idep0column= -1; } else { idep0column= TableOps.columnIndex( dep0column, columnHeaders ); if ( idep0column==-1 ) { throw new IllegalArgumentException("column not found: "+dep0column); } } QDataSet dep0ds= null; Units dep0u= Units.dimensionless; Units u= Units.dimensionless; Units[] columnUnits= null; double tb=0, cb=0; // temporary holders for data double[] bundleb=null; // temporary holders for each column. if ( cols!=null ) { bundleb= new double[ cols[1]-cols[0] ]; } DataSetBuilder builder; if ( bundleb!=null ) { builder= new DataSetBuilder( 2, 100, bundleb.length ); } else { builder= new DataSetBuilder( 1, 100 ); } DataSetBuilder tbuilder= new DataSetBuilder( 1, 100 ); mon.setTaskSize(-1); mon.started(); int line=0; double fill= 0; boolean needToCheckHeader= true; // check to see if the first line of data was mistaken for a header logger.log(Level.FINE, "reading csv data from input stream: {0}", uri); while ( reader.readRecord() ) { line++; if ( line % 100 == 0 ) { logger.log(Level.FINER, "read line {0}", line); mon.setProgressMessage("read line "+line); } if ( columnUnits==null ) { boolean foundColumnNumbers= false; columnUnits= new Units[reader.getColumnCount()]; for ( int j=0; j=2 && reader.getColumnCount()<=5 ) { builder= new DataSetBuilder( 2, 100, reader.getColumnCount() ); u= AsciiParser.UNIT_UTC; bundleb= new double[reader.getColumnCount()]; icolumn= 0; } else { icolumn= reader.getColumnCount()-1; } } if ( idep0column==-1 && reader.getColumnCount()==2 ) { idep0column= 0; } Units oldDep0u= dep0u; Units oldU= u; if ( idep0column>=0 && !(dep0u instanceof TimeLocationUnits) ) { dep0ds= parseHeader( idep0column, reader.getHeader(idep0column),reader.get(idep0column) ); dep0u= SemanticOps.getUnits(dep0ds); } if ( !( u instanceof TimeLocationUnits ) && bundleb==null ) { icolumnDs= parseHeader( icolumn, reader.getHeader(icolumn),reader.get(icolumn) ); u= SemanticOps.getUnits(icolumnDs); } if ( columnUnits!=null ) { if ( oldDep0u != dep0u || oldU!=u ) { if ( bundleb!=null ) { builder= new DataSetBuilder( 2, 100, bundleb.length ); } else { builder= new DataSetBuilder( 1, 100 ); } tbuilder= new DataSetBuilder( 1, 100 ); } } } String badTimeTag= null; if ( columnUnits!=null ) try { if ( idep0column>=0 ) { if ( dep0u instanceof EnumerationUnits ) { tb= ((EnumerationUnits)dep0u).createDatum( reader.get(idep0column) ).doubleValue(dep0u) ; } else { tb= dep0u.parse(reader.get(idep0column)).doubleValue(dep0u); } } if ( bundleb!=null ) { int validCount= 0; for ( int j=0; j1 && ((int)columnHeaders[icol].charAt(0))==0xFEFF ) { //Excel UTF non-space columnHeaders[icol]= columnHeaders[icol].substring(1); } } Units u1= columnUnits[icol]; if ( columnHeaders.length<=icol ) { logger.log(Level.FINER, "too few column headers {0}<={1}", new Object[]{columnHeaders.length, icol}); yepItsData= false; continue; } if ( u1 instanceof EnumerationUnits ) { cbs[icol]= ((EnumerationUnits)u1).createDatum( columnHeaders[icol] ).doubleValue(u1); } else { try { cbs[icol]= u1.parse(columnHeaders[icol]).doubleValue(u1); } catch ( InconvertibleUnitsException ex ) { Datum d= DatumUtil.parse(columnHeaders[icol]); cbs[icol]= d.doubleValue(d.getUnits()); if ( yepItsData && !UnitsUtil.isNominalMeasurement(d.getUnits()) ) columnUnits[icol]= d.getUnits(); } } } catch ( ParseException ex ) { logger.log(Level.FINER, "parse exception at icol={0}", icol); yepItsData= false; } } if ( yepItsData && builder.getLength()=0 ) { tbuilder.putValue( -1, cbs[idep0column] ); tbuilder.nextRecord(); } if ( bundleb!=null ) { for ( int j=0; j=0 ) { tbuilder.putValue( -1, tb ); tbuilder.nextRecord(); } if ( bundleb!=null ) { for ( int j=0; j0 ) { // TODO: skip records, so that memory isn't consumed. ds= (DDataSet)ds.trim(irecStart,ds.length()); } if ( idep0column>=0 && dep0ds!=null ) { DDataSet tds= tbuilder.getDataSet(); tds.putProperty(QDataSet.UNITS,dep0u); tds.putProperty(QDataSet.NAME, dep0ds.property(QDataSet.NAME)); tds.putProperty(QDataSet.LABEL,dep0ds.property(QDataSet.LABEL)); ds.putProperty(QDataSet.DEPEND_0, tds); } if ( bundleb!=null ) { SparseDataSet bds= SparseDataSet.createRankLen( 2, bundleb.length ); for ( int j=0; j *
  • delimiter is not automatic. *
  • rank2 is always used. * */ private class CsvTableStreamingSource implements Streaming { public CsvTableStreamingSource() { } @Override public Iterator streamDataSet(ProgressMonitor mon) throws Exception { final AsciiTableStreamer result= new AsciiTableStreamer(); final BufferedReader reader = new BufferedReader( new InputStreamReader( getInputStream(new NullProgressMonitor() ) ) ); Runnable run= new Runnable() { @Override public void run() { try { String line; String sdelimiter= getParam("delim", ","); if ( sdelimiter.equals("COMMA") ) sdelimiter= ","; if ( sdelimiter.equals("SEMICOLON") ) sdelimiter= ";"; while ( (line=reader.readLine())!=null ) { String[] fields= line.split(sdelimiter); result.addRecord( Arrays.asList(fields) ); } result.setHasNext(false); logger.log(Level.FINE, "Done parsing {0}", getURI() ); } catch ( IOException ex ) { } finally { try { reader.close(); } catch ( IOException ex ) { logger.log( Level.WARNING, ex.getMessage(), ex ); } } } }; new Thread( run, "CsvTableDataStreamer" ).start(); //new ParserDelegator().parse( reader, callback, true ); return result; } } }