多线程

前言:目前大部分操作系统都是以线程为CPU调度和分派基本单位。多线程在日常程序中运用的还是比较多的,潜在的我们web容器帮我们在http层面可以同时处理多个请求,这些可能多我们是无感的。平常的开发中,我们需要结合业务场景来合理运用多线程,例如大文件的IO操作,大量消息发送,都可以运用多线程来处理,充分利用多核CPU性能。

一、创建线程的方式

继承Thread类创建线程
实现Runnable接口创建线程
使用Callable和Future创建线程
使用线程池
a.继承Thread类,重写run()

public class Test extends Thread {
@Override
public void run() {
System.out.println(“hello,word!”);
}
public static void main(String[] args) {
Test test = new Test();
test.start();
}
}

b.实现Runnable接口创建线程

public class Test implements Runnable {
@Override
public void run() {
System.out.println(“hello,word!”);
}
public static void main(String[] args) {
Thread thread = new Thread(new Test());
thread.start();
}
}
//Java 8 lambda写法
public class Test {
public static void main(String[] args) {
Thread thread = new Thread(
() -> System.out.println(“hello,word!”));
thread.start();
}
}
c.使用Callable和Future创建线程

public class Test implements Callable {
@Override
public String call() throws Exception {
return “hello,word!”;
}

public static void main(String[] args) {
    Callable<String> test = new Test();
    try {
        FutureTask<String> futureTask = new FutureTask<>(test);
        Thread thread = new Thread(futureTask);
        thread.run();
        System.out.println(futureTask.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}
//java8 lambda匿名类写法
public class Test {
public static void main(String[] args) {
FutureTask futureTask=new FutureTask<>(()->”hello,word!”);
Thread thread = new Thread(futureTask);
thread.start();
try {
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
d.使用线程池

public class Test {
public static void main(String[] args) {
FutureTask futureTask=new FutureTask<>(()->”hello,word!”);
ExecutorService service=Executors.newFixedThreadPool(2);
service.submit(futureTask);
try {
System.out.println( futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

## 二、Callable
Callable和Runnbale一样代表着任务,区别在于Callable有返回值并且可以抛出异常!Callable是一种可以返回结果的任务,这是它与Runnable的区别,但是通过适配器模式可以使Runnable与Callable类似。Future代表了一个异步的计算,可以从中得到计算结果、查看计算状态,其实现FutureTask可以被提交给Executor执行,多个线程可以从中得到计算结果。Callable和Future是配合使用的,当从Future中get结果时,如果结果还没被计算出来,那么线程将会被挂起,FutureTak内部使用一个单链表维持等待的线程;当计算结果出来后,将会对等待线程解除挂起,等待线程就都可以得到计算结果了。

# 三、Future接口、FutureTask类

Future是一个接口,代表了一个异步计算的结果。接口中的方法用来检查计算是否完成、等待完成和得到计算的结果。当计算完成后,只能通过get()方法得到结果,get方法会阻塞直到结果准备好了。如果想取消,那么调用cancel()方法。其他方法用于确定任务是正常完成还是取消了。一旦计算完成了,那么这个计算就不能被取消。

FutureTask类实现了RunnableFuture接口,而RunnnableFuture接口继承了Runnable和Future接口,所以说FutureTask是一个提供异步计算的结果的任务。 FutureTask可以用来包装Callable或者Runnbale对象。因为FutureTask实现了Runnable接口,所以FutureTask也可以被提交给Executor。

//FutureTask 2构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}


# 四、线程池工厂

>Executors面对开发人员的创建线程池的工厂类,它和线程执行器ExecutorService关系很紧密。

可以看到,提供了很多静态的方法,我们主要关注以下方法:

* newFixedThreadPool(int nThreads) 固定线程数量的线程池
* newScheduledThreadPool(int corePoolSize) 拥有至少corePoolSize数量的线程池,可以延迟或者固定频率执行
* newCachedThreadPool() 线程不固定线程池,常用于多且小的异步任务使用
* newSingleThreadExecutor() 串行按照提交顺序执行任务,的单线程化线程池
* newWorkStealingPool() 任务窃取线程池,JDK1.8以后加的,充分利用现有的cpu资源创建线程池,一个线程维护一个任务队列,常用在任务执行时间差别较大的业务

```java
public class ExecutorsDemo {

private static ExecutorService service = Executors.newCachedThreadPool();
private static ExecutorService service2 = Executors.newFixedThreadPool(3);
private static ExecutorService service3 = Executors.newScheduledThreadPool(3);
private static ExecutorService service4 = Executors.newSingleThreadExecutor();
private static ExecutorService service5 = Executors.newWorkStealingPool();

public static void main(String[] args) throws Exception {
addTask("newCachedThreadPool", service);
addTask("newFixedThreadPool", service2);
addTask("newScheduledThreadPool", service3);
addTask("newSingleThreadExecutor", service4);
addTask("newWorkStealingPool", service5);
}

private static void addTask(String type, ExecutorService service) throws Exception {
for (int i = 0; i < 10; i++) {
service.execute(() -> System.out.println(type + "," + Thread.currentThread().getName()));
}
service.shutdown();
service.awaitTermination(5, TimeUnit.MILLISECONDS);//等待任务完成再关闭主main线程
}
}

注意:虽然service.shutdown()是关闭线程,此时已经提交的线程会被执行完毕,但是打印需要时间,所以再最后加了一句 service.awaitTermination(5, TimeUnit.MILLISECONDS),目的是为了看到打印消息。

    • ScheduledExecutorService

      ScheduledExecutorService接口里面的4个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public interface ScheduledExecutorService extends ExecutorService {

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit);
}
```

# 五、线程基础

* stop方法(Deprecated)

stop()会放弃持有的线程的锁,假设在为多个变量赋值的时候,刚好赋值到中间,执行了stop方法,很容易导致数据的不一致,此方法已经被废弃,尽量不要用,下面演示个demo

``` java
public class ThreadTest {
static Integer i, j;

static Runnable r1 = () -> {
while (true) {
i = new Random().nextInt();
j = i;
}
};

static Runnable r2 = () -> {
while (true) {
if (i != j) {
System.out.println("i:" + i + ",j:" + j);
} else {
System.out.println("00000");
}
}
};

public static void main(String[] args) {
Thread read = new Thread(r2);
read.start();
while (true) {
Thread write = new Thread(r1);
write.start();
try {
Thread.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
}
write.stop();//Deprecated
}
}
}
  • 打印结果

    00000
    00000
    i:2071039910,j:1508352969
    i:-1148719051,j:-743674023
    00000
    00000

  • 线程中断来替代stop

    上面代码读取的不一致,核心原因是一以为线程突然停止,那么有没有一种优雅的停止呢,JDK为我们提供了更优雅的方式,中断。我们将上述代码稍做改变,数据的一致性立马可以得到保证。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
     public class ThreadTest {
    static Integer i, j;

    static Runnable r1 = () -> {
    while (true) {
    if(Thread.currentThread().isInterrupted()) {//判断是否有中断标志
    i = new Random().nextInt();
    j = i;
    break;
    }
    }
    };

    static Runnable r2 = () -> {
    while (true) {
    if (i != j) {
    System.out.println("i:" + i + ",j:" + j);
    } else {
    System.out.println("00000");
    }
    }
    };

    public static void main(String[] args) {
    Thread read = new Thread(r2);
    read.start();
    while (true) {
    Thread write = new Thread(r1);
    write.start();
    try {
    Thread.sleep(25);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    write.interrupt();// 设置中断标志
    }
    }
    }
  • 结果
    00000
    00000
    ···

注意点:若是被设置了中断的线程使用了sleep方法,则运行时可能会抛出中断异常且中断标志会被清空,这里贴出异常,代码就不写出来了。

1
2
3
4
5
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.andyfan.thread.ThreadTest.lambda$static$0(ThreadTest.java:20)
at com.andyfan.thread.ThreadTest$$Lambda$1/558638686.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
  • 等待、唤醒 wait/notify

wait和notify时Object里面定义的方法。

  • 工作原理:obj.wait(),线程进入等待队列,obj.notify() 则从等待队列里面随机选择一个线程继续执行等待前代码,意味着唤醒是非公平的。
  • wait和notify的调用必须在synchronized 代码块里,这也意味着必须要获得对象的监视器(锁)。
  • wait方法被调用后会主动释放监视器(锁),这个是sleep方法重要区分点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ThreadTest2 {
final static Object object = new Object();

static Runnable r1 = () -> {
synchronized (object) {
long start = 0l;
try {
System.out.println("wait方法被执行了");
start = System.currentTimeMillis();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
System.out.println("wait方法释放后被执行了,等待了notify方法释放锁" + (System.currentTimeMillis() - start)/1000 + "秒");
}
};

static Runnable r2 = () -> {
synchronized (object) {
object.notify();
System.out.println("notify方法被执行了");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("notify休眠1秒后");
}
};

public static void main(String[] args) {
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
  • 打印结果
    wait方法被执行了
    notify方法被执行了
    notify休眠1秒后
    wait方法释放后被执行了,等待了notify方法释放锁1秒

  • suspend 和resume (Deprecated)

    suspend线程会被挂起,它是不释放监视器(锁)的,必须等到resume 才会释放锁。这2个方法也被废弃了,因为若resume在suspend前执行,会导致线程永远别挂起,自己和其他线程由于获取不到监视器根本没办法正常工作,下面演示个被一直被挂起例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ThreadTest3 {
final static Object object = new Object();

static Runnable r = () -> {
synchronized (object) {
String name = Thread.currentThread().getName();
System.out.println(name);
Thread.currentThread().suspend();//Deprecated
System.out.println(name + "被刮挂起后执行。。。");
}
};

public static void main(String[] args) {
Thread t1 = new Thread(r);
Thread t2 = new Thread(r);
t1.start();
t2.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
t1.resume();//Deprecated
t2.resume();//在t2的suspend前执行,导致t2一直被挂起!
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
  • 用jstack -l pid 命令 查看堆栈信息,可以看到线程t2一直处于Runnable状态
1
2
3
4
5
6
7
8
9
10
11
12
"Thread-1" #11 prio=5 os_prio=31 tid=0x00007fdbd703f800 nid=0x5903 runnable [0x000070000aa79000]
java.lang.Thread.State: RUNNABLE
at java.lang.Thread.suspend0(Native Method)
at java.lang.Thread.suspend(Thread.java:1029)
at com.andyfan.thread.ThreadTest3.lambda$static$0(ThreadTest3.java:14)
- locked <0x000000076abf9f58> (a java.lang.Object)
at com.andyfan.thread.ThreadTest3$$Lambda$1/558638686.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
```

* join和yield
* join 方法其实内部调用的是wait方法,JDK实现代码块:
   if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        
* yield(谦让) 个人觉得这个方法使用需要勇气,有点鸡肋,因为它执行了,表明是让出当前CPU时间片,让和它同等优先级的线程优先执行,但是它持有的锁又不释放,共有资源还会再接下来时间竞争,且时间还不固定。

```java
public class ThreadTest4 {
static int j = 0;

public static void main(String[] args) throws Exception {
Thread thread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
j = i;
}
});
thread.start();
thread.join();//main的输出,等待线程执行完再执行,不然输出可能是0或者很小的值
System.out.println(j);
}
}