threaded delta search: refine work allocation
With this, each thread get repeatedly assigned the next available chunk of objects to process until the whole list is done. The idea is to have reasonably small chunks so that all CPUs remain busy with a minimum number of threads for as long as there is data to process. Signed-off-by: Nicolas Pitre <nico@cam.org> Signed-off-by: Junio C Hamano <gitster@pobox.com>
This commit is contained in:
parent
8ecce684a3
commit
c2a33679a7
@ -1582,27 +1582,42 @@ struct thread_params {
|
|||||||
unsigned *processed;
|
unsigned *processed;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static pthread_mutex_t data_request = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
static pthread_mutex_t data_ready = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
static pthread_mutex_t data_provider = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
static struct thread_params *data_requester;
|
||||||
|
|
||||||
static void *threaded_find_deltas(void *arg)
|
static void *threaded_find_deltas(void *arg)
|
||||||
{
|
{
|
||||||
struct thread_params *p = arg;
|
struct thread_params *me = arg;
|
||||||
if (p->list_size)
|
|
||||||
find_deltas(p->list, p->list_size,
|
for (;;) {
|
||||||
p->window, p->depth, p->processed);
|
pthread_mutex_lock(&data_request);
|
||||||
|
data_requester = me;
|
||||||
|
pthread_mutex_unlock(&data_provider);
|
||||||
|
pthread_mutex_lock(&data_ready);
|
||||||
|
|
||||||
|
if (!me->list_size)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
find_deltas(me->list, me->list_size,
|
||||||
|
me->window, me->depth, me->processed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define NR_THREADS 8
|
#define NR_THREADS 4
|
||||||
|
|
||||||
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
|
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
|
||||||
int window, int depth, unsigned *processed)
|
int window, int depth, unsigned *processed)
|
||||||
{
|
{
|
||||||
struct thread_params p[NR_THREADS];
|
struct thread_params p[NR_THREADS];
|
||||||
int i, ret;
|
int i, ret;
|
||||||
|
unsigned chunk_size;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&data_provider);
|
||||||
|
pthread_mutex_lock(&data_ready);
|
||||||
|
|
||||||
for (i = 0; i < NR_THREADS; i++) {
|
for (i = 0; i < NR_THREADS; i++) {
|
||||||
unsigned sublist_size = list_size / (NR_THREADS - i);
|
|
||||||
p[i].list = list;
|
|
||||||
p[i].list_size = sublist_size;
|
|
||||||
p[i].window = window;
|
p[i].window = window;
|
||||||
p[i].depth = depth;
|
p[i].depth = depth;
|
||||||
p[i].processed = processed;
|
p[i].processed = processed;
|
||||||
@ -1610,13 +1625,29 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
|
|||||||
threaded_find_deltas, &p[i]);
|
threaded_find_deltas, &p[i]);
|
||||||
if (ret)
|
if (ret)
|
||||||
die("unable to create thread: %s", strerror(ret));
|
die("unable to create thread: %s", strerror(ret));
|
||||||
list += sublist_size;
|
|
||||||
list_size -= sublist_size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; i < NR_THREADS; i++) {
|
/* this should be auto-tuned somehow */
|
||||||
pthread_join(p[i].thread, NULL);
|
chunk_size = window * 1000;
|
||||||
|
|
||||||
|
do {
|
||||||
|
unsigned sublist_size = chunk_size;
|
||||||
|
if (sublist_size > list_size)
|
||||||
|
sublist_size = list_size;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&data_provider);
|
||||||
|
data_requester->list = list;
|
||||||
|
data_requester->list_size = sublist_size;
|
||||||
|
pthread_mutex_unlock(&data_ready);
|
||||||
|
|
||||||
|
list += sublist_size;
|
||||||
|
list_size -= sublist_size;
|
||||||
|
if (!sublist_size) {
|
||||||
|
pthread_join(data_requester->thread, NULL);
|
||||||
|
i--;
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&data_request);
|
||||||
|
} while (i);
|
||||||
}
|
}
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
Loading…
Reference in New Issue
Block a user