mqant支持分布式跟踪系统了!!!
发布于 2 年前 作者 liangdas 2265 次浏览 来自 分享

什么是分布式跟踪系统

Appdash,用Go实现的分布式系统跟踪神器

Dapper,大规模分布式系统的跟踪系统

mqantserver例子:

场景(Chat多人聊天demo)

  1. 客户端通过命令(Chat/HD_JoinChat/2)加入聊天室

  2. 后端的Chat/HD_JoinChat/ 服务函数会继续Login模块的track服务

    r, e := m.RpcInvoke("Login", "track", session)
    
  3. Login的track服务又会通过rpc调用Login的Login track1服务

    time.Sleep(time.Millisecond*10)
     m.RpcInvoke("Login", "track2", session)
    
  4. track2服务会有一定的几率调用track3服务

    time.Sleep(time.Millisecond*10)
    r:=rand.Intn(100)
    if r>30{
    	m.RpcInvoke("Login", "track3", session)
    }
    

以上的场景在后端服务开发过程中非常场景,并且随着服务复杂性提高,调用的层级和深度都会提高。在大型的分布式系统中代码调试将变得非常复杂和费时。因此接入一个分布式的跟踪可以极大的提升代码调试效率,同时在线上异常收集和分析方面也有非常重要的一样,以下是mqantserve的测试场景实验结果截图

示意图

上图是一次客户端请求的后端完整运行图,你可以通过这个界面得到以下信息

  1. 本次请求后端调用路线
  2. 各个模块执行时间
  3. 自定义的参数
  4. 等等

在mqant中如何使用

mqant框架本身并没有实现信息采集和整理的模块,仅仅对外提供了信息采集的接口

需要说明的是分布式跟踪系统信息采集有一套标准的接口定义opentracing,目前几大流行的分布式跟踪系统基本都实现了这套协议来收集信息

分布式跟踪接口使在mqant v1.4.0版本中新增的

1. 选择适合的分布式跟踪系统

mqantserver选用了golang语言实现的一套开源系统Appdash作为演示使用,这套系统用于开发人员进行日常调试使用时完全足够的,如果要用于线上生成环境的话请进行必要的压力测试和稳定性测试

2. 在mqant注册信息采集器

/work/go/mqantserver/src/server/main.go

func main() {
app := mqant.CreateApp()
app.DefaultTracer(func()opentracing.Tracer {
	if app.GetSettings().Tracing.Enable{
		if collector==nil{
			collector=appdash.NewRemoteCollector("127.0.0.1:7701")
			tracer=appdashtracer.NewTracer(collector)
		}
		return tracer
	}else{
		return nil
	}
})
app.Run(true, //只有是在调试模式下才会在控制台打印日志, 非调试模式下只在日志文件中输出日志
	gate.Module(),  //这是默认网关模块,是必须的支持 TCP,websocket,MQTT协议
	login.Module(), //这是用户登录验证模块
	chat.Module(),
	tracing.Module(), //Appdash系统模块
)  //这是聊天模块

}

Appdash会注册两个端口:

1. 7701 用于信息收集  tcp
2. 7700 用于信息查询  http

3. 在gate模块中注册一个跟踪过滤接口

/work/go/mqantserver/src/server/gate/module.go

func (gate *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
	//注意这里一定要用 gate.Gate 而不是 module.BaseModule
	gate.Gate.OnInit(gate, app, settings)
	gate.Gate.SetStorageHandler(gate) //设置持久化处理器
	gate.Gate.SetTracingHandler(gate) //设置分布式跟踪系统处理器
}

/**
是否需要对本次客户端请求进行跟踪
*/
func (gate *Gate)OnRequestTracing(session gate.Session,msg *mqtt.Publish)bool{
	if session.GetUserid()==""{
		//没有登陆的用户不跟踪
		return false
	}
	//if session.GetUserid()!="liangdas"{
	//	//userId 不等于liangdas 的请求不跟踪
	//	return false
	//}
	return true
}

4. 需要跟踪的rpc接口都需要第一个参数传递gate.Session

为什么第一个参数为gate.Session才能正常收集到数据呢?

这是因为tracing会为每一次请求生成一个唯一的tracingID,而这个tracingID就在gate.Session结构体携带着。

因此我们约定尽量所有的rpc通信接口第一个参数都设置为gate.Session

例如

/work/go/mqantserver/src/server/login/module.go

func (m *Login) track(session gate.Session) (result string, err string) {
	//演示后台模块间的rpc调用
	if session.Span()!=nil{
		session.Span().SetTag("track","track测试")
	}
	time.Sleep(time.Millisecond*10)
	m.RpcInvoke("Login", "track2", session)
	return fmt.Sprintf("My is Login Module %s"), ""
}

func (m *Login) track2(session gate.Session) (result string, err string) {
	//演示后台模块间的rpc调用
	if session.Span()!=nil{
		session.Span().SetTag("track","track 第二个测试函数")
	}
	time.Sleep(time.Millisecond*10)
	r:=rand.Intn(100)
	if r>30{
		m.RpcInvoke("Login", "track3", session)
	}

	return fmt.Sprintf("My is Login Module"), ""
}

调用方 /work/go/mqantserver/src/server/chat/module.go

func (m *Chat) joinChat(session gate.Session, msg map[string]interface{}) (result map[string]interface{}, err string) {
....
r, e := m.RpcInvoke("Login", "track", session)
....

只有正确的设置完以上的步骤,mqant就可以用分布式跟踪系统收集信息了

3 回复

不错

@liangdas 各个模块是可以分布到各个物理机器中吗,那各个机器的IP是写在哪里?

分布式架构可以看这篇文章 http://www.mqant.com/topic/5987e8ebd42579452e188b62

但目前需要手动到每一个物理机上去手动启动这些进程

請問該模組現在可以使用嗎?

已经移除了,现在把跟踪ID都打日志里面了

回到顶部