阅读量:4
在C#中,可以通过使用RabbitMQ.Client库来连接和操作RabbitMQ消息队列。以下是一个简单的示例,展示了如何封装一个基本的RabbitMQ连接池方法:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RabbitMQPool { private readonly string _connectionString; private IConnection _connection; private IModel _channel; public RabbitMQPool(string connectionString) { _connectionString = connectionString; CreateConnection(); } public void CreateConnection() { var factory = new ConnectionFactory { Uri = new Uri(_connectionString) }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); } public void Publish(string exchange, string routingKey, byte[] message) { _channel.BasicPublish(exchange, routingKey, null, message); } public void Consume(string queue, Action<byte[]> callback) { var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); callback(body); }; _channel.BasicConsume(queue, true, consumer); } public void CloseConnection() { _channel?.Close(); _connection?.Close(); } }
使用示例:
var connectionString = "amqp://guest:guest@localhost:5672"; var pool = new RabbitMQPool(connectionString); // 发布消息 var exchange = "myExchange"; var routingKey = "myRoutingKey"; var message = Encoding.UTF8.GetBytes("Hello RabbitMQ"); pool.Publish(exchange, routingKey, message); // 消费消息 var queue = "myQueue"; pool.Consume(queue, (body) => { var receivedMessage = Encoding.UTF8.GetString(body); Console.WriteLine(receivedMessage); }); // 关闭连接 pool.CloseConnection();
注意:在实际使用中,你可能需要根据自己的需求进一步完善和优化连接池的功能,例如添加连接池大小限制、连接复用等。上述示例仅提供了一个基本的封装框架,你可以根据自己的实际需求进行调整和扩展。