百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

构架师-Java AIO使用

ccwgpt 2025-03-13 13:36 20 浏览 0 评论

AIO是什么

异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,每个线程不必亲自处理io,而是委派os来处理,并且也不需要等待io完成了,如果完成后,os会通知的。

本文所说的AIO特指Java环境下的AIOAIO是java中IO模型的一种,作为NIO的改进和增强随JDK1.7版本更新被集成在JDKnio包中,因此AIO也被称作是NIO2.0。区别于传统的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在于JDK中,NIOJDK1.4版本发布更新)的阻塞式读写,AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的文件读写网络通信。本文将介绍如何使用AIO实现一个简单的网络通信以及AIO的一些比较关键的API。

AIO流程

AIO主要API详解

实现一个最简单的AIO socket通信serverclient,主要需要这些相关的类和接口:

  • AsynchronousServerSocketChannel服务端Socket通道类,负责服务端Socket的创建和监听;
  • AsynchronousSocketChannel客户端Socket通道类,负责客户端消息读写;
  • CompletionHandler消息处理回调接口,是一个负责消费异步IO操作结果的消息处理器;
  • ByteBuffer负责承载通信过程中需要读、写的消息。
  • 此外,还有可选的用于异步通道资源共享的AsynchronousChannelGroup类,接下来将一一介绍这些类的主要接口及使用。

    3.1.1 AsynchronousServerSocketChannel


    AsynchronousServerSocketChannel
    是一个流式监听套接字的异步通道。


    AsynchronousServerSocketChannel的使用需要经过三个步骤:
    创建/打开通道绑定地址和端口监听客户端连接请求

    一、创建/打开通道:简单地,可以通过调用
    AsynchronousServerSocketChannel的静态方法open()来创建
    AsynchronousServerSocketChannel实例:

    try {
      AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
    } catch (IOException e) {
      e.printStackTrace();
    }

    当打开通道失败时,会抛出一个IOException异常。
    AsynchronousServerSocketChannel提供了设置通道分组(AsynchronousChannelGroup)的功能,以实现组内通道资源共享。可以调用open(AsynchronousChannelGroup)重载方法创建指定分组的通道:

    try {
      ExecutorService pool = Executors.newCachedThreadPool();
      AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
      AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
    } catch (IOException e) {
      e.printStackTrace();
    }

    AsynchronousChannelGroup封装了处理由绑定到组的异步通道所触发的I/O操作完成所需的机制。每个AsynchronousChannelGroup关联了一个被用于提交处理I/O事件分发消费在组内通道上执行的异步操作结果的completion-handlers的线程池。除了处理I/O事件,该线程池还有可能处理其他一些用于支持完成异步I/O操作的任务。从上面例子可以看到,通过指定AsynchronousChannelGroup的方式打开
    AsynchronousServerSocketChannel,可以定制server channel执行的线程池。有关AsynchronousChannelGroup的详细介绍可以查看官方文档注释。如果不指定AsynchronousChannelGroup,则
    AsynchronousServerSocketChannel会归类到一个默认的分组中。

    二、绑定地址和端口:通过调用
    AsynchronousServerSocketChannel.bind(SocketAddress)方法来绑定监听地址和端口:

    // 构建一个InetSocketAddress实例以指定监听的地址和端口,如果需要指定ip,则调用InetSocketAddress(ip,port)构造方法创建即可
    serverSocketChannel.bind(new InetSocketAddress(port));

    三、监听和接收客户端连接请求:

    监听客户端连接请求,主要通过调用
    AsynchronousServerSocketChannel.accept()方法完成。accept()有两个重载方法:

    public abstract  void accept(A,CompletionHandler);
    public abstract Future accept();

    这两个重载方法的行为方式完全相同,事实上,AIO的很多异步API都封装了诸如此类的重载方法:提供CompletionHandle回调参数或者返回一个Future类型变量。用过Feture接口的都知道,可以调用Feture.get()方法阻塞等待调用结果。以第一个重载方法为例,当接受一个新的客户端连接,或者accept操作发生异常时,会通过CompletionHandler将结果返回给用户处理:

    serverSocketChannel
    .accept(serverSocketChannel, new CompletionHandler() {
              @Override
              public void completed(final AsynchronousSocketChannel result,
                                    final AsynchronousServerSocketChannel attachment) {
                // 接收到新的客户端连接时回调
                // result即和该客户端的连接会话
                // 此时可以通过result与客户端进行交互
              }
    
              @Override
              public void failed(final Throwable exc, final AsynchronousServerSocketChannel attachment) {
                // accept失败时回调
              }
            });

    需要注意的是,
    AsynchronousServerSocketChannel是线程安全的,但在任何时候
    同一时间内只能允许有一个accept操作。因此,必须得等待前一个accept操作完成之后才能启动下一个accept:

    serverSocketChannel
    .accept(serverSocketChannel, new CompletionHandler() {
              @Override
              public void completed(final AsynchronousSocketChannel result,
                                    final AsynchronousServerSocketChannel attachment) {
                // 接收到新的客户端连接,此时本次accept已经完成
                // 继续监听下一个客户端连接到来
                serverSocketChannel.accept(serverSocketChannel,this);
                // result即和该客户端的连接会话
                // 此时可以通过result与客户端进行交互
              }
              ...
            });

    此外,还可以通过以下方法获取和设置
    AsynchronousServerSocketChannel的socket选项:

    // 设置socket选项
    serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
    // 获取socket选项设置
    boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
    

    其中StandardSocketOptions类封装了常用的socket设置选项。

    获取本地地址:

    InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
    

    3.1.2 AsynchronousSocketChannel

    AsynchronousSocketChannel是一个流式连接套接字的异步通道。

    AsynchronousSocketChannel表示服务端与客户端之间的连接通道。客户端可以通过调用AsynchronousSocketChannel静态方法open()创建,而服务端则通过调用
    AsynchronousServerSocketChannel.accept()方法后由AIO内部在合适的时候创建。下面
    以客户端实现为例,介绍AsynchronousSocketChannel。

    一、创建AsynchronousSocketChannel并连接到服务端:需要通过open()创建和打开一个AsynchronousSocketChannel实例,再调用其connect()方法连接到服务端,接着才可以与服务端交互:

    // 打开一个socket通道
    AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
    // 阻塞等待连接成功
    socketChannel.connect(new InetSocketAddress(ip,port)).get();
    // 连接成功,接下来可以进行read、write操作
    


    AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用于指定通道分组和定制线程池。socketChannel.connect()也提供了CompletionHandler回调和Future返回值两个重载方法,上面例子使用带
    Future返回值的重载,并调用get()方法阻塞等待连接建立完成。

    二、发送消息:

    可以构建一个ByteBuffer对象并调用socketChannel.write(ByteBuffer)方法异步发送消息,并通过CompletionHandler回调接收处理发送结果:

    ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
    socketChannel.write(writeBuf, null, new CompletionHandler() {
      @Override
      public void completed(final Integer result, final Object attachment) {
        // 发送完成,result:总共写入的字节数
      }
    
      @Override
      public void failed(final Throwable exc, final Object attachment) {
        // 发送失败
      }
    });
    

    三、读取消息:

    构建一个指定接收长度的ByteBuffer用于接收数据,调用socketChannel.read()方法读取消息并通过CompletionHandler处理读取结果:

    ByteBuffer readBuffer = ByteBuffer.allocate(128);
    socketChannel.read(readBuffer, null, new CompletionHandler() {
      @Override
      public void completed(final Integer result, final Object attachment) {
        // 读取完成,result:实际读取的字节数。如果通道中没有数据可读则result=-1。
      }
    
      @Override
      public void failed(final Throwable exc, final Object attachment) {
        // 读取失败
      }
    });
    

    此外,AsynchronousSocketChannel也封装了设置/获取socket选项的方法:

    // 设置socket选项
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
    // 获取socket选项设置
    boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
    

    3.1.3 CompletionHandler

    CompletionHandler是一个用于消费异步I/O操作结果的处理器。

    AIO中定义的异步通道允许指定一个CompletionHandler处理器消费一个异步操作的结果。从上文中也可以看到,AIO中大部分的异步I/O操作接口都封装了一个带CompletionHandler类型参数的重载方法,使用CompletionHandler可以很方便地处理AIO中的异步I/O操作结果。CompletionHandler是一个具有两个泛型类型参数的接口,声明了两个接口方法:

    public interface CompletionHandler {
        void completed(V result, A attachment);
        void failed(Throwable exc, A attachment);
    }
    

    其中,泛型V表示I/O操作的结果类型,通过该类型参数消费I/O操作的结果;泛型A为附加到I/O操作中的对象类型,可以通过该类型参数将需要的变量传入到CompletionHandler实现中使用。因此,AIO中大部分的异步I/O操作都有一个类似这样的重载方法:

     void ioOperate(params,A attachment,CompletionHandler handler);
    

    例如,
    AsynchronousServerSocketChannel.accept()方法:

    public abstract  void accept(A attachment,CompletionHandler handler);
    


    AsynchronousSocketChannel.write()方法等:

    public final  void write(ByteBuffer src,A attachment,CompletionHandler handler)
    

    当I/O操作成功完成时,会回调到completed方法,failed方法则在I/O操作失败时被回调。需要注意的是:在CompletionHandler的实现中应当即使处理操作结果,以避免一直占用调用线程而不能分发其他的CompletionHandler处理器。

    相关推荐

    定时任务工具,《此刻我要...》软件体验

    之前果核给大家介绍过一款小众但实用的软件——小说规则下载器,可以把网页里的小说章节按照规则下载到本地,非常适合喜欢阅读小说的朋友。有意思的是,软件作者当时看到果核写的体验内容后,给反推荐到他的帖子里去...

    前端定时任务的神库:Node-cron,让你的项目更高效!

    在前端开发中,定时任务是一个常见的需求。无论是定时刷新数据、轮询接口,还是发送提醒,都需要一个可靠且灵活的定时任务解决方案。今天,我要向大家介绍一个强大的工具——Node-cron,它不仅能解决定时任...

    Shutter Pro!一款多功能定时执行任务工具

    这是一款可以在电脑上定时执行多种任务的小工具,使用它可以根据时间,电量等来设定一些定时任务,像定时打开程序、打开文件,定时关机重启,以及定时弹窗提醒等都可以轻松做到。这是个即开即用的小工具,无需安装,...

    深度解析 Redis 缓存击穿及解决方案

    在当今互联网大厂的后端开发体系中,Redis缓存占据着极为关键的地位。其凭借高性能、丰富的数据类型以及原子性操作等显著优势,助力众多高并发系统从容应对海量用户的访问冲击,已然成为后端开发从业者不可或...

    从零搭建体育比分网站完整步骤(比较好的体育比分软件)

    搭建一个体育比分网站是一个涉及前端、后端、数据源、部署和维护的完整项目。以下是从零开始搭建的详细流程:一、明确项目需求1.功能需求:实时比分展示(如足球、篮球、网球等)支持多个联赛和赛事历史数据查询比...

    告别复杂命令行:GoCron 图形界面让定时任务触手可及

    如果你是运维人员或者经常接触一些定时任务的配置,那么你一定希望有一款图形界面来帮助你方便的轻松配置定时任务,而GoCron就是这样一款软件,让你的配置可视化。什么是GoCron从名字你就可以大概猜到,...

    Java任务管理框架核心技术解析与分布式高并发实战指南

    在当今数字化时代,Java任务管理框架在众多应用场景中发挥着关键作用。随着业务规模的不断扩大,面对分布式高并发的复杂环境,掌握其核心技术并进行实战显得尤为重要。Java任务管理框架的核心技术涵盖多个方...

    链表和结构体实现:MCU软件定时器(链表在单片机中的应用)

    在一般的嵌入式产品设计中,介于成本、功耗等,所选型的MCU基本都是资源受限的,而里面的定时器的数量更是有限。在我们软件设计中往往有多种定时需求,例如脉冲输出、按键检测、LCD切屏延时等等,我们不可能...

    SpringBoot定时任务(springboot定时任务每小时执行一次)

    前言在我们开发中,经常碰到在某个时间点去执行某些操作,而我们不能人为的干预执行,这个时候就需要我们使用定时任务去完成该任务,下面我们来介绍下载springBoot中定时任务实现的方式。定时任务实现方式...

    定时任务新玩法!systemd timer 完整实战详解

    原文链接:「链接」Hello,大家好啊!今天给大家带来一篇使用systemdtimer实现定时任务调度的详细实战文章。相比传统的crontab,systemdtimer更加现代化、结构清晰...

    Celery与Django:打造高效DevOps的定时任务与异步处理神器

    本文详细介绍了Celery这一强大的异步任务队列系统,以及如何在Django框架中应用它来实现定时任务和异步处理,从而提高运维开发(DevOps)的效率和应用性能。下面我们先认识一下Cele...

    订单超时自动取消的7种方案,我用这种!

    前言在电商、外卖、票务等系统中,订单超时未支付自动取消是一个常见的需求。这个功能乍一看很简单,甚至很多初学者会觉得:"不就是加个定时器么?"但真到了实际工作中,细节的复杂程度往往会超...

    裸机下多任务框架设计与实现(gd32裸机配置lwip 网络ping不通)

    在嵌入式系统中,特别是在没有操作系统支持的裸机环境下,实现多任务执行是一个常见的挑战。本文将详细介绍一种基于定时器的多任务框架设计,通过全局时钟和状态机机制,实现任务的非阻塞调度,确保任务执行中不会出...

    亿级高性能通知系统构建,小白也能拿来即用

    作者介绍赵培龙,采货侠JAVA开发工程师分享概要一、服务划分二、系统设计1、首次消息发送2、重试消息发送三、稳定性的保障1、流量突增2、问题服务的资源隔离3、第三方服务的保护4、中间件的容错5、完善...

    运维实战:深度拆解Systemd定时任务原理,90%的人不知道的玩法

    运维实战:深度拆解Systemd定时任务原理,90%的人不知道的高效玩法一、Systemd定时任务的核心原理Systemd定时任务是Linux系统中替代传统cron的现代化解决方案,通过...

    取消回复欢迎 发表评论: