一、并发工具包

1、什么是并发工具包(JUC)

JUC中主要包括5个模块的内容,分别是atomic(原子操作)、locks(锁和条件变量)collections(阻塞队列和线程安全集合)executor(线程池)tools(线程同步工具)

并发工具包的构成

2、同步工具类

(1)CountDownLatch

等待所有线程都完成后再往下进行,是一个计数器,它允许一个或多个线程等待其他线程完成操作

假设有一个场景,有三名学生在考试,而老师负责等待所有学生考试完成后计算学生的平均分。这时,我们可以使用CountDownLatch来实现。

CountDownLatch

/**
 * @document: java同步工具类
 * CountDownLatch----计数器,等待所有的任务都完成以后才会继续执行下一步操作
 * @Author:SmallG
 * @CreateTime:2023/8/11+14:18
 */

public class Demo01 {
    static class Student implements Runnable {
        //定义一个计数器
        private final CountDownLatch latch;
        private final String name; //学生姓名:学生 1,学生 2,学生 3
        private static int[] scores = new int[3];

        //定义构造方法
        public Student(CountDownLatch latch, String name) {
            this.latch = latch;
            this.name = name;
        }

        @Override
        public void run() {
            // 模拟考试
            // 生成一个成绩,Math.random()生成一个0到1的浮点数
            int score = (int) (Math.random() * 100);
            System.out.println(name + "考试结束,分数是:" + score);
            // 把成绩放在数组中
            int index = Integer.parseInt(name.split(" ")[1]) - 1; //计算分数在数组中的位置
            scores[index] = score;
            //学生交卷,计数器减1
            latch.countDown();

        }
    }

    public static void main(String[] args) {
        //定义三名学生
        int numStudents = 3;
        //定义计数器
        CountDownLatch latch = new CountDownLatch(numStudents);
        //循环创建三名学生
        for (int i = 1; i <= numStudents; i++) {
            Student student = new Student(latch, "学生 " + i);
            //启动线程
            new Thread(student).start();
        }
        //等待所有学生考试结束
        try {
            latch.await();
            System.out.println("所有学生考试结束");
            //计算学生的平均分
            int totalScore = 0; //定义总分
            //计算总分
            for (int i = 0; i < numStudents; i++) {
                totalScore += Student.scores[i];
            }
            double avgScore = (double) totalScore / numStudents;
            System.out.println("平均分为:" + avgScore);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(2)CyclicBarrier

是一个同步屏障,是环形的屏障,它允许多个线程相互等待,直到到达某个公共屏障点,才能继续执行。

假设有一个场景,三名学生约好在某个地点碰面,然后一起去看电影。我们希望所有人都到达后再一起出发,这时可以使用CyclicBarrier来实现。

CyclicBarrier

/**
 * @document: 线程同步工具类
 * CyclicBarrier -- 循环屏障,所有线程到达一个屏障点后再继续执行
 * @Author:SmallG
 * @CreateTime:2023/8/11+15:07
 */

public class Demo02 {
    static class Student implements Runnable {
        private final CyclicBarrier barrier;
        private final int threadId;

        public Student(CyclicBarrier barrier, int threadId) {
            this.barrier = barrier;
            this.threadId = threadId;
        }

        @Override
        public void run() {
            try {
                //学生到达集合点等待时间
                TimeUnit.MILLISECONDS.sleep(threadId * 1000);
                System.out.println("学生" + threadId + "已经到达集合点");
                //等待其他学生到齐后 继续执行下一步操作
                barrier.await();
                System.out.println("学生" + threadId + "出发去电影院");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //定义三名学生
        int threadCount = 3;
        //创建一个屏障点
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            //所有学生到达屏障点后执行的任务
            System.out.println("所有学生已经到齐");
        });

        //创建三名学生
        for (int i = 1; i <= threadCount; i++) {
            Student student = new Student(barrier, i);
            new Thread(student).start();
        }
    }
}

(3)Semaphore

计数信号量,它通过计数器来控制线程同时访问数量。允许每个任务最多有几个并发,超过了的线程则等待。

Semaphore的核心概念是许可证计数器

  • 创建Semaphore对象时,需要指定初始的许可证数量
  • 线程在访问资源之前,需要调用acquire()方法获取许可证,如果许可证数量大于零,则线程可以继续执行,许可证数量减一
  • 如果许可证数量为零,则线程会被阻塞,直到其他线程释放许可证
  • 线程在使用完资源后,需要调用release()方法释放许可证,许可证数量加一
/**
 * @document: 同步工具类
 * Semaphore 信号量
 * @Author:SmallG
 * @CreateTime:2023/8/11+15:29
 */

public class Demo03 {
    static class Student implements Runnable {
        private final Semaphore semaphore;
        private final String name;

        public Student(Semaphore semaphore, String name) {
            this.semaphore = semaphore;
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + "已到达名人故居,申请参观");
                semaphore.acquire();
                //获得许可证
                System.out.println(name + "获得许可证,开始参观");

                //参观时间随机
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 5000));

                //离开名人故居,释放许可证
                System.out.println(name + "结束参观,离开名人故居");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //定义6名学生
        int numStudents = 6;
        int numPermits = 2; //许可证两个
        //创建信号量
        Semaphore semaphore = new Semaphore(numPermits);

        //创建6名学生
        for (int i = 1; i <= numStudents; i++) {
            Student student = new Student(semaphore, "学生" + i);
            new Thread(student).start();
        }
    }
}

同步工具类实例:

/**
 * @document: 基于Semaphore实现多线程交替打印ABC
 * @Author:SmallG
 * @CreateTime:2023/8/11+16:02
 */

public class Demo04 {
    public static void main(String[] args) {
        //创建三个信号量
        Semaphore semaphoreA = new Semaphore(1);//初始有一个许可证
        Semaphore semaphoreB = new Semaphore(0);//初始没有许可证
        Semaphore semaphoreC = new Semaphore(0);

        //创建三个线程
        new Thread(new PrintTask("A", semaphoreA, semaphoreB)).start();
        new Thread(new PrintTask("B", semaphoreB, semaphoreC)).start();
        new Thread(new PrintTask("C", semaphoreC, semaphoreA)).start();

    }

    static class PrintTask implements Runnable {
        private final String content; //当前打印字母
        private final Semaphore currentSemaphore;//当前的信号量
        private final Semaphore nextSemaphore; //下一个信号量

        public PrintTask(String content, Semaphore currentSemaphore, Semaphore nextSemaphore) {
            this.content = content;
            this.currentSemaphore = currentSemaphore;
            this.nextSemaphore = nextSemaphore;
        }

        @Override
        public void run() {
            //打印五组
            for (int i = 0; i < 5; i++) {
                try {
                    //获取许可证
                    currentSemaphore.acquire();
                    System.out.println(content);
                    //释放下一个信号量的许可证
                    nextSemaphore.release();
                    if (content.equals("C")) {
                        System.out.println("-----------");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

二、线程池

线程池维护多个线程,等待监督程序分配任务并发执行。

通过维护线程池,提高了性能,避免了频繁创建和销毁线程。

线程池

1、ExecutorService

可以简化异步模式下运行的任务。一般来说,ExecutorService 会自动提供一个线程池和一个用于为其分配任务的API。

ExecutorService接口继承Executor接口,在Executor的基础上扩展了对Executor进行控制的方法:

  • submit() 方法用于提交Runnable任务
  • invokeAll()方法用于批量提交任务
  • shutdown()方法用于停止启动新的任务

在Java中,ExecutorService接口的实现类基于线程池来实现。

创建ExecutorService实例的方法如下:

  • Executors.newSingleThreadExecutor():创建一个使用单个工作线程在无界队列上运行的Executor
  • Executors.newFixedThreadPool(int nThreads):创建一个线程池,该线程池复用在共享无界队列上运行的固定数量的线程
  • Executors.newScheduledThreadPool(int corePoolSize):创建一个线程池,可以配置为在给定延迟后运行,或者定期执行

ExecutorService接口常用的方法如下:

  • void execute(Runnable command):在将来的某个时间执行给定的任务,根据实际使用的Executor的实现类,该任务可以在新线程、线程池或调用线程中执行。

ExecutorService示例:创建线程池,普通线程池。

/**
 * @document: ExecutorService创建线程池
 * @Author:SmallG
 * @CreateTime:2023/8/11+16:45
 */

public class Demo05 {
    public static void main(String[] args) {
        //创建线程池 其中有四个线程
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        //创建线程对象
        MyRun1 run1 = new MyRun1();
        for (int i = 0; i < 4; i++) {
            executorService.execute(run1); //提交任务,相当于启动线程
        }
        //关闭线程池
        executorService.shutdown();

    }
}

class MyRun1 implements Runnable {

    int num = 0;

    @Override
    public void run() {
        while (true) {
            synchronized (this) {
                if (num > 10000) {
                    break;
                }
                String name = Thread.currentThread().getName();
                System.out.println(name + "打印" + num);
                num++;
            }
        }
    }
}

ScheduledExecutorService示例:创建线程池,有延时功能。

/**
 * @document: ScheduledExecutorService创建线程池 有延时功能
 * @Author:SmallG
 * @CreateTime:2023/8/11+17:15
 */

public class Demo06 {
    public static void main(String[] args) {
        //创建一个线程池 包含四个线程
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        //创建任务
        Runnable run1 = () -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + ":::" + LocalTime.now());
        };
        Runnable run2 = () -> {
            String name = Thread.currentThread().getName();
            System.out.println(name + "====>" + LocalTime.now());
        };
        /**
         * 参数一:任务
         * 参数二:延时多久执行任务
         * 参数三:每隔多久执行一次任务
         * 参数四:时间单位
         */
        service.scheduleAtFixedRate(run1,1,2, TimeUnit.SECONDS);
        service.scheduleAtFixedRate(run2,2,1, TimeUnit.SECONDS);
    }
}

2、获取并发任务的结果

不管是Thread还是Runnable都没有返回值,那么线程在执行的时候没有结果,如果想要获得并发任务的结果,可以使用Callable接口

Callable接口

Callable接口是创建线程的第三种方式,只有有一个call()方法,与run()方法相似。

不同的是call()方法可以返回指定的泛型类的对象

Future接口

用于接收callable返回的结果。

  • cancel()方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false
  • isCancelled()方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true
  • isDone()方法:表示任务是否已经完成,若任务完成,则返回true
  • get()方法:用来获取执行结果,如果这个方法产生阻塞,一直等到任务执行完毕才返回执行结果
  • get(long timeout, TimeUnit unit)方法:用来获取执行结果,如果在指定时间内没能获取到结果,直接返回null
/**
 * @document: 创建线程的第三种方式 实现Callable接口 实现call()方法
 * 通过Future来获取线程执行的结果
 * @Author:SmallG
 * @CreateTime:2023/8/11+17:37
 */

public class Demo07 {
    public static void main(String[] args) {
        //创建能提交任务的线程池
        ExecutorService service = Executors.newFixedThreadPool(4);
        //创建线程任务
        MyCall myCall = new MyCall();

        //通过Future来获取线程执行的结果
        Future<Integer> submit = service.submit(myCall);//提交任务
        System.out.println("是否执行完毕:" + submit.isDone());
        try {
            Integer integer = submit.get(); //执行完毕后获取结果,会有延时
            System.out.println("执行的结果:" + integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        service.shutdown(); // 关闭线程池
    }
}

class MyCall implements Callable<Integer> {
    Random random = new Random();
    int count = 1;  //控制计算次数
    int result = 0; //用于返回计算结果

    @Override
    public Integer call() throws Exception {
        while (true) {
            synchronized (this) {
                if (count > 100) {
                    break;
                }
                int num = random.nextInt(10);
                result += num;
                System.out.println(Thread.currentThread().getName() + ",num = " + num + ",result = " + result);
                count++;
            }
            Thread.yield(); //当前的线程让出CPU让其他线程执行
        }
        return result;
    }
}

FutureTask

实现了Future接口,常用来封装Callable和Runnable,也可以作为一个任务提交到线程池中执行。

FutureTask

/**
 * @document: FutureTask 类 实现了Future接口
 * @Author:SmallG
 * @CreateTime:2023/8/14+9:15
 */

public class Demo08 {

    //定义一个静态内部类
    static class Student implements Callable<Integer> {
        private final String name;

        public Student(String name) {
            this.name = name;
        }

        @Override
        public Integer call() throws Exception {
            //参加考试
            //随机生成成绩,用math.random生成随机数,左闭右开
            int score = (int) (Math.random() * 100);
            System.out.println(name + "考试结束,成绩为:" + score);
            return score;
        }
    }

    public static void main(String[] args) {
        //定义声明三名学生
        int numStudent = 3;

        //接收线程的返回结果
        FutureTask<Integer>[] ftArray = new FutureTask[numStudent];
        for (int i = 0; i < numStudent; i++) {
            //创建学生
            Student student = new Student("学生" + (i + 1));
            //创建FutureTask对象,一个对象只能接收一个结果
            FutureTask<Integer> ft = new FutureTask<>(student);
            //把对象存入数组
            ftArray[i] = ft;
            //启动线程
            new Thread(ft).start();
        }
        //计算总分
        int total = 0;
        for (int i = 0; i < numStudent; i++) {
            //取出一名学生成绩
            try {
                total += ftArray[i].get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        double avg = (double) total / numStudent;
        System.out.println("所有学生的平均分是:" + avg);
    }
}

三、锁

1、并发中的锁

锁可以控制线程的执行顺序,做线程同步的时候,为了保证资源安全性引入了锁。

(这个图很重要)**

锁

2、从synchronized看锁的分类

synchronized是一个:

非公平锁(不排队,谁抢到是谁的)

悲观锁(锁住同步资源)

可重入锁(可以嵌套synchronized)

排他锁(多个线程不能同时持有synchronized的锁)

3、Lock接口及实现

Lock接口提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。

Lock接口提供的并且synchronized关键字所不具备的主要特性如下:

  • 尝试非阻塞地获取锁:如果当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁,而synchronized就会相互抢夺资源。
  • 能被中断地获取锁:如果获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
  • 超时获取锁:一段时间后,锁还没得到,则返回。

Lock接口的常用方法如下:

  • lock():获取锁,调用该方法的当前线程会尝试获取锁,获得锁后,从该方法返回
  • lockInterruptibly():可中断地获取锁,该方法会响应中断,即在锁的获取中可以中断当前线程
  • tryLock():尝试非阻塞的获取锁,调用该方法后立刻返回,如果能否获取则返回true,否则返回false
  • tryLock(long time, TimeUnit unit):超时的获取锁,超时时间结束,返回false
  • unlock():释放锁
/**
 * @document: Lock锁的实现
 * @Author:SmallG
 * @CreateTime:2023/8/14+10:20
 */

public class Demo09 {
    public static void main(String[] args) {
        MyRun2 myRun2 = new MyRun2();
        //创建一个包含四个线程的线程池
        ExecutorService service = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            service.execute(myRun2);
        }
        //关闭线程池
        service.shutdown();
    }

}

class MyRun2 implements Runnable {
    int i = 0; //计数器
    //创建锁,lock是个接口不能实例化,用他的实现类
    Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock(); //获取锁
            String name = Thread.currentThread().getName(); // 获取当前线程的名字
            try {
                if (i > 50) {
                    break;
                }
                System.out.println(name + " " + i);
                i++;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock(); // 释放锁
            }
        }
    }
}

ReentrantLock

该锁能够支持一个线程对资源的重复加锁,支持公平锁,要求排队。

可重入锁

/**
 * @document: 公平锁和非公平锁
 * @Author:SmallG
 * @CreateTime:2023/8/14+10:44
 */

public class Demo10 {
    public static void main(String[] args) {
        //声明是公平锁/非公平锁
        boolean fairFlag = false;
        //创建锁(支持公平锁/非公平锁)
        Lock lock = new ReentrantLock(fairFlag);
        MyRun3 run = new MyRun3(lock,fairFlag?"公平锁":"非公平锁");
        ExecutorService service = Executors.newFixedThreadPool(8);

        for (int i=0;i<16;i++){
            service.execute(run);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

class MyRun3 implements Runnable {
    //在外边创建锁
    Lock lock;
    String label;

    public MyRun3(Lock lock, String label) {
        this.lock = lock;
        this.label = label;
    }

    @Override
    public void run() {
        //获取锁
        lock.lock();
        String name = Thread.currentThread().getName();
        //打印标签和名字
        System.out.println(label + ": name:" + name + "获取到锁");
        try {
            //程序休眠一秒
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //释放锁
        lock.unlock();
    }
}

一写多读场景

一写多读,并发写入频率低,并发读取频率高,是一种非常普遍的并发场景。在读取的时候不应该让信息进行修改,所以有了读写锁。

一写多读

读写锁(面试容易问到)

将读线程和锁线程相互排斥,但是读线程和读线程之间可以不互斥。写的时候可以读,但是读的时候不能写。

读写锁

ReadWriteLock

Java中使用ReadWriteLock接口作为读写锁的父接口,该接口定义了两个方法:

  • Lock readLock():返回该读写锁对中的读锁对象
  • Lock writeLock():返回该读写锁对中的写锁对象

ReentrantReadWriteLock作为ReadWriteLock接口的实现类,除了实现了上述的2个方法外,还提供了一些便于外界监控器内部工作状态的方法:

  • getReadLockCount():返回当前读锁被获取的次数,该次数不等于获取读锁的线程数
  • getReadHoldCount():返回当前线程获取读锁的次数
  • isWriteLocked():判断写锁是否被获取
  • getWriteHoldCount():返回当前写锁被获取的次数
/**
 * @document: 一写多读场景:读写锁
 * @Author:SmallG
 * @CreateTime:2023/8/14+11:31
 */

public class Demo11 {
    public static void main(String[] args) {
        StudentInfoManagerSystem system = new StudentInfoManagerSystem();
        //创建读线程
        Runnable readTask = () -> {
            for (int i = 0; i < 5; i++) {
                system.getStudents(); //五个读
            }
        };
        //创建写的线程
        Runnable writeTask = () -> {
            system.addStudent("Tom");
        };
        //创建包含10个线程的线程池
        ExecutorService service = Executors.newFixedThreadPool(10);
        //读取和写入学生信息
        for (int i = 0; i < 10; i++) {
            if (i % 4 == 0) {
                service.execute(writeTask);
            } else {
                service.execute(readTask);
            }
        }
        service.shutdown();
    }
}

class StudentInfoManagerSystem {
    //定义保存所有学生的集合,频繁修改的数据加上volatile,告诉编译器随时可能会改变
    private volatile List<String> studentList = new ArrayList<>();
    //创建读写锁
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    //添加学生
    public void addStudent(String studentName) {
        //获取写锁,保证数据的安全性
        lock.writeLock().lock();
        try {
            //写入学生信息
            studentList.add(studentName);
            System.out.println("添加了一名学生:" + studentName);
        } finally {
            //释放写锁
            lock.writeLock().unlock();
        }
    }

    /**
     * 获取学生信息,读操作
     */
    public void getStudents() {
        lock.readLock().lock(); //加上读锁,只能看不能读
        try {
            System.out.println("所有学生信息:" + studentList);
        } finally {
            lock.readLock().unlock();
        }
    }
}

四、网络编程基础

计算机网络:将地理位置不同的计算机通过通信线路连接起来,实现资源共享和信息传递。

1、通信协议

在计算机网络中实现通信必须要有一定的规则和约定。

通信协议

2、TCP/IP和UDP协议

(1)数据链路层(网络接口层)

功能:实现了网卡接口的网络驱动程序,以处理数据在物理媒介(如以太网、令牌环等)上的传输。

对应设备:网线、网桥、集线器、交换机

常用协议:

(1)ARP(地址解析协议):它实现IP地址到物理地址(通常是MAC地址,通俗的理解就是网卡地址)的转换。

(2)RARP(逆地址解析协议):顾名思义,它和ARP是相反的,它是实现从物理地址到IP地址的转换。

那它们的用途是什么呢???

ARP用途:网络层使用IP地址寻找一台机器,而数据链路层则是使用物理地址寻找一台机器,因此网络层必须先将目标机器的IP地址转化成物理地址,才能使用数据链路层提供的服务。

RARP用途:RARP协议仅用于网络上的某些无盘工作站,因为缺少储存设备,无盘工作站无法记录自己的IP地址,然而通过RARP就可以看到从物理地址到IP地址的映射。

(2)网络层(网络互联层)

功能:实现数据包的选路和转发。

对应设备:路由器

常用协议:

(1)IP协议(英特网协议)根据数据包的目的IP地址来决定如何将它发送给目标主机。如果数据包不能直接发送给目标主机,那么IP协议为它寻找一个合适的下一跳路由器,将数据包交给路由器来转发,多次之后数据包将到达目标主机,或者因发送失败而被丢弃。

(2)ICMP协议是网络层的另一个重要协议,它是IP协议的重要补充,主要用于检测网络连接。

8位类型:将ICMP报文分为两大类:一类是差错报文,比如目标不可达(类型值为3)和重定向(类型值为5);另一类是查询报文,用来查询网络信息。

有的ICMP报文还用8位代码字段细分不同的条件。比如代码值0表示网络重定向,代码值1表示主机重定向。

16位校验和:对整个报文(包括头部和内容部分)进行循环冗余校验(CRC)。

注意:ICMP协议并非严格意义上的网络层协议,因为它使用了处于同一层的IP协议提供的服务,而一般来说,上层协议使用下层协议提供的服务。

(3)传输层

功能:为两台主机上的应用程序提供端到端的通信。与网络层使用的逐跳通信方式不同,传输层只关心通信的起始端和目的端,而不在乎数据包的中转过程。

主要协议:

(1)TCP协议(传输控制协议):为应用层提供可靠的面向连接的和流式服务,点对点的发送信息。

(2)UDP协议(用户数据报协议):为应用层提供不可靠的无连接的和数据报服务,传输效率高,但数据传输不安全。

(3)SCTP协议(流控制传输协议)它是为在英特网上传输电话信号而设计的。

TCP协议和UDP协议

(4)应用层

功能:负责处理应用程序的逻辑,比如文件传输,名称查询和网络管理等。

注意:数据链路层、网络层、传输层复制处理网络通信 细节,所以这些部分必须稳定且高效,因此它们都在内核空间实现,而应用层在用户空间中实现,因为它负责众多逻辑,在内核中实现的话,则会使内核变得非常庞大。也有少数服务器程序是在内核中实现,这样代码就不用在用户空间和内核空间中来回切换(主要是数据的复制)提高了工作效率。

五、练习

1 多线程数据统计

在data文件夹下提供了5个txt文件,名称分别为1.txt ~ 5.txt。这5个文件是5个气象站在2013年提交的气象数据,数据字段说明详见“天气数据说明.txt”文件。

请基于多线程API和I/O API编写程序,实现如下需求:

1、启动5个子线程,异步的统计1.txt ~ 5.txt中TEMP列数据的平均值,每个子线程负责统计1个文件。

2、在主线程中获取5个子线程的统计结果,在控制台输出平均值,结果保留小数点后2位。

提示:

1、请注意跳过缺失数据,即值为9999.9的数据。

程序运行效果如下所示:

练习1

/**
 * @document: 多线程数据统计
 * @Author:SmallG
 * @CreateTime:2023/8/14+19:13
 */

public class HomeWork {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        String[] pathArray = new String[5];
        for (int i = 0; i < pathArray.length; i++) {
            pathArray[i] = "src/day06/data/" + (i + 1) + ".txt";
        }
        List<Callable<Double>> callableList = new ArrayList<>();
        for (String path : pathArray) {
            MyCall myCall = new MyCall(path);
            callableList.add(myCall);
        }
        //创建线程池
        ExecutorService service = Executors.newFixedThreadPool(5);
        //批量提交任务
        List<Future<Double>> futureList = service.invokeAll(callableList);
        //统计多线程执行结果
        double sumResult = 0;
        for (Future<Double> future : futureList) {
            sumResult += future.get();
        }
        double avg = sumResult / 5;
        avg = Math.round(avg * 100) / 100.0;
        System.out.println("avgResult: " + avg);
        service.shutdown();
    }
}

class MyCall implements Callable<Double> {

    String path; //文件路径

    public MyCall(String path) {
        this.path = path;
    }

    @Override
    public Double call() throws Exception {
        double sum = 0;
        int count = 0;
        try (
                FileReader fr = new FileReader(path);
                BufferedReader br = new BufferedReader(fr)
        ) {
            String line = br.readLine();
            line = br.readLine();
            while (line != null) {
                String tempStr = line.substring(24, 30).trim();
                if (tempStr.equals("9999.9")) {
                    line = br.readLine();
                    continue;
                }
                double temp = Double.parseDouble(tempStr);
                sum += temp;
                count++;
                line = br.readLine();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sum / count;
    }
}
最后修改:2023 年 08 月 14 日
如果觉得我的文章对你有用,请随意赞赏