Development Tip

명부

yourdevel 2020. 11. 21. 09:08
반응형

명부 미래로 순서


로 변환하려고 List<CompletableFuture<X>>합니다 CompletableFuture<List<T>>. 이것은 많은 비동기 작업이 있고 모든 작업의 ​​결과를 얻어야 할 때 매우 유용합니다.

그중 하나라도 실패하면 최종 미래는 실패합니다. 이것이 내가 구현 한 방법입니다.

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
        if(com.isEmpty()){
            throw new IllegalArgumentException();
        }
        Stream<? extends CompletableFuture<T>> stream = com.stream();
        CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
        return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
            x.add(y);
            return x;
        },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
            ls1.addAll(ls2);
            return ls1;
        },exec));
    }

실행하려면 :

ExecutorService executorService = Executors.newCachedThreadPool();
        Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x;
        }, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

그중 하나라도 실패하면 실패합니다. 백만 개의 선물이 있어도 예상대로 출력을 제공합니다. 제가 가진 문제는 : 5000 개 이상의 선물이 있고 그중 하나라도 실패하면 다음과 같은 결과를 얻습니다 StackOverflowError.

java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) at java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java) 스레드 "pool-1-thread-2611"java.lang.StackOverflowError 예외 : 1487) at java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) at java.util.concurrent.CompletableFuture $ ThenCompose.run ( CompletableFuture.java:1487)

내가 뭘 잘못하고 있니?

참고 : 위의 반환 된 future는 미래가 실패 할 때 바로 실패합니다. 받아 들여진 대답도이 점을 취해야합니다.


사용 CompletableFuture.allOf(...):

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

구현에 대한 몇 가지 의견 :

귀하의 사용 .thenComposeAsync, .thenApplyAsync그리고 .thenCombineAsync가능성이 당신이 기대하는 일을하지 않습니다. 이러한 ...Async메서드는 별도의 스레드에서 제공된 함수를 실행합니다. 따라서 귀하의 경우에는 제공된 실행기에서 실행되도록 목록에 새 항목을 추가합니다. 간단한 작업을 캐시 된 스레드 실행기에 넣을 필요가 없습니다. thenXXXXAsync정당한 이유없이 방법을 사용하지 마십시오 .

또한 reduce변이 가능한 용기에 축적하는 데 사용해서는 안됩니다. 스트림이 순차적 인 경우 올바르게 작동 할 수 있지만 스트림이 병렬로 만들어지면 실패합니다. 가변 감소를 수행하려면 .collect대신 사용하십시오.

첫 번째 실패 직후 예외적으로 전체 계산을 완료하려면 sequence방법 에서 다음을 수행하십시오 .

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

추가로, 당신은 첫 번째 실패의 나머지 작업을 취소하려면, 추가 exec.shutdownNow();직후 result.completeExceptionally(ex);. 물론 exec이것은 하나의 계산에 대해서만 존재 한다고 가정합니다 . 그렇지 않은 경우 나머지 Future각각을 개별적 으로 반복하고 취소해야합니다 .


으로 미샤는 지적 , 당신은 남용입니다 …Async작업. 또한 프로그램 논리를 반영하지 않는 종속성을 모델링하는 복잡한 작업 체인을 구성하고 있습니다.

  • 목록의 첫 번째 및 두 번째 작업에 의존하는 작업 x를 만듭니다.
  • 직업 x와 목록의 세 번째 직업에 의존하는 직업 x + 1을 만듭니다.
  • 작업 x + 1과 목록의 네 번째 작업에 따라 작업 x + 2를 만듭니다.
  • 작업 x + 4999와 목록의 마지막 작업에 의존하는 작업 x + 5000을 만듭니다.

그런 다음 (명시 적으로 또는 예외로 인해)이 재귀 적으로 구성된 작업을 취소하면 재귀 적으로 수행 될 수 있으며 StackOverflowError. 이는 구현에 따라 다릅니다.

Misha가 이미 보여 주었 듯이 , allOf원래 의도를 모델링하고 목록의 모든 작업에 의존하는 하나의 작업을 정의 할 수 있는 방법 이 있습니다.

그러나 그것조차 필요하지 않다는 점은 주목할 가치가 있습니다. 제한되지 않은 스레드 풀 실행기를 사용하고 있으므로 결과를 수집하는 비동기 작업을 목록에 게시하면 완료됩니다. 완료를 기다리는 것은 어쨌든 각 작업의 결과를 묻는 것으로 함축 됩니다.

ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

스레드 수가 제한되고 작업이 추가 비동기 작업을 생성 할 수있는 경우 종속 작업을 구성하는 방법을 사용하는 것이 중요합니다. 대기중인 작업이 먼저 완료해야하는 작업에서 스레드를 훔쳐가는 것을 방지하기 위해 여기에는 해당되지 않습니다.

이 특정 경우에 하나의 작업이 단순히이 많은 수의 전제 조건 작업을 반복하고 필요한 경우 대기하는 것이이 많은 종속성을 모델링하고 각 작업이 완료에 대해 종속 된 작업을 알리는 것보다 더 효율적일 수 있습니다.


Spotify의 CompletableFutures라이브러리 및 사용 allAsList방법을 얻을 수 있습니다 . Guava의 Futures.allAsList방법 에서 영감을 얻은 것 같습니다 .

public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

라이브러리를 사용하지 않으려는 경우 다음은 간단한 구현입니다.

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}

@Misha가 수락 한 답변에 추가하려면 수집기로 더 확장 할 수 있습니다.

 public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() {
    return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com));
}

이제 다음을 수행 할 수 있습니다.

Stream<CompletableFuture<Integer>> stream = Stream.of(
    CompletableFuture.completedFuture(1),
    CompletableFuture.completedFuture(2),
    CompletableFuture.completedFuture(3)
);
CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());

CompletableFuture에서 thenCombine을 사용한 시퀀스 작업의 예

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){

    CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>());

    BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = 
            (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;});

    BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ;  

    return com.stream()
              .reduce(identity,
                      combineToList,
                      combineLists);  

   }
} 

타사 라이브러리를 사용해도 괜찮다면 cyclops-react (저는 저자입니다)에는 CompletableFutures (및 Optionals, Streams 등)에 대한 일련의 유틸리티 메서드가 있습니다.

  List<CompletableFuture<String>> listOfFutures;

  CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);

면책 조항 : 이것은 초기 질문에 완전히 대답하지 않습니다. "하나가 실패하면 모두 실패"부분이 부족합니다. 그러나이 질문의 중복으로 닫 혔기 때문에 실제적이고 더 일반적인 질문에 대답 할 수 없습니다. Java 8 CompletableFuture.allOf (...) with Collection 또는 List . 그래서 여기에 대답하겠습니다.

Java 8의 스트림 API List<CompletableFuture<V>>CompletableFuture<List<V>>사용 하도록 변환하는 방법은 무엇입니까?

요약 : 다음을 사용하십시오.

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
    CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

    BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
        futureValue.thenCombine(futureList, (value, list) -> {
                List<V> newList = new ArrayList<>(list.size() + 1);
                newList.addAll(list);
                newList.add(value);
                return newList;
            });

    BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
        List<V> newList = new ArrayList<>(list1.size() + list2.size());
        newList.addAll(list1);
        newList.addAll(list2);
        return newList;
    });

    return listOfFutures.stream().reduce(identity, accumulator, combiner);
}

사용 예 :

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
    .mapToObj(i -> loadData(i, executor)).collect(toList());

CompletableFuture<List<String>> futureList = sequence(listOfFutures);

완전한 예 :

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toList;

public class ListOfFuturesToFutureOfList {

    public static void main(String[] args) {
        ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList();
        test.load(10);
    }

    public void load(int numThreads) {
        final ExecutorService executor = Executors.newFixedThreadPool(numThreads);

        List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads)
            .mapToObj(i -> loadData(i, executor)).collect(toList());

        CompletableFuture<List<String>> futureList = sequence(listOfFutures);

        System.out.println("Future complete before blocking? " + futureList.isDone());

        // this will block until all futures are completed
        List<String> data = futureList.join();
        System.out.println("Loaded data: " + data);

        System.out.println("Future complete after blocking? " + futureList.isDone());

        executor.shutdown();
    }

    public CompletableFuture<String> loadData(int dataPoint, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            System.out.println("Starting to load test data " + dataPoint);

            try {
                Thread.sleep(500 + rnd.nextInt(1500));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("Successfully loaded test data " + dataPoint);

            return "data " + dataPoint;
        }, executor);
    }

    private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) {
        CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>());

        BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) ->
            futureValue.thenCombine(futureList, (value, list) -> {
                    List<V> newList = new ArrayList<>(list.size() + 1);
                    newList.addAll(list);
                    newList.add(value);
                    return newList;
                });

        BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> {
            List<V> newList = new ArrayList<>(list1.size() + list2.size());
            newList.addAll(list1);
            newList.addAll(list2);
            return newList;
        });

        return listOfFutures.stream().reduce(identity, accumulator, combiner);
    }

}

In addition to Spotify Futures library you might try my code locate here: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (has a dependencies to other classes in same package)

It implements a logic to return "at least N out of M" CompletionStage-s with a policy how much errors it's allowed to tolerate. There are convinient methods for all/any cases, plus cancellation policy for the remaining futures, plus the code deals with CompletionStage-s (interface) rather than CompletableFuture (concrete class).


Javaslang has a very convenient Future API. It also allows to make a future of collection out of a collection of futures.

List<Future<String>> listOfFutures = ... 
Future<Seq<String>> futureOfList = Future.sequence(listOfFutures);

See http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

참고URL : https://stackoverflow.com/questions/30025428/listfuture-to-futurelist-sequence

반응형