명부 미래로 순서
- 순서
로 변환하려고 List<CompletableFuture<X>>
합니다 CompletableFuture<List<T>>
. 이것은 많은 비동기 작업이 있고 모든 작업의 결과를 얻어야 할 때 매우 유용합니다.
그중 하나라도 실패하면 최종 미래는 실패합니다. 이것이 내가 구현 한 방법입니다.
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
실행하려면 :
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);
그중 하나라도 실패하면 실패합니다. 백만 개의 선물이 있어도 예상대로 출력을 제공합니다. 제가 가진 문제는 : 5000 개 이상의 선물이 있고 그중 하나라도 실패하면 다음과 같은 결과를 얻습니다 StackOverflowError
.
java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) at java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java) 스레드 "pool-1-thread-2611"java.lang.StackOverflowError 예외 : 1487) at java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) at java.util.concurrent.CompletableFuture $ ThenCompose.run ( CompletableFuture.java:1487)
내가 뭘 잘못하고 있니?
참고 : 위의 반환 된 future는 미래가 실패 할 때 바로 실패합니다. 받아 들여진 대답도이 점을 취해야합니다.
사용 CompletableFuture.allOf(...)
:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
구현에 대한 몇 가지 의견 :
귀하의 사용 .thenComposeAsync
, .thenApplyAsync
그리고 .thenCombineAsync
가능성이 당신이 기대하는 일을하지 않습니다. 이러한 ...Async
메서드는 별도의 스레드에서 제공된 함수를 실행합니다. 따라서 귀하의 경우에는 제공된 실행기에서 실행되도록 목록에 새 항목을 추가합니다. 간단한 작업을 캐시 된 스레드 실행기에 넣을 필요가 없습니다. thenXXXXAsync
정당한 이유없이 방법을 사용하지 마십시오 .
또한 reduce
변이 가능한 용기에 축적하는 데 사용해서는 안됩니다. 스트림이 순차적 인 경우 올바르게 작동 할 수 있지만 스트림이 병렬로 만들어지면 실패합니다. 가변 감소를 수행하려면 .collect
대신 사용하십시오.
첫 번째 실패 직후 예외적으로 전체 계산을 완료하려면 sequence
방법 에서 다음을 수행하십시오 .
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;
추가로, 당신은 첫 번째 실패의 나머지 작업을 취소하려면, 추가 exec.shutdownNow();
직후 result.completeExceptionally(ex);
. 물론 exec
이것은 하나의 계산에 대해서만 존재 한다고 가정합니다 . 그렇지 않은 경우 나머지 Future
각각을 개별적 으로 반복하고 취소해야합니다 .
으로 미샤는 지적 , 당신은 남용입니다 …Async
작업. 또한 프로그램 논리를 반영하지 않는 종속성을 모델링하는 복잡한 작업 체인을 구성하고 있습니다.
- 목록의 첫 번째 및 두 번째 작업에 의존하는 작업 x를 만듭니다.
- 직업 x와 목록의 세 번째 직업에 의존하는 직업 x + 1을 만듭니다.
- 작업 x + 1과 목록의 네 번째 작업에 따라 작업 x + 2를 만듭니다.
- …
- 작업 x + 4999와 목록의 마지막 작업에 의존하는 작업 x + 5000을 만듭니다.
그런 다음 (명시 적으로 또는 예외로 인해)이 재귀 적으로 구성된 작업을 취소하면 재귀 적으로 수행 될 수 있으며 StackOverflowError
. 이는 구현에 따라 다릅니다.
Misha가 이미 보여 주었 듯이 , allOf
원래 의도를 모델링하고 목록의 모든 작업에 의존하는 하나의 작업을 정의 할 수 있는 방법 이 있습니다.
그러나 그것조차 필요하지 않다는 점은 주목할 가치가 있습니다. 제한되지 않은 스레드 풀 실행기를 사용하고 있으므로 결과를 수집하는 비동기 작업을 목록에 게시하면 완료됩니다. 완료를 기다리는 것은 어쨌든 각 작업의 결과를 묻는 것으로 함축 됩니다.
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);
스레드 수가 제한되고 작업이 추가 비동기 작업을 생성 할 수있는 경우 종속 작업을 구성하는 방법을 사용하는 것이 중요합니다. 대기중인 작업이 먼저 완료해야하는 작업에서 스레드를 훔쳐가는 것을 방지하기 위해 여기에는 해당되지 않습니다.
이 특정 경우에 하나의 작업이 단순히이 많은 수의 전제 조건 작업을 반복하고 필요한 경우 대기하는 것이이 많은 종속성을 모델링하고 각 작업이 완료에 대해 종속 된 작업을 알리는 것보다 더 효율적일 수 있습니다.
Spotify의 CompletableFutures
라이브러리 및 사용 allAsList
방법을 얻을 수 있습니다 . Guava의 Futures.allAsList
방법 에서 영감을 얻은 것 같습니다 .
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {
라이브러리를 사용하지 않으려는 경우 다음은 간단한 구현입니다.
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}
@Misha가 수락 한 답변에 추가하려면 수집기로 더 확장 할 수 있습니다.
public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}
이제 다음을 수행 할 수 있습니다.
Stream<CompletableFuture<Integer>> stream = Stream.of(
CompletableFuture.completedFuture(1),
CompletableFuture.completedFuture(2),
CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());
CompletableFuture에서 thenCombine을 사용한 시퀀스 작업의 예
public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){
CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());
BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList =
(acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});
BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;
return com.stream()
.reduce(identity,
combineToList,
combineLists);
}
}
타사 라이브러리를 사용해도 괜찮다면 cyclops-react (저는 저자입니다)에는 CompletableFutures (및 Optionals, Streams 등)에 대한 일련의 유틸리티 메서드가 있습니다.
List<CompletableFuture<String>> listOfFutures;
CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);
면책 조항 : 이것은 초기 질문에 완전히 대답하지 않습니다. "하나가 실패하면 모두 실패"부분이 부족합니다. 그러나이 질문의 중복으로 닫 혔기 때문에 실제적이고 더 일반적인 질문에 대답 할 수 없습니다. Java 8 CompletableFuture.allOf (...) with Collection 또는 List . 그래서 여기에 대답하겠습니다.
Java 8의 스트림 API
List<CompletableFuture<V>>
를CompletableFuture<List<V>>
사용 하도록 변환하는 방법은 무엇입니까?
요약 : 다음을 사용하십시오.
private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());
BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
futureValue.thenCombine(futureList, (value, list) -> {
List<V> newList = new ArrayList<>(list.size() + 1);
newList.addAll(list);
newList.add(value);
return newList;
});
BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
List<V> newList = new ArrayList<>(list1.size() + list2.size());
newList.addAll(list1);
newList.addAll(list2);
return newList;
});
return listOfFutures.stream().reduce(identity, accumulator, combiner);
}
사용 예 :
List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
.mapToObj(i -> loadData(i, executor)).collect(toList());
CompletableFuture<List<String>> futureList = sequence(listOfFutures);
완전한 예 :
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toList;
public class ListOfFuturesToFutureOfList {
public static void main(String[] args) {
ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
test.load(10);
}
public void load(int numThreads) {
final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
.mapToObj(i -> loadData(i, executor)).collect(toList());
CompletableFuture<List<String>> futureList = sequence(listOfFutures);
System.out.println("Future complete before blocking? " + futureList.isDone());
// this will block until all futures are completed
List<String> data = futureList.join();
System.out.println("Loaded data: " + data);
System.out.println("Future complete after blocking? " + futureList.isDone());
executor.shutdown();
}
public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
System.out.println("Starting to load test data " + dataPoint);
try {
Thread.sleep(500 + rnd.nextInt(1500));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Successfully loaded test data " + dataPoint);
return "data " + dataPoint;
}, executor);
}
private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());
BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
futureValue.thenCombine(futureList, (value, list) -> {
List<V> newList = new ArrayList<>(list.size() + 1);
newList.addAll(list);
newList.add(value);
return newList;
});
BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
List<V> newList = new ArrayList<>(list1.size() + list2.size());
newList.addAll(list1);
newList.addAll(list2);
return newList;
});
return listOfFutures.stream().reduce(identity, accumulator, combiner);
}
}
In addition to Spotify Futures library you might try my code locate here: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (has a dependencies to other classes in same package)
It implements a logic to return "at least N out of M" CompletionStage-s with a policy how much errors it's allowed to tolerate. There are convinient methods for all/any cases, plus cancellation policy for the remaining futures, plus the code deals with CompletionStage-s (interface) rather than CompletableFuture (concrete class).
Javaslang has a very convenient Future
API. It also allows to make a future of collection out of a collection of futures.
List<Future<String>> listOfFutures = ...
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);
참고URL : https://stackoverflow.com/questions/30025428/listfuture-to-futurelist-sequence
'Development Tip' 카테고리의 다른 글
속성 라우팅 및 상속 (0) | 2020.11.21 |
---|---|
Dagger와 ButterKnife Android의 차이점 (0) | 2020.11.21 |
Xcode 7의 기존 프로젝트에 단위 및 UI 테스트 추가 (0) | 2020.11.21 |
java.util.ArrayList에 해당하는 Scala (0) | 2020.11.20 |
기본값이있는 옵션에 대한 자바 스크립트 디자인 패턴? (0) | 2020.11.20 |