SimpleServer送受信部分をマルチスレッド対応し、複数のクライアントからの
要求を処理できるサーバを作成してみましょう
前回までに学習したCThraedJob, CMySyncOblectを使用することでマルチスレッドの
実装を簡単にできます。
SimpleServer(UTF8対応)のプロジェクトをコピーし、SimpleServerMに名前変更します。
CThreadJob, CMySyncObjectを追加します
stdThread.h, stdThraed.cppを更新します
次のクラスを追加します。
このクラスにDoRecv()の機能を実装します。
クラス名:CSendRecvThread
ファイル名:SendRecvThread.cpp、SendRecvThread.h
基底クラス:CThreadJob
CSendRecvThreadにメインスレッドから渡す必要のある情報はacceptの結果得られる
ソケットです。これを含む構造体ConnectionInfoRecを定義します。
さらにDoWorkのWhileループから抜けて実質このスレッドが不要になっているかどうかを
保持する変数とその状態を取得する関数を追加しています。
【SendRecvThread.h】 #pragma once #include “ThreadJob.h” class CMySyncObject; // このクラスの使用することを宣言 typedef struct { SOCKET fdClient; // 接続済みソケット(acceptの結果) CMySyncObject *pCMySyncObject; // 同期オブジェクト } ConnectionInfoRec; class CSendRecvThread : public CThreadJob { public: CSendRecvThread(ConnectionInfoRec *pConInfo); // パラメータをコンストラクタで渡す ~CSendRecvThread(); // 基底クラスの関数をオーバーライドする // C++11で明示的にoverrideを書くことが出来るようになりました // 基底クラスの当該関数にvirtualが書いていないとエラーを出してくれます UINT DoWork() override; // DoRecvで実施している内容を記述 BOOL IsZombie(); // このスレッドはゾンビ状態(不要となっている)か private: ConnectionInfoRec *m_pConInfo;// コンストラクタで渡されるパラメータを格納 // このスレッド実行中領域が確保されていること BOOL m_fIamZombie; // ゾンビ状態かどうかを保持 };
DoWork()にDoRecv()で行っている処理を記述しています。
whileループから抜けて実質的にこのスレッドが不要になったときには
ゾンビ状態になったことを示す変数をTRUEにします。
【SendRecvThread.cpp】 #include “SendRecvThread.h” #include “MySyncObject.h” // CMySyncObjectを使うため //////////////////////////////////////////////// // function // コンストラクタ // parameter // ConnectionInfoRec *pConInfo [in]機能に必要な情報 // return // なし //////////////////////////////////////////////// CSendRecvThread::CSendRecvThread(ConnectionInfoRec *pConInfo) { m_pConInfo = pConInfo; m_fIamZombie = FALSE; // ★実質このスレッドが不要になっていることを示す変数 } //////////////////////////////////////////////// // function // デストラクタ // parameter // なし // return // なし //////////////////////////////////////////////// CSendRecvThread::~CSendRecvThread() { } #define RCVBUFSIZE (1024) // 一回に読む最大受信サイズ //////////////////////////////////////////////// // function // 機能を記述した関数(DoRecvに記述してあった内容を記述する) // parameter // なし // return // 0:正常 -1:エラー発生 //////////////////////////////////////////////// DWORD CSendRecvThread::DoWork() { BOOL fRet = TRUE; char szRecvBuffer[RCVBUFSIZE + 1]; // 受信バッファ int iSize; fd_set rfds; struct timeval tv; fprintf(stderr, “DoWork()\n”); while (!m_fStopFlag) // ★終了条件を追加 { tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); FD_SET(m_pConInfo->fdClient, &rfds); select(FD_SETSIZE, &rfds, NULL, NULL, &tv); // タイムアウトまでSleepと同等 // ★キー入力チェックは不要なので削除 if (FD_ISSET(m_pConInfo->fdClient, &rfds)) { memset(szRecvBuffer, 0, sizeof(szRecvBuffer)); if ((iSize = recv(m_pConInfo->fdClient, szRecvBuffer, RCVBUFSIZE, 0)) <= 0) { m_pConInfo->pCMySyncObject->Lock(); // ★共通リソースにアクセスには同期処理 if (iSize == 0) fprintf(stderr, “Disconnected %d\n”, WSAGetLastError()); else fprintf(stderr, “Err:recv %d\n”, WSAGetLastError()); m_pConInfo->pCMySyncObject->UnLock(); // ★共通リソースにアクセスには同期処理 fRet = FALSE; break; } else { #if 0 fprintf(stderr, “%s\n”, szRecvBuffer); #else // UTF-8で受信したので、SJISに変換して表示する int iSize = 0; LPBYTE pbDest = NULL; ConvUtf8toSJis((LPBYTE)szRecvBuffer, NULL, &iSize); pbDest = (LPBYTE)calloc(iSize, sizeof(BYTE)); ConvUtf8toSJis((LPBYTE)szRecvBuffer, pbDest, &iSize); m_pConInfo->pCMySyncObject->Lock(); // ★共通リソースにアクセスには同期処理 fprintf(stderr, “%s\n”, pbDest); m_pConInfo->pCMySyncObject->UnLock(); // ★共通リソースにアクセスには同期処理 SAFE_FREE(pbDest) #endif } } } m_pConInfo->pCMySyncObject->Lock(); // ★共通リソースにアクセスには同期処理 m_fIamZombie = TRUE; // ★ゾンビフラグを立てる m_pConInfo->pCMySyncObject->UnLock(); // ★共通リソースにアクセスには同期処理 return((fRet == TRUE) ? 0 : -1); // ★正常時とエラー発生時で返値を変更 } //////////////////////////////////////////////// // function // このスレッドはゾンビ状態か // 別スレッドから参照される(★新規追加) // parameter // なし // return // 0:正常 -1:エラー発生 //////////////////////////////////////////////// BOOL CSendRecvThread::IsZombie() { BOOL fRet; m_pConInfo->pCMySyncObject->Lock(); // 共通リソースにアクセスには同期処理 fRet = m_fIamZombie; m_pConInfo->pCMySyncObject->UnLock(); // 共通リソースにアクセスには同期処理 return(fRet); }
メインスレッド(SimpleServer.cpp)では以下の変更を行います。
1.最大接続数を決める(同時に何台のクライアントに対応するか
2.最大接続数分のCSendRecvThreadとConnentionInfoRecを準備する
3.DoRecvは送受信スレッドに移動したので削除
もちろん関連の変数(m_fdClientなど)も削除
4.acceptを実施したら、最大接続数の範囲であれば送受信スレッド
(CSendRecvThread)を開始する
最大接続数に達していたら、切断する
5.ゾンビ状態のスレッドは破棄する
次の関数には変更がありません。
BOOL InitSocketLib()
BOOL UninitSocketLib()
BOOL CreateAndBindSocket(WORD wPort)
BOOL DoListen()
BOOL DestroySocket(SOCKET &fd)
不要になったので削除した関数は次のものです。
BOOL DoRecv()
変更点には★をつけています。
【SimpleServer.cpp】 #include “stdThread.h” #include <conio.h> // キー入力検査のため #include “SendRecvThread.h” // ★送受信スレッドを使うため #include “MySyncObject.h” // ★同期オブジェクトを使うため #ifdef _MSC_VER // Windowsのとき #define DISABLE_C4996 __pragma(warning(push)) __pragma(warning(disable:4996)) #define ENABLE_C4996 __pragma(warning(pop)) #else // Linuxのとき #define DISABLE_C4996 #define ENABLE_C4996 #endif #define MAXPENDING (5) // 接続バックログ数(保留中の接続のキューの最大長) #define RCVBUFSIZE (1024) // 一回に読む最大受信サイズ #define MAX_SOCKET_NUM (20) // 接続待用ソケットの最大数 #define MAX_CONNECTION_NUM (10) // ★最大受け入れ接続数 // 関数の宣言 BOOL InitSocketLib(); // WinSockDLLの初期化 BOOL UninitSocketLib(); // WinSockDLLの終了 BOOL CreateAndBindSocket(WORD wPort); // 接続待ち用ソケットの作成と名前付け BOOL DestroySocket(SOCKET &fd); // 切断とソケットの破棄 BOOL DoListen(); // ソケットを接続待ちにする BOOL DoAccept(); // 接続の受容(接続済みソケットを作成する) void Stop(); // すべてのソケットを破棄する BOOL CheckKey(); // キー入力の検査 BOOL KillZombei(); // ★ゾンビ状態のスレッドを破棄する // 変数の宣言 SOCKET m_fdServer[MAX_SOCKET_NUM]; // 待機用ソケット int m_iSockCount; // 待機中ソケットの数 CSendRecvThread *m_pCSendRecvThread[MAX_CONNECTION_NUM] = { NULL }; // ★ ConnectionInfoRec *m_pConInfo[MAX_CONNECTION_NUM] = { NULL }; // ★ CMySyncObject *m_pCMySyncObject = NULL; // ★ int main(int argc, char *argv[]) { int iRet = -1; int ii; m_pCMySyncObject = new CMySyncObject(); // ★同期オブジェクトの作成 m_pCMySyncObject->Initialize(); // 起動パラメータチェック if (argc != 2) { fprintf(stderr, “Usage: %s <Server Port>\n”, argv[0]); goto L_END; } // ソケットライブラリの初期化 if (InitSocketLib() == FALSE) goto L_END; // 変数の初期化(★DoRecv()で使っていた変数については不要なので削除) for (ii = 0; ii < MAX_SOCKET_NUM; ++ii) m_fdServer[ii] = INVALID_SOCKET; // 接続待ちソケットの作成と名前付け // bind済みのソケットがm_fdServer[]に格納される if (CreateAndBindSocket((WORD)atol(argv[1])) == FALSE) goto L_END; // 接続待ち状態にする // m_fdServer[]に格納されているソケットが接続待ちになる if (DoListen() == FALSE) goto L_END; // 接続要求に対して接続済みソケットを作成する(★DoRecv()の呼び出しはもちろん削除) // 送受信スレッドの作成をゾンビスレッドの破棄を行う if (DoAccept() == FALSE) goto L_END; iRet = 0; L_END: // 切断とすべてのソケットの破棄 Stop(); // ソケットライブラリの開放 UninitSocketLib(); m_pCMySyncObject->Uninitialize(); // ★同期オブジェクトの破棄 SAFE_DELETE(m_pCMySyncObject) return(iRet); } // 必要とするWINSOCK.DLL(WSOCK32.DLL のバージョン) #define VERSION_RECESTED MAKEWORD(2, 0) //////////////////////////////////////////////// // function // ソケットライブラリのロード // parameter // なし // retun // なし //////////////////////////////////////////////// BOOL InitSocketLib() { WORD wVersionRequested; // バージョン情報格納エリア WSADATA wsaData; // WinSock 情報格納エリア BOOL fRet; wVersionRequested = VERSION_RECESTED; // 初期化処理 fRet = (WSAStartup(wVersionRequested, &wsaData) == 0) ? TRUE : FALSE; return(fRet); } //////////////////////////////////////////////// // function // ソケットライブラリの開放 // parameter // なし // retun // なし //////////////////////////////////////////////// BOOL UninitSocketLib() { BOOL fRet; fRet = (WSACleanup() == 0) ? TRUE : FALSE; // 解放処理 return(fRet); } //////////////////////////////////////////////// // function // 待機用ソケットの作成とbindの実施 // parameter // WORD wPort [in]ポート番号 // return // TRUE/FALSE //////////////////////////////////////////////// BOOL CreateAndBindSocket(WORD wPort) { BOOL fRet = TRUE; struct addrinfo hints, *pres = NULL, *pTemp = NULL; char szPort[NI_MAXSERV]; int on; fprintf(stderr, “CreateAndBindSocket()\n”); m_iSockCount = 0; // ストリーム型で待機可能なアドレス情報の条件 memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; DISABLE_C4996 sprintf(szPort, “%d”, wPort); ENABLE_C4996 if (getaddrinfo(NULL, szPort, &hints, &pres) == 0) { // 順番にpresの中に格納されている情報を使用する pTemp = pres; while ((pTemp != NULL) && (m_iSockCount < MAX_SOCKET_NUM)) { // Socketの作成, bindの実行 // ソケット作成 m_fdServer[m_iSockCount] = socket(pTemp->ai_family, pTemp->ai_socktype, pTemp->ai_protocol); if (m_fdServer[m_iSockCount] == INVALID_SOCKET) goto L_NEXT; fprintf(stderr, “%d %d %d %d\n”, (int)m_fdServer[m_iSockCount], pTemp->ai_family, AF_INET, AF_INET6); #ifdef IPV6_V6ONLY // IPv6ソケットでIPv4射影アドレスを使用しないように設定 if (pTemp->ai_family == AF_INET6) { on = 1; setsockopt(m_fdServer[m_iSockCount], IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(on)); } #endif // クローズ直後に際bindできない状態の解消 on = 1; setsockopt(m_fdServer[m_iSockCount], SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on)); // bindの実行 if (bind(m_fdServer[m_iSockCount], pTemp->ai_addr, (int)pTemp->ai_addrlen) == SOCKET_ERROR) { // bindに失敗したら、そのソケットを破棄して次のソケットの処理を行う DispErrorMsg(” Err:bind”); DestroySocket(m_fdServer[m_iSockCount]); goto L_NEXT; } ++m_iSockCount; L_NEXT: pTemp = pTemp->ai_next; // 次の情報を処理する } freeaddrinfo(pres); } if (m_iSockCount == 0) // ひとつもbindできなかったときは失敗を返す fRet = FALSE; return(fRet); } //////////////////////////////////////////////// // function // 待機用ソケットを接続待ち状態にする // parameter // なし // return // TRUE/FALSE //////////////////////////////////////////////// BOOL DoListen() { BOOL fRet = FALSE; int ii; fprintf(stderr, “DoListen()\n”); for (ii = 0; ii < m_iSockCount; ++ii) { if (listen(m_fdServer[ii], MAXPENDING) == SOCKET_ERROR) continue; fRet = TRUE; } return(fRet); } //////////////////////////////////////////////// // function // 接続要求の受け入れ(接続済みソケットの作成) // 送受信スレッドの管理 // parameter // なし // return // TRUE/FALSE //////////////////////////////////////////////// BOOL DoAccept() { BOOL fRet = FALSE; sockaddr_storage ClntAddr; // 接続クライアントのアドレス情報 socklen_t iClntLen; // 接続クライアントのアドレス情報のサイズ char szHostAddr[NI_MAXHOST]; int ii; fd_set rfds; struct timeval tv; int jj; // ★最大接続数まで送受信スレッドを動かす SOCKET fdClient; // ★そのためにacceptの結果を記憶する fprintf(stderr, “DoAccept()\n”); while (1) { KillZombei(); // ★ゾンビ状態のスレッドは破棄する tv.tv_sec = 0; tv.tv_usec = 10 * 1000; // 10msec FD_ZERO(&rfds); for (ii = 0; ii < m_iSockCount; ++ii) { FD_SET(m_fdServer[ii], &rfds); // bind成功したm_fdServer[ii]が調査対象 } select(FD_SETSIZE, &rfds, NULL, NULL, &tv); // 接続要求は読み込みで検査する // キー入力で中断 if(CheckKey()) { fprintf(stderr, ” OK:Abort by key\n”); break; } for (ii = 0; ii < m_iSockCount; ++ii) { if (FD_ISSET(m_fdServer[ii], &rfds)) // 接続要求があったときacceptを実施 { iClntLen = sizeof(ClntAddr); // 接続先アドレス情報を格納する構造体のサイズ // ★受容したときのソケットを記憶し、送受信スレッドを作るか切断するかに使用 if ((fdClient = accept(m_fdServer[ii], (sockaddr *)&ClntAddr, &iClntLen)) == INVALID_SOCKET) { // 失敗時次の接続要求を処理する DispErrorMsg(“Err:accept”); continue; } else { // 成功時fdClientには接続済みのソケットが格納されている // 相手の情報から、IPアドレスを調べる if (getnameinfo((struct sockaddr *) &ClntAddr, (socklen_t)iClntLen, szHostAddr, sizeof(szHostAddr), NULL, 0, NI_NUMERICHOST) == 0) { fprintf(stderr, “%s\n”, szHostAddr); } // ★接続数に余裕があればCSendRecvThreadのインスタンスを作成し実行 // なければ、切断する fRet = FALSE; for (jj = 0; jj < MAX_CONNECTION_NUM; ++jj) { if (m_pCSendRecvThread[jj] == NULL) { m_pConInfo[jj] = (ConnectionInfoRec *)calloc(1, sizeof(ConnectionInfoRec)); m_pConInfo[jj]->pCMySyncObject = m_pCMySyncObject; m_pConInfo[jj]->fdClient = fdClient; m_pCSendRecvThread[jj] = new CSendRecvThread(m_pConInfo[jj]); m_pCSendRecvThread[jj]->Begin(); fRet = TRUE; break; } } if (fRet == FALSE) DestroySocket(fdClient); } } } } return(fRet); } //////////////////////////////////////////////// // function // TCPソケットの破棄 // parameter // SOCKET &fd[in/out]破棄するソケット // return // TRUE/FALSE //////////////////////////////////////////////// BOOL DestroySocket(SOCKET &fd) { if (fd != INVALID_SOCKET) { fprintf(stderr, “DestroySocket()\n”); shutdown(fd, SD_BOTH); // 受信も送信も停止 closesocket(fd); fd = INVALID_SOCKET; } return(TRUE); } //////////////////////////////////////////////// // function // 切断、ソケットの破棄、受信スレッドの破棄 // parameter // なし // return // なし //////////////////////////////////////////////// void Stop() { int ii; fprintf(stderr, “Stop()\n”); // ★受信スレッドの破棄, 接続済みソケットの破棄 for (ii = 0; ii < MAX_CONNECTION_NUM; ++ii) { if (m_pCSendRecvThread[ii] != NULL) { m_pCSendRecvThread[ii]->End(); m_pCSendRecvThread[ii]->WaitForEnd(); SAFE_DELETE(m_pCSendRecvThread[ii]); DestroySocket(m_pConInfo[ii]->fdClient); SAFE_FREE(m_pConInfo[ii]) } } // 接続待ちソケットの破棄 for (ii = 0; ii < m_iSockCount; ++ii) DestroySocket(m_fdServer[ii]); } //////////////////////////////////////////////// // function // キー入力チェック // parameter // なし // return // TRUE:入力あり //////////////////////////////////////////////// BOOL CheckKey() { BOOL fRet = FALSE; if (_kbhit()) { _getch(); fRet = TRUE; } return(fRet); } //////////////////////////////////////////////// // function // ★切断済みのCSendRecvThreadを破棄する // parameter // なし // return // TRUE:破棄した/FALSE:破棄すべきものがなかった //////////////////////////////////////////////// BOOL KillZombei() { BOOL fRet = FALSE; int ii; for (ii = 0; ii < MAX_CONNECTION_NUM; ++ii) { if (m_pCSendRecvThread[ii] != NULL) { if (m_pCSendRecvThread[ii]->IsZombie() == TRUE) { m_pCSendRecvThread[ii]->End(); m_pCSendRecvThread[ii]->WaitForEnd(); SAFE_DELETE(m_pCSendRecvThread[ii]); DestroySocket(m_pConInfo[ii]->fdClient); SAFE_FREE(m_pConInfo[ii]) fRet = TRUE; } } } return(fRet); }
これで完成です作成済みのSimpleClient(Win, Linux)を使って、同時複数接続が
できていることを確認してみてください。
次は、Linux版のSimpleServerですが、手順はWindows版と同じです。
ということで、次回作成しようと思いましたが
その前に、SimpleClient Windows版を作ることにしましょう。