DangerWind-RPC-framework---七、序列化算法

avatar
作者
猴君
阅读量:1

        客户端与服务端的在交互过程中需要互相发送交换数据,这就需要对传输对象进行序列化与反序列化操作,此过程涉及到序列化算法的使用。本RPC框架支持多种序列化算法,通过SPI机制进行扩展。

       比如客户端在向服务端发送数据时,首先是客户端的序列化操作:

   Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)                         .getExtension(codecName);    bodyBytes = serializer.serialize(rpcMessage.getData());

       服务端接收到数据后要进行反序列化操作:

   Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)                     .getExtension(codecName);    if (messageType == RpcConstants.REQUEST_TYPE) {          RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);          rpcMessage.setData(tmpValue);    } else {          RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class);          rpcMessage.setData(tmpValue);    }

       本次主要支持三种序列化算法,分别是Kryo、Hessian、ProtoStuff,首先是Kryo序列化算法:

    private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {         Kryo kryo = new Kryo();         // 注册类是一种优化手段,可以提高序列化和反序列化的效率         kryo.register(RpcResponse.class);         kryo.register(RpcRequest.class);         return kryo;     });      @Override     public byte[] serialize(Object obj) {         try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();              Output output = new Output(byteArrayOutputStream)) {             Kryo kryo = kryoThreadLocal.get();             // Object->byte:将对象序列化为byte数组             kryo.writeObject(output, obj);             kryoThreadLocal.remove();             return output.toBytes();         } catch (Exception e) {             throw new SerializeException("Serialization failed");         }     }      @Override     public <T> T deserialize(byte[] bytes, Class<T> clazz) {         try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);              Input input = new Input(byteArrayInputStream)) {             Kryo kryo = kryoThreadLocal.get();             // byte->Object:从byte数组中反序列化出对对象             Object o = kryo.readObject(input, clazz);             kryoThreadLocal.remove();             return clazz.cast(o);         } catch (Exception e) {             throw new SerializeException("Deserialization failed");         }     }

       Kryo序列化算法性能较好,但是是线程不安全的,因此需要进行线程隔离,通过ThreadLocal为每个线程维护一个Kryo对象,使用完毕后就remove掉,之后需要使用时再次在threadLocal中初始化一个Kryo对象,这也是withInitial懒加载的过程。

      接下来是Hessian序列化算法:

    @Override     public byte[] serialize(Object obj) {         try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {             HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);             hessianOutput.writeObject(obj);              return byteArrayOutputStream.toByteArray();         } catch (Exception e) {             throw new SerializeException("Serialization failed");         }      }      @Override     public <T> T deserialize(byte[] bytes, Class<T> clazz) {          try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) {             HessianInput hessianInput = new HessianInput(byteArrayInputStream);             Object o = hessianInput.readObject();              return clazz.cast(o);          } catch (Exception e) {             throw new SerializeException("Deserialization failed");         }      }

       最后是ProtoStuff序列化算法:

    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);      @Override     public byte[] serialize(Object obj) {         Class<?> clazz = obj.getClass();         Schema schema = RuntimeSchema.getSchema(clazz);         byte[] bytes;         try {             bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);         } finally {             BUFFER.clear();         }         return bytes;     }      @Override     public <T> T deserialize(byte[] bytes, Class<T> clazz) {         Schema<T> schema = RuntimeSchema.getSchema(clazz);         T obj = schema.newMessage();         ProtostuffIOUtil.mergeFrom(bytes, obj, schema);         return obj;     }

        BUFFER可以避免每次序列化时重新申请缓冲区空间,提高序列化的效率。三种序列化算法的实现过程中,主要是需要对线程不安全的隐患进行处理,比如进行隔离等操作,以保证序列化操作的正确执行。

 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!