Recent Posts
Recent Comments
Link
01-09 05:47
Today
Total
관리 메뉴

삶 가운데 남긴 기록 AACII.TISTORY.COM

Thread Pool 본문

DEV&OPS/Java

Thread Pool

ALEPH.GEM 2022. 4. 20. 15:52

스레드 풀

스레드 풀은 작업 스레드를 제한된 개수(총 개수)만큼 정해놓고 작업 큐에 들어오는 작업들을 하나씩 스레드가 맡아서 처리합니다.

작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리 합니다.

스레드 풀(작업 큐)로 병렬 처리를 한다면 갑자기 스레드가 폭증하는 상황에서도 성능 저하를 최소화 할 수 있습니다.

스레드 풀은 java.util.concurrent 패키지의 ExecutorService 인터페이스와 Executors 클래스를 제공하고 있습니다.

 

스레드 풀 생성

Executors 클래스의 newCachedThreadPool()이나 newFixedThreadPool(int n Threads) 으로 생성 할 수 있습니다.

 

스레드 풀 종료

스레드 풀은 데몬 스레드가 아니기 때문에 메인 스레드가 종료되더라도 작업을 처리하기 위해 실행 상태로 남아있습니다.

그래서 메인 스레드가 종료되어도 애플리케이션 프로세스는 종료되지 않는데, 스레드 풀을 종료 시켜서 스레드들이 종료 상태가 되도록 처리 해줘야 합니다.

ExecutorService 의 메서드 shutdown()은 남은 작업을 마무리하고 종료하는 것이며, shutdownNow()는 남은 작업과 관계없이 강제 종료하는 메서드 입니다.

 

작업 생성

작업(task)는 Runnable 또는 Callable 을 구현해서 사용합니다.

Runnable의 run()은 리턴값이 없고 Callable의 call()은 리턴값이 있습니다.

Runnable task = new Runnable(){
	@Override
    public void run(){
    	//작업내용
    }
}

Callable<T> task = new Callabe<T>(){
	@Override
    public T call() throws Exception{
    	//작업 내용
        return T;
    }
}

 

작업 처리 요청

작업 처리 요청은 작업 큐에 등록하는 것을 말합니다.

execute()는 작업 처리 결과를 받지 못하고 작업도중 예외가 발생하면 스레드가 종료되고 풀에서도 제거 됩니다.

submit()은 작업 처리 결과를 받을 수 있도록 Future를 리턴하며 예외가 발생해도 다음 작업을 위해 재사용합니다.

그래서 오버헤드를 줄이기 위해 submit()사용을 추천합니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecuteEx {

	public static void main(String[] args) throws InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(2);	//최대 스레드 수가 2인 스레드 풀 생성
		
		for(int i = 0; i < 10 ; i++) {
			Runnable runnable = new Runnable() {

				@Override
				public void run() {
					ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
					int poolSize = threadPoolExecutor.getPoolSize();
					String threadName = Thread.currentThread().getName();
					System.out.println("총 스레드 수: "+poolSize);
					System.out.println("작업 스레드 이름: "+threadName);
					
					//강제 예외 발생
					int value = Integer.parseInt("0.1");
				}
				
			};
			
			//작업 처리 요청
			executorService.execute(runnable);
			//executorService.submit(runnable);
			
			Thread.sleep(100);
		}
		
		executorService.shutdown(); 	//스레드풀 종료
	}

}

 

위 예제에서 스레드 풀의 최대 스레드 개수는 2이지만 실행 스레드의 이름을 보면 모두 다른 스레드가 작업을 처리하고 있습니다.

왜냐하면 작업 처리 도중 예외가 발생했기 때문에 해당 스레드는 제거가 되고 새 스레드가 계속 생성되기 때문입니다.

작업 처리 요청 부분에서 executorSevice.execute(runnable);을 주석 처리하고 바로 아래 줄의 주석을 풀어서 execute대신 submit으로 실행한다면 스레드를 재사용해서 처리하고 있는 것을 볼 수 있습니다.

 

블로킹 방식의 작업 완료 통보

ExecutorService의 submit() 은 Future객체를 리턴합니다.

Future객체는 작업이 완료될 때까지 블로킹 되어 기다렸다가 최종 결과를 얻는데 사용합니다.

Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 처리 결과를 리턴합니다.

Future를 사용할 때 주의할 점은 스레드가 완료될 때까지 get()이 블로킹 되므로 다른 작업을 할 수 없다는 점입니다.

그래서 get()을 호출하는 스레드는 새로운 스레드이거나 스레드풀의 다른 스레드가 되어야 합니다.

 

아래 예제는 리턴값이 없는 작업의 완료 통보 예제입니다.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class NoResultEx {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		System.out.println("작업 처리 요청");
		Runnable runnable = new Runnable() {

			@Override
			public void run() {
				int sum = 0;
				for(int i = 1; i < 11; i++) {
					sum += i;
				}
				System.out.println("처리 결과: "+ sum);
			}
			
		};
		
		Future future = executorService.submit(runnable);
		
		try {
			future.get();
			System.out.println("작업 처리 완료");
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		} catch (ExecutionException e2) {
			e2.printStackTrace();
		}
		
		executorService.shutdown();
	}

}

 

리턴값이 있는 작업의 완료 통보

리턴 값이 있는 경우 Callable 객체를 이용하는데 주의할 점은 제네릭 타입 파라메터T는 call() 메소드가 리턴하는 타입이 되어야 합니다.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultCallableEx {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		System.out.println("작업 처리 요청");
		Callable<Integer> task = new Callable<Integer>() {

			@Override
			public Integer call() throws Exception {
				int sum = 0;
				for(int i = 1; i < 11; i++) {
					sum += i;
				}
				return sum;
			}
			
		};
		
		Future<Integer> future = executorService.submit(task);
		
		try {
			int sum = future.get();
			System.out.println("처리 결과: "+sum);
			System.out.println("작업 처리 완료");
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		} catch (ExecutionException e2) {
			e2.printStackTrace();
		}
		
		executorService.shutdown();
	}

}

 

작업 처리 결과를 외부 객체에 저장

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultRunnableEx {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		System.out.println("작업 처리 요청");
		class Task implements Runnable {
			Result result;
			Task(Result result){
				
				this.result = result;
			}

			@Override
			public void run() {
				int sum = 0;
				for(int i = 1; i < 11; i++) {
					sum += i;
				}
				result.addValue(sum);
			}	
		}
		
		//두 task에 작업 처리를 요청
		Result result = new Result();
		Runnable task1 = new Task(result);
		Runnable task2 = new Task(result);		
		Future<Result> future1 = executorService.submit(task1, result);
		Future<Result> future2 = executorService.submit(task2, result);
		
		//두 작업 처리 결과를 취합
		try {
			result = future1.get();
			result = future2.get();
			System.out.println("처리 결과: "+result.accumValue);
			System.out.println("작업 처리 완료");
		} catch (Exception e) {
			e.printStackTrace();
		} 
		
		executorService.shutdown();
	}

}

//처리 결과를 저장하는 클래스
class Result{
	int accumValue;
	synchronized void addValue(int value) {
		accumValue += value;
	}
}

 

작업 완료 순으로 통보

스레드풀에서 작업 처리가 완료된 것만 통보받으려면 CompletionService 를 이용하면 됩니다.

완료된 작업을 가져오는 poll()과 take()메서드를 제공합니다.

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CompletionServiceEx extends Thread {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);
		
		System.out.println("작업 처리 요청");
		for(int i = 0; i < 3; i++) {
			completionService.submit(new Callable<Integer>() {

				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for(int i = 1; i<=10; i++) {
						sum += i;
					}
					return sum;
				}
				
			});
		}
		
		System.out.println("처리 완료된 작업 확인");
		executorService.submit(new Runnable() {

			@Override
			public void run() {
				while(true) {
					try {
						Future<Integer> future = completionService.take();
						int value = future.get();
						System.out.println("처리결과: "+value);
					} catch (Exception e) {
						break;
					} 
				}
			}
			
		});
		
		try {
			Thread.sleep(3000);
		}catch(Exception e) {

		}
		
		executorService.shutdownNow();
		
	}

}

 

콜백 방식으로 작업 완료 통보

블로킹 방식과 다르게 콜백 방식은 작업 처리를 요청 후 결과를 기다릴 필요 없이 다른 작업을 수행하다가 요청한 작업이 완료되면 콜백 메서드를 호출해 결과를 전달 받을 수 있습니다.

콜백 기능을 가진 클래스를 직접 정의해도 좋지만, java.nio.channels.CompletionHandler를 이용해도 좋습니다.

CompletionHandler는 completed()와 failed() 콜백 메서드를 제공합니다.

V 타입 파라메터는 결과값의 타입이고 A타입은 첨부값입니다. 

만약 첨부값이 없다면 Void로 지정해줘야 합니다.

import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CallbackEx {
	private ExecutorService executorService;
	
	public CallbackEx() {
		executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
	}
	
	private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>(){

		@Override
		public void completed(Integer result, Void attachment) {
			System.out.println("completed() : "+result);
		}

		@Override
		public void failed(Throwable exc, Void attachment) {
			System.out.println("failed() : "+exc.toString());
		}
		
	};
	
	public void doWork(final String x, final String y) {
		Runnable task = new Runnable() {

			@Override
			public void run() {
				try {
					int intX = Integer.parseInt(x);
					int intY = Integer.parseInt(y);
					int result = intX + intY;
					callback.completed(result, null);
				}catch(Exception e){
					callback.failed(e, null);
				}
			}
			
		};
		executorService.submit(task);
	}
	
	public void finish() {
		executorService.shutdown();
	}

	public static void main(String[] args) {
		CallbackEx example = new CallbackEx();
		example.doWork("3", "3");
		example.doWork("3", "0.3");
		example.finish();
	}

}

실행 결과

completed() : 6
failed() : java.lang.NumberFormatException: For input string: "0.3"

 

 

 

 

 

 

 

 

 

728x90

'DEV&OPS > Java' 카테고리의 다른 글

람다식  (0) 2022.05.06
Generic  (0) 2022.04.21
JAVA Thread  (0) 2022.04.19
Format 클래스  (0) 2022.04.12
자바 날짜, 시간, 달력 다루기  (0) 2022.04.12