[12/22] ext4: add htree lock implementation
diff mbox series

Message ID 1563758631-29550-13-git-send-email-jsimmons@infradead.org
State New
Headers show
Series
  • ldiskfs patches against 5.2-rc2+
Related show

Commit Message

James Simmons July 22, 2019, 1:23 a.m. UTC
Used for parallel lookup

Signed-off-by: James Simmons <jsimmons@infradead.org>
---
 fs/ext4/Makefile     |   4 +-
 fs/ext4/ext4.h       |  80 +++++
 fs/ext4/htree_lock.c | 891 +++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ext4/htree_lock.h | 187 +++++++++++
 fs/ext4/namei.c      | 461 +++++++++++++++++++++++---
 fs/ext4/super.c      |   1 +
 6 files changed, 1583 insertions(+), 41 deletions(-)
 create mode 100644 fs/ext4/htree_lock.c
 create mode 100644 fs/ext4/htree_lock.h

Comments

NeilBrown July 22, 2019, 5:10 a.m. UTC | #1
On Sun, Jul 21 2019, James Simmons wrote:

> Used for parallel lookup

This needs more time for review than I have just now ...
I suspect the full EX,PW,PR,CW,CR locking is overkill, but
I won't know until I read the code (unlikely to happen this year).

Also, we need parallel lookups at the VFS level - I do have code for
that.

NeilBrown

Patch
diff mbox series

diff --git a/fs/ext4/Makefile b/fs/ext4/Makefile
index 8fdfcd3..0d0a28f 100644
--- a/fs/ext4/Makefile
+++ b/fs/ext4/Makefile
@@ -6,8 +6,8 @@ 
 obj-$(CONFIG_EXT4_FS) += ext4.o
 
 ext4-y	:= balloc.o bitmap.o block_validity.o dir.o ext4_jbd2.o extents.o \
-		extents_status.o file.o fsmap.o fsync.o hash.o ialloc.o \
-		indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
+		extents_status.o file.o fsmap.o fsync.o hash.o htree_lock.o \
+		ialloc.o indirect.o inline.o inode.o ioctl.o mballoc.o migrate.o \
 		mmp.o move_extent.o namei.o page-io.o readpage.o resize.o \
 		super.o symlink.o sysfs.o xattr.o xattr_trusted.o xattr_user.o
 
diff --git a/fs/ext4/ext4.h b/fs/ext4/ext4.h
index 80601a9..bf74c7c 100644
--- a/fs/ext4/ext4.h
+++ b/fs/ext4/ext4.h
@@ -44,6 +44,8 @@ 
 
 #include <linux/compiler.h>
 
+#include "htree_lock.h"
+
 /*
  * The fourth extended filesystem constants/structures
  */
@@ -936,6 +938,9 @@  struct ext4_inode_info {
 	__u32	i_dtime;
 	ext4_fsblk_t	i_file_acl;
 
+	/* following fields for parallel directory operations -bzzz */
+	struct semaphore i_append_sem;
+
 	/*
 	 * i_block_group is the number of the block group which contains
 	 * this file's inode.  Constant across the lifetime of the inode,
@@ -2145,6 +2150,72 @@  struct dx_hash_info
  */
 #define HASH_NB_ALWAYS		1
 
+/* assume name-hash is protected by upper layer */
+#define EXT4_HTREE_LOCK_HASH	0
+
+enum ext4_pdo_lk_types {
+#if EXT4_HTREE_LOCK_HASH
+	EXT4_LK_HASH,
+#endif
+	EXT4_LK_DX,		/* index block */
+	EXT4_LK_DE,		/* directory entry block */
+	EXT4_LK_SPIN,		/* spinlock */
+	EXT4_LK_MAX,
+};
+
+/* read-only bit */
+#define EXT4_LB_RO(b)		(1 << (b))
+/* read + write, high bits for writer */
+#define EXT4_LB_RW(b)		((1 << (b)) | (1 << (EXT4_LK_MAX + (b))))
+
+enum ext4_pdo_lock_bits {
+	/* DX lock bits */
+	EXT4_LB_DX_RO		= EXT4_LB_RO(EXT4_LK_DX),
+	EXT4_LB_DX		= EXT4_LB_RW(EXT4_LK_DX),
+	/* DE lock bits */
+	EXT4_LB_DE_RO		= EXT4_LB_RO(EXT4_LK_DE),
+	EXT4_LB_DE		= EXT4_LB_RW(EXT4_LK_DE),
+	/* DX spinlock bits */
+	EXT4_LB_SPIN_RO		= EXT4_LB_RO(EXT4_LK_SPIN),
+	EXT4_LB_SPIN		= EXT4_LB_RW(EXT4_LK_SPIN),
+	/* accurate searching */
+	EXT4_LB_EXACT		= EXT4_LB_RO(EXT4_LK_MAX << 1),
+};
+
+enum ext4_pdo_lock_opc {
+	/* external */
+	EXT4_HLOCK_READDIR	= (EXT4_LB_DE_RO | EXT4_LB_DX_RO),
+	EXT4_HLOCK_LOOKUP	= (EXT4_LB_DE_RO | EXT4_LB_SPIN_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_DEL		= (EXT4_LB_DE | EXT4_LB_SPIN_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_ADD		= (EXT4_LB_DE | EXT4_LB_SPIN_RO),
+
+	/* internal */
+	EXT4_HLOCK_LOOKUP_SAFE	= (EXT4_LB_DE_RO | EXT4_LB_DX_RO |
+				   EXT4_LB_EXACT),
+	EXT4_HLOCK_DEL_SAFE	= (EXT4_LB_DE | EXT4_LB_DX_RO | EXT4_LB_EXACT),
+	EXT4_HLOCK_SPLIT	= (EXT4_LB_DE | EXT4_LB_DX | EXT4_LB_SPIN),
+};
+
+extern struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits);
+#define ext4_htree_lock_head_free(lhead)	htree_lock_head_free(lhead)
+
+extern struct htree_lock *ext4_htree_lock_alloc(void);
+#define ext4_htree_lock_free(lck)		htree_lock_free(lck)
+
+extern void ext4_htree_lock(struct htree_lock *lck,
+			    struct htree_lock_head *lhead,
+			    struct inode *dir, unsigned flags);
+#define ext4_htree_unlock(lck)			htree_unlock(lck)
+
+extern struct buffer_head *ext4_find_entry_locked(struct inode *dir,
+					const struct qstr *d_name,
+					struct ext4_dir_entry_2 **res_dir,
+					int *inlined, struct htree_lock *lck);
+extern int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+				 struct inode *inode, struct htree_lock *lck);
+
 struct ext4_filename {
 	const struct qstr *usr_fname;
 	struct fscrypt_str disk_name;
@@ -2479,8 +2550,17 @@  void ext4_insert_dentry(struct inode *inode,
 			struct ext4_filename *fname, void *data);
 static inline void ext4_update_dx_flag(struct inode *inode)
 {
+	/* Disable it for ldiskfs, because going from a DX directory to
+	 * a non-DX directory while it is in use will completely break
+	 * the htree-locking.
+	 * If we really want to support this operation in the future,
+	 * we need to exclusively lock the directory at here which will
+	 * increase complexity of code
+	 */
+#if 0
 	if (!ext4_has_feature_dir_index(inode->i_sb))
 		ext4_clear_inode_flag(inode, EXT4_INODE_INDEX);
+#endif
 }
 static const unsigned char ext4_filetype_table[] = {
 	DT_UNKNOWN, DT_REG, DT_DIR, DT_CHR, DT_BLK, DT_FIFO, DT_SOCK, DT_LNK
diff --git a/fs/ext4/htree_lock.c b/fs/ext4/htree_lock.c
new file mode 100644
index 0000000..9c1e8f0
--- /dev/null
+++ b/fs/ext4/htree_lock.c
@@ -0,0 +1,891 @@ 
+/*
+ * fs/ext4/htree_lock.c
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ */
+#include <linux/jbd2.h>
+#include <linux/hash.h>
+#include <linux/module.h>
+#include "htree_lock.h"
+
+enum {
+	HTREE_LOCK_BIT_EX	= (1 << HTREE_LOCK_EX),
+	HTREE_LOCK_BIT_PW	= (1 << HTREE_LOCK_PW),
+	HTREE_LOCK_BIT_PR	= (1 << HTREE_LOCK_PR),
+	HTREE_LOCK_BIT_CW	= (1 << HTREE_LOCK_CW),
+	HTREE_LOCK_BIT_CR	= (1 << HTREE_LOCK_CR),
+};
+
+enum {
+	HTREE_LOCK_COMPAT_EX	= 0,
+	HTREE_LOCK_COMPAT_PW	= HTREE_LOCK_COMPAT_EX | HTREE_LOCK_BIT_CR,
+	HTREE_LOCK_COMPAT_PR	= HTREE_LOCK_COMPAT_PW | HTREE_LOCK_BIT_PR,
+	HTREE_LOCK_COMPAT_CW	= HTREE_LOCK_COMPAT_PW | HTREE_LOCK_BIT_CW,
+	HTREE_LOCK_COMPAT_CR	= HTREE_LOCK_COMPAT_CW | HTREE_LOCK_BIT_PR |
+				  HTREE_LOCK_BIT_PW,
+};
+
+static int htree_lock_compat[] = {
+	[HTREE_LOCK_EX]		HTREE_LOCK_COMPAT_EX,
+	[HTREE_LOCK_PW]		HTREE_LOCK_COMPAT_PW,
+	[HTREE_LOCK_PR]		HTREE_LOCK_COMPAT_PR,
+	[HTREE_LOCK_CW]		HTREE_LOCK_COMPAT_CW,
+	[HTREE_LOCK_CR]		HTREE_LOCK_COMPAT_CR,
+};
+
+/* max allowed htree-lock depth.
+ * We only need depth=3 for ext4 although user can have higher value. */
+#define HTREE_LOCK_DEP_MAX	16
+
+#ifdef HTREE_LOCK_DEBUG
+
+static char *hl_name[] = {
+	[HTREE_LOCK_EX]		"EX",
+	[HTREE_LOCK_PW]		"PW",
+	[HTREE_LOCK_PR]		"PR",
+	[HTREE_LOCK_CW]		"CW",
+	[HTREE_LOCK_CR]		"CR",
+};
+
+/* lock stats */
+struct htree_lock_node_stats {
+	unsigned long long	blocked[HTREE_LOCK_MAX];
+	unsigned long long	granted[HTREE_LOCK_MAX];
+	unsigned long long	retried[HTREE_LOCK_MAX];
+	unsigned long long	events;
+};
+
+struct htree_lock_stats {
+	struct htree_lock_node_stats	nodes[HTREE_LOCK_DEP_MAX];
+	unsigned long long	granted[HTREE_LOCK_MAX];
+	unsigned long long	blocked[HTREE_LOCK_MAX];
+};
+
+static struct htree_lock_stats hl_stats;
+
+void htree_lock_stat_reset(void)
+{
+	memset(&hl_stats, 0, sizeof(hl_stats));
+}
+
+void htree_lock_stat_print(int depth)
+{
+	int     i;
+	int	j;
+
+	printk(KERN_DEBUG "HTREE LOCK STATS:\n");
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		printk(KERN_DEBUG "[%s]: G [%10llu], B [%10llu]\n",
+		       hl_name[i], hl_stats.granted[i], hl_stats.blocked[i]);
+	}
+	for (i = 0; i < depth; i++) {
+		printk(KERN_DEBUG "HTREE CHILD [%d] STATS:\n", i);
+		for (j = 0; j < HTREE_LOCK_MAX; j++) {
+			printk(KERN_DEBUG
+				"[%s]: G [%10llu], B [%10llu], R [%10llu]\n",
+				hl_name[j], hl_stats.nodes[i].granted[j],
+				hl_stats.nodes[i].blocked[j],
+				hl_stats.nodes[i].retried[j]);
+		}
+	}
+}
+
+#define lk_grant_inc(m)       do { hl_stats.granted[m]++; } while (0)
+#define lk_block_inc(m)       do { hl_stats.blocked[m]++; } while (0)
+#define ln_grant_inc(d, m)    do { hl_stats.nodes[d].granted[m]++; } while (0)
+#define ln_block_inc(d, m)    do { hl_stats.nodes[d].blocked[m]++; } while (0)
+#define ln_retry_inc(d, m)    do { hl_stats.nodes[d].retried[m]++; } while (0)
+#define ln_event_inc(d)       do { hl_stats.nodes[d].events++; } while (0)
+
+#else /* !DEBUG */
+
+void htree_lock_stat_reset(void) {}
+void htree_lock_stat_print(int depth) {}
+
+#define lk_grant_inc(m)	      do {} while (0)
+#define lk_block_inc(m)	      do {} while (0)
+#define ln_grant_inc(d, m)    do {} while (0)
+#define ln_block_inc(d, m)    do {} while (0)
+#define ln_retry_inc(d, m)    do {} while (0)
+#define ln_event_inc(d)	      do {} while (0)
+
+#endif /* DEBUG */
+
+EXPORT_SYMBOL(htree_lock_stat_reset);
+EXPORT_SYMBOL(htree_lock_stat_print);
+
+#define HTREE_DEP_ROOT		  (-1)
+
+#define htree_spin_lock(lhead, dep)				\
+	bit_spin_lock((dep) + 1, &(lhead)->lh_lock)
+#define htree_spin_unlock(lhead, dep)				\
+	bit_spin_unlock((dep) + 1, &(lhead)->lh_lock)
+
+#define htree_key_event_ignore(child, ln)			\
+	(!((child)->lc_events & (1 << (ln)->ln_mode)))
+
+static int
+htree_key_list_empty(struct htree_lock_node *ln)
+{
+	return list_empty(&ln->ln_major_list) && list_empty(&ln->ln_minor_list);
+}
+
+static void
+htree_key_list_del_init(struct htree_lock_node *ln)
+{
+	struct htree_lock_node *tmp = NULL;
+
+	if (!list_empty(&ln->ln_minor_list)) {
+		tmp = list_entry(ln->ln_minor_list.next,
+				 struct htree_lock_node, ln_minor_list);
+		list_del_init(&ln->ln_minor_list);
+	}
+
+	if (list_empty(&ln->ln_major_list))
+		return;
+
+	if (tmp == NULL) { /* not on minor key list */
+		list_del_init(&ln->ln_major_list);
+	} else {
+		BUG_ON(!list_empty(&tmp->ln_major_list));
+		list_replace_init(&ln->ln_major_list, &tmp->ln_major_list);
+	}
+}
+
+static void
+htree_key_list_replace_init(struct htree_lock_node *old,
+			    struct htree_lock_node *new)
+{
+	if (!list_empty(&old->ln_major_list))
+		list_replace_init(&old->ln_major_list, &new->ln_major_list);
+
+	if (!list_empty(&old->ln_minor_list))
+		list_replace_init(&old->ln_minor_list, &new->ln_minor_list);
+}
+
+static void
+htree_key_event_enqueue(struct htree_lock_child *child,
+			struct htree_lock_node *ln, int dep, void *event)
+{
+	struct htree_lock_node *tmp;
+
+	/* NB: ALWAYS called holding lhead::lh_lock(dep) */
+	BUG_ON(ln->ln_mode == HTREE_LOCK_NL);
+	if (event == NULL || htree_key_event_ignore(child, ln))
+		return;
+
+	/* shouldn't be a very long list */
+	list_for_each_entry(tmp, &ln->ln_alive_list, ln_alive_list) {
+		if (tmp->ln_mode == HTREE_LOCK_NL) {
+			ln_event_inc(dep);
+			if (child->lc_callback != NULL)
+				child->lc_callback(tmp->ln_ev_target, event);
+		}
+	}
+}
+
+static int
+htree_node_lock_enqueue(struct htree_lock *newlk, struct htree_lock *curlk,
+			unsigned dep, int wait, void *event)
+{
+	struct htree_lock_child *child = &newlk->lk_head->lh_children[dep];
+	struct htree_lock_node *newln = &newlk->lk_nodes[dep];
+	struct htree_lock_node *curln = &curlk->lk_nodes[dep];
+
+	/* NB: ALWAYS called holding lhead::lh_lock(dep) */
+	/* NB: we only expect PR/PW lock mode at here, only these two modes are
+	 * allowed for htree_node_lock(asserted in htree_node_lock_internal),
+	 * NL is only used for listener, user can't directly require NL mode */
+	if ((curln->ln_mode == HTREE_LOCK_NL) ||
+	    (curln->ln_mode != HTREE_LOCK_PW &&
+	     newln->ln_mode != HTREE_LOCK_PW)) {
+		/* no conflict, attach it on granted list of @curlk */
+		if (curln->ln_mode != HTREE_LOCK_NL) {
+			list_add(&newln->ln_granted_list,
+				 &curln->ln_granted_list);
+		} else {
+			/* replace key owner */
+			htree_key_list_replace_init(curln, newln);
+		}
+
+		list_add(&newln->ln_alive_list, &curln->ln_alive_list);
+		htree_key_event_enqueue(child, newln, dep, event);
+		ln_grant_inc(dep, newln->ln_mode);
+		return 1; /* still hold lh_lock */
+	}
+
+	if (!wait) { /* can't grant and don't want to wait */
+		ln_retry_inc(dep, newln->ln_mode);
+		newln->ln_mode = HTREE_LOCK_INVAL;
+		return -1; /* don't wait and just return -1 */
+	}
+
+	newlk->lk_task = current;
+	set_current_state(TASK_UNINTERRUPTIBLE);
+	/* conflict, attach it on blocked list of curlk */
+	list_add_tail(&newln->ln_blocked_list, &curln->ln_blocked_list);
+	list_add(&newln->ln_alive_list, &curln->ln_alive_list);
+	ln_block_inc(dep, newln->ln_mode);
+
+	htree_spin_unlock(newlk->lk_head, dep);
+	/* wait to be given the lock */
+	if (newlk->lk_task != NULL)
+		schedule();
+	/* granted, no doubt, wake up will set me RUNNING */
+	if (event == NULL || htree_key_event_ignore(child, newln))
+		return 0; /* granted without lh_lock */
+
+	htree_spin_lock(newlk->lk_head, dep);
+	htree_key_event_enqueue(child, newln, dep, event);
+	return 1; /* still hold lh_lock */
+}
+
+/*
+ * get PR/PW access to particular tree-node according to @dep and @key,
+ * it will return -1 if @wait is false and can't immediately grant this lock.
+ * All listeners(HTREE_LOCK_NL) on @dep and with the same @key will get
+ * @event if it's not NULL.
+ * NB: ALWAYS called holding lhead::lh_lock
+ */
+static int
+htree_node_lock_internal(struct htree_lock_head *lhead, struct htree_lock *lck,
+			 htree_lock_mode_t mode, u32 key, unsigned dep,
+			 int wait, void *event)
+{
+	LIST_HEAD(list);
+	struct htree_lock	*tmp;
+	struct htree_lock	*tmp2;
+	u16			major;
+	u16			minor;
+	u8			reverse;
+	u8			ma_bits;
+	u8			mi_bits;
+
+	BUG_ON(mode != HTREE_LOCK_PW && mode != HTREE_LOCK_PR);
+	BUG_ON(htree_node_is_granted(lck, dep));
+
+	key = hash_long(key, lhead->lh_hbits);
+
+	mi_bits = lhead->lh_hbits >> 1;
+	ma_bits = lhead->lh_hbits - mi_bits;
+
+	lck->lk_nodes[dep].ln_major_key = major = key & ((1U << ma_bits) - 1);
+	lck->lk_nodes[dep].ln_minor_key = minor = key >> ma_bits;
+	lck->lk_nodes[dep].ln_mode = mode;
+
+	/*
+	 * The major key list is an ordered list, so searches are started
+	 * at the end of the list that is numerically closer to major_key,
+	 * so at most half of the list will be walked (for well-distributed
+	 * keys). The list traversal aborts early if the expected key
+	 * location is passed.
+	 */
+	reverse = (major >= (1 << (ma_bits - 1)));
+
+	if (reverse) {
+		list_for_each_entry_reverse(tmp,
+					&lhead->lh_children[dep].lc_list,
+					lk_nodes[dep].ln_major_list) {
+			if (tmp->lk_nodes[dep].ln_major_key == major) {
+				goto search_minor;
+
+			} else if (tmp->lk_nodes[dep].ln_major_key < major) {
+				/* attach _after_ @tmp */
+				list_add(&lck->lk_nodes[dep].ln_major_list,
+					 &tmp->lk_nodes[dep].ln_major_list);
+				goto out_grant_major;
+			}
+		}
+
+		list_add(&lck->lk_nodes[dep].ln_major_list,
+			 &lhead->lh_children[dep].lc_list);
+		goto out_grant_major;
+
+	} else {
+		list_for_each_entry(tmp, &lhead->lh_children[dep].lc_list,
+				    lk_nodes[dep].ln_major_list) {
+			if (tmp->lk_nodes[dep].ln_major_key == major) {
+				goto search_minor;
+
+			} else if (tmp->lk_nodes[dep].ln_major_key > major) {
+				/* insert _before_ @tmp */
+				list_add_tail(&lck->lk_nodes[dep].ln_major_list,
+					&tmp->lk_nodes[dep].ln_major_list);
+				goto out_grant_major;
+			}
+		}
+
+		list_add_tail(&lck->lk_nodes[dep].ln_major_list,
+			      &lhead->lh_children[dep].lc_list);
+		goto out_grant_major;
+	}
+
+ search_minor:
+	/*
+	 * NB: minor_key list doesn't have a "head", @list is just a
+	 * temporary stub for helping list searching, make sure it's removed
+	 * after searching.
+	 * minor_key list is an ordered list too.
+	 */
+	list_add_tail(&list, &tmp->lk_nodes[dep].ln_minor_list);
+
+	reverse = (minor >= (1 << (mi_bits - 1)));
+
+	if (reverse) {
+		list_for_each_entry_reverse(tmp2, &list,
+					    lk_nodes[dep].ln_minor_list) {
+			if (tmp2->lk_nodes[dep].ln_minor_key == minor) {
+				goto out_enqueue;
+
+			} else if (tmp2->lk_nodes[dep].ln_minor_key < minor) {
+				/* attach _after_ @tmp2 */
+				list_add(&lck->lk_nodes[dep].ln_minor_list,
+					 &tmp2->lk_nodes[dep].ln_minor_list);
+				goto out_grant_minor;
+			}
+		}
+
+		list_add(&lck->lk_nodes[dep].ln_minor_list, &list);
+
+	} else {
+		list_for_each_entry(tmp2, &list,
+				    lk_nodes[dep].ln_minor_list) {
+			if (tmp2->lk_nodes[dep].ln_minor_key == minor) {
+				goto out_enqueue;
+
+			} else if (tmp2->lk_nodes[dep].ln_minor_key > minor) {
+				/* insert _before_ @tmp2 */
+				list_add_tail(&lck->lk_nodes[dep].ln_minor_list,
+					&tmp2->lk_nodes[dep].ln_minor_list);
+				goto out_grant_minor;
+			}
+		}
+
+		list_add_tail(&lck->lk_nodes[dep].ln_minor_list, &list);
+	}
+
+ out_grant_minor:
+	if (list.next == &lck->lk_nodes[dep].ln_minor_list) {
+		/* new lock @lck is the first one on minor_key list, which
+		 * means it has the smallest minor_key and it should
+		 * replace @tmp as minor_key owner */
+		list_replace_init(&tmp->lk_nodes[dep].ln_major_list,
+				  &lck->lk_nodes[dep].ln_major_list);
+	}
+	/* remove the temporary head */
+	list_del(&list);
+
+ out_grant_major:
+	ln_grant_inc(dep, lck->lk_nodes[dep].ln_mode);
+	return 1; /* granted with holding lh_lock */
+
+ out_enqueue:
+	list_del(&list); /* remove temprary head */
+	return htree_node_lock_enqueue(lck, tmp2, dep, wait, event);
+}
+
+/*
+ * release the key of @lck at level @dep, and grant any blocked locks.
+ * caller will still listen on @key if @event is not NULL, which means
+ * caller can see a event (by event_cb) while granting any lock with
+ * the same key at level @dep.
+ * NB: ALWAYS called holding lhead::lh_lock
+ * NB: listener will not block anyone because listening mode is HTREE_LOCK_NL
+ */
+static void
+htree_node_unlock_internal(struct htree_lock_head *lhead,
+			   struct htree_lock *curlk, unsigned dep, void *event)
+{
+	struct htree_lock_node	*curln = &curlk->lk_nodes[dep];
+	struct htree_lock	*grtlk = NULL;
+	struct htree_lock_node	*grtln;
+	struct htree_lock	*poslk;
+	struct htree_lock	*tmplk;
+
+	if (!htree_node_is_granted(curlk, dep))
+		return;
+
+	if (!list_empty(&curln->ln_granted_list)) {
+		/* there is another granted lock */
+		grtlk = list_entry(curln->ln_granted_list.next,
+				   struct htree_lock,
+				   lk_nodes[dep].ln_granted_list);
+		list_del_init(&curln->ln_granted_list);
+	}
+
+	if (grtlk == NULL && !list_empty(&curln->ln_blocked_list)) {
+		/*
+		 * @curlk is the only granted lock, so we confirmed:
+		 * a) curln is key owner (attached on major/minor_list),
+		 *    so if there is any blocked lock, it should be attached
+		 *    on curln->ln_blocked_list
+		 * b) we always can grant the first blocked lock
+		 */
+		grtlk = list_entry(curln->ln_blocked_list.next,
+				   struct htree_lock,
+				   lk_nodes[dep].ln_blocked_list);
+		BUG_ON(grtlk->lk_task == NULL);
+		wake_up_process(grtlk->lk_task);
+	}
+
+	if (event != NULL &&
+	    lhead->lh_children[dep].lc_events != HTREE_EVENT_DISABLE) {
+		curln->ln_ev_target = event;
+		curln->ln_mode = HTREE_LOCK_NL; /* listen! */
+	} else {
+		curln->ln_mode = HTREE_LOCK_INVAL;
+	}
+
+	if (grtlk == NULL) { /* I must be the only one locking this key */
+		struct htree_lock_node *tmpln;
+
+		BUG_ON(htree_key_list_empty(curln));
+
+		if (curln->ln_mode == HTREE_LOCK_NL) /* listening */
+			return;
+
+		/* not listening */
+		if (list_empty(&curln->ln_alive_list)) { /* no more listener */
+			htree_key_list_del_init(curln);
+			return;
+		}
+
+		tmpln = list_entry(curln->ln_alive_list.next,
+				   struct htree_lock_node, ln_alive_list);
+
+		BUG_ON(tmpln->ln_mode != HTREE_LOCK_NL);
+
+		htree_key_list_replace_init(curln, tmpln);
+		list_del_init(&curln->ln_alive_list);
+
+		return;
+	}
+
+	/* have a granted lock */
+	grtln = &grtlk->lk_nodes[dep];
+	if (!list_empty(&curln->ln_blocked_list)) {
+		/* only key owner can be on both lists */
+		BUG_ON(htree_key_list_empty(curln));
+
+		if (list_empty(&grtln->ln_blocked_list)) {
+			list_add(&grtln->ln_blocked_list,
+				 &curln->ln_blocked_list);
+		}
+		list_del_init(&curln->ln_blocked_list);
+	}
+	/*
+	 * NB: this is the tricky part:
+	 * We have only two modes for child-lock (PR and PW), also,
+	 * only owner of the key (attached on major/minor_list) can be on
+	 * both blocked_list and granted_list, so @grtlk must be one
+	 * of these two cases:
+	 *
+	 * a) @grtlk is taken from granted_list, which means we've granted
+	 *    more than one lock so @grtlk has to be PR, the first blocked
+	 *    lock must be PW and we can't grant it at all.
+	 *    So even @grtlk is not owner of the key (empty blocked_list),
+	 *    we don't care because we can't grant any lock.
+	 * b) we just grant a new lock which is taken from head of blocked
+	 *    list, and it should be the first granted lock, and it should
+	 *    be the first one linked on blocked_list.
+	 *
+	 * Either way, we can get correct result by iterating blocked_list
+	 * of @grtlk, and don't have to bother on how to find out
+	 * owner of current key.
+	 */
+	list_for_each_entry_safe(poslk, tmplk, &grtln->ln_blocked_list,
+				 lk_nodes[dep].ln_blocked_list) {
+		if (grtlk->lk_nodes[dep].ln_mode == HTREE_LOCK_PW ||
+		    poslk->lk_nodes[dep].ln_mode == HTREE_LOCK_PW)
+			break;
+		/* grant all readers */
+		list_del_init(&poslk->lk_nodes[dep].ln_blocked_list);
+		list_add(&poslk->lk_nodes[dep].ln_granted_list,
+			 &grtln->ln_granted_list);
+
+		BUG_ON(poslk->lk_task == NULL);
+		wake_up_process(poslk->lk_task);
+	}
+
+	/* if @curln is the owner of this key, replace it with @grtln */
+	if (!htree_key_list_empty(curln))
+		htree_key_list_replace_init(curln, grtln);
+
+	if (curln->ln_mode == HTREE_LOCK_INVAL)
+		list_del_init(&curln->ln_alive_list);
+}
+
+/*
+ * it's just wrapper of htree_node_lock_internal, it returns 1 on granted
+ * and 0 only if @wait is false and can't grant it immediately
+ */
+int
+htree_node_lock_try(struct htree_lock *lck, htree_lock_mode_t mode,
+		    u32 key, unsigned dep, int wait, void *event)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int rc;
+
+	BUG_ON(dep >= lck->lk_depth);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_spin_lock(lhead, dep);
+	rc = htree_node_lock_internal(lhead, lck, mode, key, dep, wait, event);
+	if (rc != 0)
+		htree_spin_unlock(lhead, dep);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_node_lock_try);
+
+/* it's wrapper of htree_node_unlock_internal */
+void
+htree_node_unlock(struct htree_lock *lck, unsigned dep, void *event)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+
+	BUG_ON(dep >= lck->lk_depth);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_spin_lock(lhead, dep);
+	htree_node_unlock_internal(lhead, lck, dep, event);
+	htree_spin_unlock(lhead, dep);
+}
+EXPORT_SYMBOL(htree_node_unlock);
+
+/* stop listening on child-lock level @dep */
+void
+htree_node_stop_listen(struct htree_lock *lck, unsigned dep)
+{
+	struct htree_lock_node *ln = &lck->lk_nodes[dep];
+	struct htree_lock_node *tmp;
+
+	BUG_ON(htree_node_is_granted(lck, dep));
+	BUG_ON(!list_empty(&ln->ln_blocked_list));
+	BUG_ON(!list_empty(&ln->ln_granted_list));
+
+	if (!htree_node_is_listening(lck, dep))
+		return;
+
+	htree_spin_lock(lck->lk_head, dep);
+	ln->ln_mode = HTREE_LOCK_INVAL;
+	ln->ln_ev_target = NULL;
+
+	if (htree_key_list_empty(ln)) { /* not owner */
+		list_del_init(&ln->ln_alive_list);
+		goto out;
+	}
+
+	/* I'm the owner... */
+	if (list_empty(&ln->ln_alive_list)) { /* no more listener */
+		htree_key_list_del_init(ln);
+		goto out;
+	}
+
+	tmp = list_entry(ln->ln_alive_list.next,
+			 struct htree_lock_node, ln_alive_list);
+
+	BUG_ON(tmp->ln_mode != HTREE_LOCK_NL);
+	htree_key_list_replace_init(ln, tmp);
+	list_del_init(&ln->ln_alive_list);
+ out:
+	htree_spin_unlock(lck->lk_head, dep);
+}
+EXPORT_SYMBOL(htree_node_stop_listen);
+
+/* release all child-locks if we have any */
+static void
+htree_node_release_all(struct htree_lock *lck)
+{
+	int	i;
+
+	for (i = 0; i < lck->lk_depth; i++) {
+		if (htree_node_is_granted(lck, i))
+			htree_node_unlock(lck, i, NULL);
+		else if (htree_node_is_listening(lck, i))
+			htree_node_stop_listen(lck, i);
+	}
+}
+
+/*
+ * obtain htree lock, it could be blocked inside if there's conflict
+ * with any granted or blocked lock and @wait is true.
+ * NB: ALWAYS called holding lhead::lh_lock
+ */
+static int
+htree_lock_internal(struct htree_lock *lck, int wait)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int	granted = 0;
+	int	blocked = 0;
+	int	i;
+
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		if (lhead->lh_ngranted[i] != 0)
+			granted |= 1 << i;
+		if (lhead->lh_nblocked[i] != 0)
+			blocked |= 1 << i;
+	}
+	if ((htree_lock_compat[lck->lk_mode] & granted) != granted ||
+	    (htree_lock_compat[lck->lk_mode] & blocked) != blocked) {
+		/* will block current lock even it just conflicts with any
+		 * other blocked lock, so lock like EX wouldn't starve */
+		if (!wait)
+			return -1;
+		lhead->lh_nblocked[lck->lk_mode]++;
+		lk_block_inc(lck->lk_mode);
+
+		lck->lk_task = current;
+		list_add_tail(&lck->lk_blocked_list, &lhead->lh_blocked_list);
+
+retry:
+		set_current_state(TASK_UNINTERRUPTIBLE);
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+		/* wait to be given the lock */
+		if (lck->lk_task != NULL)
+			schedule();
+		/* granted, no doubt. wake up will set me RUNNING.
+		 * Since thread would be waken up accidentally,
+		 * so we need check lock whether granted or not again. */
+		if (!list_empty(&lck->lk_blocked_list)) {
+			htree_spin_lock(lhead, HTREE_DEP_ROOT);
+			if (list_empty(&lck->lk_blocked_list)) {
+				htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+				return 0;
+			}
+			goto retry;
+		}
+		return 0; /* without lh_lock */
+	}
+	lhead->lh_ngranted[lck->lk_mode]++;
+	lk_grant_inc(lck->lk_mode);
+	return 1;
+}
+
+/* release htree lock. NB: ALWAYS called holding lhead::lh_lock */
+static void
+htree_unlock_internal(struct htree_lock *lck)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	struct htree_lock *tmp;
+	struct htree_lock *tmp2;
+	int granted = 0;
+	int i;
+
+	BUG_ON(lhead->lh_ngranted[lck->lk_mode] == 0);
+
+	lhead->lh_ngranted[lck->lk_mode]--;
+	lck->lk_mode = HTREE_LOCK_INVAL;
+
+	for (i = 0; i < HTREE_LOCK_MAX; i++) {
+		if (lhead->lh_ngranted[i] != 0)
+			granted |= 1 << i;
+	}
+	list_for_each_entry_safe(tmp, tmp2,
+				 &lhead->lh_blocked_list, lk_blocked_list) {
+		/* conflict with any granted lock? */
+		if ((htree_lock_compat[tmp->lk_mode] & granted) != granted)
+			break;
+
+		list_del_init(&tmp->lk_blocked_list);
+
+		BUG_ON(lhead->lh_nblocked[tmp->lk_mode] == 0);
+
+		lhead->lh_nblocked[tmp->lk_mode]--;
+		lhead->lh_ngranted[tmp->lk_mode]++;
+		granted |= 1 << tmp->lk_mode;
+
+		BUG_ON(tmp->lk_task == NULL);
+		wake_up_process(tmp->lk_task);
+	}
+}
+
+/* it's wrapper of htree_lock_internal and exported interface.
+ * It always return 1 with granted lock if @wait is true, it can return 0
+ * if @wait is false and locking request can't be granted immediately */
+int
+htree_lock_try(struct htree_lock *lck, struct htree_lock_head *lhead,
+	       htree_lock_mode_t mode, int wait)
+{
+	int	rc;
+
+	BUG_ON(lck->lk_depth > lhead->lh_depth);
+	BUG_ON(lck->lk_head != NULL);
+	BUG_ON(lck->lk_task != NULL);
+
+	lck->lk_head = lhead;
+	lck->lk_mode = mode;
+
+	htree_spin_lock(lhead, HTREE_DEP_ROOT);
+	rc = htree_lock_internal(lck, wait);
+	if (rc != 0)
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_lock_try);
+
+/* it's wrapper of htree_unlock_internal and exported interface.
+ * It will release all htree_node_locks and htree_lock */
+void
+htree_unlock(struct htree_lock *lck)
+{
+	BUG_ON(lck->lk_head == NULL);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+
+	htree_node_release_all(lck);
+
+	htree_spin_lock(lck->lk_head, HTREE_DEP_ROOT);
+	htree_unlock_internal(lck);
+	htree_spin_unlock(lck->lk_head, HTREE_DEP_ROOT);
+	lck->lk_head = NULL;
+	lck->lk_task = NULL;
+}
+EXPORT_SYMBOL(htree_unlock);
+
+/* change lock mode */
+void
+htree_change_mode(struct htree_lock *lck, htree_lock_mode_t mode)
+{
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL);
+	lck->lk_mode = mode;
+}
+EXPORT_SYMBOL(htree_change_mode);
+
+/* release htree lock, and lock it again with new mode.
+ * This function will first release all htree_node_locks and htree_lock,
+ * then try to gain htree_lock with new @mode.
+ * It always return 1 with granted lock if @wait is true, it can return 0
+ * if @wait is false and locking request can't be granted immediately */
+int
+htree_change_lock_try(struct htree_lock *lck, htree_lock_mode_t mode, int wait)
+{
+	struct htree_lock_head *lhead = lck->lk_head;
+	int rc;
+
+	BUG_ON(lhead == NULL);
+	BUG_ON(lck->lk_mode == mode);
+	BUG_ON(lck->lk_mode == HTREE_LOCK_INVAL || mode == HTREE_LOCK_INVAL);
+
+	htree_node_release_all(lck);
+
+	htree_spin_lock(lhead, HTREE_DEP_ROOT);
+	htree_unlock_internal(lck);
+	lck->lk_mode = mode;
+	rc = htree_lock_internal(lck, wait);
+	if (rc != 0)
+		htree_spin_unlock(lhead, HTREE_DEP_ROOT);
+	return rc >= 0;
+}
+EXPORT_SYMBOL(htree_change_lock_try);
+
+/* create a htree_lock head with @depth levels (number of child-locks),
+ * it is a per resoruce structure */
+struct htree_lock_head *
+htree_lock_head_alloc(unsigned depth, unsigned hbits, unsigned priv)
+{
+	struct htree_lock_head *lhead;
+	int  i;
+
+	if (depth > HTREE_LOCK_DEP_MAX) {
+		printk(KERN_ERR "%d is larger than max htree_lock depth %d\n",
+			depth, HTREE_LOCK_DEP_MAX);
+		return NULL;
+	}
+
+	lhead = kzalloc(offsetof(struct htree_lock_head,
+				 lh_children[depth]) + priv, GFP_NOFS);
+	if (lhead == NULL)
+		return NULL;
+
+	if (hbits < HTREE_HBITS_MIN)
+		lhead->lh_hbits = HTREE_HBITS_MIN;
+	else if (hbits > HTREE_HBITS_MAX)
+		lhead->lh_hbits = HTREE_HBITS_MAX;
+
+	lhead->lh_lock = 0;
+	lhead->lh_depth = depth;
+	INIT_LIST_HEAD(&lhead->lh_blocked_list);
+	if (priv > 0) {
+		lhead->lh_private = (void *)lhead +
+			offsetof(struct htree_lock_head, lh_children[depth]);
+	}
+
+	for (i = 0; i < depth; i++) {
+		INIT_LIST_HEAD(&lhead->lh_children[i].lc_list);
+		lhead->lh_children[i].lc_events = HTREE_EVENT_DISABLE;
+	}
+	return lhead;
+}
+EXPORT_SYMBOL(htree_lock_head_alloc);
+
+/* free the htree_lock head */
+void
+htree_lock_head_free(struct htree_lock_head *lhead)
+{
+	int     i;
+
+	BUG_ON(!list_empty(&lhead->lh_blocked_list));
+	for (i = 0; i < lhead->lh_depth; i++)
+		BUG_ON(!list_empty(&lhead->lh_children[i].lc_list));
+	kfree(lhead);
+}
+EXPORT_SYMBOL(htree_lock_head_free);
+
+/* register event callback for @events of child-lock at level @dep */
+void
+htree_lock_event_attach(struct htree_lock_head *lhead, unsigned dep,
+			unsigned events, htree_event_cb_t callback)
+{
+	BUG_ON(lhead->lh_depth <= dep);
+	lhead->lh_children[dep].lc_events = events;
+	lhead->lh_children[dep].lc_callback = callback;
+}
+EXPORT_SYMBOL(htree_lock_event_attach);
+
+/* allocate a htree_lock, which is per-thread structure, @pbytes is some
+ * extra-bytes as private data for caller */
+struct htree_lock *
+htree_lock_alloc(unsigned depth, unsigned pbytes)
+{
+	struct htree_lock *lck;
+	int i = offsetof(struct htree_lock, lk_nodes[depth]);
+
+	if (depth > HTREE_LOCK_DEP_MAX) {
+		printk(KERN_ERR "%d is larger than max htree_lock depth %d\n",
+			depth, HTREE_LOCK_DEP_MAX);
+		return NULL;
+	}
+	lck = kzalloc(i + pbytes, GFP_NOFS);
+	if (lck == NULL)
+		return NULL;
+
+	if (pbytes != 0)
+		lck->lk_private = (void *)lck + i;
+	lck->lk_mode = HTREE_LOCK_INVAL;
+	lck->lk_depth = depth;
+	INIT_LIST_HEAD(&lck->lk_blocked_list);
+
+	for (i = 0; i < depth; i++) {
+		struct htree_lock_node *node = &lck->lk_nodes[i];
+
+		node->ln_mode = HTREE_LOCK_INVAL;
+		INIT_LIST_HEAD(&node->ln_major_list);
+		INIT_LIST_HEAD(&node->ln_minor_list);
+		INIT_LIST_HEAD(&node->ln_alive_list);
+		INIT_LIST_HEAD(&node->ln_blocked_list);
+		INIT_LIST_HEAD(&node->ln_granted_list);
+	}
+
+	return lck;
+}
+EXPORT_SYMBOL(htree_lock_alloc);
+
+/* free htree_lock node */
+void
+htree_lock_free(struct htree_lock *lck)
+{
+	BUG_ON(lck->lk_mode != HTREE_LOCK_INVAL);
+	kfree(lck);
+}
+EXPORT_SYMBOL(htree_lock_free);
diff --git a/fs/ext4/htree_lock.h b/fs/ext4/htree_lock.h
new file mode 100644
index 0000000..9dc7788
--- /dev/null
+++ b/fs/ext4/htree_lock.h
@@ -0,0 +1,187 @@ 
+/*
+ * include/linux/htree_lock.h
+ *
+ * Copyright (c) 2011, 2012, Intel Corporation.
+ *
+ * Author: Liang Zhen <liang@whamcloud.com>
+ */
+
+/*
+ * htree lock
+ *
+ * htree_lock is an advanced lock, it can support five lock modes (concept is
+ * taken from DLM) and it's a sleeping lock.
+ *
+ * most common use case is:
+ * - create a htree_lock_head for data
+ * - each thread (contender) creates it's own htree_lock
+ * - contender needs to call htree_lock(lock_node, mode) to protect data and
+ *   call htree_unlock to release lock
+ *
+ * Also, there is advanced use-case which is more complex, user can have
+ * PW/PR lock on particular key, it's mostly used while user holding shared
+ * lock on the htree (CW, CR)
+ *
+ * htree_lock(lock_node, HTREE_LOCK_CR); lock the htree with CR
+ * htree_node_lock(lock_node, HTREE_LOCK_PR, key...); lock @key with PR
+ * ...
+ * htree_node_unlock(lock_node);; unlock the key
+ *
+ * Another tip is, we can have N-levels of this kind of keys, all we need to
+ * do is specifying N-levels while creating htree_lock_head, then we can
+ * lock/unlock a specific level by:
+ * htree_node_lock(lock_node, mode1, key1, level1...);
+ * do something;
+ * htree_node_lock(lock_node, mode1, key2, level2...);
+ * do something;
+ * htree_node_unlock(lock_node, level2);
+ * htree_node_unlock(lock_node, level1);
+ *
+ * NB: for multi-level, should be careful about locking order to avoid deadlock
+ */
+
+#ifndef _LINUX_HTREE_LOCK_H
+#define _LINUX_HTREE_LOCK_H
+
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/sched.h>
+
+/*
+ * Lock Modes
+ * more details can be found here:
+ * http://en.wikipedia.org/wiki/Distributed_lock_manager
+ */
+typedef enum {
+	HTREE_LOCK_EX	= 0, /* exclusive lock: incompatible with all others */
+	HTREE_LOCK_PW,	     /* protected write: allows only CR users */
+	HTREE_LOCK_PR,	     /* protected read: allow PR, CR users */
+	HTREE_LOCK_CW,	     /* concurrent write: allow CR, CW users */
+	HTREE_LOCK_CR,	     /* concurrent read: allow all but EX users */
+	HTREE_LOCK_MAX,	     /* number of lock modes */
+} htree_lock_mode_t;
+
+#define HTREE_LOCK_NL		HTREE_LOCK_MAX
+#define HTREE_LOCK_INVAL	0xdead10c
+
+enum {
+	HTREE_HBITS_MIN		= 2,
+	HTREE_HBITS_DEF		= 14,
+	HTREE_HBITS_MAX		= 32,
+};
+
+enum {
+	HTREE_EVENT_DISABLE	= (0),
+	HTREE_EVENT_RD		= (1 << HTREE_LOCK_PR),
+	HTREE_EVENT_WR		= (1 << HTREE_LOCK_PW),
+	HTREE_EVENT_RDWR	= (HTREE_EVENT_RD | HTREE_EVENT_WR),
+};
+
+struct htree_lock;
+
+typedef void (*htree_event_cb_t)(void *target, void *event);
+
+struct htree_lock_child {
+	struct list_head	lc_list;	/* granted list */
+	htree_event_cb_t	lc_callback;	/* event callback */
+	unsigned		lc_events;	/* event types */
+};
+
+struct htree_lock_head {
+	unsigned long		lh_lock;	/* bits lock */
+	/* blocked lock list (htree_lock) */
+	struct list_head	lh_blocked_list;
+	/* # key levels */
+	u16			lh_depth;
+	/* hash bits for key and limit number of locks */
+	u16			lh_hbits;
+	/* counters for blocked locks */
+	u16			lh_nblocked[HTREE_LOCK_MAX];
+	/* counters for granted locks */
+	u16			lh_ngranted[HTREE_LOCK_MAX];
+	/* private data */
+	void			*lh_private;
+	/* array of children locks */
+	struct htree_lock_child	lh_children[0];
+};
+
+/* htree_lock_node_t is child-lock for a specific key (ln_value) */
+struct htree_lock_node {
+	htree_lock_mode_t	ln_mode;
+	/* major hash key */
+	u16			ln_major_key;
+	/* minor hash key */
+	u16			ln_minor_key;
+	struct list_head	ln_major_list;
+	struct list_head	ln_minor_list;
+	/* alive list, all locks (granted, blocked, listening) are on it */
+	struct list_head	ln_alive_list;
+	/* blocked list */
+	struct list_head	ln_blocked_list;
+	/* granted list */
+	struct list_head	ln_granted_list;
+	void			*ln_ev_target;
+};
+
+struct htree_lock {
+	struct task_struct	*lk_task;
+	struct htree_lock_head	*lk_head;
+	void			*lk_private;
+	unsigned		lk_depth;
+	htree_lock_mode_t	lk_mode;
+	struct list_head	lk_blocked_list;
+	struct htree_lock_node	lk_nodes[0];
+};
+
+/* create a lock head, which stands for a resource */
+struct htree_lock_head *htree_lock_head_alloc(unsigned depth,
+					      unsigned hbits, unsigned priv);
+/* free a lock head */
+void htree_lock_head_free(struct htree_lock_head *lhead);
+/* register event callback for child lock at level @depth */
+void htree_lock_event_attach(struct htree_lock_head *lhead, unsigned depth,
+			     unsigned events, htree_event_cb_t callback);
+/* create a lock handle, which stands for a thread */
+struct htree_lock *htree_lock_alloc(unsigned depth, unsigned pbytes);
+/* free a lock handle */
+void htree_lock_free(struct htree_lock *lck);
+/* lock htree, when @wait is true, 0 is returned if the lock can't
+ * be granted immediately */
+int htree_lock_try(struct htree_lock *lck, struct htree_lock_head *lhead,
+		   htree_lock_mode_t mode, int wait);
+/* unlock htree */
+void htree_unlock(struct htree_lock *lck);
+/* unlock and relock htree with @new_mode */
+int htree_change_lock_try(struct htree_lock *lck,
+			  htree_lock_mode_t new_mode, int wait);
+void htree_change_mode(struct htree_lock *lck, htree_lock_mode_t mode);
+/* require child lock (key) of htree at level @dep, @event will be sent to all
+ * listeners on this @key while lock being granted */
+int htree_node_lock_try(struct htree_lock *lck, htree_lock_mode_t mode,
+			u32 key, unsigned dep, int wait, void *event);
+/* release child lock at level @dep, this lock will listen on it's key
+ * if @event isn't NULL, event_cb will be called against @lck while granting
+ * any other lock at level @dep with the same key */
+void htree_node_unlock(struct htree_lock *lck, unsigned dep, void *event);
+/* stop listening on child lock at level @dep */
+void htree_node_stop_listen(struct htree_lock *lck, unsigned dep);
+/* for debug */
+void htree_lock_stat_print(int depth);
+void htree_lock_stat_reset(void);
+
+#define htree_lock(lck, lh, mode)	htree_lock_try(lck, lh, mode, 1)
+#define htree_change_lock(lck, mode)	htree_change_lock_try(lck, mode, 1)
+
+#define htree_lock_mode(lck)		((lck)->lk_mode)
+
+#define htree_node_lock(lck, mode, key, dep)	\
+	htree_node_lock_try(lck, mode, key, dep, 1, NULL)
+/* this is only safe in thread context of lock owner */
+#define htree_node_is_granted(lck, dep)		\
+	((lck)->lk_nodes[dep].ln_mode != HTREE_LOCK_INVAL && \
+	 (lck)->lk_nodes[dep].ln_mode != HTREE_LOCK_NL)
+/* this is only safe in thread context of lock owner */
+#define htree_node_is_listening(lck, dep)	\
+	((lck)->lk_nodes[dep].ln_mode == HTREE_LOCK_NL)
+
+#endif
diff --git a/fs/ext4/namei.c b/fs/ext4/namei.c
index 9cb86e4..0153c4d 100644
--- a/fs/ext4/namei.c
+++ b/fs/ext4/namei.c
@@ -54,6 +54,7 @@  struct buffer_head *ext4_append(handle_t *handle,
 				struct inode *inode,
 				ext4_lblk_t *block)
 {
+	struct ext4_inode_info *ei = EXT4_I(inode);
 	struct buffer_head *bh;
 	int err;
 
@@ -62,15 +63,24 @@  struct buffer_head *ext4_append(handle_t *handle,
 		      EXT4_SB(inode->i_sb)->s_max_dir_size_kb)))
 		return ERR_PTR(-ENOSPC);
 
+	/* with parallel dir operations all appends
+	 * have to be serialized -bzzz
+	 */
+	down(&ei->i_append_sem);
+
 	*block = inode->i_size >> inode->i_sb->s_blocksize_bits;
 
 	bh = ext4_bread(handle, inode, *block, EXT4_GET_BLOCKS_CREATE);
-	if (IS_ERR(bh))
+	if (IS_ERR(bh)) {
+		up(&ei->i_append_sem);
 		return bh;
+	}
+
 	inode->i_size += inode->i_sb->s_blocksize;
 	EXT4_I(inode)->i_disksize = inode->i_size;
 	BUFFER_TRACE(bh, "get_write_access");
 	err = ext4_journal_get_write_access(handle, bh);
+	up(&ei->i_append_sem);
 	if (err) {
 		brelse(bh);
 		ext4_std_error(inode->i_sb, err);
@@ -250,7 +260,8 @@  static unsigned int inline dx_root_limit(struct inode *dir,
 static struct dx_frame *dx_probe(struct ext4_filename *fname,
 				 struct inode *dir,
 				 struct dx_hash_info *hinfo,
-				 struct dx_frame *frame);
+				 struct dx_frame *frame,
+				 struct htree_lock *lck);
 static void dx_release(struct dx_frame *frames);
 static int dx_make_map(struct inode *dir, struct ext4_dir_entry_2 *de,
 		       unsigned blocksize, struct dx_hash_info *hinfo,
@@ -264,12 +275,13 @@  static void dx_insert_block(struct dx_frame *frame,
 static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 				 struct dx_frame *frame,
 				 struct dx_frame *frames,
-				 __u32 *start_hash);
+				 __u32 *start_hash, struct htree_lock *lck);
 static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 		struct ext4_filename *fname,
-		struct ext4_dir_entry_2 **res_dir);
+		struct ext4_dir_entry_2 **res_dir, struct htree_lock *lck);
 static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
-			     struct inode *dir, struct inode *inode);
+			     struct inode *dir, struct inode *inode,
+			     struct htree_lock *lck);
 
 /* checksumming functions */
 void initialize_dirent_tail(struct ext4_dir_entry_tail *t,
@@ -734,6 +746,226 @@  struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 }
 #endif /* DX_DEBUG */
 
+/* private data for htree_lock */
+struct ext4_dir_lock_data {
+	unsigned int		ld_flags;	/* bits-map for lock types */
+	unsigned int		ld_count;	/* # entries of the last DX block */
+	struct dx_entry		ld_at_entry;	/* copy of leaf dx_entry */
+	struct dx_entry		*ld_at;		/* position of leaf dx_entry */
+};
+
+#define ext4_htree_lock_data(l)	((struct ext4_dir_lock_data *)(l)->lk_private)
+#define ext4_find_entry(dir, name, dirent, inline) \
+			ext4_find_entry_locked(dir, name, dirent, inline, NULL)
+#define ext4_add_entry(handle, dentry, inode) \
+		       ext4_add_entry_locked(handle, dentry, inode, NULL)
+
+/* NB: ext4_lblk_t is 32 bits so we use high bits to identify invalid blk */
+#define EXT4_HTREE_NODE_CHANGED		(0xcafeULL << 32)
+
+static void ext4_htree_event_cb(void *target, void *event)
+{
+	u64 *block = (u64 *)target;
+
+	if (*block == dx_get_block((struct dx_entry *)event))
+		*block = EXT4_HTREE_NODE_CHANGED;
+}
+
+struct htree_lock_head *ext4_htree_lock_head_alloc(unsigned hbits)
+{
+	struct htree_lock_head *lhead;
+
+	lhead = htree_lock_head_alloc(EXT4_LK_MAX, hbits, 0);
+	if (lhead) {
+		htree_lock_event_attach(lhead, EXT4_LK_SPIN, HTREE_EVENT_WR,
+					ext4_htree_event_cb);
+	}
+	return lhead;
+}
+EXPORT_SYMBOL(ext4_htree_lock_head_alloc);
+
+struct htree_lock *ext4_htree_lock_alloc(void)
+{
+	return htree_lock_alloc(EXT4_LK_MAX,
+				sizeof(struct ext4_dir_lock_data));
+}
+EXPORT_SYMBOL(ext4_htree_lock_alloc);
+
+static htree_lock_mode_t ext4_htree_mode(unsigned flags)
+{
+	switch (flags) {
+	default: /* 0 or unknown flags require EX lock */
+		return HTREE_LOCK_EX;
+	case EXT4_HLOCK_READDIR:
+		return HTREE_LOCK_PR;
+	case EXT4_HLOCK_LOOKUP:
+		return HTREE_LOCK_CR;
+	case EXT4_HLOCK_DEL:
+	case EXT4_HLOCK_ADD:
+		return HTREE_LOCK_CW;
+	}
+}
+
+/* return PR for read-only operations, otherwise return EX */
+static inline htree_lock_mode_t ext4_htree_safe_mode(unsigned flags)
+{
+	int writer = (flags & EXT4_LB_DE) == EXT4_LB_DE;
+
+	/* 0 requires EX lock */
+	return (flags == 0 || writer) ? HTREE_LOCK_EX : HTREE_LOCK_PR;
+}
+
+static int ext4_htree_safe_locked(struct htree_lock *lck)
+{
+	int writer;
+
+	if (!lck || lck->lk_mode == HTREE_LOCK_EX)
+		return 1;
+
+	writer = (ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_DE) ==
+		  EXT4_LB_DE;
+	if (writer) /* all readers & writers are excluded? */
+		return lck->lk_mode == HTREE_LOCK_EX;
+
+	/* all writers are excluded? */
+	return lck->lk_mode == HTREE_LOCK_PR ||
+	       lck->lk_mode == HTREE_LOCK_PW ||
+	       lck->lk_mode == HTREE_LOCK_EX;
+}
+
+/* relock htree_lock with EX mode if it's change operation, otherwise
+ * relock it with PR mode. It's noop if PDO is disabled.
+ */
+static void ext4_htree_safe_relock(struct htree_lock *lck)
+{
+	if (!ext4_htree_safe_locked(lck)) {
+		unsigned int flags = ext4_htree_lock_data(lck)->ld_flags;
+
+		htree_change_lock(lck, ext4_htree_safe_mode(flags));
+	}
+}
+
+void ext4_htree_lock(struct htree_lock *lck, struct htree_lock_head *lhead,
+		     struct inode *dir, unsigned flags)
+{
+	htree_lock_mode_t mode = is_dx(dir) ? ext4_htree_mode(flags) :
+					      ext4_htree_safe_mode(flags);
+
+	ext4_htree_lock_data(lck)->ld_flags = flags;
+	htree_lock(lck, lhead, mode);
+	if (!is_dx(dir))
+		ext4_htree_safe_relock(lck); /* make sure it's safe locked */
+}
+EXPORT_SYMBOL(ext4_htree_lock);
+
+static int ext4_htree_node_lock(struct htree_lock *lck, struct dx_entry *at,
+				unsigned int lmask, int wait, void *ev)
+{
+	u32 key = (at == NULL) ? 0 : dx_get_block(at);
+	u32 mode;
+
+	/* NOOP if htree is well protected or caller doesn't require the lock */
+	if (ext4_htree_safe_locked(lck) ||
+	    !(ext4_htree_lock_data(lck)->ld_flags & lmask))
+		return 1;
+
+	mode = (ext4_htree_lock_data(lck)->ld_flags & lmask) == lmask ?
+		HTREE_LOCK_PW : HTREE_LOCK_PR;
+	while (1) {
+		if (htree_node_lock_try(lck, mode, key, ffz(~lmask), wait, ev))
+			return 1;
+		if (!(lmask & EXT4_LB_SPIN)) /* not a spinlock */
+			return 0;
+		cpu_relax(); /* spin until granted */
+	}
+}
+
+static int ext4_htree_node_locked(struct htree_lock *lck, unsigned lmask)
+{
+	return ext4_htree_safe_locked(lck) ||
+	       htree_node_is_granted(lck, ffz(~lmask));
+}
+
+static void ext4_htree_node_unlock(struct htree_lock *lck,
+				   unsigned int lmask, void *buf)
+{
+	/* NB: it's safe to call mutiple times or even it's not locked */
+	if (!ext4_htree_safe_locked(lck) &&
+	    htree_node_is_granted(lck, ffz(~lmask)))
+		htree_node_unlock(lck, ffz(~lmask), buf);
+}
+
+#define ext4_htree_dx_lock(lck, key)		\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DX, 1, NULL)
+#define ext4_htree_dx_lock_try(lck, key)	\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DX, 0, NULL)
+#define ext4_htree_dx_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_DX, NULL)
+#define ext4_htree_dx_locked(lck)		\
+	ext4_htree_node_locked(lck, EXT4_LB_DX)
+
+static void ext4_htree_dx_need_lock(struct htree_lock *lck)
+{
+	struct ext4_dir_lock_data *ld;
+
+	if (ext4_htree_safe_locked(lck))
+		return;
+
+	ld = ext4_htree_lock_data(lck);
+	switch (ld->ld_flags) {
+	default:
+		return;
+	case EXT4_HLOCK_LOOKUP:
+		ld->ld_flags = EXT4_HLOCK_LOOKUP_SAFE;
+		return;
+	case EXT4_HLOCK_DEL:
+		ld->ld_flags = EXT4_HLOCK_DEL_SAFE;
+		return;
+	case EXT4_HLOCK_ADD:
+		ld->ld_flags = EXT4_HLOCK_SPLIT;
+		return;
+	}
+}
+
+#define ext4_htree_de_lock(lck, key)		\
+	ext4_htree_node_lock(lck, key, EXT4_LB_DE, 1, NULL)
+#define ext4_htree_de_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_DE, NULL)
+
+#define ext4_htree_spin_lock(lck, key, event)	\
+	ext4_htree_node_lock(lck, key, EXT4_LB_SPIN, 0, event)
+#define ext4_htree_spin_unlock(lck)		\
+	ext4_htree_node_unlock(lck, EXT4_LB_SPIN, NULL)
+#define ext4_htree_spin_unlock_listen(lck, p)	\
+	ext4_htree_node_unlock(lck, EXT4_LB_SPIN, p)
+
+static void ext4_htree_spin_stop_listen(struct htree_lock *lck)
+{
+	if (!ext4_htree_safe_locked(lck) &&
+	    htree_node_is_listening(lck, ffz(~EXT4_LB_SPIN)))
+		htree_node_stop_listen(lck, ffz(~EXT4_LB_SPIN));
+}
+
+enum {
+	DX_HASH_COL_IGNORE,	/* ignore collision while probing frames */
+	DX_HASH_COL_YES,	/* there is collision and it does matter */
+	DX_HASH_COL_NO,	/* there is no collision */
+};
+
+static int dx_probe_hash_collision(struct htree_lock *lck,
+				   struct dx_entry *entries,
+				   struct dx_entry *at, u32 hash)
+{
+	if (!(lck && ext4_htree_lock_data(lck)->ld_flags & EXT4_LB_EXACT)) {
+		return DX_HASH_COL_IGNORE; /* don't care about collision */
+	} else if (at == entries + dx_get_count(entries) - 1) {
+		return DX_HASH_COL_IGNORE; /* not in any leaf of this DX */
+	} else { /* hash collision? */
+		return ((dx_get_hash(at + 1) & ~1) == hash) ?
+			DX_HASH_COL_YES : DX_HASH_COL_NO;
+	}
+}
+
 /*
  * Probe for a directory leaf block to search.
  *
@@ -745,10 +977,11 @@  struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
  */
 static struct dx_frame *
 dx_probe(struct ext4_filename *fname, struct inode *dir,
-	 struct dx_hash_info *hinfo, struct dx_frame *frame_in)
+	 struct dx_hash_info *hinfo, struct dx_frame *frame_in,
+	 struct htree_lock *lck)
 {
 	unsigned count, indirect;
-	struct dx_entry *at, *entries, *p, *q, *m;
+	struct dx_entry *at, *entries, *p, *q, *m, *dx = NULL;
 	struct dx_root_info *info;
 	struct dx_frame *frame = frame_in;
 	struct dx_frame *ret_err = ERR_PTR(ERR_BAD_DX_DIR);
@@ -811,6 +1044,13 @@  struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 
 	dxtrace(printk("Look up %x", hash));
 	while (1) {
+		if (indirect == 0) { /* the last index level */
+			/* NB: ext4_htree_dx_lock() could be noop if
+			 * DX-lock flag is not set for current operation
+			 */
+			ext4_htree_dx_lock(lck, dx);
+			ext4_htree_spin_lock(lck, dx, NULL);
+		}
 		count = dx_get_count(entries);
 		if (!count || count > dx_get_limit(entries)) {
 			ext4_warning_inode(dir,
@@ -851,8 +1091,75 @@  struct stats dx_show_entries(struct dx_hash_info *hinfo, struct inode *dir,
 			       dx_get_block(at)));
 		frame->entries = entries;
 		frame->at = at;
-		if (!indirect--)
+
+		if (indirect == 0) { /* the last index level */
+			struct ext4_dir_lock_data *ld;
+			u64 myblock;
+
+			/* By default we only lock DE-block, however, we will
+			 * also lock the last level DX-block if:
+			 * a) there is hash collision
+			 *    we will set DX-lock flag (a few lines below)
+			 *    and redo to lock DX-block
+			 *    see detail in dx_probe_hash_collision()
+			 * b) it's a retry from splitting
+			 *    we need to lock the last level DX-block so nobody
+			 *    else can split any leaf blocks under the same
+			 *    DX-block, see detail in ext4_dx_add_entry()
+			 */
+			if (ext4_htree_dx_locked(lck)) {
+				/* DX-block is locked, just lock DE-block
+				 * and return
+				 */
+				ext4_htree_spin_unlock(lck);
+				if (!ext4_htree_safe_locked(lck))
+					ext4_htree_de_lock(lck, frame->at);
+				return frame;
+			}
+			/* it's pdirop and no DX lock */
+			if (dx_probe_hash_collision(lck, entries, at, hash) ==
+			    DX_HASH_COL_YES) {
+				/* found hash collision, set DX-lock flag
+				 * and retry to abtain DX-lock
+				 */
+				ext4_htree_spin_unlock(lck);
+				ext4_htree_dx_need_lock(lck);
+				continue;
+			}
+			ld = ext4_htree_lock_data(lck);
+			/* because I don't lock DX, so @at can't be trusted
+			 * after I release spinlock so I have to save it
+			 */
+			ld->ld_at = at;
+			ld->ld_at_entry = *at;
+			ld->ld_count = dx_get_count(entries);
+
+			frame->at = &ld->ld_at_entry;
+			myblock = dx_get_block(at);
+
+			/* NB: ordering locking */
+			ext4_htree_spin_unlock_listen(lck, &myblock);
+			/* other thread can split this DE-block because:
+			 * a) I don't have lock for the DE-block yet
+			 * b) I released spinlock on DX-block
+			 * if it happened I can detect it by listening
+			 * splitting event on this DE-block
+			 */
+			ext4_htree_de_lock(lck, frame->at);
+			ext4_htree_spin_stop_listen(lck);
+
+			if (myblock == EXT4_HTREE_NODE_CHANGED) {
+				/* someone split this DE-block before
+				 * I locked it, I need to retry and lock
+				 * valid DE-block
+				 */
+				ext4_htree_de_unlock(lck);
+				continue;
+			}
 			return frame;
+		}
+		dx = at;
+		indirect--;
 		frame++;
 		frame->bh = ext4_read_dirblock(dir, dx_get_block(at), INDEX);
 		if (IS_ERR(frame->bh)) {
@@ -921,7 +1228,7 @@  static void dx_release(struct dx_frame *frames)
 static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 				 struct dx_frame *frame,
 				 struct dx_frame *frames,
-				 __u32 *start_hash)
+				 u32 *start_hash, struct htree_lock *lck)
 {
 	struct dx_frame *p;
 	struct buffer_head *bh;
@@ -936,12 +1243,23 @@  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 	 * this loop, num_frames indicates the number of interior
 	 * nodes need to be read.
 	 */
+	ext4_htree_de_unlock(lck);
 	while (1) {
-		if (++(p->at) < p->entries + dx_get_count(p->entries))
-			break;
+		if (num_frames > 0 || ext4_htree_dx_locked(lck)) {
+			/* num_frames > 0 :
+			 *   DX block
+			 * ext4_htree_dx_locked:
+			 *   frame->at is reliable pointer returned by dx_probe,
+			 *   otherwise dx_probe already knew no collision
+			 */
+			if (++(p->at) < p->entries + dx_get_count(p->entries))
+				break;
+		}
 		if (p == frames)
 			return 0;
 		num_frames++;
+		if (num_frames == 1)
+			ext4_htree_dx_unlock(lck);
 		p--;
 	}
 
@@ -964,6 +1282,14 @@  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 	 * block so no check is necessary
 	 */
 	while (num_frames--) {
+		if (num_frames == 0) {
+			/* it's not always necessary, we just don't want to
+			 * detect hash collision again
+			 */
+			ext4_htree_dx_need_lock(lck);
+			ext4_htree_dx_lock(lck, p->at);
+		}
+
 		bh = ext4_read_dirblock(dir, dx_get_block(p->at), INDEX);
 		if (IS_ERR(bh))
 			return PTR_ERR(bh);
@@ -972,10 +1298,10 @@  static int ext4_htree_next_block(struct inode *dir, __u32 hash,
 		p->bh = bh;
 		p->at = p->entries = ((struct dx_node *) bh->b_data)->entries;
 	}
+	ext4_htree_de_lock(lck, p->at);
 	return 1;
 }
 
-
 /*
  * This function fills a red-black tree with information from a
  * directory block.  It returns the number directory entries loaded
@@ -1119,7 +1445,8 @@  int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
 	}
 	hinfo.hash = start_hash;
 	hinfo.minor_hash = 0;
-	frame = dx_probe(NULL, dir, &hinfo, frames);
+	/* assume it's PR locked */
+	frame = dx_probe(NULL, dir, &hinfo, frames, NULL);
 	if (IS_ERR(frame))
 		return PTR_ERR(frame);
 
@@ -1162,7 +1489,7 @@  int ext4_htree_fill_tree(struct file *dir_file, __u32 start_hash,
 		count += ret;
 		hashval = ~0;
 		ret = ext4_htree_next_block(dir, HASH_NB_ALWAYS,
-					    frame, frames, &hashval);
+					    frame, frames, &hashval, NULL);
 		*next_hash = hashval;
 		if (ret < 0) {
 			err = ret;
@@ -1400,7 +1727,7 @@  static int is_dx_internal_node(struct inode *dir, ext4_lblk_t block,
 static struct buffer_head *__ext4_find_entry(struct inode *dir,
 					     struct ext4_filename *fname,
 					     struct ext4_dir_entry_2 **res_dir,
-					     int *inlined)
+					     int *inlined, struct htree_lock *lck)
 {
 	struct super_block *sb;
 	struct buffer_head *bh_use[NAMEI_RA_SIZE];
@@ -1442,7 +1769,7 @@  static struct buffer_head *__ext4_find_entry(struct inode *dir,
 		goto restart;
 	}
 	if (is_dx(dir)) {
-		ret = ext4_dx_find_entry(dir, fname, res_dir);
+		ret = ext4_dx_find_entry(dir, fname, res_dir, lck);
 		/*
 		 * On success, or if the error was file not found,
 		 * return.  Otherwise, fall back to doing a search the
@@ -1452,6 +1779,7 @@  static struct buffer_head *__ext4_find_entry(struct inode *dir,
 			goto cleanup_and_exit;
 		dxtrace(printk(KERN_DEBUG "ext4_find_entry: dx failed, "
 			       "falling back\n"));
+		ext4_htree_safe_relock(lck);
 		ret = NULL;
 	}
 	nblocks = dir->i_size >> EXT4_BLOCK_SIZE_BITS(sb);
@@ -1540,10 +1868,10 @@  static struct buffer_head *__ext4_find_entry(struct inode *dir,
 	return ret;
 }
 
-static struct buffer_head *ext4_find_entry(struct inode *dir,
+struct buffer_head *ext4_find_entry_locked(struct inode *dir,
 					   const struct qstr *d_name,
 					   struct ext4_dir_entry_2 **res_dir,
-					   int *inlined)
+					   int *inlined, struct htree_lock *lck)
 {
 	int err;
 	struct ext4_filename fname;
@@ -1555,11 +1883,12 @@  static struct buffer_head *ext4_find_entry(struct inode *dir,
 	if (err)
 		return ERR_PTR(err);
 
-	bh = __ext4_find_entry(dir, &fname, res_dir, inlined);
+	bh = __ext4_find_entry(dir, &fname, res_dir, inlined, lck);
 
 	ext4_fname_free_filename(&fname);
 	return bh;
 }
+EXPORT_SYMBOL(ext4_find_entry_locked);
 
 static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 					     struct dentry *dentry,
@@ -1575,7 +1904,7 @@  static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 	if (err)
 		return ERR_PTR(err);
 
-	bh = __ext4_find_entry(dir, &fname, res_dir, NULL);
+	bh = __ext4_find_entry(dir, &fname, res_dir, NULL, NULL);
 
 	ext4_fname_free_filename(&fname);
 	return bh;
@@ -1583,7 +1912,8 @@  static struct buffer_head *ext4_lookup_entry(struct inode *dir,
 
 static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 			struct ext4_filename *fname,
-			struct ext4_dir_entry_2 **res_dir)
+			struct ext4_dir_entry_2 **res_dir,
+			struct htree_lock *lck)
 {
 	struct super_block * sb = dir->i_sb;
 	struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
@@ -1594,7 +1924,7 @@  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 #ifdef CONFIG_FS_ENCRYPTION
 	*res_dir = NULL;
 #endif
-	frame = dx_probe(fname, dir, NULL, frames);
+	frame = dx_probe(fname, dir, NULL, frames, lck);
 	if (IS_ERR(frame))
 		return (struct buffer_head *) frame;
 	do {
@@ -1616,7 +1946,7 @@  static struct buffer_head * ext4_dx_find_entry(struct inode *dir,
 
 		/* Check to see if we should continue to search */
 		retval = ext4_htree_next_block(dir, fname->hinfo.hash, frame,
-					       frames, NULL);
+					       frames, NULL, lck);
 		if (retval < 0) {
 			ext4_warning_inode(dir,
 				"error %d reading directory index block",
@@ -1799,8 +2129,9 @@  static struct ext4_dir_entry_2* dx_pack_dirents(char *base, unsigned blocksize)
  * Returns pointer to de in block into which the new entry will be inserted.
  */
 static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
-			struct buffer_head **bh,struct dx_frame *frame,
-			struct dx_hash_info *hinfo)
+			struct buffer_head **bh, struct dx_frame *frames,
+			struct dx_frame *frame, struct dx_hash_info *hinfo,
+			struct htree_lock *lck)
 {
 	unsigned blocksize = dir->i_sb->s_blocksize;
 	unsigned count, continued;
@@ -1862,8 +2193,15 @@  static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
 					hash2, split, count-split));
 
 	/* Fancy dance to stay within two buffers */
-	de2 = dx_move_dirents(data1, data2, map + split, count - split,
-			      blocksize);
+	if (hinfo->hash < hash2) {
+		de2 = dx_move_dirents(data1, data2, map + split,
+				      count - split, blocksize);
+	} else {
+		/* make sure we will add entry to the same block which
+		 * we have already locked
+		 */
+		de2 = dx_move_dirents(data1, data2, map, split, blocksize);
+	}
 	de = dx_pack_dirents(data1, blocksize);
 	de->rec_len = ext4_rec_len_to_disk(data1 + (blocksize - csum_size) -
 					   (char *) de,
@@ -1884,12 +2222,20 @@  static struct ext4_dir_entry_2 *do_split(handle_t *handle, struct inode *dir,
 	dxtrace(dx_show_leaf(dir, hinfo, (struct ext4_dir_entry_2 *) data2,
 			blocksize, 1));
 
-	/* Which block gets the new entry? */
-	if (hinfo->hash >= hash2) {
-		swap(*bh, bh2);
-		de = de2;
+	ext4_htree_spin_lock(lck, frame > frames ? (frame - 1)->at : NULL,
+			     frame->at); /* notify block is being split */
+	if (hinfo->hash < hash2) {
+		dx_insert_block(frame, hash2 + continued, newblock);
+	} else {
+		/* switch block number */
+		dx_insert_block(frame, hash2 + continued,
+				dx_get_block(frame->at));
+		dx_set_block(frame->at, newblock);
+		(frame->at)++;
 	}
-	dx_insert_block(frame, hash2 + continued, newblock);
+	ext4_htree_spin_unlock(lck);
+	ext4_htree_dx_unlock(lck);
+
 	err = ext4_handle_dirty_dirent_node(handle, dir, bh2);
 	if (err)
 		goto journal_error;
@@ -2167,7 +2513,7 @@  static int make_indexed_dir(handle_t *handle, struct ext4_filename *fname,
 	if (retval)
 		goto out_frames;	
 
-	de = do_split(handle,dir, &bh2, frame, &fname->hinfo);
+	de = do_split(handle, dir, &bh2, frames, frame, &fname->hinfo, NULL);
 	if (IS_ERR(de)) {
 		retval = PTR_ERR(de);
 		goto out_frames;
@@ -2276,8 +2622,8 @@  static int ext4_update_dotdot(handle_t *handle, struct dentry *dentry,
  * may not sleep between calling this and putting something into
  * the entry, as someone else might have used it while you slept.
  */
-static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
-			  struct inode *inode)
+int ext4_add_entry_locked(handle_t *handle, struct dentry *dentry,
+			  struct inode *inode, struct htree_lock *lck)
 {
 	struct inode *dir = d_inode(dentry->d_parent);
 	struct buffer_head *bh = NULL;
@@ -2326,9 +2672,10 @@  static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
 		if (dentry->d_name.len == 2 &&
 		    memcmp(dentry->d_name.name, "..", 2) == 0)
 			return ext4_update_dotdot(handle, dentry, inode);
-		retval = ext4_dx_add_entry(handle, &fname, dir, inode);
+		retval = ext4_dx_add_entry(handle, &fname, dir, inode, lck);
 		if (!retval || (retval != ERR_BAD_DX_DIR))
 			goto out;
+		ext4_htree_safe_relock(lck);
 		ext4_clear_inode_flag(dir, EXT4_INODE_INDEX);
 		dx_fallback++;
 		ext4_mark_inode_dirty(handle, dir);
@@ -2378,12 +2725,14 @@  static int ext4_add_entry(handle_t *handle, struct dentry *dentry,
 		ext4_set_inode_state(inode, EXT4_STATE_NEWENTRY);
 	return retval;
 }
+EXPORT_SYMBOL(ext4_add_entry_locked);
 
 /*
  * Returns 0 for success, or a negative error value
  */
 static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
-			     struct inode *dir, struct inode *inode)
+			     struct inode *dir, struct inode *inode,
+			     struct htree_lock *lck)
 {
 	struct dx_frame frames[EXT4_HTREE_LEVEL], *frame;
 	struct dx_entry *entries, *at;
@@ -2395,7 +2744,7 @@  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 
 again:
 	restart = 0;
-	frame = dx_probe(fname, dir, NULL, frames);
+	frame = dx_probe(fname, dir, NULL, frames, lck);
 	if (IS_ERR(frame))
 		return PTR_ERR(frame);
 	entries = frame->entries;
@@ -2430,6 +2779,12 @@  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 		struct dx_node *node2;
 		struct buffer_head *bh2;
 
+		if (!ext4_htree_safe_locked(lck)) { /* retry with EX lock */
+			ext4_htree_safe_relock(lck);
+			restart = 1;
+			goto cleanup;
+		}
+
 		while (frame > frames) {
 			if (dx_get_count((frame - 1)->entries) <
 			    dx_get_limit((frame - 1)->entries)) {
@@ -2533,8 +2888,34 @@  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 			restart = 1;
 			goto journal_error;
 		}
+	} else if (!ext4_htree_dx_locked(lck)) {
+		struct ext4_dir_lock_data *ld = ext4_htree_lock_data(lck);
+
+		/* not well protected, require DX lock */
+		ext4_htree_dx_need_lock(lck);
+		at = frame > frames ? (frame - 1)->at : NULL;
+
+		/* NB: no risk of deadlock because it's just a try.
+		 *
+		 * NB: we check ld_count for twice, the first time before
+		 * having DX lock, the second time after holding DX lock.
+		 *
+		 * NB: We never free blocks for directory so far, which
+		 * means value returned by dx_get_count() should equal to
+		 * ld->ld_count if nobody split any DE-block under @at,
+		 * and ld->ld_at still points to valid dx_entry.
+		 */
+		if ((ld->ld_count != dx_get_count(entries)) ||
+		    !ext4_htree_dx_lock_try(lck, at) ||
+		   (ld->ld_count != dx_get_count(entries))) {
+			restart = 1;
+			goto cleanup;
+		}
+		/* OK, I've got DX lock and nothing changed */
+		frame->at = ld->ld_at;
 	}
-	de = do_split(handle, dir, &bh, frame, &fname->hinfo);
+
+	de = do_split(handle, dir, &bh, frames, frame, &fname->hinfo, lck);
 	if (IS_ERR(de)) {
 		err = PTR_ERR(de);
 		goto cleanup;
@@ -2545,6 +2926,8 @@  static int ext4_dx_add_entry(handle_t *handle, struct ext4_filename *fname,
 journal_error:
 	ext4_std_error(dir->i_sb, err); /* this is a no-op if err == 0 */
 cleanup:
+	ext4_htree_dx_unlock(lck);
+	ext4_htree_de_unlock(lck);
 	brelse(bh);
 	dx_release(frames);
 	/* @restart is true means htree-path has been changed, we need to
diff --git a/fs/ext4/super.c b/fs/ext4/super.c
index 564eb35..07242d7 100644
--- a/fs/ext4/super.c
+++ b/fs/ext4/super.c
@@ -1077,6 +1077,7 @@  static struct inode *ext4_alloc_inode(struct super_block *sb)
 		return NULL;
 
 	inode_set_iversion(&ei->vfs_inode, 1);
+	sema_init(&ei->i_append_sem, 1);
 	spin_lock_init(&ei->i_raw_lock);
 	INIT_LIST_HEAD(&ei->i_prealloc_list);
 	spin_lock_init(&ei->i_prealloc_lock);