セマフォ
プロセス間の同期制御
プロセス間での排他制御の方法としてミューテックスを説明しましたが、これに近いものにセマフォがあります。
ミューテックスは、複数のスレッドで共有するリソースへの同時アクセスを防ぐためのものです。
セマフォは、複数のスレッドから共有リソースへの同時アクセス数を制限します。
ここでは「スレッド」と説明していますが、ミューテックスと同様にプロセス間でも使用できます。
また「共有リソース」は複数のスレッドやプロセスからアクセス可能なデータ、という意味です。
例えば複数のプリンタ(リソース)に接続する複数のパソコンがある状態を考えます。
どれかひとつでも「空き」状態のプリンタがあれば印刷が可能です。
空きが無い場合はひとつ以上のプリンタが空くまで印刷を待機する必要があります。
この「使用可能な(残りの)リソースの数」を管理するのがセマフォの機能です。
管理するリソースの個数をセマフォカウンタ(セマフォ値)といいます。
スレッドがリソースの使用を要求すると、セマフォはカウンタをひとつ減らします。
リソースの使用を終えたことを通知すると、カウンタをひとつ増やします。
リソース使用の要求はWaitForSingleObject関数などの待機関数を使用し、カウンタが0の状態でリソース使用を要求すると、他でリソースが解放されるまで待機状態になります。
セマフォの機能は数を管理するだけで、「複数のリソースのうち、どのリソースが空き状態か」などは管理しません。
この処理はプログラマが実装する必要があります。
ミューテックスには所有権の概念がありますが、セマフォにはありません。
ミューテックスは所有権を持つスレッドがその所有権を解放する必要がありますが、セマフォはそういった制限はありません。
カウンタを増やすだけのスレッドや、カウンタを減らすだけのスレッドを作ることもできます。
なお、リソースの最大数が「1」のセマフォをバイナリセマフォといいます。
(「0」か「1」のふたつの状態のみを持つ=binary(二値、二進数)、という意味)
これはミューテックスと同じ使い方が可能です。
リソースの最大数が「2」以上のセマフォをカウンティングセマフォといいます。
排他制御と同期制御
スレッドやプロセスは、何もしなければそれぞれ無関係に動作します。
それでは困る場合に排他制御や同期制御を行い、それぞれの実行単位の処理のタイミングを調整します。
排他制御は共有リソースを特定のスレッドに占有させることを言います。
その目的は複数のスレッドからのアクセスによるデータの不整合を防止することです。
同期制御はスレッドの処理タイミングを制御することを言います。
排他制御も他のスレッドを待たせる制御を行うので広い意味では同期制御の一種と言えます。
例えばスレッドAの処理を行うためにスレッドBの処理が必要な場合に「Bが必要なデータを生成し終わったことを確認してからAを開始」という風にタイミングを制御することです。
ただし「スレッドBの起動→スレッドBの終了→スレッドAの起動」という処理ではマルチスレッドの意味がないので、処理タイミングが重なっても問題ない箇所は非同期的に処理を行います。
セマフォ関数
セマフォカウンタに関係する箇所以外はミューテックス関数とほぼ同じです。
セマフォカウンタを減らす(リソース使用を要求する)場合はWaitForSingleObject関数などの待機関数を使用します。
CreateSemaphore関数
セマフォオブジェクトはCreateSemaphore
関数で作成します。
- HANDLE CreateSemaphoreW(
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,
LONG lInitialCount,
LONG lMaximumCount,
LPCWSTR lpName
); - セマフォオブジェクトを作成する。
または既存のセマフォオブジェクトを開く。
- lpSemaphoreAttributes
-
セマフォオブジェクトのセキュリティ記述子です。
NULL
を指定するとハンドルの子プロセスへの継承は禁止されます。
通常はNULLで構いません。 - lInitialCount
-
セマフォカウントの初期値を指定します。
0以上、かつ次の引数lMaximumCount
以下の値を指定します。 - lMaximumCount
-
セマフォカウントの最大値を指定します。
値は1以上である必要があります。 - lpName
-
セマフォオブジェクトの名前を指定します。
この名前のセマフォオブジェクトがシステムに存在しない場合は作成され、システムに登録されます。
すでに存在する場合はSEMAPHORE_ALL_ACCESS
というアクセス権を要求します。
この場合、引数lInitialCount
とlMaximumCount
はすでに設定済みであるため無視されます。
引数lpSemaphoreAttributes
のbInheritHandle
メンバにより、ハンドルの子プロセスへの継承を設定することは可能ですが、セキュリティ記述子は無視されます。名前は大文字小文字を区別します。
長さは最大でMAX_PATH
まで指定可能です。
既存のミューテックス、イベント、待機可能タイマー、ジョブ、ファイルマッピングオブジェクトの名前と重複すると関数は失敗します。
NULL
を指定すると名前なしセマフォを作成します。 - 戻り値
-
関数が成功した場合はセマフォオブジェクトのハンドルを返します。
指定した名前のセマフォオブジェクトが既に存在する場合も関数は成功しますが、GetLastError
関数はERROR_ALREADY_EXISTS
を返します。
関数が失敗した場合はNULL
を返します。
セマフォオブジェクトは、不要になったらCloseHandle関数でハンドルを閉じる必要があります。
セマフォオブジェクトは複数のスレッド/プロセスから参照することができ、CloseHandle関数によってひとつも参照するスレッド/プロセスがなくなった場合にシステムから削除されます。
(明示的にCloseHandle関数を呼ばなくてもアプリケーションが終了した時にハンドルは解放されます)
OpenSemaphore関数
システム上に存在するセマフォオブジェクトのハンドルを取得するにはOpenSemaphore
関数を使用します。
- HANDLE OpenSemaphoreW(
DWORD dwDesiredAccess,
BOOL bInheritHandle,
LPCWSTR lpName
); - 既存のセマフォオブジェクトlpNameのハンドルを取得する。
失敗した場合はNULLを返す。
- dwDesiredAccess
-
セマフォオブジェクトに要求するアクセス権の指定です。
セマフォオブジェクトは同期のためのSYNCHRONIZE
フラグと、セマフォカウンタを変更するためのSEMAPHORE_MODIFY_STATE
フラグの両方を指定します。
セキュリティを変更する場合はSEMAPHORE_ALL_ACCESS
フラグを指定します。
このフラグはSYNCHRONIZE
フラグやSEMAPHORE_MODIFY_STATE
フラグを含む全ての権限を要求します。//通常の範疇での使用の場合 HANDLE hSemaphore = OpenSemaphore( SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, semaphoreName);
- bInheritHandle
-
取得したハンドルを子プロセスに継承できるか否かを
TRUE
/FALSE
で指定します。 - lpName
-
開くセマフォ名を指定します。
名前なしセマフォは取得できません。 - 戻り値
-
成功した場合はセマフォオブジェクトのハンドルです。
失敗した場合はNULL
を返し、GetLastError
関数はERROR_FILE_NOT_FOUND
を返します。
ReleaseSemaphore関数
セマフォオブジェクトのカウンタを増やすにはReleaseSemaphore
関数を使用します。
- BOOL ReleaseSemaphore(
HANDLE hSemaphore,
LONG lReleaseCount,
LPLONG lpPreviousCount
); - セマフォオブジェクトハンドルhSemaphoreのセマフォカウンタをlReleaseCountの値だけ増やし、関数実行前のカウンタをlpPreviousCountに格納する。
成功した場合は0以外を、失敗した場合は0を返す。
- hSemaphore
-
セマフォオジェクトのハンドルを指定します。
- lReleaseCount
-
セマフォオブジェクトのカウンタに加える値を指定します。
0以上の値である必要があります。
セマフォカウンタの最大値を超える場合は関数は失敗します。 - lpPreviousCount
-
この関数の実行の前に設定されていたセマフォカウンタの値がlong型変数のポインタに格納されます。
必要ない場合はNULL
を指定できます。 - 戻り値
-
関数が成功した場合は0以外を、失敗した場合は0を返します。
サンプルコード
セマフォを利用して、スレッドの作成数に上限を設けるサンプルコードです。
#include <windows.h>
#include <strsafe.h>
#include <process.h>
//ウィンドウの生成等は省略
#define WM_THREADSTART (WM_APP + 1)
#define WM_THREADEND (WM_APP + 2)
#define BUFFERSIZE 64
#define IDC_BUTTON1 100
//スレッドパラメーター
typedef struct {
HWND hWnd; //メインスレッドのウィンドウハンドル
HANDLE hSemaphore; //セマフォオブジェクトハンドル
} ThreadParam;
//スレッド開始関数
DWORD WINAPI ThreadProc(LPVOID lpParameter)
{
ThreadParam *tp = (ThreadParam*)lpParameter;
//メインスレッドにスレッド開始を通知
PostMessage(tp->hWnd, WM_THREADSTART, 0, 0);
//何か処理...
Sleep(1000);
//セマフォカウンタをひとつ増やす
ReleaseSemaphore(tp->hSemaphore, 1, NULL);
//メインスレッドにスレッド終了を通知
PostMessage(tp->hWnd, WM_THREADEND, 0, 0);
return 0;
}
//ウィンドウプロシージャ
LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
static HWND hStatic;
static ThreadParam tp; //スレッドパラメーター
static int threadCount; //現在のスレッド数
HANDLE hSemaphore; //セマフォオブジェクトハンドル
HANDLE hThread; //スレッドハンドル
WCHAR buf[BUFFERSIZE];
DWORD dwWait;
switch (message)
{
case WM_CREATE: //ウィンドウの作成
//初期値3、最大値3のセマフォを作成
hSemaphore = CreateSemaphore(NULL, 3, 3, NULL);
if (!hSemaphore) {
MessageBox(NULL, L"セマフォオブジェクトの生成に失敗しました", L"エラー", MB_OK);
return -1;
}
tp.hWnd = hWnd;
tp.hSemaphore = hSemaphore;
CreateWindow(L"BUTTON", L"新スレッド開始",
WS_CHILD | WS_VISIBLE,
10, 10,
120, 25,
hWnd, (HMENU)IDC_BUTTON1, hInst, NULL);
hStatic = CreateWindow(L"STATIC", L"スレッド数: 0",
WS_CHILD | WS_VISIBLE,
10, 40,
120, 25,
hWnd, (HMENU)-1, hInst, NULL);
break;
case WM_COMMAND: //コントロールの操作
switch (LOWORD(wParam))
{
case IDC_BUTTON1:
//セマフォカウンタをひとつ減らす
//(待機時間0)
dwWait = WaitForSingleObject(tp.hSemaphore, 0);
if (dwWait == WAIT_TIMEOUT || dwWait == WAIT_FAILED) {
//セマフォカウンタが0なら何もしない
//(解放を待たない)
break;
}
//スレッドの作成
hThread = (HANDLE)_beginthreadex(
NULL, 0,
(LPTHREAD_START_ROUTINE)ThreadProc,
&tp, 0, NULL);
if (hThread) {
CloseHandle(hThread);
}
break;
}
break;
case WM_THREADSTART: //スレッドが正常に開始された
StringCchPrintf(buf, BUFFERSIZE, L"スレッド数: %d", ++threadCount);
SetWindowText(hStatic, buf);
break;
case WM_THREADEND: //スレッドの終了
StringCchPrintf(buf, BUFFERSIZE, L"スレッド数: %d", --threadCount);
SetWindowText(hStatic, buf);
break;
case WM_DESTROY: //ウィンドウの破棄
if(tp.hSemaphore)
CloseHandle(tp.hSemaphore);
PostQuitMessage(0);
break;
default:
return DefWindowProc(hWnd, message, wParam, lParam);
}
return 0;
}
ボタンをクリックするとスレッドが作成され、現在作成されているスレッドの数が表示されます。
スレッド数はセマフォ作成時に指定した上限を超えることはありません。
ボタンクリック時に待機関数を使用してリソース使用を要求し(カウンタがひとつ減る)、取得できなければ(カウンタが既に0)、スレッドを作成せず処理を終了します。
カウンタを増やす処理は異なるスレッドで行われていることに注目してください。
ミューテックスの場合はミューテックスを所有したスレッドで解放する必要がありますが、セマフォの場合はそういった制限はありません。
必要ならば待機関数ですでに起動しているスレッドの終了を待つことも可能ですが、メインスレッドで待機するとウィンドウがフリーズするので注意してください。
サンプルコード2(生産者/消費者問題)
もうひとつ「生産者/消費者問題」のサンプルコードを掲載しておきます。
これは以下のようなルールでデータをやり取りするものです。
- データを生成する「生産者(Producer)」とデータを使用する「消費者(Consumer)」が存在する
- データはキューに格納され、キューの最大サイズには制限がある(N個のキュー)
- キューが空の場合、消費者はデータが生成されるまで待機する
- キューが満杯の場合、生産者はデータが使用されるまで待機する
「キュー」とはデータを格納する配列のようなものと考えてください。
メッセージキューのキューと同じで、データは先に入れたものから順番に取り出されます。
(先入れ先出し)
より詳細な説明や動作の概念はウィキペディアのセマフォ - Wikipedia#例: 生産者/消費者問題のページが分かりやすいです。
今回は「データ」にはランダムに生成した文字列を使用することにします。
やや長いコードですが、半分くらいは動作を視覚的に表示するためのものです。
#define _CRT_RAND_S
#include <windows.h>
#include <windowsx.h>
#include <strsafe.h>
#include <process.h>
//ウィンドウの生成等は省略
//スレッド→メインスレッド用メッセージ
#define WM_INCPRODUCER (WM_APP + 1)
#define WM_DECPRODUCER (WM_APP + 2)
#define WM_INCCONSUMER (WM_APP + 3)
#define WM_DECCONSUMER (WM_APP + 4)
//リソース数
#define SEMAPHORE_BUFFERCOUNT 10
//リソース文字列の長さ(NULL文字含む)
#define SEMAPHORE_BUFFERSIZE 9
//生産者の最大数
#define PRODUCERCOUNT 3
//消費者の最大数
#define CONSUMERCOUNT 6
//スレッドパラメーター
typedef struct {
HANDLE hMutex; //ミューテックス
HANDLE hSemEmpty; //セマフォ:生産カウント(バッファ空き数)
HANDLE hSemFull; //セマフォ:消費カウント(バッファ埋まり数)
size_t nextIn; //次に生産物を入れるインデックス
size_t nextOut; //次に生産物を取り出すインデックス
//操作されるリソース
WCHAR buffer[SEMAPHORE_BUFFERCOUNT][SEMAPHORE_BUFFERSIZE];
//ここから下は画面表示やコントロール操作に必要なもの
HANDLE hSemProducer; //セマフォ:生産者数
HANDLE hSemConsumer; //セマフォ:消費者数
HWND hWnd; //メインウィンドウハンドル
HWND hEditPro; //ログ表示用
HWND hEditCon; //ログ表示用
} ThreadParam;
//rand_s関数のラッパー関数
unsigned int Random()
{
static char init;
if (init == 0) {
SYSTEMTIME systemTime;
FILETIME fileTime;
GetSystemTime(&systemTime);
SystemTimeToFileTime(&systemTime, &fileTime);
srand((unsigned)fileTime.dwLowDateTime);
init = 1;
}
unsigned int r;
return rand_s(&r) == 0 ? r : 0;
}
/// <summary>
/// ランダムな文字列を生成する
/// </summary>
/// <param name="buf">文字列を格納するバッファ</param>
/// <param name="bufSize">バッファサイズ</param>
/// <returns>成功:TRUE/失敗:FALSE</returns>
BOOL GenerateRandomString(WCHAR* buf, size_t bufSize)
{
static const WCHAR* charList = L"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
static const size_t charListSize = 64;
if (buf == NULL || bufSize == 0)
return FALSE;
for (size_t i = 0; i < bufSize - 1; i++) {
*(buf + i) = *(charList + (Random() % charListSize));
}
*(buf + bufSize - 1) = L'\0';
return TRUE;
}
/// <summary>
/// 数値の桁数を取得する(9桁まで)
/// (画面表示用)
/// </summary>
/// <param name="number">数値</param>
/// <returns>桁数</returns>
char GetDigit(size_t number)
{
size_t digit = 0;
while (number != 0) {
number /= 10;
++digit;
}
if (digit > 9)
digit = 9;
return digit;
}
/// <summary>
/// ログ用に整形済みの文字列を得る(改行込み)
/// (画面表示用)
/// </summary>
/// <param name="dst">コピー先の文字列バッファ</param>
/// <param name="dstLength">文字列バッファの文字数</param>
/// <param name="digit">キューの桁数</param>
/// <param name="index">行番号</param>
/// <param name="text">文字列</param>
/// <returns></returns>
BOOL FormatToLogString(WCHAR *dst, size_t dstLength, unsigned char digit, size_t index, const WCHAR *text)
{
if (!dst || dstLength == 0)
return FALSE;
if (digit == 0)
digit = 1;
else if (digit > 9)
digit = 9;
WCHAR lineFormat[] = L"%0*u|%s\r\n";
lineFormat[2] = (WCHAR)(digit + 48); //数値をASCIIコードに変換
StringCchPrintf(dst, dstLength, lineFormat, index, *text ? text : L"-");
return TRUE;
}
/// <summary>
/// エディットコントロールの先頭に文字列を挿入する
/// (画面表示用)
/// </summary>
/// <param name="hEdit">エディットコントロールハンドル</param>
/// <param name="text">挿入する文字列</param>
/// <param name="maxLineCount">行数の制限 0を指定で制限なし</param>
void PrependSingleLine(HWND hEdit, const WCHAR* text, size_t maxLineCount)
{
if (!hEdit || !text || maxLineCount == 0)
return;
if (maxLineCount == 0)
maxLineCount = UINT_MAX;
SendMessage(hEdit, EM_SETSEL, 0, 0);
SendMessage(hEdit, EM_REPLACESEL, FALSE, (LPARAM)text);
size_t lineCount = (size_t)SendMessage(hEdit, EM_GETLINECOUNT, 0, 0);
if (lineCount > maxLineCount)
{
int textLength = GetWindowTextLength(hEdit) + 1;
WCHAR* buf = malloc(sizeof(WCHAR) * textLength);
if (!buf) {
return;
}
GetWindowText(hEdit, buf, textLength);
*(buf + textLength - 1) = L'\0';
LRESULT index = SendMessage(hEdit, EM_LINEINDEX, maxLineCount, 0);
//指定行数の末尾の改行文字をNULL文字に置き換え
//改行文字は\r\nの2文字
*(buf + index - 2) = L'\0';
SetWindowText(hEdit, buf);
free(buf);
}
}
/// <summary>
/// 複数の文字列(ダブルポインタ)の内容を桁付きでエディットコントロールに出力する
/// (画面表示用)
/// </summary>
/// <param name="hEdit">エディットコントロールハンドル</param>
/// <param name="src">複数の文字列(ダブルポインタ)</param>
/// <param name="srcLength">文字列の数</param>
/// <param name="stringLength">文字列の長さ</param>
void OutputStrings(HWND hEdit, const WCHAR** src, size_t srcLength, size_t stringLength)
{
if (!hEdit || !src || srcLength == 0 || stringLength == 0)
return;
char digit = GetDigit(srcLength);
//一行の長さ
//桁数 + 区切り文字 + 文字列の長さ + 改行文字(\r\n)
const size_t lineLength = digit + 1 + stringLength + 2;
WCHAR* line = malloc(sizeof(WCHAR) * lineLength);
if (!line) {
return;
}
SendMessage(hEdit, WM_SETREDRAW, FALSE, 0); //更新停止
SetWindowText(hEdit, L"");
for (size_t n = 0; n < srcLength; n++) {
FormatToLogString(line, lineLength, digit, n, src[n]);
SendMessage(hEdit, EM_SETSEL, -1, -1);
SendMessage(hEdit, EM_REPLACESEL, FALSE, (LPARAM)line);
}
SendMessage(hEdit, WM_SETREDRAW, TRUE, 0); //更新再開
free(line);
}
//生産者スレッド
DWORD WINAPI Thread_Producer(LPVOID lpParameter)
{
MSG msg;
PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE);
ThreadParam *tp = (ThreadParam*)lpParameter;
DWORD dwThreadID = GetCurrentThreadId();
SendMessage(tp->hWnd, WM_INCPRODUCER, (WPARAM)dwThreadID, 0);
DWORD dwWait;
//メッセージループ
while (1)
{
if (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) {
if (msg.message == WM_QUIT)
break;
}
//リソースが消費されるまで待機
dwWait = WaitForSingleObject(tp->hSemEmpty, 500);
if (dwWait == WAIT_TIMEOUT || dwWait == WAIT_FAILED) {
//メッセージを取得するために
//500ミリ秒ごとにループ先頭に戻る
continue;
}
//リソースをロック
WaitForSingleObject(tp->hMutex, INFINITE);
//リソースにコピー
if (!GenerateRandomString(tp->buffer[tp->nextIn], SEMAPHORE_BUFFERSIZE)) {
//失敗した場合
ReleaseMutex(tp->hMutex);
msg.wParam = -1;
break;
}
tp->nextIn++;
//最後のインデックスに到達したら0に戻す
tp->nextIn %= SEMAPHORE_BUFFERCOUNT;
{
//全リソースの状態をエディットコントロールに表示
WCHAR* b[SEMAPHORE_BUFFERCOUNT];
for (int n = 0; n < SEMAPHORE_BUFFERCOUNT; n++)
b[n] = (WCHAR*)&tp->buffer[n];
OutputStrings(tp->hEditPro, b, SEMAPHORE_BUFFERCOUNT, SEMAPHORE_BUFFERSIZE);
}
//ロック解除
ReleaseMutex(tp->hMutex);
//消費カウントをひとつ減らす
ReleaseSemaphore(tp->hSemFull, 1, NULL);
//生産ひとつあたり1秒かかるとする
Sleep(1000);
}
//生産者カウントをひとつ減らす
ReleaseSemaphore(tp->hSemProducer, 1, NULL);
//free(item);
SendMessage(tp->hWnd, WM_DECPRODUCER, (WPARAM)dwThreadID, 0);
return (DWORD)msg.wParam;
}
//消費者スレッド
DWORD WINAPI Thread_Consumer(LPVOID lpParameter)
{
MSG msg;
PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE);
ThreadParam* tp = (ThreadParam*)lpParameter;
DWORD dwThreadID = GetCurrentThreadId();
SendMessage(tp->hWnd, WM_INCCONSUMER, (WPARAM)dwThreadID, 0);
DWORD dwWait;
while (1)
{
if (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) {
if (msg.message == WM_QUIT)
break;
}
//リソースにひとつでもアイテムが来るまで待機
dwWait = WaitForSingleObject(tp->hSemFull, 500);
if (dwWait == WAIT_TIMEOUT || dwWait == WAIT_FAILED) {
//メッセージを取得するために
//500ミリ秒ごとにループ先頭に戻る
continue;
}
//リソースをロック
WaitForSingleObject(tp->hMutex, INFINITE);
//リソースを使用する
WCHAR* item = tp->buffer[tp->nextOut];
{
//使用したリソースをログに表示
char digit = GetDigit(SEMAPHORE_BUFFERCOUNT);
const size_t lineLength = digit + 1 + SEMAPHORE_BUFFERSIZE + 2;
WCHAR* line = malloc(sizeof(WCHAR) * lineLength);
if (line) {
FormatToLogString(line, lineLength, digit, tp->nextOut, item);
PrependSingleLine(tp->hEditCon, line, SEMAPHORE_BUFFERCOUNT);
free(line);
}
}
//使用したリソースを空にする
tp->buffer[tp->nextOut++][0] = L'\0';
//最後のインデックスに到達したら0に戻す
tp->nextOut %= SEMAPHORE_BUFFERCOUNT;
{
//全リソースの状態をエディットコントロールに表示
//CopyAndPrependSingleLine(tp->hEditPro, tp->hEditCon, out, SEMAPHORE_BUFFERCOUNT);
WCHAR* b[SEMAPHORE_BUFFERCOUNT];
for (int n = 0; n < SEMAPHORE_BUFFERCOUNT; n++)
b[n] = (WCHAR*)&tp->buffer[n];
OutputStrings(tp->hEditPro, b, SEMAPHORE_BUFFERCOUNT, SEMAPHORE_BUFFERSIZE);
}
//ロック解除
ReleaseMutex(tp->hMutex);
//生産カウントをひとつ減らす
ReleaseSemaphore(tp->hSemEmpty, 1, NULL);
//消費ひとつあたり0.75秒かかるとする
Sleep(750);
}
//消費者カウントをひとつ減らす
ReleaseSemaphore(tp->hSemConsumer, 1, NULL);
SendMessage(tp->hWnd, WM_DECCONSUMER, (WPARAM)dwThreadID, 0);
return (DWORD)msg.wParam;
}
#define BUFFERSIZE 16
#define IDC_BTN_INCPRO 100
#define IDC_BTN_DECPRO 101
#define IDC_BTN_INCCUS 102
#define IDC_BTN_DECCUS 103
#define IDC_EDT_PRO 200
#define IDC_EDT_CUS 201
LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
static ThreadParam tp;
static int producerCount, consumerCount;
static DWORD dwProducerIDs[PRODUCERCOUNT];
static DWORD dwConsumerIDs[CONSUMERCOUNT];
static HWND hStatic_Producer, hStatic_Consumer;
HANDLE hThread;
WCHAR buf[BUFFERSIZE];
DWORD dwWait;
switch (message)
{
case WM_CREATE: //ウィンドウの作成
tp.hMutex = CreateMutex(NULL, FALSE, NULL);
//生産カウント
tp.hSemEmpty = CreateSemaphore(NULL, SEMAPHORE_BUFFERCOUNT, SEMAPHORE_BUFFERCOUNT, NULL);
//消費カウント(初期値が0であることに注意)
tp.hSemFull = CreateSemaphore(NULL, 0, SEMAPHORE_BUFFERCOUNT, NULL);
//生産者数
tp.hSemProducer = CreateSemaphore(NULL, PRODUCERCOUNT, PRODUCERCOUNT, NULL);
//消費者数
tp.hSemConsumer = CreateSemaphore(NULL, CONSUMERCOUNT, CONSUMERCOUNT, NULL);
tp.hWnd = hWnd;
CreateWindow(L"BUTTON", L"生産者+",
WS_CHILD | WS_VISIBLE,
10, 10,
80, 25,
hWnd, (HMENU)IDC_BTN_INCPRO, hInst, NULL);
CreateWindow(L"BUTTON", L"生産者-",
WS_CHILD | WS_VISIBLE,
90, 10,
80, 25,
hWnd, (HMENU)IDC_BTN_DECPRO, hInst, NULL);
CreateWindow(L"BUTTON", L"消費者+",
WS_CHILD | WS_VISIBLE,
180, 10,
80, 25,
hWnd, (HMENU)IDC_BTN_INCCUS, hInst, NULL);
CreateWindow(L"BUTTON", L"消費者-",
WS_CHILD | WS_VISIBLE,
260, 10,
80, 25,
hWnd, (HMENU)IDC_BTN_DECCUS, hInst, NULL);
hStatic_Producer = CreateWindow(L"STATIC", L"生産者数: 0",
WS_CHILD | WS_VISIBLE,
10, 35,
160, 25,
hWnd, (HMENU)-1, hInst, NULL);
hStatic_Consumer = CreateWindow(L"STATIC", L"消費者数: 0",
WS_CHILD | WS_VISIBLE,
180, 35,
160, 25,
hWnd, (HMENU)-1, hInst, NULL);
tp.hEditPro = CreateWindowEx(
WS_EX_CLIENTEDGE,
L"EDIT", 0,
WS_CHILD | WS_VISIBLE |
ES_MULTILINE | WS_VSCROLL,
10, 70, 80, 300,
hWnd, (HMENU)IDC_EDT_PRO, hInst, NULL);
tp.hEditCon = CreateWindowEx(
WS_EX_CLIENTEDGE,
L"EDIT", 0,
WS_CHILD | WS_VISIBLE |
ES_MULTILINE | WS_VSCROLL,
180, 70, 80, 300,
hWnd, (HMENU)IDC_EDT_CUS, hInst, NULL);
break;
case WM_SIZE: //ウィンドウサイズ変更
SetWindowPos(tp.hEditPro, 0,
10, 70, 160, GET_Y_LPARAM(lParam) - 80,
SWP_NOMOVE | SWP_NOZORDER);
SetWindowPos(tp.hEditCon, 0,
180, 70, 160, GET_Y_LPARAM(lParam) - 80,
SWP_NOMOVE | SWP_NOZORDER);
break;
case WM_COMMAND: //コントロールの操作
switch (LOWORD(wParam))
{
case IDC_BTN_INCPRO: //生産者+
dwWait = WaitForSingleObject(tp.hSemProducer, 0);
if (dwWait == WAIT_TIMEOUT || dwWait == WAIT_FAILED) {
break;
}
hThread = (HANDLE)_beginthreadex(
NULL, 0,
(LPTHREAD_START_ROUTINE)Thread_Producer,
&tp,
0, NULL);
if (hThread) {
CloseHandle(hThread);
}
break;
case IDC_BTN_DECPRO: //生産者-
if (producerCount == 0)
break;
PostThreadMessage(dwProducerIDs[producerCount - 1], WM_QUIT, 0, 0);
break;
case IDC_BTN_INCCUS: //消費者+
dwWait = WaitForSingleObject(tp.hSemConsumer, 0);
if (dwWait == WAIT_TIMEOUT || dwWait == WAIT_FAILED) {
break;
}
hThread = (HANDLE)_beginthreadex(
NULL, 0,
(LPTHREAD_START_ROUTINE)Thread_Consumer,
&tp,
0, NULL);
if (hThread) {
CloseHandle(hThread);
}
break;
case IDC_BTN_DECCUS: //消費者-
if (producerCount == 0)
break;
PostThreadMessage(dwConsumerIDs[consumerCount - 1], WM_QUIT, 0, 0);
}
break;
//スレッドからのメッセージ
case WM_INCPRODUCER:
dwProducerIDs[producerCount] = (DWORD)wParam;
StringCchPrintf(buf, BUFFERSIZE, L"生産者数: %d", ++producerCount);
SetWindowText(hStatic_Producer, buf);
break;
case WM_DECPRODUCER:
StringCchPrintf(buf, BUFFERSIZE, L"生産者数: %d", --producerCount);
SetWindowText(hStatic_Producer, buf);
break;
case WM_INCCONSUMER:
dwConsumerIDs[consumerCount] = (DWORD)wParam;
StringCchPrintf(buf, BUFFERSIZE, L"消費者数: %d", ++consumerCount);
SetWindowText(hStatic_Consumer, buf);
break;
case WM_DECCONSUMER:
StringCchPrintf(buf, BUFFERSIZE, L"消費者数: %d", --consumerCount);
SetWindowText(hStatic_Consumer, buf);
break;
case WM_DESTROY: //ウィンドウ破棄
if (tp.hMutex) CloseHandle(tp.hMutex);
if (tp.hSemEmpty) CloseHandle(tp.hSemEmpty);
if (tp.hSemFull) CloseHandle(tp.hSemFull);
if (tp.hSemProducer) CloseHandle(tp.hSemProducer);
if (tp.hSemConsumer) CloseHandle(tp.hSemConsumer);
PostQuitMessage(0);
break;
default:
return DefWindowProc(hWnd, message, wParam, lParam);
}
return 0;
}
生産者スレッドを作成すると文字列が順番にキューに格納されていきます。
消費者スレッドを作成すると作成された順に文字列に取り出します。
現在のリソース数を管理するためにカウンティングセマフォを使用しますが、「リソースに空きがあるか否か」と「リソースが満杯か否か」は同じように見えてひとつのセマフォでは管理できません。
セマフォは「現在のセマフォ値が0か否か」でしか待機を判定ができないためです。
そのため、生産者スレッドを待機させる用と消費者スレッドを待機させる用のふたつのセマフォが必要になります。
カウンティングセマフォは複数のスレッドから同時にリソースにアクセスできるので、それらのスレッド間では排他制御はできません。
排他制御のためにもうひとつバイナリセマフォやミューテックス、クリティカルセクションオブジェクトなどを使用する必要があります。
サンプルコードではミューテックスを使用しています。