View Javadoc
1   /*
2    * Copyright 2017 Andrew Rucker Jones.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.opencsv.bean.concurrent;
17  
18  import com.opencsv.bean.util.OrderedObject;
19  import com.opencsv.exceptions.CsvException;
20  import org.apache.commons.collections4.ListValuedMap;
21  
22  import java.util.SortedSet;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.ConcurrentMap;
25  
26  /**
27   * The accumulator takes two queues of results of transforming text input
28   * into bean output or bean input into text output (output and exceptions)
29   * and orders them for later consumption.
30   * This task is delegated to a separate thread so threads can quickly queue
31   * their results in a (synchronized, thread-safe) queue and move on with other
32   * work, while the relatively expensive operation of ordering the results
33   * doesn't block other threads waiting for access to the ordered map.
34   * @param <T> Type of output being created (bean or strings)
35   * @author Andrew Rucker Jones
36   * @since 4.0
37   */
38  class AccumulateCsvResults<T> extends Thread {
39      private final BlockingQueue<OrderedObject<T>> resultantBeansQueue;
40      private final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue;
41      private final SortedSet<Long> expectedRecords;
42      private final ConcurrentMap<Long, T> resultantBeanMap;
43      private boolean mustStop = false;
44  
45      /** <em>All access to this variable must be synchronized.</em> */
46      private final ListValuedMap<Long, CsvException> thrownExceptionsMap;
47  
48      /**
49       * The only accepted constructor for the accumulator.
50       * @param resultantBeansQueue A queue of beans coming out of the pool of
51       *   threads creating them. The accumulator pulls from this queue.
52       * @param thrownExceptionsQueue A queue of
53       *   {@link com.opencsv.exceptions.CsvException} and its derivatives coming
54       *   out of the pool of threads creating beans. The accumulator pulls from
55       *   this queue.
56       * @param expectedRecords A list of outstanding record numbers so gaps
57       *                        in ordering due to filtered input or exceptions
58       *                        while converting can be detected.
59       * @param resultantBeanMap The (ordered) map of beans that have been
60       *   created. The accumulator inserts into this map.
61       * @param thrownExceptionsMap The map of suppressed exceptions thrown
62       *   during bean creation. The accumulator inserts into this map. <em>All
63       *   access to this variable must be synchronized.</em>
64       */
65      AccumulateCsvResults(BlockingQueue<OrderedObject<T>> resultantBeansQueue,
66                           BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue,
67                           SortedSet<Long> expectedRecords,
68                           ConcurrentMap<Long, T> resultantBeanMap,
69                           ListValuedMap<Long, CsvException> thrownExceptionsMap) {
70          super();
71          this.resultantBeansQueue = resultantBeansQueue;
72          this.thrownExceptionsQueue = thrownExceptionsQueue;
73          this.expectedRecords = expectedRecords;
74          this.resultantBeanMap = resultantBeanMap;
75          this.thrownExceptionsMap = thrownExceptionsMap;
76      }
77  
78      /**
79       * Checks whether the accumulator should shut itself down.
80       * This method must always be used to check the value of the signal boolean,
81       * because it's synchronized.
82       * @return Whether the accumulator should stop
83       */
84      private synchronized boolean isMustStop() {
85          return mustStop;
86      }
87  
88      /**
89       * Tells the accumulator whether it should stop.
90       * This method must always be used to set the value of the signal boolean,
91       * because it's synchronized.
92       * @param mustStop Whether the accumulator should stop
93       */
94      synchronized void setMustStop(boolean mustStop) {
95          this.mustStop = mustStop;
96      }
97  
98      @Override
99      public void run() {
100         while(!isMustStop() || !resultantBeansQueue.isEmpty() || !thrownExceptionsQueue.isEmpty()) {
101             OrderedObject<T> orderedObject = null;
102 
103             // Move the output objects from the unsorted queue to the
104             // navigable map. Only the next expected objects are moved;
105             // if a gap in numbering occurs, the thread waits until those
106             // results have been filled in before continuing.
107             if (!expectedRecords.isEmpty()) {
108                 orderedObject = resultantBeansQueue.stream()
109                         .filter(e -> expectedRecords.first().equals(e.getOrdinal()))
110                         .findAny().orElse(null);
111             }
112             while(orderedObject != null) {
113                 resultantBeansQueue.remove(orderedObject);
114                 expectedRecords.remove(expectedRecords.first());
115                 resultantBeanMap.put(orderedObject.getOrdinal(), orderedObject.getElement());
116                 if(!expectedRecords.isEmpty()) {
117                     orderedObject = resultantBeansQueue.stream()
118                             .filter(e -> expectedRecords.first().equals(e.getOrdinal()))
119                             .findAny().orElse(null);
120                 }
121                 else {
122                     orderedObject = null;
123                 }
124             }
125 
126             // Move the exceptions from the unsorted queue to the navigable map.
127             while(!thrownExceptionsQueue.isEmpty()) {
128                 OrderedObject<CsvException> capturedException = thrownExceptionsQueue.poll();
129                 if(capturedException != null) {
130                     synchronized (thrownExceptionsMap) {
131                         thrownExceptionsMap.put(capturedException.getOrdinal(), capturedException.getElement());
132                     }
133                 }
134             }
135             Thread.yield();
136         }
137     }
138 }