Java many threadprogrammingtutorial

MasterJava many threadcore concepts, 提升程序performance and concurrentprocessingcapacity

tutorialoverview

本tutorial将详细介绍Java many threadprogramming core concepts and 实践techniques, includingthreadcreation, thread生命周期, threadsynchronization, threadsecurity, concurrentcollection and thread池etc. in 容. through本tutorial Learning, you willable towriting出 high 效, security concurrent程序, 充分利用 many 核processing器 performance优势.

threadcreation

in Javain, has 两种主要方式creationthread:

1. inheritanceThreadclass

public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("threadrunin...");
    }
    
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start(); // 启动thread
    }
}

2. implementationRunnableinterface

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("threadrunin...");
    }
    
    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();
        Thread thread = new Thread(runnable);
        thread.start(); // 启动thread
    }
}

3. usingCallable and Future

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class MyCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("threadrunin...");
        return 42;
    }
    
    public static void main(String[] args) {
        MyCallable callable = new MyCallable();
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        Thread thread = new Thread(futureTask);
        thread.start();
        
        try {
            Integer result = futureTask.get();
            System.out.println("thread返回结果: " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

thread生命周期

Javathread具 has 以 under 生命周期status:

  • new 建(New): threadobject被creation但尚未启动
  • 就绪(Runnable): thread已经启动, etc.待CPU时间片
  • run(Running): thread正 in 执行run()method
  • 阻塞(Blocked): thread被暂停, etc.待某个条件满足
  • 终止(Terminated): thread执行完毕 or 被in断

threadsynchronization

threadsynchronization is for 了避免 many threadconcurrent访问共享resource时产生 data不一致issues. Javaproviding了 many 种synchronizationmechanism:

1. synchronized关键字

public class SynchronizedExample {
    private int count = 0;
    
    // synchronizationmethod
    public synchronized void increment() {
        count++;
    }
    
    // synchronizationcode块
    public void incrementWithBlock() {
        synchronized (this) {
            count++;
        }
    }
    
    public static void main(String[] args) {
        SynchronizedExample example = new SynchronizedExample();
        
        // creation many 个thread
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    example.increment();
                }
            }).start();
        }
        
        // etc.待所 has thread执行完毕
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最终计数: " + example.count);
    }
}

2. ReentrantLock

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        
        // creation many 个thread
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    example.increment();
                }
            }).start();
        }
        
        // etc.待所 has thread执行完毕
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最终计数: " + example.count);
    }
}

threadsecurity

threadsecurity is 指 many threadenvironment under code 正确执行capacity. 以 under is implementationthreadsecurity 几种method:

1. 不可变object

不可变object天然 is threadsecurity , 因 for 它们 status in creation after 不能被modify.

2. thread局部variable

public class ThreadLocalExample {
    private static final ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
    
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 设置thread局部variable
                    threadLocal.set(threadId);
                    System.out.println("thread " + threadId + "  局部variable值: " + threadLocal.get());
                    
                    // mock工作
                    Thread.sleep(100);
                    
                    System.out.println("thread " + threadId + "  局部variable值(工作 after ): " + threadLocal.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // cleanthread局部variable
                    threadLocal.remove();
                }
            }).start();
        }
    }
}

3. 原子class

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
    private static final AtomicInteger count = new AtomicInteger(0);
    
    public static void main(String[] args) {
        // creation many 个thread
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    count.incrementAndGet();
                }
            }).start();
        }
        
        // etc.待所 has thread执行完毕
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最终计数: " + count.get());
    }
}

concurrentcollection

Javaproviding了专门 concurrentcollectionclass, 用于 in many threadenvironment under security地operationcollection:

1. ConcurrentHashMap

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // creation many 个thread同时operationmap
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    map.put("key" + threadId + "-" + j, j);
                }
                System.out.println("thread " + threadId + " completionoperation, map big  small : " + map.size());
            }).start();
        }
        
        // etc.待所 has thread执行完毕
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最终map big  small : " + map.size());
    }
}

2. CopyOnWriteArrayList

import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 添加初始元素
        list.add("元素1");
        list.add("元素2");
        
        // creationthreadfor读取
        new Thread(() -> {
            for (String element : list) {
                System.out.println("读取元素: " + element);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        // creationthreadformodify
        new Thread(() -> {
            try {
                Thread.sleep(50);
                list.add("元素3");
                System.out.println("添加元素3");
                
                Thread.sleep(100);
                list.add("元素4");
                System.out.println("添加元素4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

thread池

thread池可以重用thread, reducingthreadcreation and 销毁 开销, improving程序performance:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // creation固定 big  small  thread池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // submittingtask to thread池
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("task " + taskId + " 由thread " + Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        // 关闭thread池
        executor.shutdown();
        
        try {
            // etc.待所 has taskcompletion
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
        
        System.out.println("所 has task执行完毕");
    }
}

// usingThreadPoolExecutorcreation自定义thread池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        // creation自定义thread池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, // corethread数
            5, // 最 big thread数
            60, // 空闲thread存活时间
            TimeUnit.SECONDS, // 时间单位
            new ArrayBlockingQueue<>(10), // 工作queue
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // submittingtask
        for (int i = 0; i < 15; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("task " + taskId + " 由thread " + Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        // 关闭thread池
        executor.shutdown();
    }
}

实践case

many thread under 载器

本caseimplementation一个 simple many thread under 载器, using many 个thread同时 under 载file 不同部分, improving under 载速度.

import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.CountDownLatch;

public class MultiThreadDownloader {
    private static final int THREAD_COUNT = 4;
    private static final String FILE_URL = "https://example.com/large-file.zip";
    private static final String SAVE_PATH = "downloaded-file.zip";
    
    public static void main(String[] args) {
        try {
            URL url = new URL(FILE_URL);
            URLConnection connection = url.openConnection();
            int fileSize = connection.getContentLength();
            
            System.out.println("file big  small : " + fileSize + " 字节");
            
            // 计算每个thread under 载 file部分 big  small 
            int partSize = fileSize / THREAD_COUNT;
            
            // creation临时filearray
            File[] tempFiles = new File[THREAD_COUNT];
            for (int i = 0; i < THREAD_COUNT; i++) {
                tempFiles[i] = new File("temp" + i + ".part");
            }
            
            // creationCountDownLatchetc.待所 has threadcompletion
            CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
            
            // 启动 many 个 under 载thread
            for (int i = 0; i < THREAD_COUNT; i++) {
                final int threadId = i;
                final long startPos = i * (long) partSize;
                final long endPos = (i == THREAD_COUNT - 1) ? fileSize - 1 : (i + 1) * (long) partSize - 1;
                
                new Thread(() -> {
                    try {
                        downloadPart(url, startPos, endPos, tempFiles[threadId]);
                        System.out.println("thread " + threadId + "  under 载completion");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        latch.countDown();
                    }
                }).start();
            }
            
            // etc.待所 has threadcompletion
            latch.await();
            System.out.println("所 has thread under 载completion, 开始mergefile");
            
            // merge临时file
            mergeFiles(tempFiles, new File(SAVE_PATH));
            System.out.println("filemergecompletion, 保存 to : " + SAVE_PATH);
            
            // delete临时file
            for (File tempFile : tempFiles) {
                tempFile.delete();
            }
            System.out.println("临时file已delete");
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void downloadPart(URL url, long startPos, long endPos, File tempFile) throws Exception {
        URLConnection connection = url.openConnection();
        connection.setRequestProperty("Range", "bytes=" + startPos + "-" + endPos);
        
        try (InputStream in = connection.getInputStream();
             RandomAccessFile out = new RandomAccessFile(tempFile, "rw")) {
            
            byte[] buffer = new byte[4096];
            int bytesRead;
            while ((bytesRead = in.read(buffer)) != -1) {
                out.write(buffer, 0, bytesRead);
            }
        }
    }
    
    private static void mergeFiles(File[] tempFiles, File outputFile) throws Exception {
        try (RandomAccessFile out = new RandomAccessFile(outputFile, "rw")) {
            for (File tempFile : tempFiles) {
                try (RandomAccessFile in = new RandomAccessFile(tempFile, "r")) {
                    byte[] buffer = new byte[4096];
                    int bytesRead;
                    while ((bytesRead = in.read(buffer)) != -1) {
                        out.write(buffer, 0, bytesRead);
                    }
                }
            }
        }
    }
}

互动练习

练习1: threadcreation and 启动

creation一个Java程序, using两种不同 方式creation并启动thread, 每个thread输出自己 名称 and 一些information.

练习2: threadsynchronization

creation一个计数器class, usingsynchronized关键字确保 many threadenvironment under threadsecurity, 然 after creation many 个thread同时增加计数器 值.

练习3: thread池using

usingExecutorscreation一个thread池, submitting many 个task to thread池执行, 然 after 正确关闭thread池.

练习4: concurrentcollection

usingConcurrentHashMapstore键值 for , creation many 个thread同时读写该collection, verification其threadsecurity性.