IM服务集群与跨服务器消息路由策略

avatar
作者
猴君
阅读量:1

前言

在单机模式下,相对简单,认为所有的客户都在同一台服务器上,姑且认为1台服务器可以同时支持1万用户在线,在更多用户同时在线时,则需要集群来实现负载均衡。

集群的算法需要解决故障处理以及动态添加的问题,同时需要考虑如何在集群节点间路由数据。

1. 负载均衡算法

这里使用一致性哈希环来实现。

添加节点

每个设备都有唯一的名字(key),同时按照重复因子的个数添加到环中;每个名字以及重复的序号做hash算法得到一个哈希数,所以环上每个节点都有一个数值;

查询

在查询时候,提供了某个字符串,计算哈希,如果哈希比某个环上节点的数值大,就选这个节点,如果都找不到,就选第一个,所以称之为环而不是列表;

故障移除

当某个服务器故障退出时候,需要从环上删除节点,那么此服务节点上的用户再次登录时,会选到相邻的服务节点上;而正常服务节点上的用户不会受到影响;

1.1 redis服务发现

一般GO服务发现使用etcd比较多,但是为了少安装几个服务,可以考虑使用redis实现;

注册服务

每个服务上线时候,都在“IMSERVER"键值中设置节点名字的field和value,value中包含更新时间;

每个服务定期更新field中的活动时间戳,证明工作正常;

轮询可以根据更新时间确定哪些节点存活,如果某些节点失效了,需要重新计算哈希值;

如果为了使用超时机制来实现自动检测,那么可以再额外添加了一个键值,名字就是服务的节点名最后后缀,这样当某个服务节点的键值超时就可以即时发现;

1.2 etcd服务发现

在一个分布式系统中,服务注册与发现是一个关键的组件,用于在集群中动态地注册服务和发现其他服务。Etcd 提供了一种方便的方式来实现服务注册与发现,通常的做法包括以下几个步骤:

  1. 服务注册
    • 当一个服务启动时,它会将自己的地址信息(如 IP 地址和端口号)以及其他相关信息注册到 Etcd 中。
    • 通常情况下,服务会将自己注册为一个可识别的名称,比如服务的名称、版本号等,以便其他服务能够识别和访问。
  2. 服务发现
    • 当一个服务需要与其他服务通信时,它会向 Etcd 发送一个服务发现的请求,查询特定服务名称对应的地址信息。
    • Etcd 将返回一个或多个注册了该名称的服务的地址信息,然后服务可以使用这些信息来进行通信。

在 Go 中,可以使用 Etcd 的 Go 客户端库(如 etcd/clientv3)来实现服务注册与发现。以下是一个简单的示例,演示了如何使用 Etcd 实现服务注册与发现:

package main  import ( 	"context" 	"fmt" 	"log" 	"time"  	"go.etcd.io/etcd/clientv3" )  func main() { 	// 连接 Etcd 客户端 	cli, err := clientv3.New(clientv3.Config{ 		Endpoints:   []string{"http://localhost:2379"}, // Etcd 服务地址 		DialTimeout: 5 * time.Second, 	}) 	if err != nil { 		log.Fatal(err) 	} 	defer cli.Close()  	// 注册服务 	if err := registerService(cli, "service1", "127.0.0.1:8080"); err != nil { 		log.Fatal(err) 	}  	// 发现服务 	addrs, err := discoverService(cli, "service1") 	if err != nil { 		log.Fatal(err) 	} 	fmt.Println("Service1 addresses:", addrs) }  // 注册服务 func registerService(cli *clientv3.Client, serviceName, serviceAddr string) error { 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 	defer cancel()  	// 将服务地址信息注册到 Etcd 中 	_, err := cli.Put(ctx, "/services/"+serviceName+"/"+serviceAddr, serviceAddr) 	return err }  // 发现服务 func discoverService(cli *clientv3.Client, serviceName string) ([]string, error) { 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 	defer cancel()  	// 查询特定服务名称对应的所有地址信息 	resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix()) 	if err != nil { 		return nil, err 	}  	// 解析地址信息 	var addrs []string 	for _, kv := range resp.Kvs { 		addrs = append(addrs, string(kv.Value)) 	} 	return addrs, nil }  

1.2 重复因子

添加重复因子是为了增加哈希环上节点的数量,从而更好地实现负载均衡。在哈希环中,服务器节点的数量越多,分布越均匀,可以提高负载均衡的效果。如果只有少量的服务器节点,哈希环上的节点分布可能不够均匀,导致负载分配不平衡。

通过添加重复因子,可以让每个服务器节点在哈希环上出现多次,增加了节点的数量,从而更好地平衡了负载。例如,如果只有3台服务器,但设置了重复因子为10,则在哈希环上每个服务器节点会出现10次,相当于虚拟出了30个节点,增加了负载均衡的可能性。

在服务器数量为3到10台之间,可以考虑将重复因子设置为3或4。这样可以在哈希环上为每个服务器节点创建3到4个虚拟节点,以增加哈希环上节点的数量,提高负载均衡的效果。

例如,如果有5台服务器,设置重复因子为3,则在哈希环上会创建15个虚拟节点,增加了节点的数量,有助于更均匀地分配负载。而如果服务器数量较少,设置更大的重复因子可能会导致哈希环上节点过多,增加计算开销;而设置更小的重复因子可能会降低负载均衡的效果。

综上所述,设置重复因子为3或4可能是一个合适的选择,可以在保证负载均衡的同时,避免哈希环节点数量过多带来的性能损耗。然而,具体的选择还应根据实际情况和性能要求进行调整和评估。

2. redis检测故障实现

可以设计一个基于定时任务的机制来实现:

  1. 服务节点定时更新超时时间:每个服务节点定期(比如每隔 5 秒)更新自己的超时时间,将当前时间加上超时阈值,并将新的超时时间写入 Redis 中。这样可以确保每个服务节点的超时时间保持最新。

  2. 其他节点定时轮询超时情况:其他节点定期(比如每隔 10 秒)轮询所有节点的超时情况。对于每个节点,获取其超时时间,如果发现某个节点超时了,就触发重新计算哈希环的操作。

  3. 重新计算哈希环:一旦发现某个节点超时,触发重新计算哈希环的操作。这意味着需要重新收集所有服务节点的信息,并更新哈希环,确保哈希环中的每个节点都是当前活动的节点。

  4. 定时任务实现:使用 Go 中的定时任务库(如 time.Timertime.Tick)来实现定时任务。每个节点维护一个定时器,在定时器触发时执行相应的操作(更新超时时间或者轮询超时情况)。

2.1 轮询查询

package main  import ( 	"context" 	"fmt" 	"time"  	"github.com/go-redis/redis/v8" )  func main() { 	// 创建 Redis 客户端 	rdb := redis.NewClient(&redis.Options{ 		Addr:     "localhost:6379", 		Password: "", // 设置 Redis 密码 		DB:       0,  // 使用默认数据库 	})  	// 模拟每个服务节点更新超时时间的定时任务 	go func() { 		ticker := time.NewTicker(5 * time.Second) 		defer ticker.Stop()  		for { 			select { 			case <-ticker.C: 				// 更新超时时间 				updateTimeout(rdb) 			} 		} 	}()  	// 模拟其他节点轮询超时情况的定时任务 	go func() { 		ticker := time.NewTicker(10 * time.Second) 		defer ticker.Stop()  		for { 			select { 			case <-ticker.C: 				// 轮询超时情况并重新计算哈希环 				checkTimeoutAndRebalance(rdb) 			} 		} 	}()  	// 主程序保持运行 	select {} }  func updateTimeout(rdb *redis.Client) { 	// 获取当前时间 	now := time.Now() 	// 更新超时时间 	rdb.Set(context.Background(), "timeout", now.Add(5*time.Second).Format(time.RFC3339), 0) 	fmt.Println("Updated timeout to", now.Add(5*time.Second)) }  func checkTimeoutAndRebalance(rdb *redis.Client) { 	// 获取当前时间 	now := time.Now() 	// 获取超时时间 	timeoutStr, err := rdb.Get(context.Background(), "timeout").Result() 	if err != nil { 		fmt.Println("Error:", err) 		return 	}  	timeout, err := time.Parse(time.RFC3339, timeoutStr) 	if err != nil { 		fmt.Println("Error:", err) 		return 	}  	// 检查超时情况 	if now.After(timeout) { 		fmt.Println("Node timeout detected. Rebalancing...") 		// 重新计算哈希环 		rebalanceHashRing() 	} }  func rebalanceHashRing() { 	// 实现哈希环的重新计算逻辑 	fmt.Println("Rebalancing hash ring...") }     

2.2 订阅超时事件

当订阅了 __keyevent@0__:expired 模式后,每当有键值过期时,Redis 就会发送一个消息给订阅者。这个消息会包含过期的键名(key name),因此可以在接收到消息时获取到哪个键值超时了。

检测超时键值时候,查看是否是已知的服务器名字,如果某个服务器下线了,则需要重新调整;

以下是如何使用 Go 语言监听键值超时事件:

package main  import ( 	"context" 	"fmt" 	"time"  	"github.com/go-redis/redis/v8" )  func main() { 	// 创建 Redis 客户端 	rdb := redis.NewClient(&redis.Options{ 		Addr:     "localhost:6379", 		Password: "", // 设置 Redis 密码 		DB:       0,  // 使用默认数据库 	})  	// 订阅键值过期事件 	pubsub := rdb.Subscribe(context.Background(), "__keyevent@0__:expired") 	defer pubsub.Close()  	// 在单独的协程中处理订阅消息 	go func() { 		for { 			// 接收消息 			msg, err := pubsub.Receive(context.Background()) 			if err != nil { 				fmt.Println("Error:", err) 				return 			}  			// 处理消息 			switch msg := msg.(type) { 			case *redis.Message: 				// 获取超时的键名 				key := msg.Payload 				fmt.Println("Key expired:", key) 				// 进行相应的处理逻辑 			default: 				fmt.Println("Received unknown message type") 			} 		} 	}()  	// 主程序保持运行 	select {} }  

3. etcd故障检测实现

要检测某个服务节点是否下线,可以通过以下几种方式实现:

  1. 心跳检测:服务节点定期发送心跳信号到 Etcd,表明自己仍然活跃。如果某个节点长时间没有发送心跳信号,可以认为该节点已经下线。
  2. 定时检测:定时从 Etcd 中查询特定服务的节点信息,如果发现某个节点在最新的查询中不存在,说明该节点已下线。
  3. Watch 监听:通过 Etcd 的 Watch 功能,监视特定服务的节点信息。当有节点下线时,Watch 将会收到相应的通知,从而实现及时检测。

下面是一个简单的示例代码,演示了如何使用 Etcd 的 Watch 功能来监视特定服务的节点信息,并实时检测节点的上线和下线:

package main  import ( 	"context" 	"log" 	"time"  	"go.etcd.io/etcd/clientv3" )  func main() { 	// 连接 Etcd 客户端 	cli, err := clientv3.New(clientv3.Config{ 		Endpoints:   []string{"http://localhost:2379"}, // Etcd 服务地址 		DialTimeout: 5 * time.Second, 	}) 	if err != nil { 		log.Fatal(err) 	} 	defer cli.Close()  	// Watch 特定服务节点的变化 	watchServiceNodes(cli, "service1") }  // Watch 特定服务节点的变化 func watchServiceNodes(cli *clientv3.Client, serviceName string) { 	watchChan := cli.Watch(context.Background(), "/services/"+serviceName+"/", clientv3.WithPrefix())  	for { 		select { 		case <-watchChan: 			// 节点发生变化,处理节点上线和下线的情况 			checkServiceNodes(cli, serviceName) 		} 	} }  // 检查服务节点状态 func checkServiceNodes(cli *clientv3.Client, serviceName string) { 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 	defer cancel()  	// 查询特定服务名称对应的所有地址信息 	resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix()) 	if err != nil { 		log.Println("Error checking service nodes:", err) 		return 	}  	// 检查节点是否下线 	// 遍历之前保存的节点信息,与当前 Etcd 中的节点信息进行对比 	// 如果某个节点之前存在,但在当前查询中不存在,则认为该节点下线 	for _, kv := range resp.Kvs { 		nodeAddr := string(kv.Value) 		log.Println("Service node:", nodeAddr) 	}  	log.Println("Service node check complete.") }  

4. 消息跨服务器路由

4.1 一对一消息路由

消息目的地用户如果不再本地服务器,则使用算法计算当前应该在哪个服务器上,使用kafka将数据转发到目标服务器,等待目标服务器去处理;

屏蔽检测

在转发消息前,入库前,首先需要检测对方是否屏蔽了自己;在集群模式下,用户登录后,在redis中更新自己的状态,同时pulish 自己的状态消息。

如果有多个订阅者同时订阅了同一个频道,那么每个订阅者都能收到发布到该频道的消息。

Redis 的 Pub/Sub(发布/订阅)模式允许多个订阅者同时订阅同一个频道,并且每个订阅者都能接收到频道中发布的消息。

当消息发布到频道时,Redis 会将该消息发送给所有订阅了该频道的客户端,即使有多个订阅者也是如此。每个订阅者都会收到相同的消息,并且消息的顺序与发布的顺序一致。

这种设计使得 Pub/Sub 模式非常适合实现广播、实时通知等场景,因为每个订阅者都能及时地收到发布的消息,无需额外的请求或轮询。

但是:Redis 的 Pub/Sub 模式下,消息不会被缓存。当消息发布到频道时,它会立即发送给所有订阅了该频道的客户端,而不会保存在 Redis 中。这意味着如果有订阅者在消息发布之后才订阅频道,它们将无法接收到之前发布的消息。

补充:用户的状态信息同时需要保存到redis中,便于错过消息的服务器加载;

三级缓存: 服务器也不是缓存所有用户状态信息,只有通信中需要路由的用户才会将状态加载到内存;可以设置30分钟超时,如果不使用则清理掉缓存;这样就是一个数据库,redis,内存的三级缓存模式;

相关发布消息的示例:

package main  import ( 	"context" 	"fmt" 	"log" 	"time"  	"github.com/go-redis/redis/v8" )  func newRedisClient() *redis.Client { 	return redis.NewClient(&redis.Options{ 		Addr:     "localhost:6379", // Redis 服务地址 		Password: "",                // Redis 访问密码 		DB:       0,                 // 选择的数据库 	}) }  func publishMessage(channel, message string) error { 	ctx := context.Background() 	client := newRedisClient()  	// 发布消息到指定频道 	if err := client.Publish(ctx, channel, message).Err(); err != nil { 		return err 	}  	// 关闭 Redis 客户端连接 	if err := client.Close(); err != nil { 		log.Println("Error closing Redis client:", err) 	}  	return nil }  func main() { 	channel := "user_actions" // 频道名称 	message := "User login"   // 消息内容  	if err := publishMessage(channel, message); err != nil { 		log.Fatal("Error publishing message:", err) 	} else { 		fmt.Println("Message published successfully!") 	} }  

这样就可以通过调用 publishMessage 函数来发布消息到指定的频道了。在 main 函数中,指定了要发布的频道名称和消息内容,然后调用 publishMessage 函数将消息发布到指定的频道中。

订阅部分示例代码:

package main  import ( 	"context" 	"fmt" 	"log" 	"os" 	"os/signal" 	"sync" 	"time"  	"github.com/go-redis/redis/v8" )  func main() { 	// 创建 Redis 客户端 	rdb := redis.NewClient(&redis.Options{ 		Addr:     "localhost:6379", // Redis 服务器地址 		Password: "",               // 连接密码,若无密码则留空 		DB:       0,                // 选择数据库,默认为 0 	})  	// 创建一个新的上下文,用于控制订阅的生命周期 	ctx, cancel := context.WithCancel(context.Background()) 	defer cancel()  	// 创建一个 WaitGroup,用于等待订阅结束 	var wg sync.WaitGroup  	// 订阅频道 	sub := rdb.Subscribe(ctx, "channel1")  	// 使用 goroutine 处理订阅消息 	go func() { 		defer wg.Done() 		for { 			select { 			case <-ctx.Done(): 				return 			default: 				msg, err := sub.ReceiveMessage(ctx) 				if err != nil { 					log.Println("Error receiving message:", err) 					continue 				} 				fmt.Println("Received message:", msg.Payload) 			} 		} 	}()  	fmt.Println("Subscribed to channel1. Waiting for messages...")  	// 等待程序退出信号 	stop := make(chan os.Signal, 1) 	signal.Notify(stop, os.Interrupt)  	// 等待订阅结束或接收到退出信号 	wg.Add(1) 	select { 	case <-stop: 		fmt.Println("Received interrupt signal. Exiting...") 		cancel() // 取消订阅 	case <-ctx.Done(): 	}  	// 等待订阅处理结束 	wg.Wait() }  

故障情况

如果目标服务器出现故障,暂时没有检测到,消息执行了错误的路由,但当用户再次登录,更换服务器后,若干秒后也能正确路由;

用户重新登录时候,会加载离线数据;

4.2 群聊消息路由

我们需要三个表:

群内成员表:用于查看群状态信息以及群成员信息;

群用户在各个服务器上分布状态:转发前需要查表,查看需要转发到哪些服务器;

群内当前服务器在线成表:接收到转发的消息后,服务器根据此表来确定需要转发到哪些用户;理想情况下,所有的用户都是一直在线的;

tinode那种模式是直接用tcp连接来做集群,这样的模式在实际用的时候会比较复杂,n个节点的服务器每个节点都需要维护n-1个连接,结构复杂;如果有BUG,难以调试发现问题;我更喜欢松耦合的结构,使用kafka给每个服务节点一个队列,从里面收取自己需要转发的消息即可;

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!