クライアントのリングバッファ対応が終わったのでSimpleServerTransFile(Linux)の
各バッファをリングバッファに置き換えます。
手順は以下の通りです。
1.送信用リングバッファへの対応
これは、クライアントで実施したものと同じですね。
1.1.送信データ格納用エリアm_pSendDataをリングバッファm_pCRingBuffSendに変更します。
1.2.送信データのセットSetSendData(char *pcData, int iSize)内でリングバッファへの
書き込みWrite(LPBYTE pbBuf, long lSize)を使用するように変更します。
1.3.送信データ送信データの取得GetSendData(char **ppcData)内ではリングバッファからの
読み込みRead(LPBYTE pbBuf, long lSize)を使用するように変更します
2.受信用リングバッファへの対応
これもクライアントで実施した内容と同じですね。
2.1.受信データ格納用エリアm_pCRingBuffRecvを追加します。
2.2.受信用リングバッファに空きがあるときに受信処理を行いバッファに格納します。
受信データが受信バッファ(szRecvBuffer)に格納されたら、すぐにリングバッファに
書きこみます。
2.3.受信データの解析は、受信リングバッファに格納されているデータに対して実施します。
AnalyzeDataRecvでは格納されているサイズを調べてヘッダ分あれば、仮読みしてコマンド
を調べRecvMessagePacketに渡します。
RecvMessagePacketでは、データサイズを調べサイズ分受信をしていたら読み込み処理を
行います。
(*)AnalyzeDataRecv、RecvMessagePacketの返値を次のように変更します。
0:パケットが揃っていない
1:パケットを受信したので処理を行った
-1:エラーが発生した
3.チャット配信用リングバッファへの対応
これはクライアントにありません。
3.1.チャット配信データ格納用エリアm_pCRingBuffForDistributeChatを追加します。
3.2.チャット配信データ格納用リングバッファに空きがあるときにに格納します。
BOOL CSendRecvThread::SetDataForDistributeChat(char *pcData, int iSize)
3.3.チャット配信データ格納用リングバッファデータがあるときには、チャットメッセージ
パケットの形で取り出し、シリアライズが正しく行われるように注意します。
int CSendRecvThread::GetDataForDistributeChat(char **ppcData)
クライアントと同様にリングバッファのクラスCRingBuff(RingBuff.h、RingBuff.cpp)を
プロジェクトに追加します。
SendRecvThread.hを見てみましょう。
変更箇所に★と変更項目番号を記述しました。
【SendRecvThead.h】 #pragma once #include “ThreadJob.h” #include “define.h” // ★2.3.RecvMessagePacketの引数を変更するため class CMySyncObject; // このクラスの使用することを宣言 class CRingBuff; // ★CRingBuffを使用するため #define SEND_BUFF_SIZE (1024 * 64) // ★1.1.送信リングバッファのサイズ #define SENDBUFSIZE (1024 + sizeof(HeaderRec)) // ★1.1.一度に送信するサイズの最大値(共通ヘッダ分を加えてあります) #define RECV_BUFF_SIZE (1024 * 64) // ★1.2.受信リングバッファのサイズ #define RCVBUFSIZE (1024 * 2) // ★1.2.一度に読む最大受信サイズ typedef struct { SOCKET fdClient; // 接続済みソケット(acceptの結果) CMySyncObject *pCMySyncObject; // 同期オブジェクト } ConnectionInfoRec; class CSendRecvThread : public CThreadJob { public: CSendRecvThread(ConnectionInfoRec *pConInfo); // パラメータをコンストラクタで渡す ~CSendRecvThread(); // 基底クラスの関数をオーバーライドする // C++11で明示的にoverrideを書くことが出来るようになりました // 基底クラスの当該関数にvirtualが書いていないとエラーを出してくれます UINT DoWork() override; // DoRecvで実施している内容を記述 BOOL SetSendData(char *pcData, int iSize); // 送信データの設定 int GetDataForDistributeChat(char **ppcData); // チャット分配用データの取得 BOOL IsZombie(); // このスレッドはゾンビ状態か private: ConnectionInfoRec *m_pConInfo; // コンストラクタで渡されるパラメータを格納 // このスレッド実行中領域が確保されていること BOOL m_fIamZombie; // ゾンビ状態かどうかを保持 CRingBuff *m_pCRingBuffSend; // ★1.1.送信データ格納用エリア CRingBuff *m_pCRingBuffRecv; // ★2.1.受信データ格納用エリア CRingBuff *m_pCRingBuffForDistributeChat; // ★3.1.チャットメッセージをクライアントに配信するためのエリア int GetSendData(char **ppcData); // 送信データの取得 BOOL SetDataForDistributeChat(char *pcData, int iSize); // チャットメッセージをクライアントに分配するために格納する int AnalyzeDataRecv(); // ★2.3.引数と返値を変更 int RecvMessagePacket(HeaderRec *pHeader); // ★2.3.引数と返値を変更 BOOL SendMessagePacket(char *pcData, int iSize); };
SendRecvThread.cppを見てみましょう。
変更箇所に★と変更項目番号を記述しました。
【SendRecvThead.cpp】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため #include “RingBuff.h” // ★CRingBuffを使うため //============================================== // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //============================================== CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; m_pCRingBuffSend = new CRingBuff(SEND_BUFF_SIZE); // ★1.1.送信リングバッファの構築 m_pCRingBuffRecv = new CRingBuff(RECV_BUFF_SIZE); // ★2.1.受信リングバッファの構築 m_pCRingBuffForDistributeChat = new CRingBuff(RECV_BUFF_SIZE); // ★3.1.チャット配信用リングバッファの構築 } //============================================== // function // デストラクタ // parameter // なし // return // なし //============================================== CSendRecvThread::~CSendRecvThread() { SAFE_DELETE(m_pCRingBuffSend) // ★1.1.送信リングバッファの破棄 SAFE_DELETE(m_pCRingBuffRecv) // ★2.1.受信リングバッファの破棄 SAFE_DELETE(m_pCRingBuffForDistributeChat) // ★3.1.チャット配信用リングバッファの破棄 } //============================================== // function // 機能を記述した関数 // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== UINT CSendRecvThread::DoWork() { BOOL fRet = TRUE; char szRecvBuffer[RCVBUFSIZE]; // 受信バッファ int iRecvSize; pollfd fds[1] = { 0 }; char *pcData = NULL; // 未送信データ int iSendSize = 0; // 未送信データサイズ fprintf(stderr, “DoWork()\n”); fds[0].fd = m_pConInfo->fdClient; while (!m_fStopFlag) { iSendSize = m_pCRingBuffSend->GetReadableSize(); // ★1.3.送信データがあるかリングバッファを調べる fds[0].events = POLLIN | POLLRDHUP; // 受信と相手側からの切断イベントを設定 // 未送信データがあるときだけ送信可能検査 if (iSendSize > 0) fds[0].events |= POLLOUT; // 書き込み可能を追加 poll(fds, 1, 10); if (fds[0].revents & POLLRDHUP) // 相手側からの切断 { fprintf(stderr, “Disconnected pollrdhup\n”); fRet = FALSE; break; } if (fds[0].revents & POLLERR) // エラー発生 { m_pConInfo->pCMySyncObject->Lock(); DispErrorMsg(“Err:DoWork”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } if (fds[0].revents & POLLIN) // 受信データイベント { // ★2.2.受信リングバッファに空きがあれば受信する // ここで受信しなければ、次回のrevents検査で受信が行われる iRecvSize = min(m_pCRingBuffRecv->GetWriteableSize(), RCVBUFSIZE); if (iRecvSize > 0) { if ((iRecvSize = recv(m_pConInfo->fdClient, szRecvBuffer, iRecvSize, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); if (iRecvSize == 0) DispErrorMsg(“Disconnected recv”); else DispErrorMsg(“Err:recv”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else { // ★2.2.取得したデータすぐに受信リングバッファに書きこむ // 受信リングバッファに書き込むのはこのスレッドだけなので、すべて書き込めるはず m_pCRingBuffRecv->Write((LPBYTE)szRecvBuffer, iRecvSize); } } } // ★2.3.受信リングバッファに格納されているデータの解析を行う // 複数のパケットが格納されている可能性があるので、reventsの結果とは無関係に // 解析を行うようにする if (AnalyzeDataRecv() == -1) { m_pConInfo->pCMySyncObject->Lock(); DispErrorMsg(“Err:Packet format”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } if (fds[0].revents & POLLOUT) // 送信可能ならsend実施 { // ★1.3.送信したいデータの取得(PATH_MTUより小さくなるように取得する) iSendSize = GetSendData(&pcData); if (send(m_pConInfo->fdClient, pcData, iSendSize, 0) != iSendSize) { DispErrorMsg(“Err:send”); fRet = FALSE; break; } SAFE_FREE(pcData) } } m_pConInfo->pCMySyncObject->Lock(); m_fIamZombie = TRUE; m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pcData) return((fRet == TRUE) ? 0 : -1); } //============================================== // function // 送信データの設定 // 送信できない時、エラーとして切断をするのが良いか // は迷うところ // parameter // char *pcData [in]送信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetSendData(char *pcData, int iSize) { BOOL fRet = FALSE; // ★1.2.送信リングバッファに空きがないときは書き込まない fRet = m_pCRingBuffSend->Write((LPBYTE)pcData, iSize); return(fRet); } //============================================== // function // 送信データの取得 // parameter // char **ppcData [in/out]送信データ // return // データ長 //============================================== int CSendRecvThread::GetSendData(char **ppcData) { int iSize = 0; // ★1.3.送信データがあるかリングバッファのデータサイズを調べる if ((iSize = m_pCRingBuffSend->GetReadableSize()) > 0) { // 送信サイズをPATH_MTUより小さくしておく(1024+共通ヘッダなら大丈夫) iSize = min(iSize, SENDBUFSIZE); *ppcData = (char *)calloc(iSize, sizeof(char)); iSize = m_pCRingBuffSend->Read((LPBYTE)*ppcData, iSize); } return(iSize); } //============================================== // function // チャットメッセージをクライアントに分配するために格納する // parameter // char *pcData [in]チャットメッセージパケット // int iSize [in]パケット長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetDataForDistributeChat(char *pcData, int iSize) { BOOL fRet = FALSE; // ★3.2.格納用リングバッファに空きがないときは書き込まない fRet = m_pCRingBuffForDistributeChat->Write((LPBYTE)pcData, iSize); return(fRet); } //============================================== // function // ★3.3.チャット分配用データの取得 // パケット単位で取り出す // 今回は1パケット分 // parameter // char **ppcData [in/out]チャット転送用データ // return // データ長 //============================================== int CSendRecvThread::GetDataForDistributeChat(char **ppcData) { int iSize = 0, iStoreSize, iPacketSize; HeaderRec Header; if ((iStoreSize = m_pCRingBuffForDistributeChat->GetReadableSize()) >= sizeof(HeaderRec)) { // ヘッダ部を仮り読み込みする m_pCRingBuffForDistributeChat->ReadWithoutUpdateHeadPoint((LPBYTE)&Header, sizeof(HeaderRec)); iPacketSize = ntohs(Header.wDataLen) + sizeof(HeaderRec); if (iStoreSize >= iPacketSize) { *ppcData = (char *)calloc(iPacketSize, sizeof(char)); iSize = m_pCRingBuffForDistributeChat->Read((LPBYTE)*ppcData, iPacketSize); } } return(iSize); } //============================================== // function // このスレッドはゾンビ状態か // 別スレッドから参照される // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== BOOL CSendRecvThread::IsZombie() { BOOL fRet; m_pConInfo->pCMySyncObject->Lock(); fRet = m_fIamZombie; m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // 受信データの解析 // 受信リングバッファに格納されているデータを調べる // parameter // なし // retun ★2.3. // 0:パケットが揃っていない // 1:パケットを受信したので処理を行った // -1:エラーが発生した //============================================== BOOL CSendRecvThread::AnalyzeDataRecv() { int iRet = 0; HeaderRec Header; WORD wCmd; // ★2.3.データサイズを調べる int iSize = m_pCRingBuffRecv->GetReadableSize(); if (iSize < sizeof(HeaderRec)) // ★2.3.ヘッダサイズに満たないときは何もしない goto L_END; // ★2.3.ヘッダ部を借り読み込みする m_pCRingBuffRecv->ReadWithoutUpdateHeadPoint((LPBYTE)&Header, sizeof(HeaderRec)); // ヘッダ部の解析 if (memcmp(Header.bMagicData, MAGIC_STRING, strlen(MAGIC_STRING)) != 0) { iRet = -1; // 識別子が違うのでエラー goto L_END; } wCmd = ntohs(Header.wCommand); fprintf(stderr, “CMD:%d\n”, wCmd); switch (wCmd) { case CMD_MSG_DATA: iRet = RecvMessagePacket(&Header); break; default: // 知らないコマンド iRet = -1; break; } L_END: return(iRet); } //============================================== // function // メッセージコマンドの受信 // parameter // HeaderRec *pHeader [in]仮読みしたヘッダ // return ★2.3. // 0:パケットが揃っていない // 1:パケットを受信したので処理を行った // -1:エラーが発生した //============================================== BOOL CSendRecvThread::RecvMessagePacket(HeaderRec *pHeader) { int iRet = 0; MsgDataRec *pMsgData; int iMsgSize, iSize; char *pszMsg = NULL; iMsgSize = ntohs(pHeader->wDataLen); // データが足りないときは何もしない iSize = m_pCRingBuffRecv->GetReadableSize(); if (iSize < iMsgSize + sizeof(HeaderRec)) goto L_END; // パケット全体を受信しているので読み込みを実施する pMsgData = (MsgDataRec *)calloc(iMsgSize + sizeof(HeaderRec), sizeof(BYTE)); m_pCRingBuffRecv->Read((LPBYTE)pMsgData, iMsgSize + sizeof(HeaderRec)); // NULLターミネート分を追加して確保 pszMsg = (char *)calloc(iMsgSize + 1, sizeof(char)); memcpy(pszMsg, pMsgData->bMsgData, iMsgSize); // チャット送信のためにメッセージデータをパケット化してセット SendMessagePacket(pszMsg, iMsgSize); m_pConInfo->pCMySyncObject->Lock(); fprintf(stderr, “Msg:recv %s\n”, pszMsg); m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pMsgData) SAFE_FREE(pszMsg) iRet = 1; L_END: return(iRet); } //============================================== // function // メッセージパケットの送信 // チャットメッセージをクライアントに転送するためのエリアに // 格納する // parameter // char *pcData [in]UTF-8データ(NULLターミネートなし) // int iSize [in]データサイズ // retun // なし //============================================== BOOL CSendRecvThread::SendMessagePacket(char *pcData, int iSize) { MsgDataRec *pMsgData = NULL; int iPacketSize = sizeof(HeaderRec) + iSize; // パケットサイズ BOOL fRet = FALSE; pMsgData = (MsgDataRec *)calloc(iPacketSize, sizeof(BYTE)); // パケット全体のエリアを確保 memcpy(pMsgData->header.bMagicData, MAGIC_STRING, strlen(MAGIC_STRING)); pMsgData->header.wCommand = htons(CMD_MSG_DATA); pMsgData->header.wDataLen = htons(iSize); memcpy(pMsgData->bMsgData, pcData, iSize); // ★3.2.チャット送信のためにメッセージパケットデータをセット fRet = SetDataForDistributeChat((char *)pMsgData, iPacketSize); SAFE_FREE(pMsgData) return(fRet); }
これで、リングバッファに対応したSimpleServerTransFile(Linux)が
完成しました。
Winodwes版への対応も同様にできます(SimpleServerTransFile(Winodws))。