1 /*
2 * Copyright 2017 Andrew Rucker Jones.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.opencsv.bean.concurrent;
17
18 import com.opencsv.ICSVParser;
19 import com.opencsv.bean.util.OrderedObject;
20 import com.opencsv.exceptions.CsvException;
21 import org.apache.commons.collections4.ListValuedMap;
22 import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
23 import org.apache.commons.lang3.ArrayUtils;
24 import org.apache.commons.lang3.ObjectUtils;
25
26 import java.util.*;
27 import java.util.concurrent.*;
28 import java.util.function.Consumer;
29 import java.util.stream.Collectors;
30
31 /**
32 * This ThreadPoolExecutor automatically shuts down on any failed thread.
33 * <p>This is the historically established precedent for dealing with input errors
34 * in opencsv. This implementation expects all uncaught exceptions from its
35 * threads to be wrapped in a {@link java.lang.RuntimeException}. The number of
36 * threads in the pool is fixed.</p>
37 * <p>It is not intended for this executor to be instantiated and receive jobs
38 * directly. There are function-specific derived classes for that purpose.</p>
39 * <p>This executor adds significant logic to the basic idea of an
40 * {@link java.util.concurrent.Executor}, and thus must be used differently
41 * from other executors. Usage follows this pattern:
42 * <ol><li>{@link #prepare()}</li>
43 * <li>Submit tasks. This is not intended to be done directly to this class, but
44 * rather to one of the submission methods of the derived classes.</li>
45 * <li>{@link #complete()}</li>
46 * <li>The results are had by creating a {@link java.util.stream.Stream} out of
47 * the executor itself. This is most easily done with
48 * {@link java.util.stream.StreamSupport#stream(Spliterator, boolean)}</li>
49 * <li>Possibly {@link #getCapturedExceptions()}</li></ol></p>
50 * <p>The execution structure of this class is:
51 * <ol><li>The main thread (outside of this executor) parses input and passes
52 * it on to</li>
53 * <li>This executor, which performs a number of conversions in parallel and
54 * passes these results and any resultant errors to</li>
55 * <li>The accumulator, which creates an ordered list of the results.</li></ol></p>
56 * <p>The threads in the executor queue their results in a thread-safe
57 * queue, which should be O(1), minimizing wait time due to synchronization.
58 * The accumulator then removes items from the queue and inserts them into a
59 * sorted data structure, which is O(log n) on average and O(n) in the worst
60 * case. If the user has told us she doesn't need sorted data, the
61 * accumulator is not necessary, and thus is not started.</p>
62 *
63 * @param <T> The type of the object being created by the threads run
64 * @author Andrew Rucker Jones
65 * @since 4.0
66 */
67 class IntolerantThreadPoolExecutor<T> extends ThreadPoolExecutor implements Spliterator<T> {
68
69 /** A queue of the beans created. */
70 protected final BlockingQueue<OrderedObject<T>> resultQueue = new LinkedBlockingQueue<>();
71
72 /** A queue of exceptions thrown by threads during processing. */
73 protected final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue = new LinkedBlockingQueue<>();
74
75 /** A sorted, concurrent map for the beans created. */
76 private ConcurrentNavigableMap<Long, T> resultantBeansMap = null;
77
78 /**
79 * A multi-valued map for any exceptions captured.
80 * <p>The multi-valued part is important because the same line can throw more
81 * than one exception.</p>
82 * <p><em>All access to this variable must be synchronized.</em></p>
83 * */
84 private ListValuedMap<Long, CsvException> thrownExceptionsMap = null;
85
86 /** A separate thread that accumulates and orders results. */
87 protected AccumulateCsvResults<T> accumulateThread = null;
88
89 /** A list of the ordinals of data records still to be expected by the accumulator. */
90 protected final SortedSet<Long> expectedRecords = new ConcurrentSkipListSet<>();
91
92 /**
93 * Determines whether resulting data sets have to be in the same order as
94 * the input.
95 */
96 private final boolean orderedResults;
97
98 /** The locale for error messages. */
99 protected final Locale errorLocale;
100
101 /** The exception that caused this Executor to stop executing. */
102 private Throwable terminalException;
103
104 /**
105 * Constructor for a thread pool executor that stops by itself as soon as
106 * any thread throws an exception.
107 * Threads never time out and the queue for inbound work is unbounded.
108 * @param orderedResults Whether order should be preserved in the results
109 * @param errorLocale The errorLocale to use for error messages.
110 */
111 IntolerantThreadPoolExecutor(boolean orderedResults, Locale errorLocale) {
112 super(Runtime.getRuntime().availableProcessors(),
113 Runtime.getRuntime().availableProcessors(), Long.MAX_VALUE,
114 TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>());
115 this.orderedResults = orderedResults;
116 this.errorLocale = ObjectUtils.defaultIfNull(errorLocale, Locale.getDefault());
117 }
118
119 /**
120 * Prepares this Executor to receive jobs.
121 */
122 public void prepare() {
123 prestartAllCoreThreads();
124
125 // The ordered maps and accumulator are only necessary if ordering is
126 // stipulated. After this, the presence or absence of the accumulator is
127 // used to indicate ordering or not so as to guard against the unlikely
128 // problem that someone sets orderedResults right in the middle of
129 // processing.
130 if(orderedResults) {
131 resultantBeansMap = new ConcurrentSkipListMap<>();
132 thrownExceptionsMap = new ArrayListValuedHashMap<>();
133
134 // Start the process for accumulating results and cleaning up
135 accumulateThread = new AccumulateCsvResults<>(
136 resultQueue, thrownExceptionsQueue, expectedRecords,
137 resultantBeansMap, thrownExceptionsMap);
138 accumulateThread.start();
139 }
140 }
141
142 /**
143 * Sends a signal to the Executor that it should shut down once all threads
144 * have completed.
145 *
146 * @throws InterruptedException If the current thread is interrupted while
147 * waiting. Shouldn't be thrown, since the Executor
148 * waits indefinitely for all threads to end.
149 * @throws RejectedExecutionException If an exception during processing
150 * forced this Executor to shut down.
151 */
152 public void complete() throws InterruptedException {
153 // Normal termination
154 shutdown();
155 awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Wait indefinitely
156 if(accumulateThread != null) {
157 accumulateThread.setMustStop(true);
158 accumulateThread.join();
159 }
160
161 // There's one more possibility: The very last bean caused a problem.
162 if(terminalException != null) {
163 // Trigger a catch in the calling method
164 throw new RejectedExecutionException();
165 }
166 }
167
168 /**
169 * Returns exceptions captured during the conversion process if
170 * the conversion process was set not to propagate these errors
171 * up the call stack.
172 * The call is nondestructive.
173 *
174 * @return All exceptions captured
175 */
176 public List<CsvException> getCapturedExceptions() {
177 List<CsvException> returnList = null;
178 if(thrownExceptionsMap == null) {
179 returnList = thrownExceptionsQueue.stream()
180 .filter(Objects::nonNull)
181 .map(OrderedObject::getElement)
182 .collect(Collectors.toList());
183 }
184 else {
185 returnList = new LinkedList<>();
186 synchronized (thrownExceptionsMap) {
187 final List<CsvException> finalReturnList = returnList;
188 thrownExceptionsMap.keySet().stream()
189 .sorted()
190 .forEach(l -> finalReturnList.addAll(thrownExceptionsMap.get(l)));
191 }
192 }
193 return returnList;
194 }
195
196 @Override
197 public List<Runnable> shutdownNow() {
198 if(accumulateThread != null) {
199 accumulateThread.setMustStop(true);
200 try {
201 accumulateThread.join();
202 } catch (InterruptedException e) {
203 // Do nothing. Best faith effort.
204 }
205 }
206 return super.shutdownNow();
207 }
208
209 /**
210 * Shuts the Executor down if the thread ended in an exception.
211 * @param r {@inheritDoc}
212 * @param t {@inheritDoc}
213 */
214 @Override
215 protected void afterExecute(Runnable r, Throwable t) {
216 super.afterExecute(r, t);
217 if(t != null) {
218 if(t.getCause() != null) {
219 // Normally, everything that gets to this point should be
220 // wrapped in a RuntimeException to get past the lack of checked
221 // exceptions in Runnable.run().
222 terminalException = t.getCause();
223 }
224 else {
225 terminalException = t;
226 }
227 shutdownNow();
228 }
229 }
230
231 /**
232 * If an unrecoverable exception was thrown during processing, it can be
233 * retrieved here.
234 * @return The exception that halted one of the threads, which caused the
235 * executor to shut itself down
236 */
237 public Throwable getTerminalException() {
238 return terminalException;
239 }
240
241 /**
242 * Checks whether exceptions are available that should halt processing.
243 * This is the case with unrecoverable errors, such as parsing the input,
244 * or if exceptions in conversion should be thrown by request of the user.
245 */
246 protected void checkExceptions() {
247 if(terminalException != null) {
248 if(terminalException instanceof CsvException) {
249 CsvException csve = (CsvException) terminalException;
250 throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.linenumber"),
251 csve.getLineNumber(), String.join(",", ArrayUtils.nullToEmpty(csve.getLine()))), csve);
252 }
253 throw new RuntimeException(terminalException);
254 }
255 }
256
257 private boolean isConversionComplete() {
258 return isTerminated() && (accumulateThread == null || !accumulateThread.isAlive());
259 }
260
261 /**
262 * Determines whether more conversion results can be expected.
263 * Since {@link Spliterator}s have no way of indicating that they don't
264 * have a result at the moment, but might in the future, we must ensure
265 * that every call to {@link #tryAdvance(Consumer)} or {@link #trySplit()}
266 * only returns {@code null} if the entire conversion apparatus has shut
267 * down and all result queues are cleared. Thus, this method waits until
268 * either that is true, or there is truly at least one result that can be
269 * returned to users of the {@link Spliterator} interface.
270 *
271 * @return {@code false} if conversion is complete and no more results
272 * can ever be expected out of this {@link Spliterator}, {@code true}
273 * otherwise. If {@code true} is returned, it is guaranteed that at
274 * least one result is available immediately to the caller.
275 */
276 private boolean areMoreResultsAvailable() {
277 // If an exception has been thrown that needs to be passed on,
278 // throw it here.
279 checkExceptions();
280
281 // Check conditions for completion
282 boolean elementFound = false;
283 while(!elementFound && !isConversionComplete()) {
284 if(accumulateThread == null) {
285 if(resultQueue.isEmpty()) {
286 Thread.yield();
287 }
288 else {
289 elementFound = true;
290 }
291 }
292 else {
293 if(resultantBeansMap.isEmpty()) {
294 Thread.yield();
295 }
296 else {
297 elementFound = true;
298 }
299 }
300
301 // If an exception has been thrown that needs to be passed on,
302 // throw it here.
303 checkExceptions();
304 }
305
306 return accumulateThread == null ? !resultQueue.isEmpty() : !resultantBeansMap.isEmpty();
307 }
308
309 @Override
310 public boolean tryAdvance(Consumer<? super T> action) {
311 T bean = null;
312
313 if (areMoreResultsAvailable()) {
314 // Since we are now guaranteed to have a result, we don't
315 // really have to do all of the null checking below, but
316 // better safe than sorry.
317 if(accumulateThread == null) {
318 OrderedObject<T> orderedObject = resultQueue.poll();
319 if(orderedObject != null) {
320 bean = orderedObject.getElement();
321 }
322 }
323 else {
324 Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
325 if(mapEntry != null) {
326 bean = mapEntry.getValue();
327 }
328 }
329 if(bean != null) {
330 action.accept(bean);
331 }
332 }
333
334 return bean != null;
335 }
336
337 // WARNING! This code is untested because I have no way of telling the JDK
338 // streaming code how to do its job.
339 @Override
340 public Spliterator<T> trySplit() {
341 Spliterator<T> s = null;
342
343 // Check if all threads are through
344 if(areMoreResultsAvailable()) {
345 if(isConversionComplete()) {
346 // Return everything we have
347 if(accumulateThread == null) {
348 s = resultQueue.stream().map(OrderedObject::getElement).spliterator();
349 }
350 else {
351 s = resultantBeansMap.values().spliterator();
352 }
353 }
354 else {
355 int size;
356 ArrayList<T> c;
357 if(accumulateThread == null) {
358 // May seem like an odd implementation, but we can't use
359 // resultQueue.drainTo() because bulk operations are not
360 // thread-safe. So, we have to poll each object individually.
361 // We don't want to use a LinkedList for the Spliterator
362 // because another split would presumably be inefficient. With
363 // an ArrayList, on the other hand, we have to make sure we
364 // avoid a costly resize operation.
365 size = resultQueue.size();
366 c = new ArrayList<>(size);
367 for(int i = 0; i < size; i++) {
368 // Result guaranteed to exist through areMoreResultsAvailable()
369 OrderedObject<T> orderedObject = resultQueue.poll();
370 if(orderedObject != null) {
371 c.add(orderedObject.getElement());
372 }
373
374 }
375 }
376 else {
377 size = resultantBeansMap.size();
378 c = new ArrayList<>(size);
379 for(int i = 0; i < size; i++) {
380 Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
381 if(mapEntry != null) {
382 c.add(mapEntry.getValue());
383 }
384 }
385 }
386 s = c.spliterator();
387 }
388 }
389
390 return s;
391 }
392
393 // WARNING! This code is untested because I have no way of telling the JDK
394 // streaming code how to do its job.
395 @Override
396 public long estimateSize() {
397 return accumulateThread == null ? resultQueue.size() : resultantBeansMap.size();
398 }
399
400 @Override
401 public int characteristics() {
402 int characteristics = Spliterator.CONCURRENT | Spliterator.NONNULL;
403 if(accumulateThread != null) {
404 characteristics |= Spliterator.ORDERED;
405 }
406 return characteristics;
407 }
408 }