阅读量:1
客户端与服务端的在交互过程中需要互相发送交换数据,这就需要对传输对象进行序列化与反序列化操作,此过程涉及到序列化算法的使用。本RPC框架支持多种序列化算法,通过SPI机制进行扩展。
比如客户端在向服务端发送数据时,首先是客户端的序列化操作:
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class) .getExtension(codecName); bodyBytes = serializer.serialize(rpcMessage.getData());
服务端接收到数据后要进行反序列化操作:
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class) .getExtension(codecName); if (messageType == RpcConstants.REQUEST_TYPE) { RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class); rpcMessage.setData(tmpValue); } else { RpcResponse tmpValue = serializer.deserialize(bs, RpcResponse.class); rpcMessage.setData(tmpValue); }
本次主要支持三种序列化算法,分别是Kryo、Hessian、ProtoStuff,首先是Kryo序列化算法:
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo(); // 注册类是一种优化手段,可以提高序列化和反序列化的效率 kryo.register(RpcResponse.class); kryo.register(RpcRequest.class); return kryo; }); @Override public byte[] serialize(Object obj) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutputStream)) { Kryo kryo = kryoThreadLocal.get(); // Object->byte:将对象序列化为byte数组 kryo.writeObject(output, obj); kryoThreadLocal.remove(); return output.toBytes(); } catch (Exception e) { throw new SerializeException("Serialization failed"); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInputStream)) { Kryo kryo = kryoThreadLocal.get(); // byte->Object:从byte数组中反序列化出对对象 Object o = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return clazz.cast(o); } catch (Exception e) { throw new SerializeException("Deserialization failed"); } }
Kryo序列化算法性能较好,但是是线程不安全的,因此需要进行线程隔离,通过ThreadLocal为每个线程维护一个Kryo对象,使用完毕后就remove掉,之后需要使用时再次在threadLocal中初始化一个Kryo对象,这也是withInitial懒加载的过程。
接下来是Hessian序列化算法:
@Override public byte[] serialize(Object obj) { try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream); hessianOutput.writeObject(obj); return byteArrayOutputStream.toByteArray(); } catch (Exception e) { throw new SerializeException("Serialization failed"); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes)) { HessianInput hessianInput = new HessianInput(byteArrayInputStream); Object o = hessianInput.readObject(); return clazz.cast(o); } catch (Exception e) { throw new SerializeException("Deserialization failed"); } }
最后是ProtoStuff序列化算法:
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); @Override public byte[] serialize(Object obj) { Class<?> clazz = obj.getClass(); Schema schema = RuntimeSchema.getSchema(clazz); byte[] bytes; try { bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER); } finally { BUFFER.clear(); } return bytes; } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; }
BUFFER可以避免每次序列化时重新申请缓冲区空间,提高序列化的效率。三种序列化算法的实现过程中,主要是需要对线程不安全的隐患进行处理,比如进行隔离等操作,以保证序列化操作的正确执行。