View Javadoc
1   /*
2    * Copyright 2017 Andrew Rucker Jones.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.opencsv.bean.concurrent;
17  
18  import com.opencsv.ICSVParser;
19  import com.opencsv.bean.util.OrderedObject;
20  import com.opencsv.exceptions.CsvException;
21  import org.apache.commons.collections4.ListValuedMap;
22  import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
23  import org.apache.commons.lang3.ArrayUtils;
24  import org.apache.commons.lang3.ObjectUtils;
25  
26  import java.util.*;
27  import java.util.concurrent.*;
28  import java.util.function.Consumer;
29  import java.util.stream.Collectors;
30  
31  /**
32   * This ThreadPoolExecutor automatically shuts down on any failed thread.
33   * <p>This is the historically established precedent for dealing with input errors
34   * in opencsv. This implementation expects all uncaught exceptions from its
35   * threads to be wrapped in a {@link java.lang.RuntimeException}. The number of
36   * threads in the pool is fixed.</p>
37   * <p>It is not intended for this executor to be instantiated and receive jobs
38   * directly. There are function-specific derived classes for that purpose.</p>
39   * <p>This executor adds significant logic to the basic idea of an
40   * {@link java.util.concurrent.Executor}, and thus must be used differently
41   * from other executors. Usage follows this pattern:
42   * <ol><li>{@link #prepare()}</li>
43   * <li>Submit tasks. This is not intended to be done directly to this class, but
44   * rather to one of the submission methods of the derived classes.</li>
45   * <li>{@link #complete()}</li>
46   * <li>The results are had by creating a {@link java.util.stream.Stream} out of
47   * the executor itself. This is most easily done with
48   * {@link java.util.stream.StreamSupport#stream(Spliterator, boolean)}</li>
49   * <li>Possibly {@link #getCapturedExceptions()}</li></ol></p>
50   * <p>The execution structure of this class is:
51   * <ol><li>The main thread (outside of this executor) parses input and passes
52   * it on to</li>
53   * <li>This executor, which performs a number of conversions in parallel and
54   * passes these results and any resultant errors to</li>
55   * <li>The accumulator, which creates an ordered list of the results.</li></ol></p>
56   * <p>The threads in the executor queue their results in a thread-safe
57   * queue, which should be O(1), minimizing wait time due to synchronization.
58   * The accumulator then removes items from the queue and inserts them into a
59   * sorted data structure, which is O(log n) on average and O(n) in the worst
60   * case. If the user has told us she doesn't need sorted data, the
61   * accumulator is not necessary, and thus is not started.</p>
62   *
63   * @param <T> The type of the object being created by the threads run
64   * @author Andrew Rucker Jones
65   * @since 4.0
66   */
67  class IntolerantThreadPoolExecutor<T> extends ThreadPoolExecutor implements Spliterator<T> {
68  
69      /** A queue of the beans created. */
70      protected final BlockingQueue<OrderedObject<T>> resultQueue = new LinkedBlockingQueue<>();
71  
72      /** A queue of exceptions thrown by threads during processing. */
73      protected final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue = new LinkedBlockingQueue<>();
74  
75      /** A sorted, concurrent map for the beans created. */
76      private ConcurrentNavigableMap<Long, T> resultantBeansMap = null;
77  
78      /**
79       * A multi-valued map for any exceptions captured.
80       * <p>The multi-valued part is important because the same line can throw more
81       * than one exception.</p>
82       * <p><em>All access to this variable must be synchronized.</em></p>
83       * */
84      private ListValuedMap<Long, CsvException> thrownExceptionsMap = null;
85  
86      /** A separate thread that accumulates and orders results. */
87      protected AccumulateCsvResults<T> accumulateThread = null;
88  
89      /** A list of the ordinals of data records still to be expected by the accumulator. */
90      protected final SortedSet<Long> expectedRecords = new ConcurrentSkipListSet<>();
91  
92      /**
93       * Determines whether resulting data sets have to be in the same order as
94       * the input.
95       */
96      private final boolean orderedResults;
97  
98      /** The locale for error messages. */
99      protected final Locale errorLocale;
100 
101     /** The exception that caused this Executor to stop executing. */
102     private Throwable terminalException;
103 
104     /**
105      * Constructor for a thread pool executor that stops by itself as soon as
106      * any thread throws an exception.
107      * Threads never time out and the queue for inbound work is unbounded.
108      * @param orderedResults Whether order should be preserved in the results
109      * @param errorLocale The errorLocale to use for error messages.
110      */
111     IntolerantThreadPoolExecutor(boolean orderedResults, Locale errorLocale) {
112         super(Runtime.getRuntime().availableProcessors(),
113                 Runtime.getRuntime().availableProcessors(), Long.MAX_VALUE,
114                 TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>());
115         this.orderedResults = orderedResults;
116         this.errorLocale = ObjectUtils.defaultIfNull(errorLocale, Locale.getDefault());
117     }
118 
119     /**
120      * Prepares this Executor to receive jobs.
121      */
122     public void prepare() {
123         prestartAllCoreThreads();
124 
125         // The ordered maps and accumulator are only necessary if ordering is
126         // stipulated. After this, the presence or absence of the accumulator is
127         // used to indicate ordering or not so as to guard against the unlikely
128         // problem that someone sets orderedResults right in the middle of
129         // processing.
130         if(orderedResults) {
131             resultantBeansMap = new ConcurrentSkipListMap<>();
132             thrownExceptionsMap = new ArrayListValuedHashMap<>();
133 
134             // Start the process for accumulating results and cleaning up
135             accumulateThread = new AccumulateCsvResults<>(
136                     resultQueue, thrownExceptionsQueue, expectedRecords,
137                     resultantBeansMap, thrownExceptionsMap);
138             accumulateThread.start();
139         }
140     }
141 
142     /**
143      * Sends a signal to the Executor that it should shut down once all threads
144      * have completed.
145      *
146      * @throws InterruptedException If the current thread is interrupted while
147      * waiting. Shouldn't be thrown, since the Executor
148      * waits indefinitely for all threads to end.
149      * @throws RejectedExecutionException If an exception during processing
150      * forced this Executor to shut down.
151      */
152     public void complete() throws InterruptedException {
153         // Normal termination
154         shutdown();
155         awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Wait indefinitely
156         if(accumulateThread != null) {
157             accumulateThread.setMustStop(true);
158             accumulateThread.join();
159         }
160 
161         // There's one more possibility: The very last bean caused a problem.
162         if(terminalException != null) {
163             // Trigger a catch in the calling method
164             throw new RejectedExecutionException();
165         }
166     }
167 
168     /**
169      * Returns exceptions captured during the conversion process if
170      * the conversion process was set not to propagate these errors
171      * up the call stack.
172      * The call is nondestructive.
173      *
174      * @return All exceptions captured
175      */
176     public List<CsvException> getCapturedExceptions() {
177         List<CsvException> returnList = null;
178         if(thrownExceptionsMap == null) {
179             returnList = thrownExceptionsQueue.stream()
180                     .filter(Objects::nonNull)
181                     .map(OrderedObject::getElement)
182                     .collect(Collectors.toList());
183         }
184         else {
185             returnList = new LinkedList<>();
186             synchronized (thrownExceptionsMap) {
187                 final List<CsvException> finalReturnList = returnList;
188                 thrownExceptionsMap.keySet().stream()
189                         .sorted()
190                         .forEach(l -> finalReturnList.addAll(thrownExceptionsMap.get(l)));
191             }
192         }
193         return returnList;
194     }
195 
196     @Override
197     public List<Runnable> shutdownNow() {
198         if(accumulateThread != null) {
199             accumulateThread.setMustStop(true);
200             try {
201                 accumulateThread.join();
202             } catch (InterruptedException e) {
203                 // Do nothing. Best faith effort.
204             }
205         }
206         return super.shutdownNow();
207     }
208 
209     /**
210      * Shuts the Executor down if the thread ended in an exception.
211      * @param r {@inheritDoc}
212      * @param t {@inheritDoc} 
213      */
214     @Override
215     protected void afterExecute(Runnable r, Throwable t) {
216         super.afterExecute(r, t);
217         if(t != null) {
218             if(t.getCause() != null) {
219                 // Normally, everything that gets to this point should be
220                 // wrapped in a RuntimeException to get past the lack of checked
221                 // exceptions in Runnable.run().
222                 terminalException = t.getCause();
223             }
224             else {
225                 terminalException = t;
226             }
227             shutdownNow();
228         }
229     }
230     
231     /**
232      * If an unrecoverable exception was thrown during processing, it can be
233      * retrieved here.
234      * @return The exception that halted one of the threads, which caused the
235      *   executor to shut itself down
236      */
237     public Throwable getTerminalException() {
238         return terminalException;
239     }
240 
241     /**
242      * Checks whether exceptions are available that should halt processing.
243      * This is the case with unrecoverable errors, such as parsing the input,
244      * or if exceptions in conversion should be thrown by request of the user.
245      */
246     protected void checkExceptions() {
247         if(terminalException != null) {
248             if(terminalException instanceof CsvException) {
249                 CsvException csve = (CsvException) terminalException;
250                 throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.linenumber"),
251                         csve.getLineNumber(), String.join(",", ArrayUtils.nullToEmpty(csve.getLine()))), csve);
252             }
253             throw new RuntimeException(terminalException);
254         }
255     }
256 
257     private boolean isConversionComplete() {
258         return isTerminated() && (accumulateThread == null || !accumulateThread.isAlive());
259     }
260 
261     /**
262      * Determines whether more conversion results can be expected.
263      * Since {@link Spliterator}s have no way of indicating that they don't
264      * have a result at the moment, but might in the future, we must ensure
265      * that every call to {@link #tryAdvance(Consumer)} or {@link #trySplit()}
266      * only returns {@code null} if the entire conversion apparatus has shut
267      * down and all result queues are cleared. Thus, this method waits until
268      * either that is true, or there is truly at least one result that can be
269      * returned to users of the {@link Spliterator} interface.
270      *
271      * @return {@code false} if conversion is complete and no more results
272      *   can ever be expected out of this {@link Spliterator}, {@code true}
273      *   otherwise. If {@code true} is returned, it is guaranteed that at
274      *   least one result is available immediately to the caller.
275      */
276     private boolean areMoreResultsAvailable() {
277         // If an exception has been thrown that needs to be passed on,
278         // throw it here.
279         checkExceptions();
280 
281         // Check conditions for completion
282         boolean elementFound = false;
283         while(!elementFound && !isConversionComplete()) {
284             if(accumulateThread == null) {
285                 if(resultQueue.isEmpty()) {
286                     Thread.yield();
287                 }
288                 else {
289                     elementFound = true;
290                 }
291             }
292             else {
293                 if(resultantBeansMap.isEmpty()) {
294                     Thread.yield();
295                 }
296                 else {
297                     elementFound = true;
298                 }
299             }
300 
301             // If an exception has been thrown that needs to be passed on,
302             // throw it here.
303             checkExceptions();
304         }
305 
306         return accumulateThread == null ? !resultQueue.isEmpty() : !resultantBeansMap.isEmpty();
307     }
308 
309     @Override
310     public boolean tryAdvance(Consumer<? super T> action) {
311         T bean = null;
312 
313         if (areMoreResultsAvailable()) {
314             // Since we are now guaranteed to have a result, we don't
315             // really have to do all of the null checking below, but
316             // better safe than sorry.
317             if(accumulateThread == null) {
318                 OrderedObject<T> orderedObject = resultQueue.poll();
319                 if(orderedObject != null) {
320                     bean = orderedObject.getElement();
321                 }
322             }
323             else {
324                 Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
325                 if(mapEntry != null) {
326                     bean = mapEntry.getValue();
327                 }
328             }
329             if(bean != null) {
330                 action.accept(bean);
331             }
332         }
333 
334         return bean != null;
335     }
336 
337     // WARNING! This code is untested because I have no way of telling the JDK
338     // streaming code how to do its job.
339     @Override
340     public Spliterator<T> trySplit() {
341         Spliterator<T> s = null;
342 
343         // Check if all threads are through
344         if(areMoreResultsAvailable()) {
345             if(isConversionComplete()) {
346                 // Return everything we have
347                 if(accumulateThread == null) {
348                     s = resultQueue.stream().map(OrderedObject::getElement).spliterator();
349                 }
350                 else {
351                     s = resultantBeansMap.values().spliterator();
352                 }
353             }
354             else {
355                 int size;
356                 ArrayList<T> c;
357                 if(accumulateThread == null) {
358                     // May seem like an odd implementation, but we can't use
359                     // resultQueue.drainTo() because bulk operations are not
360                     // thread-safe. So, we have to poll each object individually.
361                     // We don't want to use a LinkedList for the Spliterator
362                     // because another split would presumably be inefficient. With
363                     // an ArrayList, on the other hand, we have to make sure we
364                     // avoid a costly resize operation.
365                     size = resultQueue.size();
366                     c = new ArrayList<>(size);
367                     for(int i = 0; i < size; i++) {
368                         // Result guaranteed to exist through areMoreResultsAvailable()
369                         OrderedObject<T> orderedObject = resultQueue.poll();
370                         if(orderedObject != null) {
371                             c.add(orderedObject.getElement());
372                         }
373 
374                     }
375                 }
376                 else {
377                     size = resultantBeansMap.size();
378                     c = new ArrayList<>(size);
379                     for(int i = 0; i < size; i++) {
380                         Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
381                         if(mapEntry != null) {
382                             c.add(mapEntry.getValue());
383                         }
384                     }
385                 }
386                 s = c.spliterator();
387             }
388         }
389 
390         return s;
391     }
392 
393     // WARNING! This code is untested because I have no way of telling the JDK
394     // streaming code how to do its job.
395     @Override
396     public long estimateSize() {
397         return accumulateThread == null ? resultQueue.size() : resultantBeansMap.size();
398     }
399 
400     @Override
401     public int characteristics() {
402         int characteristics = Spliterator.CONCURRENT | Spliterator.NONNULL;
403         if(accumulateThread != null) {
404             characteristics |= Spliterator.ORDERED;
405         }
406         return characteristics;
407     }
408 }