欢迎访问宙启技术站
智能推送

Java中的线程和并发函数介绍和应用示例

发布时间:2023-06-04 13:06:40

Java语言是一门面向对象的语言,同时也是一个支持并发编程的语言。通过Java中的线程和并发函数,我们可以实现多任务同时进行,提高程序的执行效率和响应速度。本文将介绍Java中的线程和并发函数的概念、特点和应用示例。

一、线程的概念和特点

线程是操作系统进程中的一个可运行实体,它是CPU分配资源的最小单位。在Java中,线程是一种轻量级的进程,它可以完成异步操作、并发执行以及多任务处理等功能。

Java中的线程主要有以下特点:

1. 独立性:每个线程都是独立运行的,互相之间不会影响。

2. 状态转换:线程在程序运行中会不断地变换状态,如新建、执行、等待、阻塞、终止等等。

3. 优先级:线程可以设置不同的优先级,优先级高的线程会优先被CPU调度执行。

4. 同步机制:Java提供了多种同步机制,如synchronized、Lock、Semaphore等,用于控制线程的并发执行。

二、线程的创建和启动

Java中创建线程的方式主要有两种:

1. 继承Thread类,重写run()方法。

public class MyThread extends Thread {
    @Override
    public void run() {  
        //实现线程的具体执行逻辑  
    }  
}

//创建并启动线程
MyThread myThread = new MyThread();
myThread.start();

2. 实现Runnable接口,重写run()方法。

public class MyThread implements Runnable{
    @Override
    public void run() {  
        //实现线程的具体执行逻辑  
    }  
}

//创建并启动线程
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
thread.start();

三、线程的同步和互斥

线程同步和互斥是多线程编程中必不可少的概念,可以通过Java中提供的synchronized、Lock、Semaphore等关键字或类实现。

synchronized的同步机制:

public class MyThread implements Runnable{
    private int count = 0;
    @Override
    public void run() {  
        synchronized (this) { 
            count++;  
        } 
    }  
}

Lock的同步机制:

public class MyThread implements Runnable{
    private int count = 0;
    private Lock lock = new ReentrantLock();
    @Override
    public void run() {  
        try {
            lock.lock();  
            count++;  
        } finally {
            lock.unlock();  
        } 
    }  
}

Semaphore的同步机制:

public class MyThread implements Runnable{
    private int count = 0;
    private Semaphore semaphore = new Semaphore(1);
    @Override
    public void run() {  
        try {
            semaphore.acquire();  
            count++;  
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();  
        } 
    }  
}

四、并发函数

Java中提供了多种并发函数,用于处理多线程并发执行过程中的一些常见操作。

1. CountDownLatch:等待其他线程执行完毕。

public class MyThread implements Runnable{
    private CountDownLatch countDownLatch;
    @Override
    public void run() {  
        //执行线程操作
        countDownLatch.countDown(); //线程执行完毕后进行计数,表示该线程已经完成执行
    }  
}

//主线程等待所有子线程执行完毕
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
while (countDownLatch.getCount() != 0) {
    countDownLatch.await();
}

2. CyclicBarrier:等待所有线程达到一个状态后再继续执行。

public class MyThread implements Runnable{
    private CyclicBarrier cyclicBarrier;
    @Override
    public void run() {  
        //执行线程操作
        cyclicBarrier.await(); //等待所有线程达到同一个状态
        //继续执行
    }  
}

//主线程等待所有子线程执行完毕
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
while (cyclicBarrier.getNumberWaiting() != threadCount) {
    Thread.sleep(100);
}

3. Semaphore:限制线程的并发数。

public class MyThread implements Runnable{
    private Semaphore semaphore;
    @Override
    public void run() {  
        semaphore.acquire(); //获取许可
        //执行线程操作
        semaphore.release(); //释放许可
    }  
}

//创建Semaphore实例,设置许可数量为并发数
Semaphore semaphore = new Semaphore(threadCount);

五、应用示例

1. 多线程并发下载文件

public class Downloader {
    public static void download(String url, String savePath, int threadCount) {
        try {
            URL fileUrl = new URL(url);
            HttpURLConnection conn = (HttpURLConnection) fileUrl.openConnection();
            long fileSize = conn.getContentLength();
            long blockSize = (fileSize + threadCount - 1) / threadCount;

            //创建Semaphore实例,设置许可数量为并发数
            Semaphore semaphore = new Semaphore(threadCount);

            //创建CountDownLatch实例,用于等待所有线程下载完毕
            CountDownLatch countDownLatch = new CountDownLatch(threadCount);

            for (int i = 0; i < threadCount; i++) {
                long startIndex = i * blockSize;
                long endIndex = Math.min(startIndex + blockSize - 1, fileSize - 1);
                DownloadThread thread = new DownloadThread(url, savePath, startIndex, endIndex, countDownLatch, semaphore);
                new Thread(thread).start();
            }

            //主线程等待所有子线程执行完毕
            while (countDownLatch.getCount() != 0) {
                countDownLatch.await();
            }

            conn.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class DownloadThread implements Runnable {
    private String url;
    private String savePath;
    private long startIndex;
    private long endIndex;
    private CountDownLatch countDownLatch;
    private Semaphore semaphore;

    public DownloadThread(String url, String savePath, long startIndex, long endIndex, CountDownLatch countDownLatch, Semaphore semaphore) {
        this.url = url;
        this.savePath = savePath;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.countDownLatch = countDownLatch;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            semaphore.acquire(); //获取许可
            String range = "bytes=" + startIndex + "-" + endIndex;
            URL fileUrl = new URL(url);
            HttpURLConnection conn = (HttpURLConnection) fileUrl.openConnection();
            conn.setRequestProperty("Range", range);

            InputStream is = conn.getInputStream();
            RandomAccessFile file = new RandomAccessFile(savePath, "rw");
            file.seek(startIndex);

            byte[] buffer = new byte[1024];
            int length = -1;
            while ((length = is.read(buffer)) != -1) {
                file.write(buffer, 0, length);
            }

            file.close();
            is.close();
            conn.disconnect();

            countDownLatch.countDown(); //线程执行完毕后进行计数,表示该线程已经完成执行
            semaphore.release(); //释放许可
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Downloader.download("http://example.com/file.zip", "file.zip", 8);
    }
}

2. 多线程并发求和

`java

public class CalculateSum {

public static int calculate(int[] array, int threadCount) {

int sum = 0;

int size = array.length / threadCount;

//创建CountDownLatch实例,用于等待所有线程执行完毕

CountDownLatch countDownLatch = new CountDownLatch(threadCount);

//创建Semaphore实例,设置许可数量为并发数

Semaphore semaphore = new Semaphore(threadCount);

for (int i = 0; i < threadCount; i++) {

int startIndex = i * size;

int endIndex = (i == threadCount - 1) ? array.length - 1 : (startIndex + size - 1);

CalculateThread thread = new CalculateThread(array, startIndex