熊猫TV的礼物系统使用了golang的 mongo库 mgo,中间踩了一些坑,总结下避免大家再踩坑
golang的mgo库说明里是说明了开启连接复用的,但观察实验发现,这并没有根本实现连接的控制,连接复用其实仅在当前操作 (session.Close 之前 )生效,最终还是需要程序员自行去限制连接才行。
废话不多说,开始上代码
GlobalMgoSession, err := mgo.Dial(host) func (m *MongoBaseDao) Get(tablename string, id string, result interface{}) interface{} { session := GlobalMgoSession.Clone() defer session.Close() collection := session.DB(globalMgoDbName).C(tablename) err := collection.FindId(bson.ObjectIdHex(id)).One(result) if err != nil { logkit.Logger.Error("mongo_base method:Get " + err.Error()) } return result }
golang main入口启动时,我们会创建一个全局session,然后每次使用时clone session的信息和连接,用于本次请求,使用后调用session.Close() 释放连接。
// Clone works just like Copy, but also reuses the same socket as the original // session, in case it had already reserved one due to its consistency // guarantees. This behavior ensures that writes performed in the old session // are necessarily observed when using the new session, as long as it was a // strong or monotonic session. That said, it also means that long operations // may cause other goroutines using the original session to wait. func (s *Session) Clone() *Session { s.m.Lock() scopy := copySession(s, true) s.m.Unlock() return scopy } // Close terminates the session. It's a runtime error to use a session // after it has been closed. func (s *Session) Close() { s.m.Lock() if s.cluster_ != nil { debugf("Closing session %p", s) s.unsetSocket() //释放当前线程占用的socket 置为nil s.cluster_.Release() s.cluster_ = nil } s.m.Unlock() }
Clone的方法注释里说明会重用原始session的socket连接,但是并发请求一大,其他协程来不及释放连接,当前协程会怎么办?
func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { // Read-only lock to check for previously reserved socket. s.m.RLock() // If there is a slave socket reserved and its use is acceptable, take it as long // as there isn't a master socket which would be preferred by the read preference mode. if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { socket := s.slaveSocket socket.Acquire() s.m.RUnlock() logkit.Logger.Info("sgp_test 1 acquireSocket slave is ok!") return socket, nil } if s.masterSocket != nil { socket := s.masterSocket socket.Acquire() s.m.RUnlock() logkit.Logger.Info("sgp_test 1 acquireSocket master is ok!") return socket, nil } s.m.RUnlock() // No go. We may have to request a new socket and change the session, // so try again but with an exclusive lock now. s.m.Lock() defer s.m.Unlock() if s.slaveSocket != nil && s.slaveOk && slaveOk && (s.masterSocket == nil || s.consistency != PrimaryPreferred && s.consistency != Monotonic) { s.slaveSocket.Acquire() logkit.Logger.Info("sgp_test 2 acquireSocket slave is ok!") return s.slaveSocket, nil } if s.masterSocket != nil { s.masterSocket.Acquire() logkit.Logger.Info("sgp_test 2 acquireSocket master is ok!") return s.masterSocket, nil } // Still not good. We need a new socket. sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) ...... logkit.Logger.Info("sgp_test 3 acquireSocket cluster AcquireSocket is ok!") return sock, nil }
在源码中加debug,结果日志说明一切:
Mar 25 09:46:40 dev02.com[12607]: [info] sgp_test 1 acquireSocket master is ok! Mar 25 09:46:40 dev02.com[12607]: [info] sgp_test 1 acquireSocket master is ok! Mar 25 09:46:41 dev02.com[12607]: [info] sgp_test 1 acquireSocket slave is ok! Mar 25 09:46:41 dev02.com[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok! Mar 25 09:46:41 dev02.com[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok! Mar 25 09:46:41 dev02.com[12607]: [info] sgp_test 3 acquireSocket cluster AcquireSocket is ok!
不断的创建连接 AcquireSocket
$ netstat -nat|grep -i 27017|wc -l
400
如果每个session 不调用close,会达到恐怖的4096,并堵死其他请求,所以clone或copy session时一定要defer close掉
启用maxPoolLimit 参数则会限制总连接大小,连接到限制则当前协程会sleep等待 直到可以创建连接,高并发时锁有问题,会导致多创建几个连接
src/gopkg.in/mgo.v2/cluster.go s, abended, err := server.AcquireSocket(poolLimit, socketTimeout) if err == errPoolLimit { if !warnedLimit { warnedLimit = true logkit.Logger.Error("sgp_test WARNING: Per-server connection limit reached. " + err.Error()) log("WARNING: Per-server connection limit reached.") } time.Sleep(100 * time.Millisecond) continue } session.go: // SetPoolLimit sets the maximum number of sockets in use in a single server // before this session will block waiting for a socket to be available. // The default limit is 4096. // // This limit must be set to cover more than any expected workload of the // application. It is a bad practice and an unsupported use case to use the // database driver to define the concurrency limit of an application. Prevent // such concurrency "at the door" instead, by properly restricting the amount // of used resources and number of goroutines before they are created. func (s *Session) SetPoolLimit(limit int) { s.m.Lock() s.poolLimit = limit s.m.Unlock() }
连接池设置方法:
1、配置中 增加
[host]:[port]?maxPoolSize=10
2、代码中 :
dao.GlobalMgoSession.SetPoolLimit(10)
再做压测:
$ netstat -nat|grep -i 27017|wc -l
15
结论:
每次clone session之后,操作结束时如果调用 session.Close 则会unset Socket ,置nil, 所以socket复用,仅在当前session范围内生效,所以非全局的session无法共用,每个协程请求到来都会创建socket连接,直到达到最大值4096,而mongo的连接数上限一般也就是1万,也就是一个端口你只能启动一个进程保证连接不被撑爆,过多的连接数客户端效率不高,server端更会耗费内存和CPU,所以需要启用自定义连接池 , 启用连接池也需要注意如果有pooMaxLimit个协程执行过长或者死循环不释放socket连接,也会悲剧。
mgo并没有从底层实现socket的预先创建和整个生命周期的连接池复用,需要自行优化。
有疑问加站长微信联系(非本文作者)