1 package com.opencsv.bean.concurrent;
2
3 import com.opencsv.ICSVParser;
4 import com.opencsv.bean.BeanVerifier;
5 import com.opencsv.bean.CsvToBeanFilter;
6 import com.opencsv.bean.MappingStrategy;
7 import com.opencsv.bean.exceptionhandler.CsvExceptionHandler;
8 import com.opencsv.exceptions.CsvMalformedLineException;
9
10 import java.util.Arrays;
11 import java.util.List;
12 import java.util.Locale;
13 import java.util.ResourceBundle;
14 import java.util.concurrent.RejectedExecutionException;
15
16
17
18
19
20
21
22
23
24 public class LineExecutor<T> extends IntolerantThreadPoolExecutor<T> {
25
26 private final CompleteFileReader<T> completeFileReader;
27
28
29
30
31
32
33
34
35 public LineExecutor(boolean orderedResults, Locale errorLocale, CompleteFileReader<T> completeFileReader) {
36 super(orderedResults, errorLocale);
37 this.completeFileReader = completeFileReader;
38 }
39
40 @Override
41 public void prepare() {
42 Thread readerThread = new Thread(completeFileReader);
43 completeFileReader.setExecutor(this);
44 super.prepare();
45 readerThread.start();
46 }
47
48 @Override
49 protected void checkExceptions() {
50 Throwable t = completeFileReader.getTerminalException();
51
52
53
54
55
56 if(t != null && !(t instanceof RejectedExecutionException)) {
57 shutdownNow();
58 if(t instanceof CsvMalformedLineException) {
59
60 CsvMalformedLineException cmle = (CsvMalformedLineException) t;
61 throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
62 cmle.getLineNumber(), cmle.getContext()), cmle);
63 }
64 throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.full"),
65 completeFileReader.getLineProcessed(), Arrays.toString(completeFileReader.getLine())), t);
66 }
67 super.checkExceptions();
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public void submitLine(
83 long lineNumber, MappingStrategy<? extends T> mapper, CsvToBeanFilter filter,
84 List<BeanVerifier<T>> verifiers, String[] line,
85 CsvExceptionHandler exceptionHandler) {
86 if (accumulateThread != null) {
87 expectedRecords.add(lineNumber);
88 }
89 try {
90 execute(new ProcessCsvLine<>(
91 lineNumber, mapper, filter, verifiers, line,
92 resultQueue, thrownExceptionsQueue,
93 expectedRecords, exceptionHandler));
94 } catch (Exception e) {
95 if(accumulateThread != null) {
96 expectedRecords.remove(lineNumber);
97 accumulateThread.setMustStop(true);
98 }
99 throw e;
100 }
101 }
102 }