Chapter 13 쓰레드(Thread)

1. 프로세스와 쓰레드

프로세스(process)란 실행 중인 프로그램(program)

  • 프로세스는 프로그램을 수행하는데 필요한 데이터와 메모리 등의 자원 그리고 쓰레드로 구성되어 있다.
  • 실제로 작업을 수행하는 것이 바로 쓰레드(thread)이다.
프로세스(공장), 쓰레드(일꾼) : 싱글 쓰레드
프로세스(공장), 쓰레드들(일꾼 들) : 멀티 쓰레드

쓰레드를 가벼운 프로세스, 즉 경량 프로세스(LWP,light-weight process)라고 부르기도 한다.

멀티 쓰레드의 장점

  • CPU의 사용률을 향상 시킨다.
  • 자원을 보다 효율적으로 사용할 수 있다.
  • 사용자에 대한 응답성이 향상된다.
  • 작업이 분리되어 코드가 간결해진다.

멀티 쓰레드 고려할 사항

  • 여러 쓰레드가 같은 프로세스 내에서 자원을 공유하면서 작업을 하기 때문에
  • 동기화(synchronization), 교착상태(deadlock) 같은 문제들을 고려해서 프로그래밍 해야 한다.

2. 쓰레드의 구현과 실행

쓰레드의 구현

  • Thread 클래스를 상속받는 방법
  • Runnable 인터페이스를 구현하는 방법

일반적으로 Runnable 인터페이스를 구현하는 방식을 사용함

  • 재사용성(reusability)를 높이고 코드의 일관성(consistency)를 유지할 수 있다.
public class ThreadEx1 {
    public static void main(String[] args) {
        ThreadEx1_1 t1 = new ThreadEx1_1();

        Runnable r = new ThreadEx1_2();
        Thread t2 = new Thread(r);

        t1.start();
        t2.start();
    }
}

class ThreadEx1_1 extends Thread {
    public void run() {
        for(int i = 0; i < 5; i++) {
            // Thread 클래스 메소드 직접 호출 가능
            System.out.println(getName());
        }
    }
}

class ThreadEx1_2 implements Runnable {
    @Override
    public void run() {
        for(int i = 0; i < 5; i++) {
            // Runnable을 구현하면 Thread클래스의 currentThread()를 호출하여 
            // 쓰레드에 대한 참조를 얻어와서 호출 해야 함
            System.out.println(Thread.currentThread().getName());
        }
    }
}

쓰레드의 실행

  t1.start() // 쓰레드 t1을 실행시킨다.
  t2.start() // 쓰레드 t2를 실행시킨다.
  • start()가 호출 되었다고 해서 바로 실행되는 것이 아니라, 일단 실행 대기 상태에 있다가 자신의 차례가 되어야 실행상태가 된다.
  • 한가지 더 주의할 사항
    • 하나의 쓰레드에 대해 start()가 한 번만 호출될 수 있다
    • 두 번 실행시 IllegalThreadStateException이 발생한다

3. start()와 run()

쓰레드를 실행 시킬 때 run()이 아닌 start()를 호출하는 것에 대한 다소 의문이 들었을 것이다.

@ main 메서드에서 run()을 호출하는 경우의 call stack
  |        |
  ----------
  |run     |
  ----------
  |main    |
  ----------

@ main 매서드에서 start()를 호출하는 경우
 #step 1.
  |        |
  ----------
  |start   |
  ----------
  |main    |
  ----------
 #step 2.
  |        |     |        | 
  ----------     |        |
  |start   |     |        |
  ----------     |        |
  |main    |     |        |
  ----------     ----------  
 #step 3. 
  |        |     |        | 
  ----------     |        |
  |start   |     |        |
  ----------     ----------
  |main    |     |run     |
  ----------     ----------
 #step 4.
  |        |     |        |
  |        |     |        |
  ----------     ----------
  |main    |     |run     |
  ----------     ----------
 #step 5. 
  |        |     |        |
  |        |     |        |
  |        |     ----------
  |        |     |run     |
  ----------     ----------
  • main 메서드에서 run()을 호출하는 것은 생성된 쓰레드를 실행시키는 것이 아니라 단순히 클래스에 선언된 메서드를 호출하는 것일 뿐이다.

실행 중인 사용자 쓰레드가 하나도 없을 때 프로그램은 종료된다.

  • 쓰레드의 종류
    • 사용자 쓰레드(user thread)
    • 데몬 쓰레드(deamon thread)

4. 싱글쓰레드와 멀티쓰레드

싱글 쓰레드와 멀티 쓰레드의 차이 (싱글코어)

  • 하나의 쓰레드로 두개의 작업을 수행하는 경우
     |  A 작업    |   B 작업   |
    
  • 두개의 쓰레드로 두개의 작업을 수행하는 경우
     |A|B|A|B|A|B|A|B|A|B|A|B|
    
    싱글코어에서 단순히 CPU만을 사용하는 계산 작업이라면 오히려 멀티쓰레드보다 싱글 쓰레드로 프로그래밍 하는 것이 더 효율적이다.
    • 이유: Context switching : 쓰레드간 전환 작업 (현재의 작업 상태를 저장하는 과정)

병행과 병렬

  • 싱글 코어로 두개의 쓰레드를 실행 및 처리하는 경우 : 병행(concurrent)
  • 멀티 코어로 두개의 쓰레드를 실행 및 처리하는 경우 : 병렬(parallel)

두 쓰레드가 서로 다른 자원을 사용하는 작업의 경우에는 싱글쓰레드 프로세스보다 멀티 쓰레드 프로세스가 더 효율적이다.

  • 예를 들면, 사용자로부터 데이터를 입력받는 작업, 네트워크로 파일을 주고받는 작업, 프린터로 파일을 출력하는 작업과같이 외부기기와의 입출력이 필요로 하는 경우가 이에 해당된다.

5. 쓰레드의 우선순위

쓰레드는 우선순위(priority)에 따라서 실행 시간이 달라진다.

  void setPriority(int newPriority) // Thread 우선순위를 지정한 값으로 변경한다.
  int getPriority() // Thread의 우선순위를 반환한다.

  public static final int MAX_PRIORITY = 10;
  public static final int MIN_PRIORITY = 1;
  public static final int NORM_PRIORITY = 5; // 보통 우선 순위
  • 싱글코어에서는 우선 순위의 영향을 받음
  • 멀티코어에서는 쓰레드의 우선순위를 더 주면 더 많은 실행 시간과 실행 기회를 갖는 것을 보장하지 못한다.
    • OS의 스케쥴링 정책과 JVM의 구현에 따라 다름
    • 작업에 우선 순위를 두어 PriorityQueue에 저장해놓고 우선순위가 높은 작업이 먼저 처리도도록 하는 것이 나을 수 있다.

6. 쓰레드 그룹(thread group)

쓰레드 그룹은 보안상의 이유로 도입된 개념으로, 자신이 속한 쓰레드 그룹이나 하위 쓰레드 그룹은 변경할 수 있지만 다른 쓰레드 그룹의 쓰레드를 변경할 수 없다.

Thread(ThreadGroup group, String name);
Thread(ThreadGroup group, Runnable target);
Thread(ThreadGroup group, Runnable target, String name);
Thread(ThreadGroup group, Runnable target ,String name, long stackSize);

java application 을 실행하면 JVM은 main과 system이라는 쓰레드 그룹을 만든다.

  • main method 를 수행하는 main 이라는 쓰레드는 main 쓰레드 그룹에 속함
  • 카비지컬렉션을 수행하는 Finalizer 쓰레드는 system 쓰레드에 속함.

7. 데몬 쓰레드(daemon thread)

데몬 쓰레드 : 다른 일반 쓰레드(데몬 쓰레드가 아닌 쓰레드)의 작업을 돕는 보조적인 역할을 수행하는 쓰레드이다.

  • 데몬 쓰레드는 모두 종료되면 데몬 쓰레드는 강제적으로 자동 종료된다
  • 이점을 제외하고는 일반 쓰레드와 차이가 없다.
  • 예를 들면, 가비지컬렉터, 워드프로세서의 자동저장, 화면 자동 갱신 등이 있다

데몬 쓰레드는 무한 루프와 조건문을 이용해서 실행 후 대기하고 있다가 특정 조건이 만족되면 작업이 수행되고 다시 대기하도록 작성한다.

  boolean isDaemon() // 쓰레드가 데몬 쓰레드인지 확인한다.
  void setDaemon(boolean on) // 쓰레드를 데몬 쓰레드로 또는 사용자 쓰레드로 변경한다.
// Daemon thread example
public class ThreadEx10 implements Runnable {
    static boolean autoSave = false;

    public static void main(String[] args) {
        Thread t = new Thread(new ThreadEx10());
        t.setDaemon(true); // **이 부분이 없으면 종료되지 않는다.**
        t.start();

        for(int i = 1; i <= 10; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(i);
            if(i==5) {
                autoSave = true;                
            }
        }

        System.out.println("프로그램을 종료합니다.");
    }

    @Override
    public void run() {
        // **데몬쓰레드 실행을 위한 무한루프 실행 및 특정 조건 일때 실행**
        while(true) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(autoSave) {
                autoSave();
            }
        }
    }

    public void autoSave() {
        System.out.println("작업파일이 자동저장되었습니다.");
    }

}

8. 쓰레드의 실행제어

Thread의 상태는 아래와 같다.

상태 설명
NEW 쓰레드가 생성되고 아직 start()가 호출 되지 않은 상태
RUNNABLE 실행 중 또는 실행 가능한 상태
BLOCKED 동기화 블럭에 의해서 일시 정지된 상태 (lock이 풀릴 때까지 기다리는 상태)
WAITING, TIMED_WAITING 쓰레드의 작업이 종료되지는 않았지만 실행가능하지 않은 (unrunnable) 일시정지 상태. TIMED_WAITING은 일시정지 시간이 지정된 경우를 의미
TERMINATED 쓰레드의 작업이 종료된 상태
  • Thread의 상태는 getState() 메서드를 호출해서 확인이 가능함 (JDK1.5 추가됨)

[참고] resume(), stop(), suspend()는 쓰레드를 교착상태(deadlock)로 만들기 쉽기 때문에 deprecated 되었다.

class ThreadEx15{
    public static void main(String args[]) {
        A th1 = new A();
        B th2 = new B();

        th1.start();
        th2.start();

        try {
            th1.sleep(5000);    
            // sleep(): 작업 흐름 대기시간 설정한다. 
            // 5초동안 대기시간 갖은 후에 다음 문자의 실행흐름을 이어 나간다.
        } catch(InterruptedException e) {}

        System.out.print("<<main 종료>>");
    } // main
}

class A extends Thread {
    public void run() {
        for(int i=0; i < 300; i++) {
            System.out.print("-");
        }
        System.out.print("<<th1 종료>>");
    } // run()
}

class B extends Thread {
    public void run() {
        for(int i=0; i < 300; i++) {
            System.out.print("|");
        }
        System.out.print("<<th2 종료>>");
    } // run()
}
  • 결과
    • th1 종료 이 가장 늦게 종료 될 것으로 예상
    • th1 종료 -> th2 종료 -> main 종료
    • 이유 : sleep()이 항상 실행 중인 쓰레드에 대해 작동하기 때문에 실제 영향을 받는 것은 main 메서드 이기 때문
    • 그래서 sleep()은 static 으로 선언되어 있으며, 참조 변수를 이용해서 실행하는 것보다 Thread.sleep(2000); 과 같이 호출

interrupt()와 interrupted()

진행 중인 쓰레드의 작업이 끝나기 전에 취소시켜야 할 때가 있는 경우 사용

  • 예를 들어 큰 파일을 다운로드 받을 때 시간이 너무 오래 걸리면 중간에 다운르도를 포기하고 취소할 수 있어야 한다.
  • interrupt()는 쓰레드에게 작업을 멈추라고 요청한다.
  • 실제 동작 : 그저 쓰레드의 interrupted 상태 (인스턴스 변수)를 바꾸는 것일 뿐이다.
 void interrupt(); // 쓰레드의 interrupted 상태를 false에서 true로 변경
 boolean isInterrupted(); // 쓰레드의 interrupted 상태를 반환
 static boolean isInterrupted(); // 현재 쓰레드의 interrupted 상태를 알려주고, false로 초기화
public class ThreadEx11{
    public static void main(String[] args) {
        ThreadEx11_1 t1 = new ThreadEx11_1();
        t1.start();

        String input = JOptionPane.showInputDialog("아무값이나 입력하시오.");
        System.out.println("input value is " + input);
        t1.interrupt();

        System.out.println("isInterrupted(): " + t1.isInterrupted());
    }
}

class ThreadEx11_1 extends Thread {
    public void run() {
        int i = 10;
        while(i!=0 && !isInterrupted()) {
            System.out.println(i--);

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // interrupted();
            }
        }
        System.out.println("카운트가 종료되었습니다.")
    }
}
  • 실행 결과
10
9
8
input value is TTT
inInterrupted(): false
7
6
5
4
3
2
1
카운트가 종료되었습니다.
  • 코드 수정
    • interrupted(); //추가
    • 이유: Thread.sleep(1000) 에서 InterruptedException 발생
    • sleep() 에 의해 쓰레드가 멈춰있을 때, interrupt()를 호출하면 InterruptedException 발생되고 interrupted 상태는 false로 자동 초기화됨

suspend(), resume(), stop()

  • suspend(): sleep() 처럼 쓰레드를 멈추게 한다.
  • resume(): suspend() 에 의해 정지된 쓰레드는 resume()을 호출해야 다시 실행 대기 상태가 된다.
  • stop(): 호출되는 즉시 쓰레드가 종료된다.

쓰레드의 실행을 제어하는 가장 손쉬운 방법이지만, suspend()와 stop()이 교착상태(deadlock)을 일으키기 쉽게 작성되어 있으므로 사용이 권장되지는 않는다. (Deprecated)

yeild()

  • yeild() : 다른 쓰레드에게 양보한다.
  • 예를 들어 스케쥴러에 의해 1초 실행 시간을 할당받는 쓰레드가 0.5초의 시간동안 작업한 상태에서 yeild를 호출되면, 나머지 0.5초는 포기하고 다시 실행 대기 상태가 된다.

yeild()와 interrupt()를 적절히 사용하면 프로그램의 응답성을 높이고 보다 효율적인 실행이 가능하게 할 수 있다.

  • 참고 예제는 교재 p760~761 참고

join()

  • join() : 쓰레드 자신이 하던 작업을 멈추고 다른 쓰레드가 지정된 시간 동안 작업을 수행하도록 할 때 join()을 사용한다.
  • 참고:
    • join()도 sleep()처럼 interrupt()에 의해 대기상태에서 벗어날 수 있다.
    • try-catch문으로 감싸야 한다. (InterruptedException)
    • sleep()과 다른 점은 join()은 특정 쓰레드에 대해 동작하므로 static 메서드가 아니라는 점이다.

public class ThreadEx19{
    static long startTime = 0;
    public static void main(String[] args) {
        ThreadEx19_1 t1 = new ThreadEx19_1();
        ThreadEx19_2 t2 = new ThreadEx19_2();

        t1.start();
        t2.start();

        startTime = System.currentTimeMillis();
        try {
            t1.join(); // main쓰레드가 t1의 작업이 끝날때까지 기다린다.
            t2.join(); // main쓰레드가 t2의 작업이 끝날때까지 기다린다.
        } catch (InterruptedException e) {
        }

        System.out.println("소요시간: " + (System.currentTimeMillis() - startTime));
    }
}

class ThreadEx19_1 extends Thread {
    public void run() {
        // ...
    }
}

class ThreadEx19_2 extends Thread {
    public void run() {
        // ...
    }
}

public class ThreadEx19{
    public static void main(String args[]) {
        ThreadEx19_1 gc = new ThreadEx19_1();//가비지 컬렉션역할의 스레드 생성
        gc.setDaemon(true);//GargabeCollection 스레드를 데몬 스레드로 설정
        gc.start();//가비지 컬렉션 스레드 시작
        int requiredMemory = 0;//필요한 메모리의 크기.
        for(int i=0; i < 20; i++) {
            requiredMemory = (int)(Math.random() * 10) * 20;
            //필요한 메모리의 크기를 난수를 사용해서 설정한다. 
            //즉, 컴퓨터에서 동작하는 프로그램에서 사용하려는 메모리의 크기를 흉내낸 것이다. 
            // 필요한 메모리가 사용할 수 있는 양보다 크거나 전체 메모리의 60%이상을
            // 사용했을 경우 gc를 깨운다.
            if(gc.freeMemory() < requiredMemory ||
                gc.freeMemory() < gc.totalMemory() * 0.4) {
                gc.interrupt();//잠자고 있는 데몬 스레드를 깨운다.
            }
            gc.usedMemory += requiredMemory;
            //사용 중인 메모리 크기가 프로그램에서 사용하는 메모리 크기 만큼 증가한다.
            System.out.println("usedMemory:"+gc.usedMemory);
            //사용중인 메모리 크기를 출력
        }
    }
}

class ThreadEx19_1 extends Thread {

    final static int MAX_MEMORY = 1000;
    //메모리의 최대 크기, 즉 메모리 크기가 1000 이상이 될 수 없슴
    int usedMemory = 0;//사용 중인 메모리 크기

    public void run() {
        while(true) {
            try {
                Thread.sleep(10 * 1000);//10초를 자고 일어나서 가비지 컬렉션을 수행한다.
            } catch(InterruptedException e) {
                System.out.println("Awaken by interrupt().");
                //gc.interrupt();에 의해서 스레드가 잠에서 깰 때 이 예외가 발생한다.
            }
            gc();//10초 동안 자고 일어나서 garbage collection을 수행한다. 
            System.out.println("Garbage Collected. Free Memory :" + freeMemory());
            //가비지 컬렉션을 수행한 후의 메모리 크기를 출력한다.
        }
    }

    public void gc() {//가비지 컬렉션을 수행하는 메소드
        usedMemory -= 300;//사용중인 메모리 크기를 줄인다.
        if(usedMemory < 0) usedMemory = 0;//사용중인 메모리 값이 마이너스가 되지 않도록한다.
    }

    public int totalMemory() {//전체 메모리 크기를 알려주는 메소드
        return MAX_MEMORY;
    }

    public int freeMemory() {//사용가능한 메모리 크기를 알려주는 메소드
        return MAX_MEMORY - usedMemory;
    }
}

결과

  • 위의 스레드 프로그램은 메모리의 최대 크기가 1000으로 설정되었기 때문에, 어떠한 경우에도 출력된 메모리의 크기가 1000을 넘을 수 없다.
  • 그러나, 반복해서 실행하다 보면 사용 중인 메모리의 크기가 1000을 넘어가는 현상을 발견할 수 있다.

이유

  • main 쓰레드가 interrupt() 호출 후에 자신의 작업을 진행하기 때문
  • 해결책 : join() 메서드로 main 쓰레드는 잠시 대기
    // 깨우고 나서 일정한 시간을 기다려서 
    // 가비지 컬렉션 작업을 할 수 있도록 작업 시간을 보장해 준다. 
    gc.interrupt();//잠자고 있는 데몬 스레드를 깨운다. 
    try {
       gc.join(100);//가비지 컬렉션이 0.1초 동안 일을 할 동안 기다린다.
    } catch (InterruptedException e) { }
    

9. 쓰레드의 동기화

멀티 쓰레드 프로세스의 경우 여러 쓰레드가 같은 프로세스 내의 자원을 공유하게 되므로 쓰레드의 동기화 개념이 필요하게 되었다.

(참고: 싱글 쓰레드 프로세스의 경우 프로세스 내에서 단 하나의 스레드만 작업하기 때문에 프로세스의 자원을 가지고 작업하는데 별 문제가 없음)

임계영역(Critical Section)과 잠금(락, lock)

  • 한 쓰레드가 특정 작업을 마치기 전까지 다른 쓰레드에 의해 방해받지 않도록 함
  • 공유 데이터를 사용하는 코드 영역을 임계영역으로 지정해놓고, 공유 데이터(객체)가 가지고 있는 lock을 획득한 단 하나의 쓰레드만 이 영역 내의 코드를 수행할 수 있도록 한다.

쓰레드의 동기화(synchronization)

  • 한 쓰레드가 진행 중인 작업을 다른 쓰레드가 간섭하지 못하도록 막는 것
  • 자바에서는 synchronized 블럭을 이용해서 동기화를 지원
  • JDK1.5부터는 java.util.concurrent.locks와 java.util.concurrent.atomic 패키지를 통해서 다양한 방식으로 동기화를 구현할 수 있도록 지원함

9.1 synchronized를 이용한 동기화

synchronized를 이용한 동기화 방법

  • 가능하면 메서드에 synchronized를 사용하는 메서드 단위의 동기화를 권장
// 1. 특정한 객체에 lock을 걸고자 할 때
synchronized(객체의 참조변수){
    // ...
}

// 2. 메서드에 lock을 걸고자할 때
public synchronized void calcSum(){
    // ...
}

class example {
    public static void main(String[] args) {
        MyThread r = new MyThread();
        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);

        t1.start();
        t2.start();
    }
}

class Account{
    //**balance에 synchronized 를 위한 private 멤버 변수로 정의**
    private int balance = 1000;
    public int getBalance() {
      return balance;
    }

    // **동기화 적용 (메서드 단위의 동기화를 추천한다.)**
    public synchronized void withDraw(int money){
        /* 객체에 lock을 걸 경우
        synchronized (this) {
            여기에 소스 코드를 집어 넣어도 된다.
        }
        */
        if(balance >= money){
            try{
                Thread.sleep(1000);
            }catch (Exception e) {}
            balance -= money;
        }
    }
}

class MyThread implements Runnable{
    Account acc = new Account();

    @Override
    public void run() {
        while(acc.getBalance > 0){
            // 100, 200, 300 중의 한 값을 임의로 선택해서 출금(withDraw)
            int money = (int)(Math.random() * 3 + 1) * 100;
            acc.withDraw(money);
            System.out.println("balance:" + acc.getBalance);
        }
    }
}

9.2 wait()과 notify()

쓰레드를 동기화할 때 동기화의 효율을 높이기 위해 wait()와 notify()를 함께 사용할 수 있다.

한 쓰레드가 객체에 lock을 걸고 어떤 조건이 만족될 때까지 기다려야 하는 경우, 이 쓰레드를 그대로 놔두면 이 객체를 사용하려는 다른 쓰레드들은 lock이 풀릴 때까지 같이 기다려야 하는 상황이 발생한다.

이러한 비효율을 개선하기 위해서 wait()와 notify()를 사용한다.

 void wait(); // 쓰레드가 락을 반납하고 대기실(waiting pool)에서 통지를 기다린다.
 void wait(long timeout);
 void wait(long timeout, int nanos);
 void notify(); // 작업을 중단했던 쓰레드가 다시 락을 얻어서 작업을 진행하게 된다.
 void notifyAll();
 // wait()에 의해 lock을 반납했다가,
 // 다시 lock을 얻어서 임계 영역에 들어오는 것을 **재진입(reentrance)**라고 한다.

wait(), notify(), notifyAll()

  • Object로 정의되어 있다.
  • 동기화 블록(synchronized 블록) 내에서만 사용할 수 있다.
  • 보다 효율적인 동기화를 가능하게 한다.

notify()가 호출되면, 해당 객체의 대기실에 있던 모든 쓰레드 중에서 임의의 쓰레드만 통지를 받는다. notifyAll()은 기다리고 있는 모든 쓰레드에게 통보를 하지만, 그래도 lock을 얻지 못하면 다시 lock을 기다리게 된다.

기아 상태와 경쟁 상태

지독히 운이 없는 Thread는 계속 통지를 받지 못하고 오랫동안 기다리게 되는데 이것을 기아(starvation) 현상이라고 한다.

이 현상을 막으려면 notify() 대신 notifyAll()을 사용해야 한다. 일단 모든 쓰레드에게 통지를 하면, 쓰레드가 waiting pool에 들어가더라도 쓰레드는 결국 lock을 얻어서 작업을 진행할 수 있기 때문이다.

nofifyAll()로 쓰레드의 기아현상을 막았지만 여러 쓰레드들이 불가피하게 lock을 얻기 위해 경쟁을 하게 된다.

이처럼 여러 쓰레드가 lock을 얻기위해 서로 경쟁하는 것을 경쟁상태('race condition')이라고 한다.

아래 Lock과 condition을 이용하면 wait()와 notify()로는 불가능한 선별적인 통지가 가능해진다.

9.3 Lock과 Condition을 이용한 동기화

동기화할 수 있는 방법은 synchronized 블럭 외에도 'java.util.concurrent.locks' 패키지가 제공하는 lock 클래스들을 이용하는 방법이 있다.


ReentrantLock
ReentrantReadWriteLock
StampedLock

ReentrantLock은 가장 일반적인 lock이다. 'reentrant(재진입할 수 있는)'이라는 단어가 앞에 붙은 이유는 우리가 앞서 wait() & notify()에서 배운 것처럼, 특정 조건에서 lock을 풀고 나중에 다시 lock을 얻고 임계영역으로 들어와서 이후의 작업을 수행할 수 있기 때문이다.

StampedLock은 lock을 걸거나 해지할 때 '스탭프(long타입의 정수값)'를 사용하며, 읽기와 쓰기를 위한 lock외에 낙관적 읽기 lock이 추가된 것이다. 읽기 lock이 걸려있으면, 쓰기 lock을 얻기 위해서는 읽기 lock이 푸릴ㄹ 때까지 기다려야 하는데 비해 '낙관적 읽기 lock'은 쓰기 lock에 의해 바로 풀린다. 그래서 낙관적 ㅇ릭기에 실패하면 읽기 lock을 얻어서 다시 읽어와야 한다. 무조건 읽기 lock을 걸지 않고, 쓰기와 읽기가 충돌할 때만 쓰기가 끝난 후 읽기 lock을 거는 것이다.


int getBalance() {
    long stamp = lock.tryOptimisticRead();

    int curBalance = this.balance;

    if(lock.validate(stamp)) {
        stamp = lock.readLock();

        try {
            curBalance = this.balance;
        } finally {
            lock.unlockRead(stamp);
        }
    }
    return curBalance;
}

아직 lock 클래스들에 대해 자세히 배우지 않았지만, 낙관적 읽기 lock을 어떻게 사용하는 지 감을 잡을 수 있을 것이다.

ReentrantLock의 생성자 ReentrantLock은 다음과 같이 두 개의 생성자를 가지고 있다.


ReentrantLock()
ReentrantLock(boolean fair)

생성자의 매개변수를 true로 주면, lock이 풀렸을 때 가장 오래 기다린 쓰레드가 lock을 획득할 수 있게, 즉 공정하게 처리한다.


void lock(); //lock을 잠근다
void unlock(); //lock을 해지한다
boolean isLocked(); //lock이 잠겼는지 확인한다

자동적으로 lock의 잠금과 해제가 관리되는 synchronized블럭과 달리, ReentrantLock과 같은 lock클래스들은 수종으로 lock을 잠금고 해제해야 한다. 그대로 lock을 잠금고 푸는 것은 간단하다. 그저 메서드를 호출하기만 하면 될 뿐이다. lock을 걸고 나서 푸는 것을 잊어버리는 실수를 하지 않도록 주의를 기울여야 한다.

synchronized(lock) {
    // 임계영역
}
lock.lock();
// 임계영역 , 이렇게 하면 중간에 예외가 발생하면 lock이 안풀린다.
lock.unlock();

lock.lock();
try{
    // 임계영역, 이렇게 하면 임계영역에서 예외나 에러가 발생해도 finally에서 unlock 시켜서 안전
} finally {
    lock.unlock();
}

이외에도 tryLock()ㅇ;라는 메서드가 있는데, 이 메서드는 lock()과는 달리, 다른 쓰레드에 의해 lock이 걸려있으면 lock을 얻으려고 기다리지 않는다. 또는 지정된 시간만큼만 기다린다. lock을 얻으면 true를 반환하고 얻지 못하면 false를 반환한다.


boolean tryLock();
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;

lock()은 lock을 얻을 때까지 쓰레드를 블락시키므로 쓰레드의 응답성이 나빠질 수 있다. 응답성이 중요한 경우 tryLock()을 이용해서 지정된 시간동안 lock을 얻지 못하면 다시 작업을 시도할 것인지 포기할 것인지를 사용자가 결정할 수 있게 하는 것이 좋다.

ReentrantLock과 Condition

앞서 wait() & notify() 예제에 요리사 쓰레드와 손님 쓰레드로 구분해서 통지하지 못한다는 단점이 있다는 것을 기억할 것이다. Condition은 이 문제를 해결하기 위한 것이다.

wait() & notify()로 쓰레드의 종류를 구분하지 않고, 공유 객체의 waiting pool 에 같이 몰아넣는 대신, 손님 쓰레드를 위한 Condition과 요리사 쓰레드를 위한 Condition을 만들어서 각각의 waiting pool에서 따로 기다리도록 하면 문제는 해결된다. 말로 설명하는 것보다 직접 코드를 봅시다~


 private ReentrantKLock lock = new ReentrantLock(); // lock을 생성
 private Condition forCook = lock.newCondition();
 private Condition forCust = lock.newCondition();

 public void add(String dish){

     lock.lock();

     try{
         while(dishes.size() >= MAX_FOOD) {
             String name = Thread.currentThread().getName();
             System.out.println(name + " is waiting.");
             try{
                 forCook.await();
                 Thread.sleep(500);
             }catch(InterruptedException e) {}
         }
     } finally{
         lock.unlock();
     }

 }

 import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.Condition;

 class Customer implements Runnable {
     private Table table;
     private String food;

     Customer (Table table, String food){
         this.table = table;
         this.food = food;
     }

     public void run() {
         while(true) {
             try { Thread.sleep(100); } catch (InterruptedException e) {}
             String name = Thread.currentThread().getName();

             table.remove(food);
             System.out.println(name + " ate a " + food);


         }
     }

9.4 volatile

싱글 코어 프로세서가 장착된 컴퓨터에서는 예제 13-16이 아무런 문제없이 실해오딜 것이다. 그러나 요즘에 대부분 멀티 코어 프로세서가 장착된 컴퓨터를 사용하기 때문에 이 예제에서 문제가 발생한 가능성이 있다.

코에는 메모리에서 읽어온 값을 캐시에 저장하고 캐시에서 값을 읽어서 작업한다. 다시 같은 값을 읽어올 때는 먼터 캐시에 있는지 확인하고 없을 때만 메모리에서 읽어온다.

그러다보니 도중에 메모리에 저장된 변수의 값이 변경되었는데도 캐시에 저장된 값이 갱신되지 않아서 메모리에 저장된 갑싱 다른 경우가 발생한다. 그래서 변수 stopped의 값이 바뀌었는데도 쓰레드가 멈추지 않고 계속 실행되는 것이다.


boolean suspended = false;
boolean stopped = false;

volatile boolean suspended = false;
volatile boolean stopped = false;

변수에 volatile을 붙이는 대신에 synchronized 블럭을 사용해도 같은 효과를 얻을 수 있다. 쓰레드가 synchronized블럭으로 들어갈 때와 나올 때 캐시와 메모리간의 동기화가 이루어지기 때문에 값의 불일치가 해소되기 때문이다.

아.. Thread-safe 하다는게 이거구나.. ㅋㅋㅋㅋ

volatile로 long과 double을 원자화합니다. long과 double은 한 개 이상의 4바이트(=32bit)단위로 처리하기 때문에 int 와 int보다 작은 타입은 한번에 쓰는 게 가능합니다. 하지만 long, double은 여러 개의 4바이트 블록을 쓰기 때문에 여러 쓰레드가 하나의 변수의 블록을 접근할 수 있다.

그래서 long, double 변수에 volatile을 붙이면 Thread-safe 해집니다!!

9.5 fork & join 프레임웍

10년 전까지만 해도 CPU 속도는 매년 거의 2배씩 빠르게 향상되어왔다. 그러나 이제 그 한계에 도달하여 속도보다는 코어의 개수를 늘려서 CPU의 성능을 향상시키는 방향으로 발전해가고 있다.

이러한 하드웨어의 변화에 발맞춰 프로그래밍도 멀티 코어를 잘 활용할 수 있는 멀티쓰레드 프로그래밍이 점점 더 중요해지고 있다.지금까지 배워서 잘 알겠지만 멀티쓰레드 프로그래밍이 그리 쉽지는 않다.

그래서 JDK1.7부터 fork & join 프레임웍이 추가되었고, 이 프레임웍은 하나의 작업을 작은 단위로 나눠서 여러 쓰레드가 동시에 처리하는 것을 쉽게 만들어준다.

먼저 수행할 작업에 따라 RecursiveAction과 RecursiveTask, 두 클래스 중에 하나를 상속받아 구현해야 한다.


RecursiveAction // 반환값이 없는 작업을 구현할 때 사용
RecursiveTask // 반환값이 있는 작업을 구현할 때 사용

두 클래스 모두 compute() 라는 추상 메서드를 가지고 있는데, 우리는 상속을 통해 이 추상 메서드를 구현하기만 하면 되낟.


public abstract class RecursiveAction extends ForkJoinAction<Void>()
{
    protected abstract void compute();
}
public abstract class RecursiveAction extends ForkJoinTask<V>()
{
    protected abstract V compute();
}

에를들어 1부터 n까지의 합을 계산해서 결과를 돌려주는 작업을 구현하려면 다음과 같이 한다.

class SumTask extends RecursiveTask<Long>{
    long form;
    long to;

    SumTask(long from, long to){
        this.from = from;
        this.to = to;
    }

    public Long compute(){
    }
}

ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(from, to);

Long result = pool.invoke(task);

compute의 구현 compute()를 구현하 ㄹ때는 수행할 작업 외에도 작업을 어떻게 나눌 것인가에 대해서도 알려줘야 한다.


public Long compute(){

    long size = to - from;
    if (size <= 5)
        return sum();

    long half = (from+to)/2;

    SumTask leftSum = new SumTask(from, half);
    SumTask rightSum = new SumTask(half+1, to);

    leftSum.fork();

    return rightSum.compute() + leftSum.join();
}

이렇게 작업을 나눠서 할당하는 거는 Divide & Conquare 같다.

1~8 을 나누면 1~4 5~8로 나뉘고 다시 1~4는 1~2 3~4로 나뉜다.

다른 쓰레드의 작업을 훔쳐오기 (work stealing) 작업 훔쳐오기는 모두 쓰레드풀에 의해 자동적으로 이루어진다.

큐가 비어있는 쓰레드는 일이 있는 쓰레드의 일을 자동적으로 가져옵니다.

fork와 join은 더이상 작업을 나눌 수 없을 때까지 나눈다. 나뉘진 작업은 각 쓰레드가 골고루 나눠서 처리하고 작업의 결과는 oin을 호출해서 얻을 수 있다.

fork() // 해당 작업을 쓰레드 풀의 작업 큐에 넣는다. 동기 메서드
join() // 해당 작업의 수행이 끝날 때까지 기다렸다가 수행이 끝나면 그 결과를 반환한다.
import java.util.concurrent.*;

class ForJoinEx1 {
    static final ForkJoinPool pool = new ForkJoinPool();

    public static void main(String[] args) {
        long from = 1L;
        long to = 100_000_000L;

        SumTask task = new SumTask(from, to);

        long start = System.currentTimeMillis();
        Long result = pool.invoke(task);

        System.out.println("Elapsed time(4 Core):" + (System.currentTimeMillis() - start));

        System.out.printf("sum of %d~%d = %d/n", from, to, result);
        System.out.println();

        result = 0L;

        start = System.currentTimeMillis();
        for(long i = from ;  i<= to;i++){
            result += i;
        }
        System.out.println("Elapsed time(1 Core):" + (System.currentTimeMillis() - start));
        System.out.printf("sum of %d~%d=%d%n", from, to, result);

    }// main의 끝
}

class SumTask extends RecursiveTask<Long>{
    public long form;
    public long to;

    SumTask(long from, long to){
        this.from = from;
        this.to = to;
    }

    public Long compute(){

        long size = this.to - this.from;
        if (size <= 5)
            return sum();

        long half = (this.from+this.to)/2;

        SumTask leftSum = new SumTask(this.from, half);
        SumTask rightSum = new SumTask(half+1, this.to);

        leftSum.fork();

        return rightSum.compute() + leftSum.join();
    }

    long sum() {
        long tmp = 0L;
        for(long i = this.from; i<= to ; i++)
            tmp += i;
        return tmp;
    }
}