Message ID | 20181001134556.33232-8-peartben@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | speed up index load through parallelization | expand |
On Mon, Oct 1, 2018 at 3:46 PM Ben Peart <peartben@gmail.com> wrote: > +/* > + * A helper function that will load the specified range of cache entries > + * from the memory mapped file and add them to the given index. > + */ > +static unsigned long load_cache_entry_block(struct index_state *istate, > + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, Please use unsigned long for offset (here and in the thread_data struct). We should use off_t instead, but that's out of scope. At least keep offset type consistent in here. > + unsigned long start_offset, const struct cache_entry *previous_ce) I don't think you want to pass previous_ce in. You always pass NULL anyway. And if this function is about loading a block (i.e. at block boundary) then initial previous_ce _must_ be NULL or things break horribly. > @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data) > > #define THREAD_COST (10000) > > +struct load_cache_entries_thread_data > +{ > + pthread_t pthread; > + struct index_state *istate; > + struct mem_pool *ce_mem_pool; > + int offset; > + const char *mmap; > + struct index_entry_offset_table *ieot; > + int ieot_offset; /* starting index into the ieot array */ If it's an index, maybe just name it ieot_index and we can get rid of the comment. > + int ieot_work; /* count of ieot entries to process */ Maybe instead of saving the whole "ieot" table here. Add struct index_entry_offset *blocks; which points to the starting block for this thread and rename that mysterious (to me) ieot_work to nr_blocks. The thread will have access from blocks[0] to blocks[nr_blocks - 1] > + unsigned long consumed; /* return # of bytes in index file processed */ > +}; > + > +/* > + * A thread proc to run the load_cache_entries() computation > + * across multiple background threads. > + */ > +static void *load_cache_entries_thread(void *_data) > +{ > + struct load_cache_entries_thread_data *p = _data; > + int i; > + > + /* iterate across all ieot blocks assigned to this thread */ > + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) { > + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); Please wrap this long line. > + p->offset += p->ieot->entries[i].nr; > + } > + return NULL; > +} > + > +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, > + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) > +{ > + int i, offset, ieot_work, ieot_offset, err; > + struct load_cache_entries_thread_data *data; > + unsigned long consumed = 0; > + int nr; > + > + /* a little sanity checking */ > + if (istate->name_hash_initialized) > + BUG("the name hash isn't thread safe"); > + > + mem_pool_init(&istate->ce_mem_pool, 0); > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); we normally use sizeof(*data) instead of sizeof(struct ...) > + > + /* ensure we have no more threads than we have blocks to process */ > + if (nr_threads > ieot->nr) > + nr_threads = ieot->nr; > + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); eh.. reallocate the same "data"? > + > + offset = ieot_offset = 0; > + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads); > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + int j; > + > + if (ieot_offset + ieot_work > ieot->nr) > + ieot_work = ieot->nr - ieot_offset; > + > + p->istate = istate; > + p->offset = offset; > + p->mmap = mmap; > + p->ieot = ieot; > + p->ieot_offset = ieot_offset; > + p->ieot_work = ieot_work; > + > + /* create a mem_pool for each thread */ > + nr = 0; Since nr is only used in this for loop. Declare it in this scope instead of declaring it for the whole function. > + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++) > + nr += p->ieot->entries[j].nr; > + if (istate->version == 4) { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size_from_compressed(nr)); > + } > + else { > + mem_pool_init(&p->ce_mem_pool, > + estimate_cache_size(mmap_size, nr)); > + } Maybe keep this mem_pool_init code inside load_cache_entries_thread(), similar to how you do it for load_cache_entries_thread(). It's mostly to keep this loop shorter to see (and understand), of course parallelizing this mem_pool_init() is just noise. > + > + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); > + if (err) > + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); > + > + /* increment by the number of cache entries in the ieot block being processed */ > + for (j = 0; j < ieot_work; j++) > + offset += ieot->entries[ieot_offset + j].nr; I wonder if it makes things simpler if you store cache_entry _index_ in entrie[] array instead of storing the number of entries. You can easily calculate nr then by doing entries[i].index - entries[i-1].index. And you can count multiple blocks the same way, without looping like this. > + ieot_offset += ieot_work; > + } > + > + for (i = 0; i < nr_threads; i++) { > + struct load_cache_entries_thread_data *p = &data[i]; > + > + err = pthread_join(p->pthread, NULL); > + if (err) > + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); > + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); > + consumed += p->consumed; > + } > + > + free(data); > + > + return consumed; > +} > +#endif > + > /* remember to discard_cache() before reading a different cache! */ > int do_read_index(struct index_state *istate, const char *path, int must_exist) > { > - int fd, i; > + int fd; > struct stat st; > unsigned long src_offset; > const struct cache_header *hdr; > const char *mmap; > size_t mmap_size; > - const struct cache_entry *previous_ce = NULL; > struct load_index_extensions p; > size_t extension_offset = 0; > #ifndef NO_PTHREADS > - int nr_threads; > + int nr_threads, cpus; > + struct index_entry_offset_table *ieot = NULL; > #endif > > if (istate->initialized) > @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > p.mmap = mmap; > p.mmap_size = mmap_size; > > + src_offset = sizeof(*hdr); OK we've been doing this since forever, sizeof(struct cache_header) probably does not have extra padding on any supported platform. > @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) > nr_threads--; > } > } > -#endif > - > - if (istate->version == 4) { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size_from_compressed(istate->cache_nr)); > - } else { > - mem_pool_init(&istate->ce_mem_pool, > - estimate_cache_size(mmap_size, istate->cache_nr)); > - } > > - src_offset = sizeof(*hdr); > - for (i = 0; i < istate->cache_nr; i++) { > - struct ondisk_cache_entry *disk_ce; > - struct cache_entry *ce; > - unsigned long consumed; > + /* > + * Locate and read the index entry offset table so that we can use it > + * to multi-thread the reading of the cache entries. > + */ > + if (extension_offset && nr_threads > 1) > + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); You need to free ieot at some point. > > - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); > - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); > - set_index_entry(istate, i, ce); > + if (ieot) > + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); > + else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#else > + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); > +#endif > > - src_offset += consumed; > - previous_ce = ce; > - } > istate->timestamp.sec = st.st_mtime; > istate->timestamp.nsec = ST_MTIME_NSEC(st); > > -- > 2.18.0.windows.1 >
On 10/1/2018 1:09 PM, Duy Nguyen wrote: > On Mon, Oct 1, 2018 at 3:46 PM Ben Peart <peartben@gmail.com> wrote: >> +/* >> + * A helper function that will load the specified range of cache entries >> + * from the memory mapped file and add them to the given index. >> + */ >> +static unsigned long load_cache_entry_block(struct index_state *istate, >> + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, > > Please use unsigned long for offset (here and in the thread_data > struct). We should use off_t instead, but that's out of scope. At > least keep offset type consistent in here. > Unfortunately, this code is littered with different types for size and offset. "int" is the most common but there are also off_t, size_t and some unsigned long as well. Currently all of them are at least 32 bits so until we need to have an index larger than 32 bits, we should be OK. I agree, fixing them all is outside the scope of this patch. >> + unsigned long start_offset, const struct cache_entry *previous_ce) > > I don't think you want to pass previous_ce in. You always pass NULL > anyway. And if this function is about loading a block (i.e. at block > boundary) then initial previous_ce _must_ be NULL or things break > horribly. > The function as written can load any arbitrary subset of cache entries as long as previous_ce is set correctly. I currently only use it on block boundaries but I don't see any good reason to limit its capabilities by moving what code passes the NULL in one function deeper. >> @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data) >> >> #define THREAD_COST (10000) >> >> +struct load_cache_entries_thread_data >> +{ >> + pthread_t pthread; >> + struct index_state *istate; >> + struct mem_pool *ce_mem_pool; >> + int offset; >> + const char *mmap; >> + struct index_entry_offset_table *ieot; >> + int ieot_offset; /* starting index into the ieot array */ > > If it's an index, maybe just name it ieot_index and we can get rid of > the comment. > >> + int ieot_work; /* count of ieot entries to process */ > > Maybe instead of saving the whole "ieot" table here. Add > > struct index_entry_offset *blocks; > > which points to the starting block for this thread and rename that > mysterious (to me) ieot_work to nr_blocks. The thread will have access > from blocks[0] to blocks[nr_blocks - 1] > Meh. Either way you have to figure out there are a block of entries and each thread is going to process some subset of those entries. You can do the base + offset math here or down in the calling function but it has to happen (and be understood) either way. I'll rename ieot_offset to ieot_start and ieot_work to ieot_blocks which should hopefully help make it more obvious what they do. >> + unsigned long consumed; /* return # of bytes in index file processed */ >> +}; >> + >> +/* >> + * A thread proc to run the load_cache_entries() computation >> + * across multiple background threads. >> + */ >> +static void *load_cache_entries_thread(void *_data) >> +{ >> + struct load_cache_entries_thread_data *p = _data; >> + int i; >> + >> + /* iterate across all ieot blocks assigned to this thread */ >> + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) { >> + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); > > Please wrap this long line. > >> + p->offset += p->ieot->entries[i].nr; >> + } >> + return NULL; >> +} >> + >> +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, >> + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) >> +{ >> + int i, offset, ieot_work, ieot_offset, err; >> + struct load_cache_entries_thread_data *data; >> + unsigned long consumed = 0; >> + int nr; >> + >> + /* a little sanity checking */ >> + if (istate->name_hash_initialized) >> + BUG("the name hash isn't thread safe"); >> + >> + mem_pool_init(&istate->ce_mem_pool, 0); >> + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); > > we normally use sizeof(*data) instead of sizeof(struct ...) > >> + >> + /* ensure we have no more threads than we have blocks to process */ >> + if (nr_threads > ieot->nr) >> + nr_threads = ieot->nr; >> + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); > > eh.. reallocate the same "data"? > Thanks, good catch - I hate leaky code. >> + >> + offset = ieot_offset = 0; >> + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads); >> + for (i = 0; i < nr_threads; i++) { >> + struct load_cache_entries_thread_data *p = &data[i]; >> + int j; >> + >> + if (ieot_offset + ieot_work > ieot->nr) >> + ieot_work = ieot->nr - ieot_offset; >> + >> + p->istate = istate; >> + p->offset = offset; >> + p->mmap = mmap; >> + p->ieot = ieot; >> + p->ieot_offset = ieot_offset; >> + p->ieot_work = ieot_work; >> + >> + /* create a mem_pool for each thread */ >> + nr = 0; > > Since nr is only used in this for loop. Declare it in this scope > instead of declaring it for the whole function. > >> + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++) >> + nr += p->ieot->entries[j].nr; >> + if (istate->version == 4) { >> + mem_pool_init(&p->ce_mem_pool, >> + estimate_cache_size_from_compressed(nr)); >> + } >> + else { >> + mem_pool_init(&p->ce_mem_pool, >> + estimate_cache_size(mmap_size, nr)); >> + } > > Maybe keep this mem_pool_init code inside load_cache_entries_thread(), > similar to how you do it for load_cache_entries_thread(). It's mostly > to keep this loop shorter to see (and understand), of course > parallelizing this mem_pool_init() is just noise. > I understand the desire to get that part of the thread initialization out of the main line of this function (it's a bit messy between the entry counting and version differences) but I prefer to have all the thread initialization completed before creating the thread. That allows for simpler error handling and helps minimize the state you have to pass into the thread (mmap_size in this case). >> + >> + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); >> + if (err) >> + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); >> + >> + /* increment by the number of cache entries in the ieot block being processed */ >> + for (j = 0; j < ieot_work; j++) >> + offset += ieot->entries[ieot_offset + j].nr; > > I wonder if it makes things simpler if you store cache_entry _index_ > in entrie[] array instead of storing the number of entries. You can > easily calculate nr then by doing entries[i].index - > entries[i-1].index. And you can count multiple blocks the same way, > without looping like this. > >> + ieot_offset += ieot_work; >> + } >> + >> + for (i = 0; i < nr_threads; i++) { >> + struct load_cache_entries_thread_data *p = &data[i]; >> + >> + err = pthread_join(p->pthread, NULL); >> + if (err) >> + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); >> + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); >> + consumed += p->consumed; >> + } >> + >> + free(data); >> + >> + return consumed; >> +} >> +#endif >> + >> /* remember to discard_cache() before reading a different cache! */ >> int do_read_index(struct index_state *istate, const char *path, int must_exist) >> { >> - int fd, i; >> + int fd; >> struct stat st; >> unsigned long src_offset; >> const struct cache_header *hdr; >> const char *mmap; >> size_t mmap_size; >> - const struct cache_entry *previous_ce = NULL; >> struct load_index_extensions p; >> size_t extension_offset = 0; >> #ifndef NO_PTHREADS >> - int nr_threads; >> + int nr_threads, cpus; >> + struct index_entry_offset_table *ieot = NULL; >> #endif >> >> if (istate->initialized) >> @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) >> p.mmap = mmap; >> p.mmap_size = mmap_size; >> >> + src_offset = sizeof(*hdr); > > OK we've been doing this since forever, sizeof(struct cache_header) > probably does not have extra padding on any supported platform. > >> @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) >> nr_threads--; >> } >> } >> -#endif >> - >> - if (istate->version == 4) { >> - mem_pool_init(&istate->ce_mem_pool, >> - estimate_cache_size_from_compressed(istate->cache_nr)); >> - } else { >> - mem_pool_init(&istate->ce_mem_pool, >> - estimate_cache_size(mmap_size, istate->cache_nr)); >> - } >> >> - src_offset = sizeof(*hdr); >> - for (i = 0; i < istate->cache_nr; i++) { >> - struct ondisk_cache_entry *disk_ce; >> - struct cache_entry *ce; >> - unsigned long consumed; >> + /* >> + * Locate and read the index entry offset table so that we can use it >> + * to multi-thread the reading of the cache entries. >> + */ >> + if (extension_offset && nr_threads > 1) >> + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); > > You need to free ieot at some point. > Good catch - I hate leaky code. >> >> - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); >> - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); >> - set_index_entry(istate, i, ce); >> + if (ieot) >> + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); >> + else >> + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); >> +#else >> + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); >> +#endif >> >> - src_offset += consumed; >> - previous_ce = ce; >> - } >> istate->timestamp.sec = st.st_mtime; >> istate->timestamp.nsec = ST_MTIME_NSEC(st); >> >> -- >> 2.18.0.windows.1 >>
diff --git a/read-cache.c b/read-cache.c index 9557376e78..14402a0738 100644 --- a/read-cache.c +++ b/read-cache.c @@ -1720,7 +1720,8 @@ int read_index(struct index_state *istate) return read_index_from(istate, get_index_file(), get_git_dir()); } -static struct cache_entry *create_from_disk(struct index_state *istate, +static struct cache_entry *create_from_disk(struct mem_pool *ce_mem_pool, + unsigned int version, struct ondisk_cache_entry *ondisk, unsigned long *ent_size, const struct cache_entry *previous_ce) @@ -1737,7 +1738,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate, * number of bytes to be stripped from the end of the previous name, * and the bytes to append to the result, to come up with its name. */ - int expand_name_field = istate->version == 4; + int expand_name_field = version == 4; /* On-disk flags are just 16 bits */ flags = get_be16(&ondisk->flags); @@ -1761,16 +1762,17 @@ static struct cache_entry *create_from_disk(struct index_state *istate, const unsigned char *cp = (const unsigned char *)name; size_t strip_len, previous_len; - previous_len = previous_ce ? previous_ce->ce_namelen : 0; + /* If we're at the begining of a block, ignore the previous name */ strip_len = decode_varint(&cp); - if (previous_len < strip_len) { - if (previous_ce) + if (previous_ce) { + previous_len = previous_ce->ce_namelen; + if (previous_len < strip_len) die(_("malformed name field in the index, near path '%s'"), - previous_ce->name); - else - die(_("malformed name field in the index in the first path")); + previous_ce->name); + copy_len = previous_len - strip_len; + } else { + copy_len = 0; } - copy_len = previous_len - strip_len; name = (const char *)cp; } @@ -1780,7 +1782,7 @@ static struct cache_entry *create_from_disk(struct index_state *istate, len += copy_len; } - ce = mem_pool__ce_alloc(istate->ce_mem_pool, len); + ce = mem_pool__ce_alloc(ce_mem_pool, len); ce->ce_stat_data.sd_ctime.sec = get_be32(&ondisk->ctime.sec); ce->ce_stat_data.sd_mtime.sec = get_be32(&ondisk->mtime.sec); @@ -1950,6 +1952,52 @@ static void *load_index_extensions(void *_data) return NULL; } +/* + * A helper function that will load the specified range of cache entries + * from the memory mapped file and add them to the given index. + */ +static unsigned long load_cache_entry_block(struct index_state *istate, + struct mem_pool *ce_mem_pool, int offset, int nr, const char *mmap, + unsigned long start_offset, const struct cache_entry *previous_ce) +{ + int i; + unsigned long src_offset = start_offset; + + for (i = offset; i < offset + nr; i++) { + struct ondisk_cache_entry *disk_ce; + struct cache_entry *ce; + unsigned long consumed; + + disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); + ce = create_from_disk(ce_mem_pool, istate->version, disk_ce, &consumed, previous_ce); + set_index_entry(istate, i, ce); + + src_offset += consumed; + previous_ce = ce; + } + return src_offset - start_offset; +} + +static unsigned long load_all_cache_entries(struct index_state *istate, + const char *mmap, size_t mmap_size, unsigned long src_offset) +{ + unsigned long consumed; + + if (istate->version == 4) { + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size_from_compressed(istate->cache_nr)); + } else { + mem_pool_init(&istate->ce_mem_pool, + estimate_cache_size(mmap_size, istate->cache_nr)); + } + + consumed = load_cache_entry_block(istate, istate->ce_mem_pool, + 0, istate->cache_nr, mmap, src_offset, NULL); + return consumed; +} + +#ifndef NO_PTHREADS + /* * Mostly randomly chosen maximum thread counts: we * cap the parallelism to online_cpus() threads, and we want @@ -1959,20 +2007,125 @@ static void *load_index_extensions(void *_data) #define THREAD_COST (10000) +struct load_cache_entries_thread_data +{ + pthread_t pthread; + struct index_state *istate; + struct mem_pool *ce_mem_pool; + int offset; + const char *mmap; + struct index_entry_offset_table *ieot; + int ieot_offset; /* starting index into the ieot array */ + int ieot_work; /* count of ieot entries to process */ + unsigned long consumed; /* return # of bytes in index file processed */ +}; + +/* + * A thread proc to run the load_cache_entries() computation + * across multiple background threads. + */ +static void *load_cache_entries_thread(void *_data) +{ + struct load_cache_entries_thread_data *p = _data; + int i; + + /* iterate across all ieot blocks assigned to this thread */ + for (i = p->ieot_offset; i < p->ieot_offset + p->ieot_work; i++) { + p->consumed += load_cache_entry_block(p->istate, p->ce_mem_pool, p->offset, p->ieot->entries[i].nr, p->mmap, p->ieot->entries[i].offset, NULL); + p->offset += p->ieot->entries[i].nr; + } + return NULL; +} + +static unsigned long load_cache_entries_threaded(struct index_state *istate, const char *mmap, size_t mmap_size, + unsigned long src_offset, int nr_threads, struct index_entry_offset_table *ieot) +{ + int i, offset, ieot_work, ieot_offset, err; + struct load_cache_entries_thread_data *data; + unsigned long consumed = 0; + int nr; + + /* a little sanity checking */ + if (istate->name_hash_initialized) + BUG("the name hash isn't thread safe"); + + mem_pool_init(&istate->ce_mem_pool, 0); + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); + + /* ensure we have no more threads than we have blocks to process */ + if (nr_threads > ieot->nr) + nr_threads = ieot->nr; + data = xcalloc(nr_threads, sizeof(struct load_cache_entries_thread_data)); + + offset = ieot_offset = 0; + ieot_work = DIV_ROUND_UP(ieot->nr, nr_threads); + for (i = 0; i < nr_threads; i++) { + struct load_cache_entries_thread_data *p = &data[i]; + int j; + + if (ieot_offset + ieot_work > ieot->nr) + ieot_work = ieot->nr - ieot_offset; + + p->istate = istate; + p->offset = offset; + p->mmap = mmap; + p->ieot = ieot; + p->ieot_offset = ieot_offset; + p->ieot_work = ieot_work; + + /* create a mem_pool for each thread */ + nr = 0; + for (j = p->ieot_offset; j < p->ieot_offset + p->ieot_work; j++) + nr += p->ieot->entries[j].nr; + if (istate->version == 4) { + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size_from_compressed(nr)); + } + else { + mem_pool_init(&p->ce_mem_pool, + estimate_cache_size(mmap_size, nr)); + } + + err = pthread_create(&p->pthread, NULL, load_cache_entries_thread, p); + if (err) + die(_("unable to create load_cache_entries thread: %s"), strerror(err)); + + /* increment by the number of cache entries in the ieot block being processed */ + for (j = 0; j < ieot_work; j++) + offset += ieot->entries[ieot_offset + j].nr; + ieot_offset += ieot_work; + } + + for (i = 0; i < nr_threads; i++) { + struct load_cache_entries_thread_data *p = &data[i]; + + err = pthread_join(p->pthread, NULL); + if (err) + die(_("unable to join load_cache_entries thread: %s"), strerror(err)); + mem_pool_combine(istate->ce_mem_pool, p->ce_mem_pool); + consumed += p->consumed; + } + + free(data); + + return consumed; +} +#endif + /* remember to discard_cache() before reading a different cache! */ int do_read_index(struct index_state *istate, const char *path, int must_exist) { - int fd, i; + int fd; struct stat st; unsigned long src_offset; const struct cache_header *hdr; const char *mmap; size_t mmap_size; - const struct cache_entry *previous_ce = NULL; struct load_index_extensions p; size_t extension_offset = 0; #ifndef NO_PTHREADS - int nr_threads; + int nr_threads, cpus; + struct index_entry_offset_table *ieot = NULL; #endif if (istate->initialized) @@ -2014,10 +2167,18 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) p.mmap = mmap; p.mmap_size = mmap_size; + src_offset = sizeof(*hdr); + #ifndef NO_PTHREADS nr_threads = git_config_get_index_threads(); - if (!nr_threads) - nr_threads = online_cpus(); + + /* TODO: does creating more threads than cores help? */ + if (!nr_threads) { + nr_threads = istate->cache_nr / THREAD_COST; + cpus = online_cpus(); + if (nr_threads > cpus) + nr_threads = cpus; + } if (nr_threads > 1) { extension_offset = read_eoie_extension(mmap, mmap_size); @@ -2032,29 +2193,22 @@ int do_read_index(struct index_state *istate, const char *path, int must_exist) nr_threads--; } } -#endif - - if (istate->version == 4) { - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size_from_compressed(istate->cache_nr)); - } else { - mem_pool_init(&istate->ce_mem_pool, - estimate_cache_size(mmap_size, istate->cache_nr)); - } - src_offset = sizeof(*hdr); - for (i = 0; i < istate->cache_nr; i++) { - struct ondisk_cache_entry *disk_ce; - struct cache_entry *ce; - unsigned long consumed; + /* + * Locate and read the index entry offset table so that we can use it + * to multi-thread the reading of the cache entries. + */ + if (extension_offset && nr_threads > 1) + ieot = read_ieot_extension(mmap, mmap_size, extension_offset); - disk_ce = (struct ondisk_cache_entry *)(mmap + src_offset); - ce = create_from_disk(istate, disk_ce, &consumed, previous_ce); - set_index_entry(istate, i, ce); + if (ieot) + src_offset += load_cache_entries_threaded(istate, mmap, mmap_size, src_offset, nr_threads, ieot); + else + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); +#else + src_offset += load_all_cache_entries(istate, mmap, mmap_size, src_offset); +#endif - src_offset += consumed; - previous_ce = ce; - } istate->timestamp.sec = st.st_mtime; istate->timestamp.nsec = ST_MTIME_NSEC(st);