1 /*
2 * Copyright 2017 Andrew Rucker Jones.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.opencsv.bean.concurrent;
17
18 import com.opencsv.bean.util.OrderedObject;
19 import com.opencsv.exceptions.CsvException;
20 import org.apache.commons.collections4.ListValuedMap;
21
22 import java.util.SortedSet;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentMap;
25
26 /**
27 * The accumulator takes two queues of results of transforming text input
28 * into bean output or bean input into text output (output and exceptions)
29 * and orders them for later consumption.
30 * This task is delegated to a separate thread so threads can quickly queue
31 * their results in a (synchronized, thread-safe) queue and move on with other
32 * work, while the relatively expensive operation of ordering the results
33 * doesn't block other threads waiting for access to the ordered map.
34 * @param <T> Type of output being created (bean or strings)
35 * @author Andrew Rucker Jones
36 * @since 4.0
37 */
38 class AccumulateCsvResults<T> extends Thread {
39 private final BlockingQueue<OrderedObject<T>> resultantBeansQueue;
40 private final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue;
41 private final SortedSet<Long> expectedRecords;
42 private final ConcurrentMap<Long, T> resultantBeanMap;
43 private boolean mustStop = false;
44
45 /** <em>All access to this variable must be synchronized.</em> */
46 private final ListValuedMap<Long, CsvException> thrownExceptionsMap;
47
48 /**
49 * The only accepted constructor for the accumulator.
50 * @param resultantBeansQueue A queue of beans coming out of the pool of
51 * threads creating them. The accumulator pulls from this queue.
52 * @param thrownExceptionsQueue A queue of
53 * {@link com.opencsv.exceptions.CsvException} and its derivatives coming
54 * out of the pool of threads creating beans. The accumulator pulls from
55 * this queue.
56 * @param expectedRecords A list of outstanding record numbers so gaps
57 * in ordering due to filtered input or exceptions
58 * while converting can be detected.
59 * @param resultantBeanMap The (ordered) map of beans that have been
60 * created. The accumulator inserts into this map.
61 * @param thrownExceptionsMap The map of suppressed exceptions thrown
62 * during bean creation. The accumulator inserts into this map. <em>All
63 * access to this variable must be synchronized.</em>
64 */
65 AccumulateCsvResults(BlockingQueue<OrderedObject<T>> resultantBeansQueue,
66 BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue,
67 SortedSet<Long> expectedRecords,
68 ConcurrentMap<Long, T> resultantBeanMap,
69 ListValuedMap<Long, CsvException> thrownExceptionsMap) {
70 super();
71 this.resultantBeansQueue = resultantBeansQueue;
72 this.thrownExceptionsQueue = thrownExceptionsQueue;
73 this.expectedRecords = expectedRecords;
74 this.resultantBeanMap = resultantBeanMap;
75 this.thrownExceptionsMap = thrownExceptionsMap;
76 }
77
78 /**
79 * Checks whether the accumulator should shut itself down.
80 * This method must always be used to check the value of the signal boolean,
81 * because it's synchronized.
82 * @return Whether the accumulator should stop
83 */
84 private synchronized boolean isMustStop() {
85 return mustStop;
86 }
87
88 /**
89 * Tells the accumulator whether it should stop.
90 * This method must always be used to set the value of the signal boolean,
91 * because it's synchronized.
92 * @param mustStop Whether the accumulator should stop
93 */
94 synchronized void setMustStop(boolean mustStop) {
95 this.mustStop = mustStop;
96 }
97
98 @Override
99 public void run() {
100 while(!isMustStop() || !resultantBeansQueue.isEmpty() || !thrownExceptionsQueue.isEmpty()) {
101 OrderedObject<T> orderedObject = null;
102
103 // Move the output objects from the unsorted queue to the
104 // navigable map. Only the next expected objects are moved;
105 // if a gap in numbering occurs, the thread waits until those
106 // results have been filled in before continuing.
107 if (!expectedRecords.isEmpty()) {
108 orderedObject = resultantBeansQueue.stream()
109 .filter(e -> expectedRecords.first().equals(e.getOrdinal()))
110 .findAny().orElse(null);
111 }
112 while(orderedObject != null) {
113 resultantBeansQueue.remove(orderedObject);
114 expectedRecords.remove(expectedRecords.first());
115 resultantBeanMap.put(orderedObject.getOrdinal(), orderedObject.getElement());
116 if(!expectedRecords.isEmpty()) {
117 orderedObject = resultantBeansQueue.stream()
118 .filter(e -> expectedRecords.first().equals(e.getOrdinal()))
119 .findAny().orElse(null);
120 }
121 else {
122 orderedObject = null;
123 }
124 }
125
126 // Move the exceptions from the unsorted queue to the navigable map.
127 while(!thrownExceptionsQueue.isEmpty()) {
128 OrderedObject<CsvException> capturedException = thrownExceptionsQueue.poll();
129 if(capturedException != null) {
130 synchronized (thrownExceptionsMap) {
131 thrownExceptionsMap.put(capturedException.getOrdinal(), capturedException.getElement());
132 }
133 }
134 }
135 Thread.yield();
136 }
137 }
138 }