View Javadoc
1   package com.opencsv.bean.concurrent;
2   
3   import com.opencsv.ICSVParser;
4   import com.opencsv.bean.BeanVerifier;
5   import com.opencsv.bean.CsvToBeanFilter;
6   import com.opencsv.bean.MappingStrategy;
7   import com.opencsv.bean.exceptionhandler.CsvExceptionHandler;
8   import com.opencsv.exceptions.CsvMalformedLineException;
9   
10  import java.util.Arrays;
11  import java.util.List;
12  import java.util.Locale;
13  import java.util.ResourceBundle;
14  import java.util.concurrent.RejectedExecutionException;
15  
16  /**
17   * A specific derivative of {@link IntolerantThreadPoolExecutor} intended for
18   * submitting lines of input to be converted to beans.
19   *
20   * @param <T> The type of the bean being converted to
21   * @author Andrew Rucker Jones
22   * @since 5.0
23   */
24  public class LineExecutor<T> extends IntolerantThreadPoolExecutor<T> {
25  
26      private final CompleteFileReader<T> completeFileReader;
27  
28      /**
29       * The only constructor available for this class.
30       * @param orderedResults Whether order should be preserved in the results
31       * @param errorLocale The locale to use for error messages
32       * @param completeFileReader The thread that reads lines of input and feeds the
33       *                   results to this Executor
34       */
35      public LineExecutor(boolean orderedResults, Locale errorLocale, CompleteFileReader<T> completeFileReader) {
36          super(orderedResults, errorLocale);
37          this.completeFileReader = completeFileReader;
38      }
39  
40      @Override
41      public void prepare() {
42          Thread readerThread = new Thread(completeFileReader);
43          completeFileReader.setExecutor(this);
44          super.prepare();
45          readerThread.start();
46      }
47  
48      @Override
49      protected void checkExceptions() {
50          Throwable t = completeFileReader.getTerminalException();
51  
52          // RejectedExecutionException indicates a problem encountered when
53          // submitting a line for processing by the Executor, specifically that
54          // the Executor has shut down, but the base class will take care of
55          // errors that would cause the Executor to shut down.
56          if(t != null && !(t instanceof RejectedExecutionException)) {
57              shutdownNow();
58              if(t instanceof CsvMalformedLineException) {
59                  // Exception during parsing. Always unrecoverable.
60                  CsvMalformedLineException cmle = (CsvMalformedLineException) t;
61                  throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
62                          cmle.getLineNumber(), cmle.getContext()), cmle);
63              }
64              throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
65                      completeFileReader.getLineProcessed(), Arrays.toString(completeFileReader.getLine())), t);
66          }
67          super.checkExceptions();
68      }
69  
70      /**
71       * Submit one record for conversion to a bean.
72       *
73       * @param lineNumber Which record in the input file is being processed
74       * @param mapper The mapping strategy to be used
75       * @param filter A filter to remove beans from the running, if necessary.
76       *   May be null.
77       * @param verifiers The list of verifiers to run on beans after creation
78       * @param line The line of input to be transformed into a bean
79       * @param exceptionHandler The handler for exceptions thrown during record
80       *                         processing
81       */
82      public void submitLine(
83              long lineNumber, MappingStrategy<? extends T> mapper, CsvToBeanFilter filter,
84              List<BeanVerifier<T>> verifiers, String[] line,
85              CsvExceptionHandler exceptionHandler) {
86          if (accumulateThread != null) {
87              expectedRecords.add(lineNumber);
88          }
89          try {
90              execute(new ProcessCsvLine<>(
91                      lineNumber, mapper, filter, verifiers, line,
92                      resultQueue, thrownExceptionsQueue,
93                      expectedRecords, exceptionHandler));
94          } catch (Exception e) {
95              if(accumulateThread != null) {
96                  expectedRecords.remove(lineNumber);
97                  accumulateThread.setMustStop(true);
98              }
99              throw e;
100         }
101     }
102 }