Development Tip

ExecutorService (특히 ThreadPoolExecutor)는 스레드로부터 안전합니까?

yourdevel 2020. 10. 24. 11:59
반응형

ExecutorService (특히 ThreadPoolExecutor)는 스레드로부터 안전합니까?


ExecutorService스레드 안전성을 보장 합니까 ?

다른 스레드의 작업을 동일한 ThreadPoolExecutor에 제출할 예정입니다. 작업을 상호 작용 / 제출하기 전에 실행 프로그램에 대한 액세스를 동기화해야합니까?


사실, 문제의 JDK 클래스는 스레드로부터 안전한 작업 제출을 명시 적으로 보장하지 않는 것 같습니다. 그러나 실제로 라이브러리의 모든 ExecutorService 구현은 실제로 이러한 방식으로 스레드로부터 안전합니다. 이것에 의존하는 것이 합리적이라고 생각합니다. 이러한 기능을 구현하는 모든 코드가 퍼블릭 도메인에 배치 되었기 때문에 누구든지 완전히 다른 방식으로 다시 작성할 동기가 전혀 없습니다.


(다른 답변과 달리) 스레드 안전성 계약 문서화되어 있습니다. interface(메소드의 javadoc이 아닌) javadocs를 살펴보십시오 . 예를 들어 ExecutorService javadoc 의 맨 아래에서 다음 을 찾을 수 있습니다.

메모리 무결성 효과 : ExecutorService에로의 Runnable 또는 호출 가능 작업의 제출하기 전에 스레드에서 작업 전에 발생 차례로 해당 작업, 취한 조치 일 - 전에 결과가하는 Future.get을 통해 검색됩니다 ().

이것으로 충분합니다.

"작업을 상호 작용 / 제출하기 전에 실행 프로그램에 대한 액세스를 동기화해야합니까?"

아니에요. ExecutorService외부 동기화없이 작업을 구성하고 (올바르게 구현 된) 작업을 제출하는 것이 좋습니다. 이것이 주요 디자인 목표 중 하나입니다.

ExecutorServiceA는 동시 가 성능, 동기화를 필요로하지 않고 최대 범위까지 작동하도록 설계되어 있다는 말을하는 것입니다 유틸리티. (동기화로 인해 스레드 경합이 발생하여 특히 많은 수의 스레드로 확장 할 때 멀티 스레딩 효율성을 저하시킬 수 있습니다.)

작업이 실행되거나 완료 될 시간에 대한 보장은 없지만 (일부는 작업을 제출 한 동일한 스레드에서 즉시 실행될 수도 있음) 작업자 스레드는 제출하는 스레드가 수행 모든 효과를 확인할 수 있습니다. 제출 지점 . 따라서 작업 (실행되는 스레드)은 동기화, 스레드로부터 안전한 클래스 또는 다른 형태의 "안전한 게시"없이 사용하기 위해 생성 된 모든 데이터를 안전하게 읽을 수도 있습니다. 작업을 제출하는 행위는 입력 데이터를 작업에 "안전하게 게시"하기에 충분합니다. 작업이 실행되는 동안 입력 데이터가 어떤 식 으로든 수정되지 않도록하기 만하면됩니다.

마찬가지로를 통해 작업 결과를 다시 가져올 때 Future.get()검색 스레드는 실행기의 작업자 스레드가 만든 모든 효과를 볼 수 있습니다 (반환 된 결과와 작업자 스레드가 만들 수있는 모든 부작용 변경 사항 모두에서). .

이 계약은 또한 작업 자체가 더 많은 작업을 제출하는 것이 좋다는 것을 의미합니다.

"ExecutorService는 스레드 안전성을 보장합니까?"

이제 질문의이 부분은 훨씬 더 일반적입니다. 예를 들어 메소드에 대한 스레드 안전 계약의 어떤 설명도 찾을 수 없습니다 shutdownAndAwaitTermination. Javadoc의 코드 샘플은 동기화를 사용하지 않습니다. (예를 들어 작업자 스레드가 아니라 Executor를 생성 한 동일한 스레드에 의해 종료가 시작된다는 숨겨진 가정이있을 수 있지만?)

BTW 나는 동시 프로그래밍의 세계에 대한 좋은 기초를 위해 "Java Concurrency In Practice"라는 책을 추천합니다.


귀하의 질문은 다소 개방적입니다. ExecutorService인터페이스가 수행하는 모든 작업은 어딘가에서 제출 된 Runnable또는 Callable인스턴스를 처리 할 스레드를 보장하는 것 입니다.

제출 Runnable/ Callable참조가 다른 Runnable/ Callables 인스턴스 에서 액세스 할 수있는 공유 데이터 구조를 참조하는 경우 (잠재적으로 다른 스레드에서 동시에 처리됨) 이 데이터 구조에서 스레드 안전을 보장 하는 것은 사용자의 책임 입니다.

질문의 두 번째 부분에 답하려면 작업을 제출하기 전에 ThreadPoolExecutor에 액세스 할 수 있습니다. 예 :

BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
...
execService.submit(new Callable(...));

편집하다

Brian의 의견을 기반으로 귀하의 질문을 오해 한 경우 : 여러 생산자 스레드에서에 작업을 제출 ExecutorService하면 일반적으로 스레드로부터 안전합니다 (내가 말할 수있는 한 인터페이스의 API에 명시 적으로 언급되지 않았음에도 불구하고). 스레드 안전성을 제공하지 않는 모든 구현은 다중 스레드 환경에서 쓸모가 없으며 (다중 생산자 / 다중 소비자는 상당히 일반적인 패러다임이므로) 이것은 특히 ExecutorService(및 나머지 java.util.concurrent)를 위해 설계된 것입니다.


대한 ThreadPoolExecutor대답 단순히 . 모든 구현이 스레드로부터 안전하다는 것을 요구하거나 보장 ExecutorService하지 않으며 인터페이스이기 때문에 그렇게 할 수도 없습니다. 이러한 유형의 계약은 Java 인터페이스의 범위를 벗어납니다. 그러나 ThreadPoolExecutor둘 다 스레드로부터 안전한 것으로 명확하게 문서화되어 있습니다. 또한 모든 구현이 스레드로부터 안전하도록 요청하는 인터페이스를 ThreadPoolExecutor사용하여 작업 대기열을 관리합니다 java.util.concurrent.BlockingQueue. 의 모든 java.util.concurrent.*구현은 BlockingQueue스레드로부터 안전하다고 가정 할 수 있습니다. 비표준 구현은 그렇지 않을 수 있지만 누군가 BlockingQueue스레드로부터 안전하지 않은 구현 대기열 을 제공하는 경우 완전히 어리석은 일 입니다.

따라서 제목 질문에 대한 대답은 분명히 입니다. 둘 사이에 약간의 불일치가 있기 때문에 질문의 후속 본문에 대한 대답은 아마도것입니다 .


Luke Usherwood 의 답변이 주장 하는 것과는 달리 , ExecutorService구현이 스레드로부터 안전하다는 것을 문서에 암시하지 않습니다 . ThreadPoolExecutor구체적으로 질문에 대해서는 다른 답변을 참조하십시오.

예, 전 발생 관계가 지정되어 있지만 Miles가 언급 한 것처럼 메서드 자체의 스레드 안전성에 대한 어떤 것도 의미하지 않습니다 . 에서 누가 Usherwood 의 대답은 전자가 후자를 증명하기에 충분하지만, 실제 인수가 이루어지지 않습니다 것을 주장한다.

"스레드 안전"여기에 여러 가지 의미가 있지만, 할 수있는 간단한 카운터 - 예입니다 Executor(하지 ExecutorService사소하게 요구 충족하지만 차이는 없습니다) 발생-전에 관계하지만 스레드로부터 안전하지 않습니다 때문에에 비동기 액세스의 count필드 .

class CountingDirectExecutor implements Executor {

    private int count = 0;

    public int getExecutedTaskCount() {
        return count;
    }

    public void execute(Runnable command) {
        command.run();
    }
}

면책 조항 : 저는 전문가가 아니며 직접 답변을 찾고 있었기 때문에이 질문을 찾았습니다.


ThreadPoolExecutor의 경우 제출은 스레드로부터 안전합니다. jdk8에서 소스 코드를 볼 수 있습니다. 새 작업을 추가 할 때 스레드 안전을 보장하기 위해 mainLock을 사용합니다.

private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);

                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;

                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

참고 URL : https://stackoverflow.com/questions/1702386/is-executorservice-specifically-threadpoolexecutor-thread-safe

반응형