前言
在单机模式下,相对简单,认为所有的客户都在同一台服务器上,姑且认为1台服务器可以同时支持1万用户在线,在更多用户同时在线时,则需要集群来实现负载均衡。
集群的算法需要解决故障处理以及动态添加的问题,同时需要考虑如何在集群节点间路由数据。
1. 负载均衡算法
这里使用一致性哈希环来实现。
添加节点
每个设备都有唯一的名字(key),同时按照重复因子的个数添加到环中;每个名字以及重复的序号做hash算法得到一个哈希数,所以环上每个节点都有一个数值;
查询
在查询时候,提供了某个字符串,计算哈希,如果哈希比某个环上节点的数值大,就选这个节点,如果都找不到,就选第一个,所以称之为环而不是列表;
故障移除
当某个服务器故障退出时候,需要从环上删除节点,那么此服务节点上的用户再次登录时,会选到相邻的服务节点上;而正常服务节点上的用户不会受到影响;
1.1 redis服务发现
一般GO服务发现使用etcd比较多,但是为了少安装几个服务,可以考虑使用redis实现;
注册服务
每个服务上线时候,都在“IMSERVER"键值中设置节点名字的field和value,value中包含更新时间;
每个服务定期更新field中的活动时间戳,证明工作正常;
轮询可以根据更新时间确定哪些节点存活,如果某些节点失效了,需要重新计算哈希值;
如果为了使用超时机制来实现自动检测,那么可以再额外添加了一个键值,名字就是服务的节点名最后后缀,这样当某个服务节点的键值超时就可以即时发现;
1.2 etcd服务发现
在一个分布式系统中,服务注册与发现是一个关键的组件,用于在集群中动态地注册服务和发现其他服务。Etcd 提供了一种方便的方式来实现服务注册与发现,通常的做法包括以下几个步骤:
- 服务注册:
- 当一个服务启动时,它会将自己的地址信息(如 IP 地址和端口号)以及其他相关信息注册到 Etcd 中。
- 通常情况下,服务会将自己注册为一个可识别的名称,比如服务的名称、版本号等,以便其他服务能够识别和访问。
- 服务发现:
- 当一个服务需要与其他服务通信时,它会向 Etcd 发送一个服务发现的请求,查询特定服务名称对应的地址信息。
- Etcd 将返回一个或多个注册了该名称的服务的地址信息,然后服务可以使用这些信息来进行通信。
在 Go 中,可以使用 Etcd 的 Go 客户端库(如 etcd/clientv3
)来实现服务注册与发现。以下是一个简单的示例,演示了如何使用 Etcd 实现服务注册与发现:
package main import ( "context" "fmt" "log" "time" "go.etcd.io/etcd/clientv3" ) func main() { // 连接 Etcd 客户端 cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://localhost:2379"}, // Etcd 服务地址 DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } defer cli.Close() // 注册服务 if err := registerService(cli, "service1", "127.0.0.1:8080"); err != nil { log.Fatal(err) } // 发现服务 addrs, err := discoverService(cli, "service1") if err != nil { log.Fatal(err) } fmt.Println("Service1 addresses:", addrs) } // 注册服务 func registerService(cli *clientv3.Client, serviceName, serviceAddr string) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 将服务地址信息注册到 Etcd 中 _, err := cli.Put(ctx, "/services/"+serviceName+"/"+serviceAddr, serviceAddr) return err } // 发现服务 func discoverService(cli *clientv3.Client, serviceName string) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 查询特定服务名称对应的所有地址信息 resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix()) if err != nil { return nil, err } // 解析地址信息 var addrs []string for _, kv := range resp.Kvs { addrs = append(addrs, string(kv.Value)) } return addrs, nil }
1.2 重复因子
添加重复因子是为了增加哈希环上节点的数量,从而更好地实现负载均衡。在哈希环中,服务器节点的数量越多,分布越均匀,可以提高负载均衡的效果。如果只有少量的服务器节点,哈希环上的节点分布可能不够均匀,导致负载分配不平衡。
通过添加重复因子,可以让每个服务器节点在哈希环上出现多次,增加了节点的数量,从而更好地平衡了负载。例如,如果只有3台服务器,但设置了重复因子为10,则在哈希环上每个服务器节点会出现10次,相当于虚拟出了30个节点,增加了负载均衡的可能性。
在服务器数量为3到10台之间,可以考虑将重复因子设置为3或4。这样可以在哈希环上为每个服务器节点创建3到4个虚拟节点,以增加哈希环上节点的数量,提高负载均衡的效果。
例如,如果有5台服务器,设置重复因子为3,则在哈希环上会创建15个虚拟节点,增加了节点的数量,有助于更均匀地分配负载。而如果服务器数量较少,设置更大的重复因子可能会导致哈希环上节点过多,增加计算开销;而设置更小的重复因子可能会降低负载均衡的效果。
综上所述,设置重复因子为3或4可能是一个合适的选择,可以在保证负载均衡的同时,避免哈希环节点数量过多带来的性能损耗。然而,具体的选择还应根据实际情况和性能要求进行调整和评估。
2. redis检测故障实现
可以设计一个基于定时任务的机制来实现:
服务节点定时更新超时时间:每个服务节点定期(比如每隔 5 秒)更新自己的超时时间,将当前时间加上超时阈值,并将新的超时时间写入 Redis 中。这样可以确保每个服务节点的超时时间保持最新。
其他节点定时轮询超时情况:其他节点定期(比如每隔 10 秒)轮询所有节点的超时情况。对于每个节点,获取其超时时间,如果发现某个节点超时了,就触发重新计算哈希环的操作。
重新计算哈希环:一旦发现某个节点超时,触发重新计算哈希环的操作。这意味着需要重新收集所有服务节点的信息,并更新哈希环,确保哈希环中的每个节点都是当前活动的节点。
定时任务实现:使用 Go 中的定时任务库(如
time.Timer
或time.Tick
)来实现定时任务。每个节点维护一个定时器,在定时器触发时执行相应的操作(更新超时时间或者轮询超时情况)。
2.1 轮询查询
package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) func main() { // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // 设置 Redis 密码 DB: 0, // 使用默认数据库 }) // 模拟每个服务节点更新超时时间的定时任务 go func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // 更新超时时间 updateTimeout(rdb) } } }() // 模拟其他节点轮询超时情况的定时任务 go func() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // 轮询超时情况并重新计算哈希环 checkTimeoutAndRebalance(rdb) } } }() // 主程序保持运行 select {} } func updateTimeout(rdb *redis.Client) { // 获取当前时间 now := time.Now() // 更新超时时间 rdb.Set(context.Background(), "timeout", now.Add(5*time.Second).Format(time.RFC3339), 0) fmt.Println("Updated timeout to", now.Add(5*time.Second)) } func checkTimeoutAndRebalance(rdb *redis.Client) { // 获取当前时间 now := time.Now() // 获取超时时间 timeoutStr, err := rdb.Get(context.Background(), "timeout").Result() if err != nil { fmt.Println("Error:", err) return } timeout, err := time.Parse(time.RFC3339, timeoutStr) if err != nil { fmt.Println("Error:", err) return } // 检查超时情况 if now.After(timeout) { fmt.Println("Node timeout detected. Rebalancing...") // 重新计算哈希环 rebalanceHashRing() } } func rebalanceHashRing() { // 实现哈希环的重新计算逻辑 fmt.Println("Rebalancing hash ring...") }
2.2 订阅超时事件
当订阅了 __keyevent@0__:expired
模式后,每当有键值过期时,Redis 就会发送一个消息给订阅者。这个消息会包含过期的键名(key name),因此可以在接收到消息时获取到哪个键值超时了。
检测超时键值时候,查看是否是已知的服务器名字,如果某个服务器下线了,则需要重新调整;
以下是如何使用 Go 语言监听键值超时事件:
package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) func main() { // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // 设置 Redis 密码 DB: 0, // 使用默认数据库 }) // 订阅键值过期事件 pubsub := rdb.Subscribe(context.Background(), "__keyevent@0__:expired") defer pubsub.Close() // 在单独的协程中处理订阅消息 go func() { for { // 接收消息 msg, err := pubsub.Receive(context.Background()) if err != nil { fmt.Println("Error:", err) return } // 处理消息 switch msg := msg.(type) { case *redis.Message: // 获取超时的键名 key := msg.Payload fmt.Println("Key expired:", key) // 进行相应的处理逻辑 default: fmt.Println("Received unknown message type") } } }() // 主程序保持运行 select {} }
3. etcd故障检测实现
要检测某个服务节点是否下线,可以通过以下几种方式实现:
- 心跳检测:服务节点定期发送心跳信号到 Etcd,表明自己仍然活跃。如果某个节点长时间没有发送心跳信号,可以认为该节点已经下线。
- 定时检测:定时从 Etcd 中查询特定服务的节点信息,如果发现某个节点在最新的查询中不存在,说明该节点已下线。
- Watch 监听:通过 Etcd 的 Watch 功能,监视特定服务的节点信息。当有节点下线时,Watch 将会收到相应的通知,从而实现及时检测。
下面是一个简单的示例代码,演示了如何使用 Etcd 的 Watch 功能来监视特定服务的节点信息,并实时检测节点的上线和下线:
package main import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" ) func main() { // 连接 Etcd 客户端 cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"http://localhost:2379"}, // Etcd 服务地址 DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } defer cli.Close() // Watch 特定服务节点的变化 watchServiceNodes(cli, "service1") } // Watch 特定服务节点的变化 func watchServiceNodes(cli *clientv3.Client, serviceName string) { watchChan := cli.Watch(context.Background(), "/services/"+serviceName+"/", clientv3.WithPrefix()) for { select { case <-watchChan: // 节点发生变化,处理节点上线和下线的情况 checkServiceNodes(cli, serviceName) } } } // 检查服务节点状态 func checkServiceNodes(cli *clientv3.Client, serviceName string) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 查询特定服务名称对应的所有地址信息 resp, err := cli.Get(ctx, "/services/"+serviceName+"/", clientv3.WithPrefix()) if err != nil { log.Println("Error checking service nodes:", err) return } // 检查节点是否下线 // 遍历之前保存的节点信息,与当前 Etcd 中的节点信息进行对比 // 如果某个节点之前存在,但在当前查询中不存在,则认为该节点下线 for _, kv := range resp.Kvs { nodeAddr := string(kv.Value) log.Println("Service node:", nodeAddr) } log.Println("Service node check complete.") }
4. 消息跨服务器路由
4.1 一对一消息路由
消息目的地用户如果不再本地服务器,则使用算法计算当前应该在哪个服务器上,使用kafka将数据转发到目标服务器,等待目标服务器去处理;
屏蔽检测
在转发消息前,入库前,首先需要检测对方是否屏蔽了自己;在集群模式下,用户登录后,在redis中更新自己的状态,同时pulish 自己的状态消息。
如果有多个订阅者同时订阅了同一个频道,那么每个订阅者都能收到发布到该频道的消息。
Redis 的 Pub/Sub(发布/订阅)模式允许多个订阅者同时订阅同一个频道,并且每个订阅者都能接收到频道中发布的消息。
当消息发布到频道时,Redis 会将该消息发送给所有订阅了该频道的客户端,即使有多个订阅者也是如此。每个订阅者都会收到相同的消息,并且消息的顺序与发布的顺序一致。
这种设计使得 Pub/Sub 模式非常适合实现广播、实时通知等场景,因为每个订阅者都能及时地收到发布的消息,无需额外的请求或轮询。
但是:Redis 的 Pub/Sub 模式下,消息不会被缓存。当消息发布到频道时,它会立即发送给所有订阅了该频道的客户端,而不会保存在 Redis 中。这意味着如果有订阅者在消息发布之后才订阅频道,它们将无法接收到之前发布的消息。
补充:用户的状态信息同时需要保存到redis中,便于错过消息的服务器加载;
三级缓存: 服务器也不是缓存所有用户状态信息,只有通信中需要路由的用户才会将状态加载到内存;可以设置30分钟超时,如果不使用则清理掉缓存;这样就是一个数据库,redis,内存的三级缓存模式;
相关发布消息的示例:
package main import ( "context" "fmt" "log" "time" "github.com/go-redis/redis/v8" ) func newRedisClient() *redis.Client { return redis.NewClient(&redis.Options{ Addr: "localhost:6379", // Redis 服务地址 Password: "", // Redis 访问密码 DB: 0, // 选择的数据库 }) } func publishMessage(channel, message string) error { ctx := context.Background() client := newRedisClient() // 发布消息到指定频道 if err := client.Publish(ctx, channel, message).Err(); err != nil { return err } // 关闭 Redis 客户端连接 if err := client.Close(); err != nil { log.Println("Error closing Redis client:", err) } return nil } func main() { channel := "user_actions" // 频道名称 message := "User login" // 消息内容 if err := publishMessage(channel, message); err != nil { log.Fatal("Error publishing message:", err) } else { fmt.Println("Message published successfully!") } }
这样就可以通过调用 publishMessage
函数来发布消息到指定的频道了。在 main
函数中,指定了要发布的频道名称和消息内容,然后调用 publishMessage
函数将消息发布到指定的频道中。
订阅部分示例代码:
package main import ( "context" "fmt" "log" "os" "os/signal" "sync" "time" "github.com/go-redis/redis/v8" ) func main() { // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", // Redis 服务器地址 Password: "", // 连接密码,若无密码则留空 DB: 0, // 选择数据库,默认为 0 }) // 创建一个新的上下文,用于控制订阅的生命周期 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 创建一个 WaitGroup,用于等待订阅结束 var wg sync.WaitGroup // 订阅频道 sub := rdb.Subscribe(ctx, "channel1") // 使用 goroutine 处理订阅消息 go func() { defer wg.Done() for { select { case <-ctx.Done(): return default: msg, err := sub.ReceiveMessage(ctx) if err != nil { log.Println("Error receiving message:", err) continue } fmt.Println("Received message:", msg.Payload) } } }() fmt.Println("Subscribed to channel1. Waiting for messages...") // 等待程序退出信号 stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt) // 等待订阅结束或接收到退出信号 wg.Add(1) select { case <-stop: fmt.Println("Received interrupt signal. Exiting...") cancel() // 取消订阅 case <-ctx.Done(): } // 等待订阅处理结束 wg.Wait() }
故障情况
如果目标服务器出现故障,暂时没有检测到,消息执行了错误的路由,但当用户再次登录,更换服务器后,若干秒后也能正确路由;
用户重新登录时候,会加载离线数据;
4.2 群聊消息路由
我们需要三个表:
群内成员表:用于查看群状态信息以及群成员信息;
群用户在各个服务器上分布状态:转发前需要查表,查看需要转发到哪些服务器;
群内当前服务器在线成表:接收到转发的消息后,服务器根据此表来确定需要转发到哪些用户;理想情况下,所有的用户都是一直在线的;
tinode那种模式是直接用tcp连接来做集群,这样的模式在实际用的时候会比较复杂,n个节点的服务器每个节点都需要维护n-1个连接,结构复杂;如果有BUG,难以调试发现问题;我更喜欢松耦合的结构,使用kafka给每个服务节点一个队列,从里面收取自己需要转发的消息即可;