阅读量:0
初衷:grpc可以很好的作为跨语言,跨机器的服务模式。但是对于大量数据的传输,进程之间还是共享内存要快一些,所以这里将grpc与共享内存混合使用。
功能实现:
客户端:从摄像头不断的读取图片,并传输给服务端,并呈现从服务端返回的处理结果。
服务端:读取客户端的请求,并从中读取共享内存中的图片,在图片上写入”ok“,并将结果传回给客户端。
创建文件:image_processing.proto
syntax = "proto3"; package image_processing; import "google/protobuf/empty.proto"; // 定义一个请求消息,包含共享内存的名称 message ImageRequest { string input_shm_name = 1; string output_shm_name = 2; } // 定义一个响应消息,包含处理后的图像的共享内存地址 message ImageResponse { string output_shm_name = 1; } // 定义清理共享内存的请求消息 message CleanupRequest { string output_shm_name = 1; } // 定义服务接口 service ImageProcessingService { rpc ProcessImage (ImageRequest) returns (ImageResponse); rpc CleanupSharedMemory (CleanupRequest) returns (google.protobuf.Empty); }
安装python库:
pip install grpcio grpcio-tools opencv-python-headless
使用grpcio-tools
生成Python代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. image_processing.proto
客户端
import grpc import cv2 import numpy as np from multiprocessing import shared_memory import image_processing_pb2 import image_processing_pb2_grpc def main(): # 捕获图像 #img = capture_image() vc=cv2.VideoCapture(0) with grpc.insecure_channel('localhost:50051') as channel: stub = image_processing_pb2_grpc.ImageProcessingServiceStub(channel) while 1: res,img=vc.read() shm = shared_memory.SharedMemory(create=True, size=img.nbytes) shm_array = np.ndarray(img.shape, dtype=img.dtype, buffer=shm.buf) shm_array[:] = img[:] response = stub.ProcessImage(image_processing_pb2.ImageRequest(input_shm_name=shm.name, output_shm_name="")) # 读取处理后的图像 output_shm = shared_memory.SharedMemory(name=response.output_shm_name) output_array = np.ndarray(img.shape, dtype=img.dtype, buffer=output_shm.buf) # 显示处理后的图像 cv2.imshow("Processed Image", output_array) cv2.waitKey(1) cv2.destroyAllWindows() # 清理共享内存 shm.close() shm.unlink() output_shm.close() output_shm.unlink() # 通知服务端清理共享内存 with grpc.insecure_channel('localhost:50051') as channel: stub = image_processing_pb2_grpc.ImageProcessingServiceStub(channel) stub.CleanupSharedMemory(image_processing_pb2.CleanupRequest(output_shm_name=response.output_shm_name)) if __name__ == '__main__': main()
服务端:
import grpc from concurrent import futures import cv2 import numpy as np from multiprocessing import shared_memory import time import threading import image_processing_pb2 import image_processing_pb2_grpc # 全局字典存储共享内存对象引用 shared_memory_store = {} lock = threading.Lock() class ImageProcessingServiceServicer(image_processing_pb2_grpc.ImageProcessingServiceServicer): def ProcessImage(self, request, context): # 读取共享内存中的图像 shm = shared_memory.SharedMemory(name=request.input_shm_name) img_array = np.ndarray((480, 640, 3), dtype=np.uint8, buffer=shm.buf) img = img_array.copy() # 复制图像以确保共享内存可以被释放 # 在图像上写“ok” cv2.putText(img, "ok", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA) # 创建新的共享内存并将处理后的图像写入 output_shm = shared_memory.SharedMemory(create=True, size=img.nbytes) output_array = np.ndarray(img.shape, dtype=img.dtype, buffer=output_shm.buf) output_array[:] = img[:] # 将共享内存对象存储在全局字典中 with lock: shared_memory_store[output_shm.name] = output_shm # 启动一个线程在一段时间后清理共享内存 threading.Timer(60, self.cleanup_shared_memory, args=(output_shm.name,)).start() # 返回处理后的图像的共享内存名称 response = image_processing_pb2.ImageResponse(output_shm_name=output_shm.name) return response def cleanup_shared_memory(self, shm_name): with lock: if shm_name in shared_memory_store: shm = shared_memory_store.pop(shm_name) shm.close() shm.unlink() def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) image_processing_pb2_grpc.add_ImageProcessingServiceServicer_to_server(ImageProcessingServiceServicer(), server) server.add_insecure_port('[::]:50051') server.start() print("Server started, listening on port 50051.") try: while True: time.sleep(86400) # 运行一整天 except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()