本文主要描述QFS客户端请求从发出至结束的整个生命周期,并罗列此周期内涉及的数据结构。本文以客户端的Mkdir请求举例说明。

请求构造 客户端请求的源头是类KfsClient,此类定义了客户端与MetaServer和ChunkServer的操作。如Mkdir操作的源头便是: int KfsClient::Mkdir(const char pathname, kfsMode_t mode) { return mImpl->Mkdir(pathname, mode); } KfsClient的操作最终交由内部成员mImpl来实现,这是类KfsClientImpl,被包含在类KfsClient中。负责KfsClient所有的接口实现。 int KfsClientImpl::Mkdir(const char pathname, kfsMode_t mode) { QCStMutexLocker l(mMutex); kfsFileId_t parentFid; string dirname; string path; int res = GetPathComponents( pathname, &parentFid, dirname, &path, kInvalidateSubCountsFlag, kEnforceLastDirFlag); if (res < 0) { return res; } MkdirOp op(0, parentFid, dirname.c_str(), Permissions( mUseOsUserAndGroupFlag ? mEUser : kKfsUserNone, mUseOsUserAndGroupFlag ? mEGroup : kKfsGroupNone, mode != kKfsModeUndef ? (mode & ~mUMask) : mode ), NextIdempotentOpId() ); DoMetaOpWithRetry(&op); } 这里的关键是构造了一个MkdirOp,并最终将该操作发往更底层。而对象MkdirOp则是继承自KfsIdempotentOp并继承自KfsOp。MkdirOp对象中包含了要发往MetaServer端的创建目录请求所需要的全部参数。这里就不细细列举了。 #### 请求分发 在上面,请求被构造出来,接下来就是要被分发下去。 void KfsClientImpl::DoMetaOpWithRetry(KfsOp op) { InitUserAndGroupMode(); ExecuteMeta(op); } void KfsClientImpl::ExecuteMeta(KfsOp& op) { if (mMetaServer) { …… } else { StartProtocolWorker(); mProtocolWorker->ExecuteMeta(op); } } 请求的分发稍显简单:研究过代码发现if分支已经基本不用,一般情况下都是进入else分支内。这里又涉及到一个类:KfsProtocolWorker,专门用来进行客户端协议处理。StartProtocolWorker用来初始化并启动客户端协议处理机,这个我们略过不表了,后面有机会再来专门分析,只需要知道该协议处理机内会启动一个专门线程来处理客户端请求。 至于请求的分发,直接调用了ExecuteMeta来处理上面的请求。 KfsProtocolWorker::ExecuteMeta( KfsOp& inOp) { const int64_t theRet = mImpl.Execute( kRequestTypeMetaOp, 1, 1, 0, &inOp, 0, 0, 0 ); if (theRet < 0 && 0 <= inOp.status) { inOp.status = (int)theRet; } } KfsProtocolWorker同样也内嵌了一个mImpl,类型是 KfsProtocolWorker::Impl。请求分发最终是进入了该对象的Execute: int64_t Execute( RequestType inRequestType, FileInstance inFileInstance, FileId inFileId, const Request::Params inParamsPtr, void inBufferPtr, int inSize, int inMaxPending, int64_t inOffset) { /* * jeff.ding: meta操作(如Mkdir)都是同步的 / if (IsSync(inRequestType)) { SyncRequest& theReq = GetSyncRequest( inRequestType, inFileInstance, inFileId, inParamsPtr, inBufferPtr, inSize, inMaxPending, inOffset ); const int64_t theRet = theReq.Execute(this); PutSyncRequest(theReq); return theRet; } …… } 其内部实现是会在MkdirOp的基础上构造一个SyncRequest(对于元数据操作),然后直接调用其Execute: /* * 对于同步请求,插入调用者(KfsProtocolWorker::Impl)队列以后就陷入等待 * 直到请求完成,本线程被唤醒 / int64_t Execute(Impl& inWorker) { mWaitingFlag = true; inWorker.Enqueue(this); QCStMutexLocker theLock(mMutex); while (mWaitingFlag && mCond.Wait(mMutex)) {} return mRetStatus; } 可以看到,对于一个同步请求,其最终调用KfsProtocolWorker::Impl::Enqueue而被插入请求队列中,同时调用者陷入等待直到请求处理完成被唤醒。 int64_t Enqueue( Request& inRequest) { …… { QCStMutexLocker theLock(mMutex); … inRequest.mState = Request::kStateInFlight; WorkQueue::PushBack(mWorkQueue, inRequest); } /* * 向pipe中写入一个字节内容,NetManager::MainLoop会被唤醒 * 由于将ITimeout的超时时间设置为0,因此会立即进入 / mNetManager.Wakeup(); return 0; } 这里就比较有趣了,首先将请求(inRequest)插入至请求队列(mWorkQueue)尾部,然后调用了mNetManager.wakeup。这里又多出来了一个类NetManager。这是QFS网络框架的核心,处理网络连接、定时器等,以后会专门抽时间剖析下该对象的内部机制。 言归正传,我们前面说过,KfsProtocolWorker::Impl中会启动一个后台线程专门处理客户端请求以及网络连接上的数据,该线程的运行函数便是NetManager::MainLoop,该方法是一个无限循环,会一直等待这事件的到来,这里的事件包括:网络链接上有数据可读写、超时被触发等,这里调用的 mNetManager.Wakeup便是将MainLoop从等待中唤醒。 #### 请求处理 NetManager被唤醒后,会处理网络连接、超时的定时任务等,由于此时尚未有网络链接,于是便看看有没有定时的超时任务。而恰好,KfsProtocolWorker::Impl又注册了一个超时任务,这里又涉及到了一个对象ITimeout,而且,这个超时任务的超时时间是0,也就是意味着:每次检查该任务都会超时。于是该超时任务被立即处理: virtual void Timeout() { Request theWorkQueue[1]; { QCStMutexLocker theLock(mMutex); theWorkQueue[0] = mWorkQueue[0]; WorkQueue::Init(mWorkQueue); …… } bool theShutdownFlag = false; Request* theReqPtr; while ((theReqPtr = WorkQueue::PopFront(theWorkQueue))) { Request& theReq = theReqPtr; QCRTASSERT(theReq.mState == Request::kStateInFlight); …… if (theReq.mRequestType == kRequestTypeMetaOp) { MetaRequest(theReq); continue; } …… } } 到这里就真像大白了:请求到这里才被真正地得到处理。对于元数据操作请求,处理如下: void MetaRequest(Request& inRequest) { KfsOp const theOpPtr = reinterpret_cast<KfsOp>(inRequest.mBufferPtr); if (! mMetaServer.Enqueue( theOpPtr, static_cast<SyncRequest>(&inRequest))) { theOpPtr->status = kErrParameters; theOpPtr->statusMsg = “failed to enqueue op”; } } 这里又涉及到类:MetaServer:专门负责与MetaServer请求的交互处理。而该类则只是KfsNetClient的别名而已。而KfsNetClient也有自己的实现,是类KfsNetClient::Impl。 bool KfsNetClient::Enqueue( KfsOp* inOpPtr, OpOwner* inOwnerPtr, IOBuffer* inBufferPtr /* = 0 /, int inExtraTimeout / = 0 /) { Impl::StRef theRef(mImpl); return mImpl.Enqueue(inOpPtr, inOwnerPtr, inBufferPtr, inExtraTimeout); } bool Enqueue( KfsOp inOpPtr, OpOwner* inOwnerPtr, IOBuffer* inBufferPtr, int inExtraTimeout) { const time_t theNow = Now(); …… const bool theOkFlag = EnqueueSelf( inOpPtr, inOwnerPtr, inBufferPtr, 0, inExtraTimeout); if (theOkFlag) { EnsureConnected(0, inOpPtr); } return theOkFlag; } bool EnqueueSelf( KfsOp* inOpPtr, OpOwner* inOwnerPtr, IOBuffer* inBufferPtr, int inRetryCount, int inExtraTimeout) { if (! inOpPtr) { return false; } mIdleTimeoutFlag = false; SetMaxWaitTime(inOpPtr, inExtraTimeout); inOpPtr->seq = mNextSeqNum++; const bool theResetTimerFlag = mPendingOpQueue.empty(); pair<OpQueue::iterator, bool> const theRes = mPendingOpQueue.insert(make_pair( inOpPtr->seq, OpQueueEntry(inOpPtr, inOwnerPtr, inBufferPtr, inExtraTimeout) )); if (! theRes.second || ! IsConnected() || IsAuthInFlight()) { return theRes.second; } / * jeff.ding: Request()会将数据写入网络连接中 / Request(mOutstandingOpPtr ? mOutstandingOpPtr : theRes.first->second, theResetTimerFlag || mOutstandingOpPtr, inRetryCount); return theRes.second; } 这里会将传入的请求inOpPtr构造一个OpQueueEntry插入mPendingOpQueue内,然后将该队列中的第一个请求调用Request数据写入NetConnection的写入缓冲区内。 至此,一个请求内容最终算是通过网络发送给目的端(如Mkdir请求最终被发往了MetaServer)。 #### 接收响应 当请求在目的端被处理完成并通过网络返回响应后,客户端通过NetManager从网络连接(NetConnection)上获得响应数据(此详细过程不表)。最终会进入KfsNetClient::HandleResponse: void HandleResponse( IOBuffer& inBuffer) { for (; 😉 { …… mRetryCount = 0; HandleOp(mCurOpIt); } } void HandleOp( OpQueue::iterator inIt, bool inCanceledFlag = false) { …… OpQueueEntry theOpEntry = inIt->second; // 从pending队列中移除 mPendingOpQueue.erase(inIt); // 调用OpDone方法,并继续发送下一个请求 theOpEntry.OpDone(inCanceledFlag); if (! mOutstandingOpPtr && theScheduleNextOpFlag && thePrevRefCount <= GetRefCount() && ! mPendingOpQueue.empty() && IsConnected()) { mOutstandingOpPtr = &(mPendingOpQueue.begin()->second); const bool kResetTimerFlag = true; Request(mOutstandingOpPtr, kResetTimerFlag, mOutstandingOpPtr->mRetryCount); } } void OpDone( bool inCanceledFlag) { KfsOp const theOpPtr = mOpPtr; OpOwner* const theOwnerPtr = mOwnerPtr; IOBuffer* const theBufferPtr = mBufferPtr; Clear(); if (theOwnerPtr) { if (theOpPtr) { // 对于SyncRequest,其OpDone实现是函数:SyncRequest::OpDone theOwnerPtr->OpDone(theOpPtr, inCanceledFlag, theBufferPtr); } } else { delete theOpPtr; delete theBufferPtr; } } // SyncRequest的OpDone方法 virtual void OpDone( KfsOp* inOpPtr, bool inCanceledFlag, IOBuffer* inBufferPtr) { QCRTASSERT(inOpPtr && ! inBufferPtr && inOpPtr == mBufferPtr); if (inCanceledFlag && inOpPtr->status == 0) { inOpPtr->status = -ECANCELED; inOpPtr->statusMsg = “canceled”; } // 调用了KfsProtocolWorker::Done Impl::Done(this, 0); } static void Done( Request& inRequest, int64_t inStatus) { if (inRequest.mState == Request::kStateDone) { return; } QCRTASSERT(inRequest.mState == Request::kStateInFlight); inRequest.mState = Request::kStateDone; inRequest.mStatus = inStatus; / * 这里会唤醒inRequest的等待者(即发起请求的线程) / inRequest.Done(inStatus); } / * SyncRequest完成后会进入这里的Done / virtual void Done( int64_t inStatus) { QCStMutexLocker theLock(mMutex); mRetStatus = inStatus; mWaitingFlag = false; / 对于sync请求,请求enqueue进入处理队列后便调用mCond.Wait()陷入等待 * 直到请求完成后被mCond.Notify()唤醒继续处理 */ mCond.Notify(); } 到这里,我们算是真正地梳理了客户端请求完整地生命周期,为接下来的分析工作打下了比较扎实的基础。 #### 总结 再来回顾下客户端请求的创建到完成的整个生命周期: > * 创建:创建阶段主要由类KfsClient以及KfsClientImpl完成,创建的对象由KfsOp及其具体子类代表; > * 派发:请求派发主要由类KfsProtocolWorker以及KfsProtocolWorker::Impl提供的接口实现。在该阶段,创建阶段构造的KfsOp对象被进一步包装为Request,而根据请求的不同,Request又衍生出子类SyncRequest以及AsyncRequest。Request被放入KfsProtocolWorker内部状态机的请求队列并同时唤醒KfsProtocolWorker的状态机进行下一步处理; > – 处理:KfsProtocolWorker内部自带的状态机是由NetManager实现,融合了超时事件管理、网络连接管理等功能,KfsProtocolWorker利用其超时事件管理机制注册了超时事件(超时事件为0),一旦状态机被唤醒,该超时事件必然触发,在注册的超时回调函数中处理请求队列中的Request。对于SyncRequest,会将请求通过KfsNetClient进行处理。KfsNetClient将该Request插入自身维护的PendingQueue中并将该PendingQueue头部的请求数据写入NetConnection的发送缓冲区中,最终请求被发往目的端。同时,对于SyncRequest,请求的发起者会等待请求完成的通知。 > – 接受响应:接受响应通过NetManager统一处理,NetManager侦测到NetConnection上有数据可读,会调用读事件回调函数,进而读出数据、解包、封装响应并查找到该响应对于的请求(在PendingQueue上),进而会层层回调最终触发Request::Done(),对于SyncRequest,在这里会唤醒等待响应的调用者,至此,请求最终处理完成。