1 /*
2 * Copyright 2017 Andrew Rucker Jones.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.opencsv.bean.concurrent;
17
18 import com.opencsv.bean.BeanVerifier;
19 import com.opencsv.bean.CsvToBeanFilter;
20 import com.opencsv.bean.MappingStrategy;
21 import com.opencsv.bean.exceptionhandler.CsvExceptionHandler;
22 import com.opencsv.bean.util.OpencsvUtils;
23 import com.opencsv.bean.util.OrderedObject;
24 import com.opencsv.exceptions.*;
25 import org.apache.commons.lang3.ArrayUtils;
26 import org.apache.commons.lang3.ObjectUtils;
27
28 import java.util.*;
29 import java.util.concurrent.BlockingQueue;
30
31 /**
32 * A class that encapsulates the job of creating a bean from a line of CSV input
33 * and making it possible to run those jobs in parallel.
34 * @param <T> The type of the bean being created
35 * @author Andrew Rucker Jones
36 * @since 4.0
37 */
38 public class ProcessCsvLine<T> implements Runnable {
39 private final long lineNumber;
40 private final MappingStrategy<? extends T> mapper;
41 private final CsvToBeanFilter filter;
42 private final List<BeanVerifier<T>> verifiers;
43 private final String[] line;
44 private final BlockingQueue<OrderedObject<T>> resultantBeanQueue;
45 private final BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue;
46 private final SortedSet<Long> expectedRecords;
47 private final CsvExceptionHandler exceptionHandler;
48
49 /**
50 * The only constructor for creating a bean out of a line of input.
51 * @param lineNumber Which record in the input file is being processed
52 * @param mapper The mapping strategy to be used
53 * @param filter A filter to remove beans from the running, if necessary.
54 * May be null.
55 * @param verifiers The list of verifiers to run on beans after creation
56 * @param line The line of input to be transformed into a bean
57 * @param resultantBeanQueue A queue in which to place the bean created
58 * @param thrownExceptionsQueue A queue in which to place a thrown
59 * exception, if one is thrown
60 * @param expectedRecords A list of outstanding record numbers so gaps
61 * in ordering due to filtered input or exceptions
62 * while converting can be detected.
63 * @param exceptionHandler The handler for exceptions thrown during record
64 * processing
65 */
66 public ProcessCsvLine(
67 long lineNumber, MappingStrategy<? extends T> mapper, CsvToBeanFilter filter,
68 List<BeanVerifier<T>> verifiers, String[] line,
69 BlockingQueue<OrderedObject<T>> resultantBeanQueue,
70 BlockingQueue<OrderedObject<CsvException>> thrownExceptionsQueue,
71 SortedSet<Long> expectedRecords, CsvExceptionHandler exceptionHandler) {
72 this.lineNumber = lineNumber;
73 this.mapper = mapper;
74 this.filter = filter;
75 this.verifiers = ObjectUtils.defaultIfNull(new ArrayList<>(verifiers), Collections.emptyList());
76 this.line = ArrayUtils.clone(line);
77 this.resultantBeanQueue = resultantBeanQueue;
78 this.thrownExceptionsQueue = thrownExceptionsQueue;
79 this.expectedRecords = expectedRecords;
80 this.exceptionHandler = exceptionHandler;
81 }
82
83 @Override
84 public void run() {
85 try {
86 if (filter == null || filter.allowLine(line)) {
87 T obj = processLine();
88 ListIterator<BeanVerifier<T>> verifierList = verifiers.listIterator();
89 boolean keep = true;
90 while(keep && verifierList.hasNext()) {
91 keep = verifierList.next().verifyBean(obj);
92 }
93 if (keep) {
94 OpencsvUtils.queueRefuseToAcceptDefeat(
95 resultantBeanQueue,
96 new OrderedObject<>(lineNumber, obj));
97 }
98 else {
99 expectedRecords.remove(lineNumber);
100 }
101 }
102 else {
103 expectedRecords.remove(lineNumber);
104 }
105 } catch (CsvException e) {
106 expectedRecords.remove(lineNumber);
107 e.setLine(line);
108 OpencsvUtils.handleException(e, lineNumber, exceptionHandler, thrownExceptionsQueue);
109 } catch (Exception e) {
110 expectedRecords.remove(lineNumber);
111 throw new RuntimeException(e);
112 }
113 }
114
115 /**
116 * Creates a single object from a line from the CSV file.
117 * @return Object containing the values.
118 * @throws CsvBeanIntrospectionException Thrown on error creating bean.
119 * @throws CsvBadConverterException If a custom converter cannot be
120 * initialized properly
121 * @throws CsvFieldAssignmentException A more specific subclass of this
122 * exception is thrown for any problem decoding and assigning a field
123 * of the input to a bean field
124 * @throws CsvChainedException If multiple exceptions are thrown for the
125 * same input line
126 */
127 private T processLine()
128 throws CsvBeanIntrospectionException,
129 CsvBadConverterException, CsvFieldAssignmentException,
130 CsvChainedException {
131 return mapper.populateNewBean(line);
132 }
133 }