Merge branch 'np/delta'

* np/delta:
  builtin-pack-objects.c: avoid bogus gcc warnings
  threaded delta search: proper locking for cache accounting
  threaded delta search: add pack.threads config variable
  fix threaded delta search locking
  threaded delta search: specify number of threads at run time
  threaded delta search: better chunck split point
  threaded delta search: refine work allocation
  basic threaded delta search
  rearrange delta search progress reporting
  localize window memory usage accounting
  straighten the list of objects to deltify
This commit is contained in:
Junio C Hamano 2007-09-14 22:33:28 -07:00
commit d225ae59c9
4 changed files with 256 additions and 59 deletions

View File

@ -630,9 +630,17 @@ pack.deltaCacheSize::
A value of 0 means no limit. Defaults to 0. A value of 0 means no limit. Defaults to 0.
pack.deltaCacheLimit:: pack.deltaCacheLimit::
The maxium size of a delta, that is cached in The maximum size of a delta, that is cached in
gitlink:git-pack-objects[1]. Defaults to 1000. gitlink:git-pack-objects[1]. Defaults to 1000.
pack.threads::
Specifies the number of threads to spawn when searching for best
delta matches. This requires that gitlink:git-pack-objects[1]
be compiled with pthreads otherwise this option is ignored with a
warning. This is meant to reduce packing time on multiprocessor
machines. The required amount of memory for the delta search window
is however multiplied by the number of threads.
pull.octopus:: pull.octopus::
The default merge strategy to use when pulling multiple branches The default merge strategy to use when pulling multiple branches
at once. at once.

View File

@ -169,6 +169,14 @@ base-name::
length, this option typically shrinks the resulting length, this option typically shrinks the resulting
packfile by 3-5 per-cent. packfile by 3-5 per-cent.
--threads=<n>::
Specifies the number of threads to spawn when searching for best
delta matches. This requires that pack-objects be compiled with
pthreads otherwise this option is ignored with a warning.
This is meant to reduce packing time on multiprocessor machines.
The required amount of memory for the delta search window is
however multiplied by the number of threads.
--index-version=<version>[,<offset>]:: --index-version=<version>[,<offset>]::
This is intended to be used by the test suite only. It allows This is intended to be used by the test suite only. It allows
to force the version for the generated pack index, and to force to force the version for the generated pack index, and to force

View File

@ -124,6 +124,9 @@ all::
# If not set it defaults to the bare 'wish'. If it is set to the empty # If not set it defaults to the bare 'wish'. If it is set to the empty
# string then NO_TCLTK will be forced (this is used by configure script). # string then NO_TCLTK will be forced (this is used by configure script).
# #
# Define THREADED_DELTA_SEARCH if you have pthreads and wish to exploit
# parallel delta searching when packing objects.
#
GIT-VERSION-FILE: .FORCE-GIT-VERSION-FILE GIT-VERSION-FILE: .FORCE-GIT-VERSION-FILE
@$(SHELL_PATH) ./GIT-VERSION-GEN @$(SHELL_PATH) ./GIT-VERSION-GEN
@ -675,6 +678,11 @@ ifdef NO_MEMMEM
COMPAT_OBJS += compat/memmem.o COMPAT_OBJS += compat/memmem.o
endif endif
ifdef THREADED_DELTA_SEARCH
BASIC_CFLAGS += -DTHREADED_DELTA_SEARCH
EXTLIBS += -lpthread
endif
ifeq ($(TCLTK_PATH),) ifeq ($(TCLTK_PATH),)
NO_TCLTK=NoThanks NO_TCLTK=NoThanks
endif endif

View File

@ -15,12 +15,16 @@
#include "list-objects.h" #include "list-objects.h"
#include "progress.h" #include "progress.h"
#ifdef THREADED_DELTA_SEARCH
#include <pthread.h>
#endif
static const char pack_usage[] = "\ static const char pack_usage[] = "\
git-pack-objects [{ -q | --progress | --all-progress }] \n\ git-pack-objects [{ -q | --progress | --all-progress }] \n\
[--max-pack-size=N] [--local] [--incremental] \n\ [--max-pack-size=N] [--local] [--incremental] \n\
[--window=N] [--window-memory=N] [--depth=N] \n\ [--window=N] [--window-memory=N] [--depth=N] \n\
[--no-reuse-delta] [--no-reuse-object] [--delta-base-offset] \n\ [--no-reuse-delta] [--no-reuse-object] [--delta-base-offset] \n\
[--non-empty] [--revs [--unpacked | --all]*] [--reflog] \n\ [--threads=N] [--non-empty] [--revs [--unpacked | --all]*] [--reflog] \n\
[--stdout | base-name] [<ref-list | <object-list]"; [--stdout | base-name] [<ref-list | <object-list]";
struct object_entry { struct object_entry {
@ -68,6 +72,7 @@ static int progress = 1;
static int window = 10; static int window = 10;
static uint32_t pack_size_limit; static uint32_t pack_size_limit;
static int depth = 50; static int depth = 50;
static int delta_search_threads = 1;
static int pack_to_stdout; static int pack_to_stdout;
static int num_preferred_base; static int num_preferred_base;
static struct progress progress_state; static struct progress progress_state;
@ -78,7 +83,6 @@ static unsigned long delta_cache_size = 0;
static unsigned long max_delta_cache_size = 0; static unsigned long max_delta_cache_size = 0;
static unsigned long cache_max_small_delta_size = 1000; static unsigned long cache_max_small_delta_size = 1000;
static unsigned long window_memory_usage = 0;
static unsigned long window_memory_limit = 0; static unsigned long window_memory_limit = 0;
/* /*
@ -1291,6 +1295,31 @@ static int delta_cacheable(unsigned long src_size, unsigned long trg_size,
return 0; return 0;
} }
#ifdef THREADED_DELTA_SEARCH
static pthread_mutex_t read_mutex = PTHREAD_MUTEX_INITIALIZER;
#define read_lock() pthread_mutex_lock(&read_mutex)
#define read_unlock() pthread_mutex_unlock(&read_mutex)
static pthread_mutex_t cache_mutex = PTHREAD_MUTEX_INITIALIZER;
#define cache_lock() pthread_mutex_lock(&cache_mutex)
#define cache_unlock() pthread_mutex_unlock(&cache_mutex)
static pthread_mutex_t progress_mutex = PTHREAD_MUTEX_INITIALIZER;
#define progress_lock() pthread_mutex_lock(&progress_mutex)
#define progress_unlock() pthread_mutex_unlock(&progress_mutex)
#else
#define read_lock() (void)0
#define read_unlock() (void)0
#define cache_lock() (void)0
#define cache_unlock() (void)0
#define progress_lock() (void)0
#define progress_unlock() (void)0
#endif
/* /*
* We search for deltas _backwards_ in a list sorted by type and * We search for deltas _backwards_ in a list sorted by type and
* by size, so that we see progressively smaller and smaller files. * by size, so that we see progressively smaller and smaller files.
@ -1300,7 +1329,7 @@ static int delta_cacheable(unsigned long src_size, unsigned long trg_size,
* one. * one.
*/ */
static int try_delta(struct unpacked *trg, struct unpacked *src, static int try_delta(struct unpacked *trg, struct unpacked *src,
unsigned max_depth) unsigned max_depth, unsigned long *mem_usage)
{ {
struct object_entry *trg_entry = trg->entry; struct object_entry *trg_entry = trg->entry;
struct object_entry *src_entry = src->entry; struct object_entry *src_entry = src->entry;
@ -1313,12 +1342,6 @@ static int try_delta(struct unpacked *trg, struct unpacked *src,
if (trg_entry->type != src_entry->type) if (trg_entry->type != src_entry->type)
return -1; return -1;
/* We do not compute delta to *create* objects we are not
* going to pack.
*/
if (trg_entry->preferred_base)
return -1;
/* /*
* We do not bother to try a delta that we discarded * We do not bother to try a delta that we discarded
* on an earlier try, but only when reusing delta data. * on an earlier try, but only when reusing delta data.
@ -1355,24 +1378,28 @@ static int try_delta(struct unpacked *trg, struct unpacked *src,
/* Load data if not already done */ /* Load data if not already done */
if (!trg->data) { if (!trg->data) {
read_lock();
trg->data = read_sha1_file(trg_entry->idx.sha1, &type, &sz); trg->data = read_sha1_file(trg_entry->idx.sha1, &type, &sz);
read_unlock();
if (!trg->data) if (!trg->data)
die("object %s cannot be read", die("object %s cannot be read",
sha1_to_hex(trg_entry->idx.sha1)); sha1_to_hex(trg_entry->idx.sha1));
if (sz != trg_size) if (sz != trg_size)
die("object %s inconsistent object length (%lu vs %lu)", die("object %s inconsistent object length (%lu vs %lu)",
sha1_to_hex(trg_entry->idx.sha1), sz, trg_size); sha1_to_hex(trg_entry->idx.sha1), sz, trg_size);
window_memory_usage += sz; *mem_usage += sz;
} }
if (!src->data) { if (!src->data) {
read_lock();
src->data = read_sha1_file(src_entry->idx.sha1, &type, &sz); src->data = read_sha1_file(src_entry->idx.sha1, &type, &sz);
read_unlock();
if (!src->data) if (!src->data)
die("object %s cannot be read", die("object %s cannot be read",
sha1_to_hex(src_entry->idx.sha1)); sha1_to_hex(src_entry->idx.sha1));
if (sz != src_size) if (sz != src_size)
die("object %s inconsistent object length (%lu vs %lu)", die("object %s inconsistent object length (%lu vs %lu)",
sha1_to_hex(src_entry->idx.sha1), sz, src_size); sha1_to_hex(src_entry->idx.sha1), sz, src_size);
window_memory_usage += sz; *mem_usage += sz;
} }
if (!src->index) { if (!src->index) {
src->index = create_delta_index(src->data, src_size); src->index = create_delta_index(src->data, src_size);
@ -1382,7 +1409,7 @@ static int try_delta(struct unpacked *trg, struct unpacked *src,
warning("suboptimal pack - out of memory"); warning("suboptimal pack - out of memory");
return 0; return 0;
} }
window_memory_usage += sizeof_delta_index(src->index); *mem_usage += sizeof_delta_index(src->index);
} }
delta_buf = create_delta(src->index, trg->data, trg_size, &delta_size, max_size); delta_buf = create_delta(src->index, trg->data, trg_size, &delta_size, max_size);
@ -1402,17 +1429,27 @@ static int try_delta(struct unpacked *trg, struct unpacked *src,
trg_entry->delta_size = delta_size; trg_entry->delta_size = delta_size;
trg->depth = src->depth + 1; trg->depth = src->depth + 1;
/*
* Handle memory allocation outside of the cache
* accounting lock. Compiler will optimize the strangeness
* away when THREADED_DELTA_SEARCH is not defined.
*/
if (trg_entry->delta_data)
free(trg_entry->delta_data);
cache_lock();
if (trg_entry->delta_data) { if (trg_entry->delta_data) {
delta_cache_size -= trg_entry->delta_size; delta_cache_size -= trg_entry->delta_size;
free(trg_entry->delta_data);
trg_entry->delta_data = NULL; trg_entry->delta_data = NULL;
} }
if (delta_cacheable(src_size, trg_size, delta_size)) { if (delta_cacheable(src_size, trg_size, delta_size)) {
trg_entry->delta_data = xrealloc(delta_buf, delta_size);
delta_cache_size += trg_entry->delta_size; delta_cache_size += trg_entry->delta_size;
} else cache_unlock();
trg_entry->delta_data = xrealloc(delta_buf, delta_size);
} else {
cache_unlock();
free(delta_buf); free(delta_buf);
}
return 1; return 1;
} }
@ -1429,68 +1466,60 @@ static unsigned int check_delta_limit(struct object_entry *me, unsigned int n)
return m; return m;
} }
static void free_unpacked(struct unpacked *n) static unsigned long free_unpacked(struct unpacked *n)
{ {
window_memory_usage -= sizeof_delta_index(n->index); unsigned long freed_mem = sizeof_delta_index(n->index);
free_delta_index(n->index); free_delta_index(n->index);
n->index = NULL; n->index = NULL;
if (n->data) { if (n->data) {
freed_mem += n->entry->size;
free(n->data); free(n->data);
n->data = NULL; n->data = NULL;
window_memory_usage -= n->entry->size;
} }
n->entry = NULL; n->entry = NULL;
n->depth = 0; n->depth = 0;
return freed_mem;
} }
static void find_deltas(struct object_entry **list, int window, int depth) static void find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{ {
uint32_t i = nr_objects, idx = 0, count = 0, processed = 0; uint32_t i = list_size, idx = 0, count = 0;
unsigned int array_size = window * sizeof(struct unpacked); unsigned int array_size = window * sizeof(struct unpacked);
struct unpacked *array; struct unpacked *array;
int max_depth; unsigned long mem_usage = 0;
if (!nr_objects)
return;
array = xmalloc(array_size); array = xmalloc(array_size);
memset(array, 0, array_size); memset(array, 0, array_size);
if (progress)
start_progress(&progress_state, "Deltifying %u objects...", "", nr_result);
do { do {
struct object_entry *entry = list[--i]; struct object_entry *entry = list[--i];
struct unpacked *n = array + idx; struct unpacked *n = array + idx;
int j, best_base = -1; int j, max_depth, best_base = -1;
if (!entry->preferred_base) mem_usage -= free_unpacked(n);
processed++;
if (progress)
display_progress(&progress_state, processed);
if (entry->delta)
/* This happens if we decided to reuse existing
* delta from a pack. "!no_reuse_delta &&" is implied.
*/
continue;
if (entry->size < 50)
continue;
if (entry->no_try_delta)
continue;
free_unpacked(n);
n->entry = entry; n->entry = entry;
while (window_memory_limit && while (window_memory_limit &&
window_memory_usage > window_memory_limit && mem_usage > window_memory_limit &&
count > 1) { count > 1) {
uint32_t tail = (idx + window - count) % window; uint32_t tail = (idx + window - count) % window;
free_unpacked(array + tail); mem_usage -= free_unpacked(array + tail);
count--; count--;
} }
/* We do not compute delta to *create* objects we are not
* going to pack.
*/
if (entry->preferred_base)
goto next;
progress_lock();
(*processed)++;
if (progress)
display_progress(&progress_state, *processed);
progress_unlock();
/* /*
* If the current object is at pack edge, take the depth the * If the current object is at pack edge, take the depth the
* objects that depend on the current object into account * objects that depend on the current object into account
@ -1513,7 +1542,7 @@ static void find_deltas(struct object_entry **list, int window, int depth)
m = array + other_idx; m = array + other_idx;
if (!m->entry) if (!m->entry)
break; break;
ret = try_delta(n, m, max_depth); ret = try_delta(n, m, max_depth, &mem_usage);
if (ret < 0) if (ret < 0)
break; break;
else if (ret > 0) else if (ret > 0)
@ -1552,9 +1581,6 @@ static void find_deltas(struct object_entry **list, int window, int depth)
idx = 0; idx = 0;
} while (i > 0); } while (i > 0);
if (progress)
stop_progress(&progress_state);
for (i = 0; i < window; ++i) { for (i = 0; i < window; ++i) {
free_delta_index(array[i].index); free_delta_index(array[i].index);
free(array[i].data); free(array[i].data);
@ -1562,21 +1588,145 @@ static void find_deltas(struct object_entry **list, int window, int depth)
free(array); free(array);
} }
#ifdef THREADED_DELTA_SEARCH
struct thread_params {
pthread_t thread;
struct object_entry **list;
unsigned list_size;
int window;
int depth;
unsigned *processed;
};
static pthread_mutex_t data_request = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t data_ready = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t data_provider = PTHREAD_MUTEX_INITIALIZER;
static struct thread_params *data_requester;
static void *threaded_find_deltas(void *arg)
{
struct thread_params *me = arg;
for (;;) {
pthread_mutex_lock(&data_request);
data_requester = me;
pthread_mutex_unlock(&data_provider);
pthread_mutex_lock(&data_ready);
pthread_mutex_unlock(&data_request);
if (!me->list_size)
return NULL;
find_deltas(me->list, me->list_size,
me->window, me->depth, me->processed);
}
}
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{
struct thread_params *target, p[delta_search_threads];
int i, ret;
unsigned chunk_size;
if (delta_search_threads <= 1) {
find_deltas(list, list_size, window, depth, processed);
return;
}
pthread_mutex_lock(&data_provider);
pthread_mutex_lock(&data_ready);
for (i = 0; i < delta_search_threads; i++) {
p[i].window = window;
p[i].depth = depth;
p[i].processed = processed;
ret = pthread_create(&p[i].thread, NULL,
threaded_find_deltas, &p[i]);
if (ret)
die("unable to create thread: %s", strerror(ret));
}
/* this should be auto-tuned somehow */
chunk_size = window * 1000;
do {
unsigned sublist_size = chunk_size;
if (sublist_size > list_size)
sublist_size = list_size;
/* try to split chunks on "path" boundaries */
while (sublist_size < list_size && list[sublist_size]->hash &&
list[sublist_size]->hash == list[sublist_size-1]->hash)
sublist_size++;
pthread_mutex_lock(&data_provider);
target = data_requester;
target->list = list;
target->list_size = sublist_size;
pthread_mutex_unlock(&data_ready);
list += sublist_size;
list_size -= sublist_size;
if (!sublist_size) {
pthread_join(target->thread, NULL);
i--;
}
} while (i);
}
#else
#define ll_find_deltas find_deltas
#endif
static void prepare_pack(int window, int depth) static void prepare_pack(int window, int depth)
{ {
struct object_entry **delta_list; struct object_entry **delta_list;
uint32_t i; uint32_t i, n, nr_deltas;
get_object_details(); get_object_details();
if (!window || !depth) if (!nr_objects || !window || !depth)
return; return;
delta_list = xmalloc(nr_objects * sizeof(*delta_list)); delta_list = xmalloc(nr_objects * sizeof(*delta_list));
for (i = 0; i < nr_objects; i++) nr_deltas = n = 0;
delta_list[i] = objects + i;
qsort(delta_list, nr_objects, sizeof(*delta_list), type_size_sort); for (i = 0; i < nr_objects; i++) {
find_deltas(delta_list, window+1, depth); struct object_entry *entry = objects + i;
if (entry->delta)
/* This happens if we decided to reuse existing
* delta from a pack. "!no_reuse_delta &&" is implied.
*/
continue;
if (entry->size < 50)
continue;
if (entry->no_try_delta)
continue;
if (!entry->preferred_base)
nr_deltas++;
delta_list[n++] = entry;
}
if (nr_deltas) {
unsigned nr_done = 0;
if (progress)
start_progress(&progress_state,
"Deltifying %u objects...", "",
nr_deltas);
qsort(delta_list, n, sizeof(*delta_list), type_size_sort);
ll_find_deltas(delta_list, n, window+1, depth, &nr_done);
if (progress)
stop_progress(&progress_state);
if (nr_done != nr_deltas)
die("inconsistency with delta count");
}
free(delta_list); free(delta_list);
} }
@ -1612,6 +1762,17 @@ static int git_pack_config(const char *k, const char *v)
cache_max_small_delta_size = git_config_int(k, v); cache_max_small_delta_size = git_config_int(k, v);
return 0; return 0;
} }
if (!strcmp(k, "pack.threads")) {
delta_search_threads = git_config_int(k, v);
if (delta_search_threads < 1)
die("invalid number of threads specified (%d)",
delta_search_threads);
#ifndef THREADED_DELTA_SEARCH
if (delta_search_threads > 1)
warning("no threads support, ignoring %s", k);
#endif
return 0;
}
return git_default_config(k, v); return git_default_config(k, v);
} }
@ -1771,6 +1932,18 @@ int cmd_pack_objects(int argc, const char **argv, const char *prefix)
usage(pack_usage); usage(pack_usage);
continue; continue;
} }
if (!prefixcmp(arg, "--threads=")) {
char *end;
delta_search_threads = strtoul(arg+10, &end, 0);
if (!arg[10] || *end || delta_search_threads < 1)
usage(pack_usage);
#ifndef THREADED_DELTA_SEARCH
if (delta_search_threads > 1)
warning("no threads support, "
"ignoring %s", arg);
#endif
continue;
}
if (!prefixcmp(arg, "--depth=")) { if (!prefixcmp(arg, "--depth=")) {
char *end; char *end;
depth = strtoul(arg+8, &end, 0); depth = strtoul(arg+8, &end, 0);