扑街前言:前面说了netty的基本运用、Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基本逻辑,并不是真的可以用于业务使用。(认识到自己是菜鸟的第47天,今天突然记起来是多少天了)
        在编写具体代码之前,我们要了解什么是rpc框架,它是由什么结构组成的,而最常见RPC框架就是Dubbo。
        RPC 
的主要功能目标是让构建分布式计算(应用)更容易,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议规范,简单的来说就是像调用本地服务一样调用远程服务,对开发者而言是透明的。
RPC的优势
 * RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销。
 * RPC框架一般都有注册中心,有丰富的监控管理。
 * 发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
 * 协议私密,安全性较高。
 * rpc 能做到协议更简单内容更小,效率更高。
 * rpc是面向服务的更高级的抽象,支持服务注册发现,负载均衡,超时重试,熔断降级等高级特性。 
RPC架构设计
        一幅图片解释整个rpc的架构设计
        
上述简单描述一下rpc的一些简单概念,那么首先来编写服务端的代码,因为服务端的编写难度要小于客户端。从上面图片可以看出服务端主要是:服务注册(这里用zookeeper作为注册中心)、监听端口接收连接(包括请求解码、响应编码、请求处理,而编码和解码又有一次编、解码和二次编、解码)。当然这是简单的基本功能,比如限流、健康监测之类的后续再说,先迈出第一步很重要。
 zookeeper
        
在代码开始之前,还需要简单的了解zookeeper的安装和使用,安装就是在zookeeper的官网下载一下最新的稳定版本,然后解压,打开bin目录,运行zkServer.cmd即可。具体的详细下篇文章再说,本篇文章重点是rpc框架的基本编写。至于Java中使用zookeeper,可以类比Redis的使用,Redis为Java提供了两个客户端 
Jedis 和 Redisson,下面代码我们也是使用zookeeper了一个客户端 zkclient。
代码逻辑示例
        上述内容结合之前文章的相关的网络编程内容,可以先写一个服务端,代码如下。 
首先结合上面的流程图,大致说一下逻辑。1、需要一个引导类,用于引导整个rpc服务的启动;2、有一个启动器,在启动器中完成服务注册和基于netty的监听;3、需要一个服务注册的server,基于zookeeper实现服务注册,需要封装zookeeper的连接和调用;4、需要一个netty的server,实现一次、二次编解码,并且实现请求处理调用具体的业务逻辑。
引导类
        
这里单独说明一下@PostConstruct注解,这个具体作用是当这个类被注册bean的时候,会运行一次被修饰的方法,但是在jdk9之后就去除了。
import org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.context.annotation.Configuration; import 
javax.annotation.PostConstruct; @Configuration public class RpcServerBootstrap 
{ @Autowired private RpcServerRnner rpcServerRnner; @PostConstruct public void 
initRpcServer (){ // 运行启动器 rpcServerRnner.run(); } } 
启动器
import com.rpc.server.registry.RpcRegistry; import 
org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.stereotype.Component; import javax.annotation.Resource; 
@Component public class RpcServerRnner { @Autowired private RpcRegistry 
rpcRegistry; @Resource private RpcServer rpcServer; /** * 用于服务注册和netty监听 */ 
public void run () { // 服务注册 rpcRegistry.serviceRegistry(); // 启动服务,监听端口,接收连接请求 
rpcServer.start(); } } 
配置信息对象
import lombok.Data; import org.springframework.beans.factory.annotation.Value; 
import org.springframework.stereotype.Component; @Data @Component public class 
RpcServerConfiguration { /** * ZK根节点名称 */ @Value("${rpc.server.zk.root}") 
private String zkRoot; /** * ZK地址信息 */ @Value("${rpc.server.zk.addr}") private 
String zkAddr; /** * RPC通讯端口 */ @Value("${rpc.network.port}") private int 
rpcPort; /** * Spring Boot 服务端口 */ @Value("${server.port}") private int 
serverPort; /** * ZK连接超时时间配置 */ @Value("${rpc.server.zk.timeout:10000}") 
private int connectTimeout; } 
zookeeper客户端连接
import org.I0Itec.zkclient.ZkClient; import 
org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.context.annotation.Bean; import 
org.springframework.context.annotation.Configuration; @Configuration public 
class ServerZkClientConfig { /** * RPC服务端配置 */ @Autowired private 
RpcServerConfiguration rpcServerConfiguration; /** * 声音ZK客户端 * @return */ @Bean 
public ZkClient zkClient() { return new 
ZkClient(rpcServerConfiguration.getZkAddr(), 
rpcServerConfiguration.getConnectTimeout()); } } 
zookeeper连接操作接口
import org.I0Itec.zkclient.ZkClient; import 
org.springframework.beans.factory.annotation.Autowired; import 
org.springframework.stereotype.Component; /** * Zookeeper连接操作接口 */ @Component 
public class ServerZKit { @Autowired private ZkClient zkClient; @Autowired 
private RpcServerConfiguration rpcServerConfiguration; /*** * 根节点创建 */ public 
void createRootNode() { boolean exists = 
zkClient.exists(rpcServerConfiguration.getZkRoot()); if (!exists) { 
zkClient.createPersistent(rpcServerConfiguration.getZkRoot()); } } /*** * 
创建其他节点 * @param path */ public void createPersistentNode(String path) { String 
pathName = rpcServerConfiguration.getZkRoot() + "/" + path; boolean exists = 
zkClient.exists(pathName); if (!exists) { zkClient.createPersistent(pathName); 
} } /*** * 创建节点 * @param path */ public void createNode(String path) { String 
pathName = rpcServerConfiguration.getZkRoot() + "/" + path; boolean exists = 
zkClient.exists(pathName); if (!exists) { zkClient.createEphemeral(pathName); } 
} } 
用于服务请求连接的注解
import org.springframework.core.annotation.AliasFor; import 
org.springframework.stereotype.Component; import java.lang.annotation.*; 
@Component @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) 
@Documented public @interface HrpcService { /** * 等同于@Component的value * @return 
*/ @AliasFor(annotation = Component.class) String value() default ""; /** * 
服务接口Class * @return */ Class<?> interfaceClass() default void.class; /** * 
服务接口名称 * @return */ String interfaceName() default ""; /** * 服务版本号 * @return */ 
String version() default ""; /** * 服务分组 * @return */ String group() default ""; 
} 
创建一个Spring的Bean工厂,用于封装获取IOC容器中的bean信息
import org.springframework.beans.BeansException; import 
org.springframework.beans.factory.support.DefaultListableBeanFactory; import 
org.springframework.context.ApplicationContext; import 
org.springframework.context.ApplicationContextAware; import 
org.springframework.stereotype.Component; import 
java.lang.annotation.Annotation; import java.util.Map; @Component public class 
SpringBeanFactory implements ApplicationContextAware { /** * ioc容器 */ private 
static ApplicationContext context; @Override public void 
setApplicationContext(ApplicationContext applicationContext) throws 
BeansException { context = applicationContext; } /*public static 
ApplicationContext getApplicationContext() { return context; }*/ /** * 
根据Class获取bean * @param cls * @param <T> * @return */ public static <T> T 
getBean(Class<T> cls) { return context.getBean(cls); } /** * 根据beanName获取bean * 
@param beanName * @return */ public static Object getBean(String beanName) { 
return context.getBean(beanName); } /*** * 获取有指定注解的对象 * @param annotationClass 
* @return */ public static Map<String, Object> 
getBeanListByAnnotationClass(Class<? extends Annotation> annotationClass) { 
return context.getBeansWithAnnotation(annotationClass); } /** * 向容器注册单例bean * 
@param bean */ public static void registerSingleton(Object bean) { 
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) 
context.getAutowireCapableBeanFactory(); // 
让bean完成Spring初始化过程中所有增强器检验,只是不重新创建bean 
beanFactory.applyBeanPostProcessorsAfterInitialization(bean,bean.getClass().getName()); 
//将bean以单例的形式入驻到容器中,此时通过bean.getClass().getName()或bean.getClass()都可以拿到放入Spring容器的Bean 
beanFactory.registerSingleton(bean.getClass().getName(),bean); } } 
服务注册
        上面的准备工作基本上就做完了,下面开始正式的逻辑代码。这里提一点,当spring 
boot整合这个自定义框架的时候,可以有很多方式,这个不再细说,可以参考文章spring boot的自动配置,这里可以直接粗暴一点在spring 
boot项目的启动类上@SpringBootApplication(scanBasePackages ={"包路径","包路径"})。
        再说一下@component 注解,这是由spring 提供,被其修饰的类被声明为spring 
的组件,简单来说就是创建bean并放置IOC容器中。
/** * 服务注册接口 */ public interface RpcRegistry { /** * 服务注册 */ void 
serviceRegistry(); } import com.rpc.annotation.HrpcService; import 
com.rpc.server.config.RpcServerConfiguration; import 
com.rpc.server.registry.RpcRegistry; import com.rpc.spring.SpringBeanFactory; 
import com.rpc.util.IpUtil; import lombok.extern.slf4j.Slf4j; import 
org.springframework.stereotype.Component; import java.util.Map; @Component 
@Slf4j public class ZkRegistry implements RpcRegistry { /** * 封装的bean工厂 */ 
@Autowired private SpringBeanFactory springBeanFactory; /** * 封装的zookeeper的客户端 
*/ @Autowired private ServerZKit zKitClient; /** * 配置信息对象 */ @Autowired private 
RpcServerConfiguration rpcServerConfiguration; @Override public void 
serviceRegistry() { /* * 1、首先要获取被HrpcService 注解修饰的,IOC中的所有的bean信息 * 
2、拿到bean之后,再获取bean上的HrpcService 注解对象 * 3、拿到HrpcService 注解上面的接口信息 * 
4、创建zookeeper上的根节点,并获取服务端ip和配置文件中的zookeeper端口,创建以接口名称为key,ip+端口为value的子节点 * 
5、注册成功 */ // 获取被HrpcService 注解修饰的,IOC中的所有的bean信息 Map<String, Object> 
annotationClass = 
springBeanFactory.getBeanListByAnnotationClass(HrpcService.class); // 
没被注册信息,直接结束 if (annotationClass == null || annotationClass.size() < 0){ return; 
} // 迭代所有的bean for (Object bean : annotationClass.values()) { // 获取HrpcService 
注解信息 HrpcService hrpcService = 
bean.getClass().getAnnotation(HrpcService.class); // 获取HrpcService 
注解的interfaceClass属性,也就是接口对象 Class<?> interfaceClass = 
hrpcService.interfaceClass(); // 获取接口的名称 String name = 
interfaceClass.getName(); /* * 开始往zookeeper添加节点 */ // 根节点 
zKitClient.createRootNode(); // 子节点,用于接口名称 
zKitClient.createPersistentNode(name); // 获取ip String ip = IpUtil.getRealIp(); 
// ip + 端口 String node = ip + rpcServerConfiguration.getZkAddr(); // 子节点对应下级节点 
zKitClient.createNode(name + "/" + node); // 打印日志 log.info("服务{}-{}注册成功", name, 
node); } } } 
序列化工具
import io.protostuff.LinkedBuffer; import io.protostuff.ProtostuffIOUtil; 
import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; import 
lombok.extern.slf4j.Slf4j; import java.util.*; import 
java.util.concurrent.CopyOnWriteArrayList; /** * @description * @author: ts * 
@create:2021-04-08 10:31 */ @Slf4j public class ProtostuffUtil { 
//存储因为无法直接序列化/反序列化 而需要被包装的类型Class private static final Set<Class<?>> 
WRAPPER_SET = new HashSet<Class<?>>(); static { WRAPPER_SET.add(List.class); 
WRAPPER_SET.add(ArrayList.class); WRAPPER_SET.add(CopyOnWriteArrayList.class); 
WRAPPER_SET.add(LinkedList.class); WRAPPER_SET.add(Stack.class); 
WRAPPER_SET.add(Vector.class); WRAPPER_SET.add(Map.class); 
WRAPPER_SET.add(HashMap.class); WRAPPER_SET.add(TreeMap.class); 
WRAPPER_SET.add(LinkedHashMap.class); WRAPPER_SET.add(Hashtable.class); 
WRAPPER_SET.add(SortedMap.class); WRAPPER_SET.add(Object.class); } 
//注册需要使用包装类进行序列化的Class对象 public static void registerWrapperClass(Class<?> 
clazz) { WRAPPER_SET.add(clazz); } /** * 将对象序列化为字节数组 * @param t * @param 
useWrapper 为true完全使用包装模式 为false则选择性的使用包装模式 * @param <T> * @return */ public 
static <T> byte[] serialize(T t,boolean useWrapper) { Object serializerObj = t; 
if (useWrapper) { serializerObj = SerializeDeserializeWrapper.build(t); } 
return serialize(serializerObj); } /** * 将对象序列化为字节数组 * @param t * @param <T> * 
@return */ public static <T> byte[] serialize(T t) { //获取序列化对象的class Class<T> 
clazz = (Class<T>) t.getClass(); Object serializerObj = t; if 
(WRAPPER_SET.contains(clazz)) { serializerObj = 
SerializeDeserializeWrapper.build(t);//将原始序列化对象进行包装 } return 
doSerialize(serializerObj); } /** * 执行序列化 * @param t * @param <T> * @return */ 
public static <T> byte[] doSerialize(T t) { //获取序列化对象的class Class<T> clazz = 
(Class<T>) t.getClass(); //获取Schema // RuntimeSchema<T> schema = 
RuntimeSchema.createFrom(clazz);//根据给定的class创建schema /** * this is lazily 
created and cached by RuntimeSchema * so its safe to call 
RuntimeSchema.getSchema() over and over The getSchema method is also 
thread-safe */ Schema<T> schema = RuntimeSchema.getSchema(clazz);//内部有缓存机制 /** 
* Re-use (manage) this buffer to avoid allocating on every serialization */ 
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); 
byte[] protostuff = null; try { protostuff = ProtostuffIOUtil.toByteArray(t, 
schema, buffer); } catch (Exception e){ log.error("protostuff serialize 
error,{}",e.getMessage()); }finally { buffer.clear(); } return protostuff; } 
/** * 反序列化 * @param data * @param clazz * @param <T> * @return */ public static 
<T> T deserialize(byte[] data,Class<T> clazz) { //判断是否经过包装 if 
(WRAPPER_SET.contains(clazz)) { SerializeDeserializeWrapper<T> wrapper = new 
SerializeDeserializeWrapper<T>(); 
ProtostuffIOUtil.mergeFrom(data,wrapper,RuntimeSchema.getSchema(SerializeDeserializeWrapper.class)); 
return wrapper.getData(); }else { Schema<T> schema = 
RuntimeSchema.getSchema(clazz); T newMessage = schema.newMessage(); 
ProtostuffIOUtil.mergeFrom(data,newMessage,schema); return newMessage; } } 
private static class SerializeDeserializeWrapper<T> { //被包装的数据 T data; public 
static <T> SerializeDeserializeWrapper<T> build(T data){ 
SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>(); 
wrapper.setData(data); return wrapper; } public T getData() { return data; } 
public void setData(T data) { this.data = data; } } } 
一次编码
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** * 一次解码 */ 
public class FrameDecoder extends LengthFieldBasedFrameDecoder { public 
FrameDecoder() { super(Integer.MAX_VALUE, 0, 4, 0, 4); } } 
二次编码
import com.rpc.util.ProtostuffUtil; import io.netty.buffer.ByteBuf; import 
io.netty.channel.ChannelHandlerContext; import 
io.netty.handler.codec.MessageToMessageEncoder; import 
lombok.extern.slf4j.Slf4j; import java.util.List; /** * 服务端的二次编码 */ @Slf4j 
public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> { 
@Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, 
List<Object> out) throws Exception { /* * 首先将传入的ByteBuf 序列化为一个byte 数组 * 
然后用ChannelHandlerContext 构建一个buffer对象 * 最后写入buffer,添加out写出 */ try { // 
使用序列化工具,将msg序列化 byte[] bytes = ProtostuffUtil.serialize(msg); // 
由ctx分配构建一个buffer对象 ByteBuf buffer = ctx.alloc().buffer(bytes.length); // 
将数据交给buffer buffer.writeBytes(bytes); // 添加写出 out.add(buffer); } catch 
(Exception e) { // 异常 log.error("RpcResponseEncoder exception 
,msg={}",e.getMessage()); } } } 
一次解码
import io.netty.handler.codec.LengthFieldPrepender; /** * 一次解码 */ public class 
FrameEncoder extends LengthFieldPrepender { public FrameEncoder() { super(4); } 
} 
二次解码
import com.itheima.rpc.util.ProtostuffUtil; import io.netty.buffer.ByteBuf; 
import io.netty.channel.ChannelHandlerContext; import 
io.netty.handler.codec.MessageToMessageEncoder; import 
lombok.extern.slf4j.Slf4j; import java.util.List; /** * 服务端的二次编码 */ @Slf4j 
public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> { 
@Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, 
List<Object> out) throws Exception { /* * 首先将传入的ByteBuf 序列化为一个byte 数组 * 
然后用ChannelHandlerContext 构建一个buffer对象 * 最后写入buffer,添加out写出 */ try { // 
使用序列化工具,将msg序列化 byte[] bytes = ProtostuffUtil.serialize(msg); // 
由ctx分配构建一个buffer对象 ByteBuf buffer = ctx.alloc().buffer(bytes.length); // 
将数据交给buffer buffer.writeBytes(bytes); // 添加写出 out.add(buffer); } catch 
(Exception e) { // 异常 log.error("RpcResponseEncoder exception 
,msg={}",e.getMessage()); } } } 
响应对象
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; 
import lombok.NoArgsConstructor; @Data @Builder @NoArgsConstructor 
@AllArgsConstructor public class RpcResponse { private String requestId; 
private Object result; private Throwable cause; public boolean isError() { 
return cause != null; } } 
请求对象
import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; 
import lombok.NoArgsConstructor; @Data @Builder @NoArgsConstructor 
@AllArgsConstructor public class RpcRequest { private String requestId; private 
String className; private String methodName; private Class<?>[] parameterTypes; 
private Object[] parameters; } 
业务逻辑调用Handler
import com.itheima.rpc.data.RpcRequest; import 
com.itheima.rpc.data.RpcResponse; import 
com.itheima.rpc.spring.SpringBeanFactory; import 
io.netty.channel.ChannelHandlerContext; import 
io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; 
import java.lang.reflect.InvocationTargetException; import 
java.lang.reflect.Method; /** * 客户端请求业务调用 */ @Slf4j public class 
RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override 
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) 
throws Exception { /* * 先创建请求对应的响应对象 * 从请求对象中获取相关接口信息,接口名称、方法名称、参数类型、参数 * 
根据接名称从容器中获取bean * 用反射,根据方法和参数类型拿到Method 对象 * 将参数传入Method 对象,然后运行,拿到返回值 * 
将返回值给到响应对象 * 一定要将响应对象回写给客户端 */ log.info("服务端收到的请求是:{}",rpcRequest); // 构建响应对象 
RpcResponse rpcResponse = new RpcResponse(); // 于请求对象关联 
rpcResponse.setRequestId(rpcRequest.getRequestId()); try { // 接口名称 String 
interfaceName = rpcRequest.getClassName(); // 方法名称 String methodName = 
rpcRequest.getMethodName(); // 参数类型 Class<?>[] parameterTypes = 
rpcRequest.getParameterTypes(); // 实际参数 Object[] parameters = 
rpcRequest.getParameters(); // 从容器中获取bean实例 Object bean = 
SpringBeanFactory.getBean(Class.forName(interfaceName)); // 反射获取method 对象 
Method method = bean.getClass().getMethod(methodName, parameterTypes); // 
执行对应方法,拿到返回值 Object result = method.invoke(bean, parameters); // 添加到响应对象 
rpcResponse.setResult(result); } catch (Exception e) { 
log.error("RpcRequestHandler exception,msg={}",e.getMessage()); 
rpcResponse.setCause(e); } finally { // 将结果写回 
log.info("向客户端发送响应,{}",rpcResponse); ctx.writeAndFlush(rpcResponse); } } } 
netty代码实现
import com.rpc.netty.codec.FrameDecoder; import 
com.rpc.netty.codec.FrameEncoder; import com.rpc.netty.codec.RpcRequestDecoder; 
import com.rpc.netty.codec.RpcResponseEncoder; import 
com.rpc.netty.handler.RpcRequestHandler; import com.rpc.server.boot.RpcServer; 
import com.rpc.server.config.RpcServerConfiguration; import 
io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; import 
io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import 
io.netty.channel.nio.NioEventLoopGroup; import 
io.netty.channel.socket.SocketChannel; import 
io.netty.channel.socket.nio.NioServerSocketChannel; import 
io.netty.util.NettyRuntime; import 
io.netty.util.concurrent.DefaultThreadFactory; import 
org.springframework.beans.factory.annotation.Autowired; public class NettServer 
implements RpcServer { @Autowired private RpcServerConfiguration 
rpcServerConfiguration; @Override public void start() { /* * 
首先要获取三个线程池,用于注册serverSocketChannel、socketChannel 还有业务逻辑处理(即请求调用) * 
再构建引导类,并配置先关信息,将注册请求和处理读写的线程池配置到引导类中,然后配置好相关的 
handler(第一、二次编解码,请求调用),注意请求调用使用线程池做处理 * 最后启动引导类,绑定监听端口,设置同步 * 监控等待关闭 * 优雅的关闭线程池 
*/ // 构建注册serverSocketChannel 的线程池 NioEventLoopGroup boss = new 
NioEventLoopGroup(1, new DefaultThreadFactory("boss")); // 构建注册socketChannel 
的线程池 NioEventLoopGroup worker = new NioEventLoopGroup(0, new 
DefaultThreadFactory("worker")); // 构建业务调用的线程池 NioEventLoopGroup 
rpcRequestHandler = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 
2, new DefaultThreadFactory("reqRequestHandler")); // 业务逻辑调用 RpcRequestHandler 
requestHandler = new RpcRequestHandler(); try { // 构建引导类 ServerBootstrap 
serverBootstrap = new ServerBootstrap(); // 配置引导类 serverBootstrap.group(boss, 
worker) .channel(NioServerSocketChannel.class) 
.option(ChannelOption.SO_BACKLOG,1024) 
.childOption(ChannelOption.TCP_NODELAY,true) 
.childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new 
ChannelInitializer<SocketChannel>() { @Override protected void 
initChannel(SocketChannel socketChannel) throws Exception { // 获取 pipeline 
ChannelPipeline pipeline = socketChannel.pipeline(); /* * 配置 handler */ // 一级编码 
pipeline.addLast("FrameEncoder", new FrameEncoder()); // 二级编码 
pipeline.addLast("RpcResponseEncoder", new RpcResponseEncoder()); // 一级解码 
pipeline.addLast("FrameDecoder", new FrameDecoder()); // 二级解码 
pipeline.addLast("RpcRequestDecoder", new RpcRequestDecoder()); // 业务线程池调用 
pipeline.addLast(rpcRequestHandler, "requestHandler", requestHandler); } }); // 
启动引导类,监听端口,设置同步 ChannelFuture future = 
serverBootstrap.bind(rpcServerConfiguration.getRpcPort()).sync(); // 监控等待关闭 
future.channel().closeFuture().sync(); } catch (Exception e) { // 异常 
e.printStackTrace(); } finally { /* * 优雅的关闭各个线程池 */ boss.shutdownGracefully(); 
worker.shutdownGracefully(); rpcRequestHandler.shutdownGracefully(); } } } 
        上述内容就已经完成了一个服务端的创建,后续文章在说客户端,本次结束。