View Javadoc
1   /*
2    * Copyright 2017 Andrew Rucker Jones.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.opencsv.bean.concurrent;
17  
18  import com.opencsv.bean.BeanVerifier;
19  import com.opencsv.bean.CsvToBeanFilter;
20  import com.opencsv.bean.MappingStrategy;
21  import com.opencsv.bean.exceptionhandler.CsvExceptionHandler;
22  import com.opencsv.bean.util.OpencsvUtils;
23  import com.opencsv.bean.util.OrderedObject;
24  import com.opencsv.exceptions.*;
25  import org.apache.commons.lang3.ArrayUtils;
26  import org.apache.commons.lang3.ObjectUtils;
27  
28  import java.util.*;
29  import java.util.concurrent.BlockingQueue;
30  
31  /**
32   * A class that encapsulates the job of creating a bean from a line of CSV input
33   * and making it possible to run those jobs in parallel.
34   * @param <T> The type of the bean being created
35   * @author Andrew Rucker Jones
36   * @since 4.0
37   */
38  public class ProcessCsvLine<T> implements Runnable {
39      private final long lineNumber;
40      private final MappingStrategy<? extends T> mapper;
41      private final CsvToBeanFilter filter;
42      private final List<BeanVerifier<T>> verifiers;
43      private final String[] line;
44      private final BlockingQueue<OrderedObject<T>> resultantBeanQueue;
45      private final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue;
46      private final SortedSet<Long> expectedRecords;
47      private final CsvExceptionHandler exceptionHandler;
48  
49      /**
50       * The only constructor for creating a bean out of a line of input.
51       * @param lineNumber Which record in the input file is being processed
52       * @param mapper The mapping strategy to be used
53       * @param filter A filter to remove beans from the running, if necessary.
54       *   May be null.
55       * @param verifiers The list of verifiers to run on beans after creation
56       * @param line The line of input to be transformed into a bean
57       * @param resultantBeanQueue A queue in which to place the bean created
58       * @param thrownExceptionsQueue A queue in which to place a thrown
59       *   exception, if one is thrown
60       * @param expectedRecords A list of outstanding record numbers so gaps
61       *                        in ordering due to filtered input or exceptions
62       *                        while converting can be detected.
63       * @param exceptionHandler The handler for exceptions thrown during record
64       *                         processing
65       */
66      public ProcessCsvLine(
67              long lineNumber, MappingStrategy<? extends T> mapper, CsvToBeanFilter filter,
68              List<BeanVerifier<T>> verifiers, String[] line,
69              BlockingQueue<OrderedObject<T>> resultantBeanQueue,
70              BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue,
71              SortedSet<Long> expectedRecords, CsvExceptionHandler exceptionHandler) {
72          this.lineNumber = lineNumber;
73          this.mapper = mapper;
74          this.filter = filter;
75          this.verifiers = ObjectUtils.defaultIfNull(new ArrayList<>(verifiers), Collections.emptyList());
76          this.line = ArrayUtils.clone(line);
77          this.resultantBeanQueue = resultantBeanQueue;
78          this.thrownExceptionsQueue = thrownExceptionsQueue;
79          this.expectedRecords = expectedRecords;
80          this.exceptionHandler = exceptionHandler;
81      }
82  
83      @Override
84      public void run() {
85          try {
86              if (filter == null || filter.allowLine(line)) {
87                  T obj = processLine();
88                  ListIterator<BeanVerifier<T>> verifierList = verifiers.listIterator();
89                  boolean keep = true;
90                  while(keep && verifierList.hasNext()) {
91                      keep = verifierList.next().verifyBean(obj);
92                  }
93                  if (keep) {
94                      OpencsvUtils.queueRefuseToAcceptDefeat(
95                              resultantBeanQueue,
96                              new OrderedObject<>(lineNumber, obj));
97                  }
98                  else {
99                      expectedRecords.remove(lineNumber);
100                 }
101             }
102             else {
103                 expectedRecords.remove(lineNumber);
104             }
105         } catch (CsvException e) {
106             expectedRecords.remove(lineNumber);
107             e.setLine(line);
108             OpencsvUtils.handleException(e, lineNumber, exceptionHandler, thrownExceptionsQueue);
109         } catch (Exception e) {
110             expectedRecords.remove(lineNumber);
111             throw new RuntimeException(e);
112         }
113     }
114 
115     /**
116      * Creates a single object from a line from the CSV file.
117      * @return Object containing the values.
118      * @throws CsvBeanIntrospectionException Thrown on error creating bean.
119      * @throws CsvBadConverterException If a custom converter cannot be
120      *   initialized properly
121      * @throws CsvFieldAssignmentException A more specific subclass of this
122      *   exception is thrown for any problem decoding and assigning a field
123      *   of the input to a bean field
124      * @throws CsvChainedException If multiple exceptions are thrown for the
125      * same input line
126      */
127     private T processLine()
128             throws CsvBeanIntrospectionException,
129             CsvBadConverterException, CsvFieldAssignmentException,
130             CsvChainedException {
131         return mapper.populateNewBean(line);
132     }
133 }