IntolerantThreadPoolExecutor.java
/*
* Copyright 2017 Andrew Rucker Jones.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.opencsv.bean.concurrent;
import com.opencsv.ICSVParser;
import com.opencsv.bean.util.OrderedObject;
import com.opencsv.exceptions.CsvException;
import org.apache.commons.collections4.ListValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.ObjectUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* This ThreadPoolExecutor automatically shuts down on any failed thread.
* <p>This is the historically established precedent for dealing with input errors
* in opencsv. This implementation expects all uncaught exceptions from its
* threads to be wrapped in a {@link java.lang.RuntimeException}. The number of
* threads in the pool is fixed.</p>
* <p>It is not intended for this executor to be instantiated and receive jobs
* directly. There are function-specific derived classes for that purpose.</p>
* <p>This executor adds significant logic to the basic idea of an
* {@link java.util.concurrent.Executor}, and thus must be used differently
* from other executors. Usage follows this pattern:
* <ol><li>{@link #prepare()}</li>
* <li>Submit tasks. This is not intended to be done directly to this class, but
* rather to one of the submission methods of the derived classes.</li>
* <li>{@link #complete()}</li>
* <li>The results are had by creating a {@link java.util.stream.Stream} out of
* the executor itself. This is most easily done with
* {@link java.util.stream.StreamSupport#stream(Spliterator, boolean)}</li>
* <li>Possibly {@link #getCapturedExceptions()}</li></ol></p>
* <p>The execution structure of this class is:
* <ol><li>The main thread (outside of this executor) parses input and passes
* it on to</li>
* <li>This executor, which performs a number of conversions in parallel and
* passes these results and any resultant errors to</li>
* <li>The accumulator, which creates an ordered list of the results.</li></ol></p>
* <p>The threads in the executor queue their results in a thread-safe
* queue, which should be O(1), minimizing wait time due to synchronization.
* The accumulator then removes items from the queue and inserts them into a
* sorted data structure, which is O(log n) on average and O(n) in the worst
* case. If the user has told us she doesn't need sorted data, the
* accumulator is not necessary, and thus is not started.</p>
*
* @param <T> The type of the object being created by the threads run
* @author Andrew Rucker Jones
* @since 4.0
*/
class IntolerantThreadPoolExecutor<T> extends ThreadPoolExecutor implements Spliterator<T> {
/** A queue of the beans created. */
protected final BlockingQueue<OrderedObject<T>> resultQueue = new LinkedBlockingQueue<>();
/** A queue of exceptions thrown by threads during processing. */
protected final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue = new LinkedBlockingQueue<>();
/** A sorted, concurrent map for the beans created. */
private ConcurrentNavigableMap<Long, T> resultantBeansMap = null;
/**
* A multi-valued map for any exceptions captured.
* <p>The multi-valued part is important because the same line can throw more
* than one exception.</p>
* <p><em>All access to this variable must be synchronized.</em></p>
* */
private ListValuedMap<Long, CsvException> thrownExceptionsMap = null;
/** A separate thread that accumulates and orders results. */
protected AccumulateCsvResults<T> accumulateThread = null;
/** A list of the ordinals of data records still to be expected by the accumulator. */
protected final SortedSet<Long> expectedRecords = new ConcurrentSkipListSet<>();
/**
* Determines whether resulting data sets have to be in the same order as
* the input.
*/
private final boolean orderedResults;
/** The locale for error messages. */
protected final Locale errorLocale;
/** The exception that caused this Executor to stop executing. */
private Throwable terminalException;
/**
* Constructor for a thread pool executor that stops by itself as soon as
* any thread throws an exception.
* Threads never time out and the queue for inbound work is unbounded.
* @param orderedResults Whether order should be preserved in the results
* @param errorLocale The errorLocale to use for error messages.
*/
IntolerantThreadPoolExecutor(boolean orderedResults, Locale errorLocale) {
super(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(), Long.MAX_VALUE,
TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>());
this.orderedResults = orderedResults;
this.errorLocale = ObjectUtils.defaultIfNull(errorLocale, Locale.getDefault());
}
/**
* Prepares this Executor to receive jobs.
*/
public void prepare() {
prestartAllCoreThreads();
// The ordered maps and accumulator are only necessary if ordering is
// stipulated. After this, the presence or absence of the accumulator is
// used to indicate ordering or not so as to guard against the unlikely
// problem that someone sets orderedResults right in the middle of
// processing.
if(orderedResults) {
resultantBeansMap = new ConcurrentSkipListMap<>();
thrownExceptionsMap = new ArrayListValuedHashMap<>();
// Start the process for accumulating results and cleaning up
accumulateThread = new AccumulateCsvResults<>(
resultQueue, thrownExceptionsQueue, expectedRecords,
resultantBeansMap, thrownExceptionsMap);
accumulateThread.start();
}
}
/**
* Sends a signal to the Executor that it should shut down once all threads
* have completed.
*
* @throws InterruptedException If the current thread is interrupted while
* waiting. Shouldn't be thrown, since the Executor
* waits indefinitely for all threads to end.
* @throws RejectedExecutionException If an exception during processing
* forced this Executor to shut down.
*/
public void complete() throws InterruptedException {
// Normal termination
shutdown();
awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Wait indefinitely
if(accumulateThread != null) {
accumulateThread.setMustStop(true);
accumulateThread.join();
}
// There's one more possibility: The very last bean caused a problem.
if(terminalException != null) {
// Trigger a catch in the calling method
throw new RejectedExecutionException();
}
}
/**
* Returns exceptions captured during the conversion process if
* the conversion process was set not to propagate these errors
* up the call stack.
* The call is nondestructive.
*
* @return All exceptions captured
*/
public List<CsvException> getCapturedExceptions() {
List<CsvException> returnList = null;
if(thrownExceptionsMap == null) {
returnList = thrownExceptionsQueue.stream()
.filter(Objects::nonNull)
.map(OrderedObject::getElement)
.collect(Collectors.toList());
}
else {
returnList = new LinkedList<>();
synchronized (thrownExceptionsMap) {
final List<CsvException> finalReturnList = returnList;
thrownExceptionsMap.keySet().stream()
.sorted()
.forEach(l -> finalReturnList.addAll(thrownExceptionsMap.get(l)));
}
}
return returnList;
}
@Override
public List<Runnable> shutdownNow() {
if(accumulateThread != null) {
accumulateThread.setMustStop(true);
try {
accumulateThread.join();
} catch (InterruptedException e) {
// Do nothing. Best faith effort.
}
}
return super.shutdownNow();
}
/**
* Shuts the Executor down if the thread ended in an exception.
* @param r {@inheritDoc}
* @param t {@inheritDoc}
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
if(t.getCause() != null) {
// Normally, everything that gets to this point should be
// wrapped in a RuntimeException to get past the lack of checked
// exceptions in Runnable.run().
terminalException = t.getCause();
}
else {
terminalException = t;
}
shutdownNow();
}
}
/**
* If an unrecoverable exception was thrown during processing, it can be
* retrieved here.
* @return The exception that halted one of the threads, which caused the
* executor to shut itself down
*/
public Throwable getTerminalException() {
return terminalException;
}
/**
* Checks whether exceptions are available that should halt processing.
* This is the case with unrecoverable errors, such as parsing the input,
* or if exceptions in conversion should be thrown by request of the user.
*/
protected void checkExceptions() {
if(terminalException != null) {
if(terminalException instanceof CsvException) {
CsvException csve = (CsvException) terminalException;
throw new RuntimeException(String.format(ResourceBundle.getBundle(ICSVParser.DEFAULT_BUNDLE_NAME, errorLocale).getString("parsing.error.linenumber"),
csve.getLineNumber(), String.join(",", ArrayUtils.nullToEmpty(csve.getLine()))), csve);
}
throw new RuntimeException(terminalException);
}
}
private boolean isConversionComplete() {
return isTerminated() && (accumulateThread == null || !accumulateThread.isAlive());
}
/**
* Determines whether more conversion results can be expected.
* Since {@link Spliterator}s have no way of indicating that they don't
* have a result at the moment, but might in the future, we must ensure
* that every call to {@link #tryAdvance(Consumer)} or {@link #trySplit()}
* only returns {@code null} if the entire conversion apparatus has shut
* down and all result queues are cleared. Thus, this method waits until
* either that is true, or there is truly at least one result that can be
* returned to users of the {@link Spliterator} interface.
*
* @return {@code false} if conversion is complete and no more results
* can ever be expected out of this {@link Spliterator}, {@code true}
* otherwise. If {@code true} is returned, it is guaranteed that at
* least one result is available immediately to the caller.
*/
private boolean areMoreResultsAvailable() {
// If an exception has been thrown that needs to be passed on,
// throw it here.
checkExceptions();
// Check conditions for completion
boolean elementFound = false;
while(!elementFound && !isConversionComplete()) {
if(accumulateThread == null) {
if(resultQueue.isEmpty()) {
Thread.yield();
}
else {
elementFound = true;
}
}
else {
if(resultantBeansMap.isEmpty()) {
Thread.yield();
}
else {
elementFound = true;
}
}
// If an exception has been thrown that needs to be passed on,
// throw it here.
checkExceptions();
}
return accumulateThread == null ? !resultQueue.isEmpty() : !resultantBeansMap.isEmpty();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T bean = null;
if (areMoreResultsAvailable()) {
// Since we are now guaranteed to have a result, we don't
// really have to do all of the null checking below, but
// better safe than sorry.
if(accumulateThread == null) {
OrderedObject<T> orderedObject = resultQueue.poll();
if(orderedObject != null) {
bean = orderedObject.getElement();
}
}
else {
Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
if(mapEntry != null) {
bean = mapEntry.getValue();
}
}
if(bean != null) {
action.accept(bean);
}
}
return bean != null;
}
// WARNING! This code is untested because I have no way of telling the JDK
// streaming code how to do its job.
@Override
public Spliterator<T> trySplit() {
Spliterator<T> s = null;
// Check if all threads are through
if(areMoreResultsAvailable()) {
if(isConversionComplete()) {
// Return everything we have
if(accumulateThread == null) {
s = resultQueue.stream().map(OrderedObject::getElement).spliterator();
}
else {
s = resultantBeansMap.values().spliterator();
}
}
else {
int size;
ArrayList<T> c;
if(accumulateThread == null) {
// May seem like an odd implementation, but we can't use
// resultQueue.drainTo() because bulk operations are not
// thread-safe. So, we have to poll each object individually.
// We don't want to use a LinkedList for the Spliterator
// because another split would presumably be inefficient. With
// an ArrayList, on the other hand, we have to make sure we
// avoid a costly resize operation.
size = resultQueue.size();
c = new ArrayList<>(size);
for(int i = 0; i < size; i++) {
// Result guaranteed to exist through areMoreResultsAvailable()
OrderedObject<T> orderedObject = resultQueue.poll();
if(orderedObject != null) {
c.add(orderedObject.getElement());
}
}
}
else {
size = resultantBeansMap.size();
c = new ArrayList<>(size);
for(int i = 0; i < size; i++) {
Map.Entry<Long, T> mapEntry = resultantBeansMap.pollFirstEntry();
if(mapEntry != null) {
c.add(mapEntry.getValue());
}
}
}
s = c.spliterator();
}
}
return s;
}
// WARNING! This code is untested because I have no way of telling the JDK
// streaming code how to do its job.
@Override
public long estimateSize() {
return accumulateThread == null ? resultQueue.size() : resultantBeansMap.size();
}
@Override
public int characteristics() {
int characteristics = Spliterator.CONCURRENT | Spliterator.NONNULL;
if(accumulateThread != null) {
characteristics |= Spliterator.ORDERED;
}
return characteristics;
}
}