tutorialoverview
本tutorial将详细介绍Java many threadprogramming core concepts and 实践techniques, includingthreadcreation, thread生命周期, threadsynchronization, threadsecurity, concurrentcollection and thread池etc. in 容. through本tutorial Learning, you willable towriting出 high 效, security concurrent程序, 充分利用 many 核processing器 performance优势.
threadcreation
in Javain, has 两种主要方式creationthread:
1. inheritanceThreadclass
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("threadrunin...");
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start(); // 启动thread
}
}
2. implementationRunnableinterface
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("threadrunin...");
}
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
Thread thread = new Thread(runnable);
thread.start(); // 启动thread
}
}
3. usingCallable and Future
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("threadrunin...");
return 42;
}
public static void main(String[] args) {
MyCallable callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();
try {
Integer result = futureTask.get();
System.out.println("thread返回结果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
thread生命周期
Javathread具 has 以 under 生命周期status:
- new 建(New): threadobject被creation但尚未启动
- 就绪(Runnable): thread已经启动, etc.待CPU时间片
- run(Running): thread正 in 执行run()method
- 阻塞(Blocked): thread被暂停, etc.待某个条件满足
- 终止(Terminated): thread执行完毕 or 被in断
threadsynchronization
threadsynchronization is for 了避免 many threadconcurrent访问共享resource时产生 data不一致issues. Javaproviding了 many 种synchronizationmechanism:
1. synchronized关键字
public class SynchronizedExample {
private int count = 0;
// synchronizationmethod
public synchronized void increment() {
count++;
}
// synchronizationcode块
public void incrementWithBlock() {
synchronized (this) {
count++;
}
}
public static void main(String[] args) {
SynchronizedExample example = new SynchronizedExample();
// creation many 个thread
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
example.increment();
}
}).start();
}
// etc.待所 has thread执行完毕
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终计数: " + example.count);
}
}
2. ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
// creation many 个thread
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
example.increment();
}
}).start();
}
// etc.待所 has thread执行完毕
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终计数: " + example.count);
}
}
threadsecurity
threadsecurity is 指 many threadenvironment under code 正确执行capacity. 以 under is implementationthreadsecurity 几种method:
1. 不可变object
不可变object天然 is threadsecurity , 因 for 它们 status in creation after 不能被modify.
2. thread局部variable
public class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 设置thread局部variable
threadLocal.set(threadId);
System.out.println("thread " + threadId + " 局部variable值: " + threadLocal.get());
// mock工作
Thread.sleep(100);
System.out.println("thread " + threadId + " 局部variable值(工作 after ): " + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// cleanthread局部variable
threadLocal.remove();
}
}).start();
}
}
}
3. 原子class
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static final AtomicInteger count = new AtomicInteger(0);
public static void main(String[] args) {
// creation many 个thread
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
count.incrementAndGet();
}
}).start();
}
// etc.待所 has thread执行完毕
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终计数: " + count.get());
}
}
concurrentcollection
Javaproviding了专门 concurrentcollectionclass, 用于 in many threadenvironment under security地operationcollection:
1. ConcurrentHashMap
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// creation many 个thread同时operationmap
for (int i = 0; i < 5; i++) {
final int threadId = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put("key" + threadId + "-" + j, j);
}
System.out.println("thread " + threadId + " completionoperation, map big small : " + map.size());
}).start();
}
// etc.待所 has thread执行完毕
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("最终map big small : " + map.size());
}
}
2. CopyOnWriteArrayList
import java.util.concurrent.CopyOnWriteArrayList;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加初始元素
list.add("元素1");
list.add("元素2");
// creationthreadfor读取
new Thread(() -> {
for (String element : list) {
System.out.println("读取元素: " + element);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// creationthreadformodify
new Thread(() -> {
try {
Thread.sleep(50);
list.add("元素3");
System.out.println("添加元素3");
Thread.sleep(100);
list.add("元素4");
System.out.println("添加元素4");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
thread池
thread池可以重用thread, reducingthreadcreation and 销毁 开销, improving程序performance:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// creation固定 big small thread池
ExecutorService executor = Executors.newFixedThreadPool(3);
// submittingtask to thread池
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("task " + taskId + " 由thread " + Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭thread池
executor.shutdown();
try {
// etc.待所 has taskcompletion
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("所 has task执行完毕");
}
}
// usingThreadPoolExecutorcreation自定义thread池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPoolExample {
public static void main(String[] args) {
// creation自定义thread池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // corethread数
5, // 最 big thread数
60, // 空闲thread存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 工作queue
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// submittingtask
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("task " + taskId + " 由thread " + Thread.currentThread().getName() + " 执行");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 关闭thread池
executor.shutdown();
}
}
实践case
many thread under 载器
本caseimplementation一个 simple many thread under 载器, using many 个thread同时 under 载file 不同部分, improving under 载速度.
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.CountDownLatch;
public class MultiThreadDownloader {
private static final int THREAD_COUNT = 4;
private static final String FILE_URL = "https://example.com/large-file.zip";
private static final String SAVE_PATH = "downloaded-file.zip";
public static void main(String[] args) {
try {
URL url = new URL(FILE_URL);
URLConnection connection = url.openConnection();
int fileSize = connection.getContentLength();
System.out.println("file big small : " + fileSize + " 字节");
// 计算每个thread under 载 file部分 big small
int partSize = fileSize / THREAD_COUNT;
// creation临时filearray
File[] tempFiles = new File[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
tempFiles[i] = new File("temp" + i + ".part");
}
// creationCountDownLatchetc.待所 has threadcompletion
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
// 启动 many 个 under 载thread
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
final long startPos = i * (long) partSize;
final long endPos = (i == THREAD_COUNT - 1) ? fileSize - 1 : (i + 1) * (long) partSize - 1;
new Thread(() -> {
try {
downloadPart(url, startPos, endPos, tempFiles[threadId]);
System.out.println("thread " + threadId + " under 载completion");
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}).start();
}
// etc.待所 has threadcompletion
latch.await();
System.out.println("所 has thread under 载completion, 开始mergefile");
// merge临时file
mergeFiles(tempFiles, new File(SAVE_PATH));
System.out.println("filemergecompletion, 保存 to : " + SAVE_PATH);
// delete临时file
for (File tempFile : tempFiles) {
tempFile.delete();
}
System.out.println("临时file已delete");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void downloadPart(URL url, long startPos, long endPos, File tempFile) throws Exception {
URLConnection connection = url.openConnection();
connection.setRequestProperty("Range", "bytes=" + startPos + "-" + endPos);
try (InputStream in = connection.getInputStream();
RandomAccessFile out = new RandomAccessFile(tempFile, "rw")) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
}
}
}
private static void mergeFiles(File[] tempFiles, File outputFile) throws Exception {
try (RandomAccessFile out = new RandomAccessFile(outputFile, "rw")) {
for (File tempFile : tempFiles) {
try (RandomAccessFile in = new RandomAccessFile(tempFile, "r")) {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
}
}
}
}
}
}
互动练习
练习1: threadcreation and 启动
creation一个Java程序, using两种不同 方式creation并启动thread, 每个thread输出自己 名称 and 一些information.
练习2: threadsynchronization
creation一个计数器class, usingsynchronized关键字确保 many threadenvironment under threadsecurity, 然 after creation many 个thread同时增加计数器 值.
练习3: thread池using
usingExecutorscreation一个thread池, submitting many 个task to thread池执行, 然 after 正确关闭thread池.
练习4: concurrentcollection
usingConcurrentHashMapstore键值 for , creation many 个thread同时读写该collection, verification其threadsecurity性.
推荐tutorial
LearningJavalanguage basic语法 and core concepts
MasterJava 面向objectprogramming思想 and 实践
LearningJavacollectionframework using and 原理
MasterJava fileoperation and IO流processing
LearningJavaexceptionprocessingmechanism and best practices
MasterJava泛型programming core concepts and application
LearningJava 8 Lambda表达式 and function式programming
MasterJava 8 Stream API using and 原理
LearningJavaadvancedcollectionconcepts and 实践techniques
浏览所 has Javatutorial in 容