使用在python中使用grpc和共享内存,实现功能调用。

avatar
作者
猴君
阅读量:0

初衷:grpc可以很好的作为跨语言,跨机器的服务模式。但是对于大量数据的传输,进程之间还是共享内存要快一些,所以这里将grpc与共享内存混合使用。

功能实现:

        客户端:从摄像头不断的读取图片,并传输给服务端,并呈现从服务端返回的处理结果。

        服务端:读取客户端的请求,并从中读取共享内存中的图片,在图片上写入”ok“,并将结果传回给客户端。

创建文件:image_processing.proto

syntax = "proto3";  package image_processing;  import "google/protobuf/empty.proto";  // 定义一个请求消息,包含共享内存的名称 message ImageRequest {     string input_shm_name = 1;     string output_shm_name = 2; }  // 定义一个响应消息,包含处理后的图像的共享内存地址 message ImageResponse {     string output_shm_name = 1; }  // 定义清理共享内存的请求消息 message CleanupRequest {     string output_shm_name = 1; }  // 定义服务接口 service ImageProcessingService {     rpc ProcessImage (ImageRequest) returns (ImageResponse);     rpc CleanupSharedMemory (CleanupRequest) returns (google.protobuf.Empty); } 

安装python库:

pip install grpcio grpcio-tools opencv-python-headless

使用grpcio-tools生成Python代码:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. image_processing.proto

客户端

import grpc import cv2 import numpy as np from multiprocessing import shared_memory  import image_processing_pb2 import image_processing_pb2_grpc def main():     # 捕获图像     #img = capture_image()     vc=cv2.VideoCapture(0)          with grpc.insecure_channel('localhost:50051') as channel:         stub = image_processing_pb2_grpc.ImageProcessingServiceStub(channel)         while 1:             res,img=vc.read()             shm = shared_memory.SharedMemory(create=True, size=img.nbytes)             shm_array = np.ndarray(img.shape, dtype=img.dtype, buffer=shm.buf)             shm_array[:] = img[:]              response = stub.ProcessImage(image_processing_pb2.ImageRequest(input_shm_name=shm.name, output_shm_name=""))              # 读取处理后的图像             output_shm = shared_memory.SharedMemory(name=response.output_shm_name)             output_array = np.ndarray(img.shape, dtype=img.dtype, buffer=output_shm.buf)              # 显示处理后的图像             cv2.imshow("Processed Image", output_array)             cv2.waitKey(1)         cv2.destroyAllWindows()          # 清理共享内存         shm.close()         shm.unlink()         output_shm.close()         output_shm.unlink()          # 通知服务端清理共享内存         with grpc.insecure_channel('localhost:50051') as channel:             stub = image_processing_pb2_grpc.ImageProcessingServiceStub(channel)             stub.CleanupSharedMemory(image_processing_pb2.CleanupRequest(output_shm_name=response.output_shm_name))  if __name__ == '__main__':     main() 

服务端:

import grpc from concurrent import futures import cv2 import numpy as np from multiprocessing import shared_memory import time import threading  import image_processing_pb2 import image_processing_pb2_grpc  # 全局字典存储共享内存对象引用 shared_memory_store = {} lock = threading.Lock()  class ImageProcessingServiceServicer(image_processing_pb2_grpc.ImageProcessingServiceServicer):     def ProcessImage(self, request, context):         # 读取共享内存中的图像         shm = shared_memory.SharedMemory(name=request.input_shm_name)         img_array = np.ndarray((480, 640, 3), dtype=np.uint8, buffer=shm.buf)         img = img_array.copy()  # 复制图像以确保共享内存可以被释放          # 在图像上写“ok”         cv2.putText(img, "ok", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)          # 创建新的共享内存并将处理后的图像写入         output_shm = shared_memory.SharedMemory(create=True, size=img.nbytes)         output_array = np.ndarray(img.shape, dtype=img.dtype, buffer=output_shm.buf)         output_array[:] = img[:]          # 将共享内存对象存储在全局字典中         with lock:             shared_memory_store[output_shm.name] = output_shm          # 启动一个线程在一段时间后清理共享内存         threading.Timer(60, self.cleanup_shared_memory, args=(output_shm.name,)).start()          # 返回处理后的图像的共享内存名称         response = image_processing_pb2.ImageResponse(output_shm_name=output_shm.name)         return response      def cleanup_shared_memory(self, shm_name):         with lock:             if shm_name in shared_memory_store:                 shm = shared_memory_store.pop(shm_name)                 shm.close()                 shm.unlink()  def serve():     server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))     image_processing_pb2_grpc.add_ImageProcessingServiceServicer_to_server(ImageProcessingServiceServicer(), server)     server.add_insecure_port('[::]:50051')     server.start()     print("Server started, listening on port 50051.")     try:         while True:             time.sleep(86400)  # 运行一整天     except KeyboardInterrupt:         server.stop(0)  if __name__ == '__main__':     serve() 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!