LineExecutor.java
package com.opencsv.bean.concurrent;
import com.opencsv.ICSVParser;
import com.opencsv.bean.BeanVerifier;
import com.opencsv.bean.CsvToBeanFilter;
import com.opencsv.bean.MappingStrategy;
import com.opencsv.bean.exceptionhandler.CsvExceptionHandler;
import com.opencsv.exceptions.CsvMalformedLineException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import java.util.concurrent.RejectedExecutionException;
/**
* A specific derivative of {@link IntolerantThreadPoolExecutor} intended for
* submitting lines of input to be converted to beans.
*
* @param <T> The type of the bean being converted to
* @author Andrew Rucker Jones
* @since 5.0
*/
public class LineExecutor<T> extends IntolerantThreadPoolExecutor<T> {
private final CompleteFileReader<T> completeFileReader;
/**
* The only constructor available for this class.
* @param orderedResults Whether order should be preserved in the results
* @param errorLocale The locale to use for error messages
* @param completeFileReader The thread that reads lines of input and feeds the
* results to this Executor
*/
public LineExecutor(boolean orderedResults, Locale errorLocale, CompleteFileReader<T> completeFileReader) {
super(orderedResults, errorLocale);
this.completeFileReader = completeFileReader;
}
@Override
public void prepare() {
Thread readerThread = new Thread(completeFileReader);
completeFileReader.setExecutor(this);
super.prepare();
readerThread.start();
}
@Override
protected void checkExceptions() {
Throwable t = completeFileReader.getTerminalException();
// RejectedExecutionException indicates a problem encountered when
// submitting a line for processing by the Executor, specifically that
// the Executor has shut down, but the base class will take care of
// errors that would cause the Executor to shut down.
if(t != null && !(t instanceof RejectedExecutionException)) {
shutdownNow();
if(t instanceof CsvMalformedLineException) {
// Exception during parsing. Always unrecoverable.
CsvMalformedLineException cmle = (CsvMalformedLineException) t;
throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
cmle.getLineNumber(), cmle.getContext()), cmle);
}
throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
completeFileReader.getLineProcessed(), Arrays.toString(completeFileReader.getLine())), t);
}
super.checkExceptions();
}
/**
* Submit one record for conversion to a bean.
*
* @param lineNumber Which record in the input file is being processed
* @param mapper The mapping strategy to be used
* @param filter A filter to remove beans from the running, if necessary.
* May be null.
* @param verifiers The list of verifiers to run on beans after creation
* @param line The line of input to be transformed into a bean
* @param exceptionHandler The handler for exceptions thrown during record
* processing
*/
public void submitLine(
long lineNumber, MappingStrategy<? extends T> mapper, CsvToBeanFilter filter,
List<BeanVerifier<T>> verifiers, String[] line,
CsvExceptionHandler exceptionHandler) {
if (accumulateThread != null) {
expectedRecords.add(lineNumber);
}
try {
execute(new ProcessCsvLine<>(
lineNumber, mapper, filter, verifiers, line,
resultQueue, thrownExceptionsQueue,
expectedRecords, exceptionHandler));
} catch (Exception e) {
if(accumulateThread != null) {
expectedRecords.remove(lineNumber);
accumulateThread.setMustStop(true);
}
throw e;
}
}
}