threadpool.h
#pragma once
typedef void(*task_func)(void*);
int async_pool_init();
void async_pool_run(task_func f, void* arg);
void async_pool_reset();
threadpool.c
#include "stdafx.h"
#include "threadpool.h"
#include "tinythread.h"
#include <assert.h>
#define POOL_SIZE 2
struct Task
{
void* arg;
task_func f;
};
static mtx_t s_queue_mutex;
static thrd_t s_threads[POOL_SIZE];
static bool s_stop = false;
static cnd_t s_condition;
static size_t s_taskssize = 0;
static Task s_tasks[100];
int tfunc(void*)
{
for (;;)
{
mtx_lock(&s_queue_mutex);
while (!s_stop && s_taskssize == 0)
{
cnd_wait(&s_condition, &s_queue_mutex);
}
if (s_stop && s_taskssize == 0)
{
mtx_unlock(&s_queue_mutex);
break;
}
else
{
Task *p = &s_tasks[s_taskssize - 1];
s_taskssize--;
mtx_unlock(&s_queue_mutex);
(*p->f)(p->arg);
}
}
return 0;
}
int async_pool_init()
{
int r = mtx_init(&s_queue_mutex, mtx_plain);
if (r == thrd_success)
{
r = cnd_init(&s_condition);
if (r == thrd_success)
{
for (int i = 0; i < POOL_SIZE; i++)
{
r = thrd_create(&s_threads[i], &tfunc, 0);
if (r == thrd_success)
{
}
}
}
}
return thrd_success;
}
void async_pool_run(task_func f, void* arg)
{
mtx_lock(&s_queue_mutex);
s_tasks[s_taskssize].arg = arg;
s_tasks[s_taskssize].f = f;
s_taskssize++;
mtx_unlock(&s_queue_mutex);
cnd_broadcast(&s_condition);
}
void async_pool_reset()
{
bool wasstoped = false;
mtx_lock(&s_queue_mutex);
wasstoped = s_stop;
s_stop = true;
mtx_unlock(&s_queue_mutex);
if (wasstoped)
{
return;
}
cnd_broadcast(&s_condition);
for (size_t i = 0; i < POOL_SIZE; ++i)
{
int res;
int r = thrd_join(s_threads[i], &res);
if (r == thrd_success)
{
}
}
}