Echoもどきシステムでクライアント、サーバともに送受信機能を実装しました。
送信機能については、送信バッファ(後編ではリングバッファに変更予定)を
使用しています。
今回は、受信バッファも使用することにして、他の送受信スレッド(CSendRecvThreadの
別のインスタンス)に、受信したデータを渡す仕組みを実装し、Chatもどきシステムを
作成してみましょう。
なお、クライアント側はEchoもどきシステムのクライアント(SimpleClientMEcho)を
そのまま使用します。
Chatもどきシステム(TCP IPv4 IPv6)の仕様は、以下の通りです。
Clientから送信した文字列(UTF-8)をServerは送信元以外の接続している
すべてのClientに送信します。
Clientはサーバから送り返された文字列を表示します。
SimpleServerMEchoのプロジェクトをコピーし、SimpleServerMChatに名前変更します。
CSendRecvThreadに受信バッファm_pRecvData, m_iRecvDataSizeと受信バッファを操作する
関数GetRecvData, SetRecvDataを追加します。
これらは実装済みのSetSendData, GetSendDataの受信データ版です。
受信したデータをSetRecvDataで受信データとしてセットします。
受信バッファに格納されたデータを読み出すのは、ここではメインスレッド(DoAccept)で
行うことにします。
【SendRecvThread.h Windows版とLinux版同じです】 #pragma once #include “ThreadJob.h” class CMySyncObject; // このクラスの使用することを宣言 typedef struct { SOCKET fdClient; // 接続済みソケット(acceptの結果) CMySyncObject *pCMySyncObject; // 同期オブジェクト } ConnectionInfoRec; class CSendRecvThread : public CThreadJob { public: CSendRecvThread(ConnectionInfoRec *pConInfo); // パラメータをコンストラクタで渡す ~CSendRecvThread(); // 基底クラスの関数をオーバーライドする // C++11で明示的にoverrideを書くことが出来るようになりました // 基底クラスの当該関数にvirtualが書いていないとエラーを出してくれます UINT DoWork() override; BOOL SetSendData(char *pcData, int iSize); // 送信データの設定 int GetRecvData(char **ppcData); // ★受信データの取得(他から参照するのでpublic) BOOL IsZombie(); // このスレッドはゾンビ状態か private: ConnectionInfoRec *m_pConInfo; // コンストラクタで渡されるパラメータを格納 // このスレッド実行中領域が確保されていること BOOL m_fIamZombie; // ゾンビ状態かどうかを保持 char *m_pSendData; // 送信データ格納用エリア int m_iSendDataSize; // 送信データ格納用エリアのデータサイズ char *m_pRecvData; // ★受信データ格納用エリア int m_iRecvDataSize; // ★受信データ格納用エリアのデータサイズ int GetSendData(char **ppcData); // 送信データの取得 BOOL SetRecvData(char *pcData, int iSize); // ★受信データの設定(他から参照しないのでprivate) };
Echoもどきでは受信データを同じスレッド(同じCSendRecvThreadのインスタンス)の
送信バッファに格納していましたが、今回は別のスレッド(別のCSendRecvThreadの
インスタンス)に渡す必要があるので、いったん受信バッファに格納するようにします。
ここが、Echoもどきと異なる点です。
そのための、受信バッファの初期化、操作するための関数を実装します。
【SendRecvThread.cpp Windows】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため //============================================== // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //============================================== CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; m_pSendData = NULL; // 送信したいデータを格納するエリア m_iSendDataSize = 0; // 送信したいデータのサイズ m_pRecvData = NULL; // ★受信データを格納するエリア m_iRecvDataSize = 0; // ★受信データのサイズ } //============================================== // function // デストラクタ // parameter // なし // return // なし //============================================== CSendRecvThread::~CSendRecvThread() { SAFE_FREE(m_pSendData) // 送信したいデータを格納するエリアを開放します m_iSendDataSize = 0; SAFE_FREE(m_pRecvData) // ★受信データを格納するエリアを開放します m_iRecvDataSize = 0; } #define RCVBUFSIZE (1024) // 一回に読む最大受信サイズ //============================================== // function // 機能を記述した関数(DoRecv, DoSendの内容を記述する) // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== UINT CSendRecvThread::DoWork() { BOOL fRet = TRUE; char szRecvBuffer[RCVBUFSIZE + 1]; // 受信バッファ int iRecvSize; fd_set rfds, wfds; struct timeval tv; char *pcData = NULL; // 未送信データ int iSendSize = 0; // 未送信データサイズ fprintf(stderr, “DoWork()\n”); while (!m_fStopFlag) { // 未送信のデータがなければ送信したいデータがあるか調べる if (iSendSize == 0) iSendSize = GetSendData(&pcData); tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); FD_ZERO(&wfds); FD_SET(m_pConInfo->fdClient, &rfds); // 未送信データがあるときだけ送信可能検査用fd_setにセットする if (iSendSize > 0) FD_SET(m_pConInfo->fdClient, &wfds); select(FD_SETSIZE, &rfds, NULL, NULL, &tv);// タイムアウトまでSleepと同等 // 受信処理(DoRecv) if (FD_ISSET(m_pConInfo->fdClient, &rfds)) { memset(szRecvBuffer, 0, sizeof(szRecvBuffer)); if ((iRecvSize = recv(m_pConInfo->fdClient, szRecvBuffer, RCVBUFSIZE, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); if (iRecvSize == 0) DispErrorMsg(“Disconnected”); else DispErrorMsg(“Err:recv”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else { #if 0 fprintf(stderr, “%s\n”, szRecvBuffer); #else // ★受信データ(UTF-8)を受信バッファに格納する SetRecvData(szRecvBuffer, iRecvSize); // UTF-8で受信したので、SJISに変換して表示する int iSize = 0; LPBYTE pbDest = NULL; ConvUtf8toSJis((LPBYTE)szRecvBuffer, NULL, &iSize); pbDest = (LPBYTE)calloc(iSize, sizeof(BYTE)); ConvUtf8toSJis((LPBYTE)szRecvBuffer, pbDest, &iSize); m_pConInfo->pCMySyncObject->Lock(); fprintf(stderr, “%s\n”, pbDest); m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pbDest) #endif } } // 送信処理(DoSend) if (FD_ISSET(m_pConInfo->fdClient, &wfds)) // 送信可能ならsend実施 { if (send(m_pConInfo->fdClient, pcData, iSendSize, 0) != iSendSize) { DispErrorMsg(“Err:send”); fRet = FALSE; break; } SAFE_FREE(pcData) // 未送信データなしにセット iSendSize = 0; } } m_pConInfo->pCMySyncObject->Lock(); m_fIamZombie = TRUE; m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pcData) // 未送信データなしにセット iSendSize = 0; return((fRet == TRUE) ? 0 : -1); } //============================================== // function // 送信データの設定 // parameter // char *pcData [in]送信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetSendData(char *pcData, int iSize) { BOOL fRet = FALSE; m_pConInfo->pCMySyncObject->Lock(); // 前回のデータを読みだしていないときはエラーにする if (m_iSendDataSize == 0) { m_pSendData = (char *)calloc(iSize, sizeof(char)); memcpy(m_pSendData, pcData, iSize); m_iSendDataSize = iSize; fRet = TRUE; } m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // 送信データの取得 // parameter // char **ppcData [in/out]送信データ // return // データ長 //============================================== int CSendRecvThread::GetSendData(char **ppcData) { int iSize = 0; m_pConInfo->pCMySyncObject->Lock(); if (m_iSendDataSize > 0) { *ppcData = (char *)calloc(m_iSendDataSize, sizeof(char)); memcpy(*ppcData, m_pSendData, m_iSendDataSize); iSize = m_iSendDataSize; SAFE_FREE(m_pSendData) // 読み出しを行ったことをセット m_iSendDataSize = 0; } m_pConInfo->pCMySyncObject->UnLock(); return(iSize); } //============================================== // function // ★受信データの設定 // parameter // char *pcData [in]受信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetRecvData(char *pcData, int iSize) { BOOL fRet = FALSE; m_pConInfo->pCMySyncObject->Lock(); // 前回のデータを読みだしていないときはエラーにする if (m_iRecvDataSize == 0) { m_pRecvData = (char *)calloc(iSize, sizeof(char)); memcpy(m_pRecvData, pcData, iSize); m_iRecvDataSize = iSize; fRet = TRUE; } m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // ★受信データの取得 // parameter // char **ppcData [in/out]受信データ // return // データ長 //============================================== int CSendRecvThread::GetRecvData(char **ppcData) { int iSize = 0; m_pConInfo->pCMySyncObject->Lock(); if (m_iRecvDataSize > 0) { *ppcData = (char *)calloc(m_iRecvDataSize, sizeof(char)); memcpy(*ppcData, m_pRecvData, m_iRecvDataSize); iSize = m_iRecvDataSize; SAFE_FREE(m_pRecvData) // 読み出しを行ったことをセット m_iRecvDataSize = 0; } m_pConInfo->pCMySyncObject->UnLock(); return(iSize); } //============================================== // function // このスレッドはゾンビ状態か // 別スレッドから参照される // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== BOOL CSendRecvThread::IsZombie() { BOOL fRet; m_pConInfo->pCMySyncObject->Lock(); fRet = m_fIamZombie; m_pConInfo->pCMySyncObject->UnLock(); return(fRet); }
Linux版のSendRecvThread.cppの変更点もWindows版と全く同じです。
selectとpollの違いがあるので、念のためソースを記述しておきます。
【SendRecvThraed.cpp Linux】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため //============================================== // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //============================================== CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; m_pSendData = NULL; // 送信したいデータを格納するエリア m_iSendDataSize = 0; // 送信したいデータのサイズ m_pRecvData = NULL; // ★受信データを格納するエリア m_iRecvDataSize = 0; // ★受信データのサイズ } //============================================== // function // デストラクタ // parameter // なし // return // なし //============================================== CSendRecvThread::~CSendRecvThread() { SAFE_FREE(m_pSendData) // 送信したいデータを格納するエリアを開放します m_iSendDataSize = 0; SAFE_FREE(m_pRecvData) // ★受信データを格納するエリアを開放します m_iRecvDataSize = 0; } #define RCVBUFSIZE (1024) // 一回に読む最大受信サイズ //============================================== // function // 機能を記述した関数 // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== UINT CSendRecvThread::DoWork() { BOOL fRet = TRUE; char szRecvBuffer[RCVBUFSIZE + 1]; // 受信バッファ int iRecvSize; pollfd fds[1] = { 0 }; char *pcData = NULL; // 未送信データ int iSendSize = 0; // 未送信データサイズ fprintf(stderr, “DoWork()\n”); fds[0].fd = m_pConInfo->fdClient; while (!m_fStopFlag) { // 未送信のデータがなければ送信したいデータがあるか調べる if (iSendSize == 0) iSendSize = GetSendData(&pcData); fds[0].events = POLLIN | POLLRDHUP; // 受信と相手側からの切断イベントを設定 if (iSendSize > 0) fds[0].events |= POLLOUT; // 書き込み可能を追加 poll(fds, 1, 10); if (fds[0].revents & POLLRDHUP) // 相手側からの切断 { fprintf(stderr, “Disconnected pollrdhup\n”); fRet = FALSE; break; } else if (fds[0].revents & POLLERR) // エラー発生 { m_pConInfo->pCMySyncObject->Lock(); DispErrorMsg(“Err:DoWork”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else if (fds[0].revents & POLLIN) // 受信データイベント { memset(szRecvBuffer, 0, sizeof(szRecvBuffer)); if ((iRecvSize = recv(m_pConInfo->fdClient, szRecvBuffer, RCVBUFSIZE, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); if (iRecvSize == 0) DispErrorMsg(“Disconnected recv”); else DispErrorMsg(“Err:recv”); m_pConInfo->pCMySyncObject->UnLock(); fRet = FALSE; break; } else { // ★受信データを受信バッファに格納する SetRecvData(szRecvBuffer, iRecvSize); m_pConInfo->pCMySyncObject->Lock(); fprintf(stderr, “%s\n”, szRecvBuffer); m_pConInfo->pCMySyncObject->UnLock(); } } else if (fds[0].revents & POLLOUT) // 送信可能ならsend実施 { if (send(m_pConInfo->fdClient, pcData, iSendSize, 0) != iSendSize) { DispErrorMsg(“Err:send”); fRet = FALSE; break; } SAFE_FREE(pcData) // 未送信データなしにセット iSendSize = 0; } } m_pConInfo->pCMySyncObject->Lock(); m_fIamZombie = TRUE; m_pConInfo->pCMySyncObject->UnLock(); SAFE_FREE(pcData) // 未送信データなしにセット iSendSize = 0; return((fRet == TRUE) ? 0 : -1); } //============================================== // function // 送信データの設定 // parameter // char *pcData [in]送信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetSendData(char *pcData, int iSize) { BOOL fRet = FALSE; m_pConInfo->pCMySyncObject->Lock(); // 前回のデータを読みだしていないときはエラーにする if (m_iSendDataSize == 0) { m_pSendData = (char *)calloc(iSize, sizeof(char)); memcpy(m_pSendData, pcData, iSize); m_iSendDataSize = iSize; fRet = TRUE; } m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // 送信データの取得 // parameter // char **ppcData [in/out]送信データ // return // データ長 //============================================== int CSendRecvThread::GetSendData(char **ppcData) { int iSize = 0; m_pConInfo->pCMySyncObject->Lock(); if (m_iSendDataSize > 0) { *ppcData = (char *)calloc(m_iSendDataSize, sizeof(char)); memcpy(*ppcData, m_pSendData, m_iSendDataSize); iSize = m_iSendDataSize; SAFE_FREE(m_pSendData) // 読み出しを行ったことをセット m_iSendDataSize = 0; } m_pConInfo->pCMySyncObject->UnLock(); return(iSize); } //============================================== // function // ★受信データの設定 // parameter // char *pcData [in]受信データ // int iSize [in]データ長 // return // TRUE/FALSE //============================================== BOOL CSendRecvThread::SetRecvData(char *pcData, int iSize) { BOOL fRet = FALSE; m_pConInfo->pCMySyncObject->Lock(); // 前回のデータを読みだしていないときはエラーにする if (m_iRecvDataSize == 0) { m_pRecvData = (char *)calloc(iSize, sizeof(char)); memcpy(m_pRecvData, pcData, iSize); m_iRecvDataSize = iSize; fRet = TRUE; } m_pConInfo->pCMySyncObject->UnLock(); return(fRet); } //============================================== // function // ★受信データの取得 // parameter // char **ppcData [in/out]受信データ // return // データ長 //============================================== int CSendRecvThread::GetRecvData(char **ppcData) { int iSize = 0; m_pConInfo->pCMySyncObject->Lock(); if (m_iRecvDataSize > 0) { *ppcData = (char *)calloc(m_iRecvDataSize, sizeof(char)); memcpy(*ppcData, m_pRecvData, m_iRecvDataSize); iSize = m_iRecvDataSize; SAFE_FREE(m_pRecvData) // 読み出しを行ったことをセット m_iRecvDataSize = 0; } m_pConInfo->pCMySyncObject->UnLock(); return(iSize); } //============================================== // function // このスレッドはゾンビ状態か // 別スレッドから参照される // parameter // なし // return // 0:正常 -1:エラー発生 //============================================== BOOL CSendRecvThread::IsZombie() { BOOL fRet; m_pConInfo->pCMySyncObject->Lock(); fRet = m_fIamZombie; m_pConInfo->pCMySyncObject->UnLock(); return(fRet); }
受信したデータを読みだして、接続している他のクライアントへ送信する
つまりCSendRecvThreadの他のインスタンスの送信バッファに格納すれば完成です。
すべてのCSendRecvThreadのインスタンスを知っている制御用のスレッドを作っても
良いのですが、今回はメインスレッドのDoAccept関数を使うことにします。
メインスレッドはすべてのCSendRecvThreadのインスタンスを知っています。
DoAcceptはそれなりに短い間隔で繰り返し処理を行っていますのでここで、受信データの
存在確認と受信データの配信(送信バッファへの格納)処理を行う関数を呼び出すことに
します。
この関数はGetRecvDataAndSetSendDataとして実装しています。
Windows版とLinux版で変更点がおなじですので、変更箇所のみ以下に示します。
【SimpleServer.cpp Windows, Linux 変更箇所】 // ★関数の宣言に追加します BOOL GetRecvDataAndSetSendData(); // 受信データがあれば、他のクライアントに送信する //============================================== // function // 接続要求の受け入れ(接続済みソケットの作成) // 送受信スレッドの管理 // ★実行中の送受信スレッドに受信データがあれば他の実行中の送受信スレッド // の送信バッファにデータを格納する関数を呼ぶ // parameter // なし // return // TRUE/FALSE //============================================== BOOL DoAccept() { BOOL fRet = FALSE; sockaddr_storage ClntAddr; // 接続クライアントのアドレス情報 socklen_t iClntLen; // 接続クライアントのアドレス情報のサイズ char szHostAddr[NI_MAXHOST]; int ii; fd_set rfds; struct timeval tv; int jj; // 最大接続数まで送受信スレッドを動かす SOCKET fdClient; // そのためにacceptの結果を記憶する fprintf(stderr, “DoAccept()\n”); while (1) { KillZombei(); // ゾンビ状態のスレッドは破棄する GetRecvDataAndSetSendData(); // ★受信データのチェックと他のクライアントへの送信 tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); for (ii = 0; ii < m_iSockCount; ++ii) { FD_SET(m_fdServer[ii], &rfds); // bind成功したm_fdServer[ii]が調査対象 } select(FD_SETSIZE, &rfds, NULL, NULL, &tv);// 接続要求は読み込みで検査する // キー入力で中断 if(CheckKey()) { fprintf(stderr, ” OK:Abort by key\n”); break; } for (ii = 0; ii < m_iSockCount; ++ii) { if (FD_ISSET(m_fdServer[ii], &rfds)) // 接続要求があったときacceptを実施 { iClntLen = sizeof(ClntAddr); // 接続先アドレス情報を格納する構造体のサイズ // 受容したときのソケットを記憶し、送受信スレッドを作るか切断するかに使用 if ((fdClient = accept(m_fdServer[ii], (sockaddr *)&ClntAddr, &iClntLen)) == INVALID_SOCKET) { // 失敗時次の接続要求を処理する DispErrorMsg(“Err:accept”); continue; } else { // 成功時m_fdClientには接続済みのソケットが格納されている // 相手の情報から、IPアドレスを調べる if (getnameinfo((struct sockaddr *) &ClntAddr, (socklen_t)iClntLen, szHostAddr, sizeof(szHostAddr), NULL, 0, NI_NUMERICHOST) == 0) { fprintf(stderr, “%s\n”, szHostAddr); } // 接続数に余裕があればCSendRecvThreadのインスタンスを作成し実行 // なければ、切断する fRet = FALSE; for (jj = 0; jj < MAX_CONNECTION_NUM; ++jj) { if (m_pCSendRecvThread[jj] == NULL) { m_pConInfo[jj] = (ConnectionInfoRec *)calloc(1, sizeof(ConnectionInfoRec)); m_pConInfo[jj]->pCMySyncObject = m_pCMySyncObject; m_pConInfo[jj]->fdClient = fdClient; m_pCSendRecvThread[jj] = new CSendRecvThread(m_pConInfo[jj]); m_pCSendRecvThread[jj]->Begin(); fRet = TRUE; break; } } if (fRet == FALSE) DestroySocket(fdClient); } } } } return(fRet); } //============================================== // function // ★ゾンビ状態でないCSendRecvThreadに受信データがあれば // ゾンビ状態でない受信したClient以外のすべての // CSendRecvThreadに送信データとしてセットする // parameter // なし // return // TRUE:セットすべきデータがあった/FALSE:なかった //============================================== BOOL GetRecvDataAndSetSendData() { BOOL fRet = FALSE; int ii, iRecvSize = 0, iRecvIndex = -1; char *pcRecvData = NULL; for (ii = 0; ii < MAX_CONNECTION_NUM; ++ii) { if (m_pCSendRecvThread[ii] != NULL) // 実行中か { // 実行状態は有効か if (m_pCSendRecvThread[ii]->IsZombie() == FALSE) { // 受信データが受信バッファにあるか iRecvSize = m_pCSendRecvThread[ii]->GetRecvData(&pcRecvData); if (iRecvSize > 0) { fRet = TRUE; iRecvIndex = ii; break; } } } } // 受信データがある送受信スレッド以外に送信(iRecvIndexでない有効なスレッド) if (fRet == TRUE) { for (ii = 0; ii < MAX_CONNECTION_NUM; ++ii) { if ((m_pCSendRecvThread[ii] != NULL) && (ii != iRecvIndex)) { if (m_pCSendRecvThread[ii]->IsZombie() == FALSE) { m_pCSendRecvThread[ii]->SetSendData(pcRecvData, iRecvSize); } } } } SAFE_FREE(pcRecvData) return(fRet); }
プロジェクトSimpleServerMChat for Windows
プロジェクトSimpleServerPollMChat for Linux
前編は、これで終了ですがいかがでしたでしょうか。
ソケットによるTCP/IPのプログラミングを、IPv4, IPv6という視点だけではなく
Windows, Linuxという視点からも考えてきました。
IPv4, IPv6の両方の環境で動作するプログラムは、必須のことだと思います。
Linux用のプログラミングは、Android, iOS, MacOSでも使えるので、結構用途が
広がると思います。
もちろん、実用的なプログラムということでマルチスレッド、同期オブジェクトにも
対応しました。
まとめのChatもどきシステムを見られて次のようなことが気になられた方もおられると
思います。
・同期オブジェクトは送信バッファと受信バッファで別にしべきではないか。
・送信、受信バッファはリングバッファにすべきではないか
・TCP/IPはバイトストリームなのでアプリケーションパケットのような仕組みが
必要なのではないか
・UDPについては全く触れられてないではないか
などなど
もっともなことばかりです。
これらについては、後編でやっていくつもりでいますので、よろしくお願いいたします。