diff mbox

libceph: apply new_state before new_up_client on incrementals

Message ID 1468963597-10667-1-git-send-email-idryomov@gmail.com (mailing list archive)
State New, archived
Headers show

Commit Message

Ilya Dryomov July 19, 2016, 9:26 p.m. UTC
Currently, osd_weight and osd_state fields are updated in the encoding
order.  This is wrong, because an incremental map may look like e.g.

    new_up_client: { osd=6, addr=... } # set osd_state and addr
    new_state: { osd=6, xorstate=EXISTS } # clear osd_state

Suppose osd6's current osd_state is EXISTS (i.e. osd6 is down).  After
applying new_up_client, osd_state is changed to EXISTS | UP.  Carrying
on with the new_state update, we flip EXISTS and leave osd6 in a weird
"!EXISTS but UP" state.  A non-existent OSD is considered down by the
mapping code

2087    for (i = 0; i < pg->pg_temp.len; i++) {
2088            if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
2089                    if (ceph_can_shift_osds(pi))
2090                            continue;
2091
2092                    temp->osds[temp->size++] = CRUSH_ITEM_NONE;

and so requests get directed to the second OSD in the set instead of
the first, resulting in OSD-side errors like:

[WRN] : client.4239 192.168.122.21:0/2444980242 misdirected client.4239.1:2827 pg 2.5df899f2 to osd.4 not [1,4,6] in e680/680

and hung rbds on the client:

[  493.566367] rbd: rbd0: write 400000 at 11cc00000 (0)
[  493.566805] rbd: rbd0:   result -6 xferred 400000
[  493.567011] blk_update_request: I/O error, dev rbd0, sector 9330688

The fix is to decouple application from the decoding and:
- apply new_weight first
- apply new_state before new_up_client
- twiddle osd_state flags if marking in
- clear out some of the state if osd is destroyed

Fixes: http://tracker.ceph.com/issues/14901

Cc: stable@vger.kernel.org # 3.15+: 6dd74e44dc1d: libceph: set 'exists' flag for newly up osd
Cc: stable@vger.kernel.org # 3.15+
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 net/ceph/osdmap.c | 156 +++++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 113 insertions(+), 43 deletions(-)

Comments

Josh Durgin July 21, 2016, 12:04 a.m. UTC | #1
On 07/19/2016 02:26 PM, Ilya Dryomov wrote:
> Currently, osd_weight and osd_state fields are updated in the encoding
> order.  This is wrong, because an incremental map may look like e.g.
>
>      new_up_client: { osd=6, addr=... } # set osd_state and addr
>      new_state: { osd=6, xorstate=EXISTS } # clear osd_state
>
> Suppose osd6's current osd_state is EXISTS (i.e. osd6 is down).  After
> applying new_up_client, osd_state is changed to EXISTS | UP.  Carrying
> on with the new_state update, we flip EXISTS and leave osd6 in a weird
> "!EXISTS but UP" state.  A non-existent OSD is considered down by the
> mapping code
>
> 2087    for (i = 0; i < pg->pg_temp.len; i++) {
> 2088            if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
> 2089                    if (ceph_can_shift_osds(pi))
> 2090                            continue;
> 2091
> 2092                    temp->osds[temp->size++] = CRUSH_ITEM_NONE;
>
> and so requests get directed to the second OSD in the set instead of
> the first, resulting in OSD-side errors like:
>
> [WRN] : client.4239 192.168.122.21:0/2444980242 misdirected client.4239.1:2827 pg 2.5df899f2 to osd.4 not [1,4,6] in e680/680
>
> and hung rbds on the client:
>
> [  493.566367] rbd: rbd0: write 400000 at 11cc00000 (0)
> [  493.566805] rbd: rbd0:   result -6 xferred 400000
> [  493.567011] blk_update_request: I/O error, dev rbd0, sector 9330688
>
> The fix is to decouple application from the decoding and:
> - apply new_weight first
> - apply new_state before new_up_client
> - twiddle osd_state flags if marking in
> - clear out some of the state if osd is destroyed
>
> Fixes: http://tracker.ceph.com/issues/14901
>
> Cc: stable@vger.kernel.org # 3.15+: 6dd74e44dc1d: libceph: set 'exists' flag for newly up osd
> Cc: stable@vger.kernel.org # 3.15+
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>   net/ceph/osdmap.c | 156 +++++++++++++++++++++++++++++++++++++++---------------
>   1 file changed, 113 insertions(+), 43 deletions(-)

This matches userspace's incremental handling now. Looks good.

Reviewed-by: Josh Durgin <jdurgin@redhat.com>

>
> diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
> index 03062bb763b3..7e480bf75bcf 100644
> --- a/net/ceph/osdmap.c
> +++ b/net/ceph/osdmap.c
> @@ -1261,6 +1261,115 @@ struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
>   }
>
>   /*
> + * Encoding order is (new_up_client, new_state, new_weight).  Need to
> + * apply in the (new_weight, new_state, new_up_client) order, because
> + * an incremental map may look like e.g.
> + *
> + *     new_up_client: { osd=6, addr=... } # set osd_state and addr
> + *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
> + */
> +static int decode_new_up_state_weight(void **p, void *end,
> +				      struct ceph_osdmap *map)
> +{
> +	void *new_up_client;
> +	void *new_state;
> +	void *new_weight_end;
> +	u32 len;
> +
> +	new_up_client = *p;
> +	ceph_decode_32_safe(p, end, len, e_inval);
> +	len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
> +	ceph_decode_need(p, end, len, e_inval);
> +	*p += len;
> +
> +	new_state = *p;
> +	ceph_decode_32_safe(p, end, len, e_inval);
> +	len *= sizeof(u32) + sizeof(u8);
> +	ceph_decode_need(p, end, len, e_inval);
> +	*p += len;
> +
> +	/* new_weight */
> +	ceph_decode_32_safe(p, end, len, e_inval);
> +	while (len--) {
> +		s32 osd;
> +		u32 w;
> +
> +		ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
> +		osd = ceph_decode_32(p);
> +		w = ceph_decode_32(p);
> +		BUG_ON(osd >= map->max_osd);
> +		pr_info("osd%d weight 0x%x %s\n", osd, w,
> +		     w == CEPH_OSD_IN ? "(in)" :
> +		     (w == CEPH_OSD_OUT ? "(out)" : ""));
> +		map->osd_weight[osd] = w;
> +
> +		/*
> +		 * If we are marking in, set the EXISTS, and clear the
> +		 * AUTOOUT and NEW bits.
> +		 */
> +		if (w) {
> +			map->osd_state[osd] |= CEPH_OSD_EXISTS;
> +			map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
> +						 CEPH_OSD_NEW);
> +		}
> +	}
> +	new_weight_end = *p;
> +
> +	/* new_state (up/down) */
> +	*p = new_state;
> +	len = ceph_decode_32(p);
> +	while (len--) {
> +		s32 osd;
> +		u8 xorstate;
> +		int ret;
> +
> +		osd = ceph_decode_32(p);
> +		xorstate = ceph_decode_8(p);
> +		if (xorstate == 0)
> +			xorstate = CEPH_OSD_UP;
> +		BUG_ON(osd >= map->max_osd);
> +		if ((map->osd_state[osd] & CEPH_OSD_UP) &&
> +		    (xorstate & CEPH_OSD_UP))
> +			pr_info("osd%d down\n", osd);
> +		if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
> +		    (xorstate & CEPH_OSD_EXISTS)) {
> +			pr_info("osd%d does not exist\n", osd);
> +			map->osd_weight[osd] = CEPH_OSD_IN;
> +			ret = set_primary_affinity(map, osd,
> +						   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
> +			if (ret)
> +				return ret;
> +			memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
> +			map->osd_state[osd] = 0;
> +		} else {
> +			map->osd_state[osd] ^= xorstate;
> +		}
> +	}
> +
> +	/* new_up_client */
> +	*p = new_up_client;
> +	len = ceph_decode_32(p);
> +	while (len--) {
> +		s32 osd;
> +		struct ceph_entity_addr addr;
> +
> +		osd = ceph_decode_32(p);
> +		ceph_decode_copy(p, &addr, sizeof(addr));
> +		ceph_decode_addr(&addr);
> +		BUG_ON(osd >= map->max_osd);
> +		pr_info("osd%d up\n", osd);
> +		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
> +		map->osd_addr[osd] = addr;
> +	}
> +
> +	*p = new_weight_end;
> +	return 0;
> +
> +e_inval:
> +	return -EINVAL;
> +}
> +
> +/*
>    * decode and apply an incremental map update.
>    */
>   struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
> @@ -1358,49 +1467,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
>   			__remove_pg_pool(&map->pg_pools, pi);
>   	}
>
> -	/* new_up */
> -	ceph_decode_32_safe(p, end, len, e_inval);
> -	while (len--) {
> -		u32 osd;
> -		struct ceph_entity_addr addr;
> -		ceph_decode_32_safe(p, end, osd, e_inval);
> -		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);
> -		ceph_decode_addr(&addr);
> -		pr_info("osd%d up\n", osd);
> -		BUG_ON(osd >= map->max_osd);
> -		map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS;
> -		map->osd_addr[osd] = addr;
> -	}
> -
> -	/* new_state */
> -	ceph_decode_32_safe(p, end, len, e_inval);
> -	while (len--) {
> -		u32 osd;
> -		u8 xorstate;
> -		ceph_decode_32_safe(p, end, osd, e_inval);
> -		xorstate = **(u8 **)p;
> -		(*p)++;  /* clean flag */
> -		if (xorstate == 0)
> -			xorstate = CEPH_OSD_UP;
> -		if (xorstate & CEPH_OSD_UP)
> -			pr_info("osd%d down\n", osd);
> -		if (osd < map->max_osd)
> -			map->osd_state[osd] ^= xorstate;
> -	}
> -
> -	/* new_weight */
> -	ceph_decode_32_safe(p, end, len, e_inval);
> -	while (len--) {
> -		u32 osd, off;
> -		ceph_decode_need(p, end, sizeof(u32)*2, e_inval);
> -		osd = ceph_decode_32(p);
> -		off = ceph_decode_32(p);
> -		pr_info("osd%d weight 0x%x %s\n", osd, off,
> -		     off == CEPH_OSD_IN ? "(in)" :
> -		     (off == CEPH_OSD_OUT ? "(out)" : ""));
> -		if (osd < map->max_osd)
> -			map->osd_weight[osd] = off;
> -	}
> +	/* new_up_client, new_state, new_weight */
> +	err = decode_new_up_state_weight(p, end, map);
> +	if (err)
> +		goto bad;
>
>   	/* new_pg_temp */
>   	err = decode_new_pg_temp(p, end, map);
>

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Ilya Dryomov July 21, 2016, 3:27 p.m. UTC | #2
On Thu, Jul 21, 2016 at 2:04 AM, Josh Durgin <jdurgin@redhat.com> wrote:
> On 07/19/2016 02:26 PM, Ilya Dryomov wrote:
>>
>> Currently, osd_weight and osd_state fields are updated in the encoding
>> order.  This is wrong, because an incremental map may look like e.g.
>>
>>      new_up_client: { osd=6, addr=... } # set osd_state and addr
>>      new_state: { osd=6, xorstate=EXISTS } # clear osd_state
>>
>> Suppose osd6's current osd_state is EXISTS (i.e. osd6 is down).  After
>> applying new_up_client, osd_state is changed to EXISTS | UP.  Carrying
>> on with the new_state update, we flip EXISTS and leave osd6 in a weird
>> "!EXISTS but UP" state.  A non-existent OSD is considered down by the
>> mapping code
>>
>> 2087    for (i = 0; i < pg->pg_temp.len; i++) {
>> 2088            if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
>> 2089                    if (ceph_can_shift_osds(pi))
>> 2090                            continue;
>> 2091
>> 2092                    temp->osds[temp->size++] = CRUSH_ITEM_NONE;
>>
>> and so requests get directed to the second OSD in the set instead of
>> the first, resulting in OSD-side errors like:
>>
>> [WRN] : client.4239 192.168.122.21:0/2444980242 misdirected
>> client.4239.1:2827 pg 2.5df899f2 to osd.4 not [1,4,6] in e680/680
>>
>> and hung rbds on the client:
>>
>> [  493.566367] rbd: rbd0: write 400000 at 11cc00000 (0)
>> [  493.566805] rbd: rbd0:   result -6 xferred 400000
>> [  493.567011] blk_update_request: I/O error, dev rbd0, sector 9330688
>>
>> The fix is to decouple application from the decoding and:
>> - apply new_weight first
>> - apply new_state before new_up_client
>> - twiddle osd_state flags if marking in
>> - clear out some of the state if osd is destroyed
>>
>> Fixes: http://tracker.ceph.com/issues/14901
>>
>> Cc: stable@vger.kernel.org # 3.15+: 6dd74e44dc1d: libceph: set 'exists'
>> flag for newly up osd
>> Cc: stable@vger.kernel.org # 3.15+
>> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
>> ---
>>   net/ceph/osdmap.c | 156
>> +++++++++++++++++++++++++++++++++++++++---------------
>>   1 file changed, 113 insertions(+), 43 deletions(-)
>
>
> This matches userspace's incremental handling now. Looks good.

Technically, no.  new_primary_affinity is decoded along with new_weight
in userspace and crushmap is applied at the very end, after weights and
states are applied.  AFAICT crushmap is applied last only because of
the userspace-only adjust_osd_weights(), which needs to have the new
weights.

As for primary-affinity, the following

    new_up_client: { osd=X, addr=... }
    new_state: { osd=X, xorstate=EXISTS }
    new_primary_affinity: { osd=X, aff=Y }

would be misinterpreted - the kernel client would stay on aff=Y instead
of aff=1 (reset), but I don't see a (obvious, at least) way to get such
a map, and, as I wanted this to be as small as possible for backporting,
I punted on that.  Let me know if you see otherwise.

Thanks,

                Ilya
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Josh Durgin July 21, 2016, 7:22 p.m. UTC | #3
On 07/21/2016 08:27 AM, Ilya Dryomov wrote:
> On Thu, Jul 21, 2016 at 2:04 AM, Josh Durgin <jdurgin@redhat.com> wrote:
>> On 07/19/2016 02:26 PM, Ilya Dryomov wrote:
>>>
>>> Currently, osd_weight and osd_state fields are updated in the encoding
>>> order.  This is wrong, because an incremental map may look like e.g.
>>>
>>>       new_up_client: { osd=6, addr=... } # set osd_state and addr
>>>       new_state: { osd=6, xorstate=EXISTS } # clear osd_state
>>>
>>> Suppose osd6's current osd_state is EXISTS (i.e. osd6 is down).  After
>>> applying new_up_client, osd_state is changed to EXISTS | UP.  Carrying
>>> on with the new_state update, we flip EXISTS and leave osd6 in a weird
>>> "!EXISTS but UP" state.  A non-existent OSD is considered down by the
>>> mapping code
>>>
>>> 2087    for (i = 0; i < pg->pg_temp.len; i++) {
>>> 2088            if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
>>> 2089                    if (ceph_can_shift_osds(pi))
>>> 2090                            continue;
>>> 2091
>>> 2092                    temp->osds[temp->size++] = CRUSH_ITEM_NONE;
>>>
>>> and so requests get directed to the second OSD in the set instead of
>>> the first, resulting in OSD-side errors like:
>>>
>>> [WRN] : client.4239 192.168.122.21:0/2444980242 misdirected
>>> client.4239.1:2827 pg 2.5df899f2 to osd.4 not [1,4,6] in e680/680
>>>
>>> and hung rbds on the client:
>>>
>>> [  493.566367] rbd: rbd0: write 400000 at 11cc00000 (0)
>>> [  493.566805] rbd: rbd0:   result -6 xferred 400000
>>> [  493.567011] blk_update_request: I/O error, dev rbd0, sector 9330688
>>>
>>> The fix is to decouple application from the decoding and:
>>> - apply new_weight first
>>> - apply new_state before new_up_client
>>> - twiddle osd_state flags if marking in
>>> - clear out some of the state if osd is destroyed
>>>
>>> Fixes: http://tracker.ceph.com/issues/14901
>>>
>>> Cc: stable@vger.kernel.org # 3.15+: 6dd74e44dc1d: libceph: set 'exists'
>>> flag for newly up osd
>>> Cc: stable@vger.kernel.org # 3.15+
>>> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
>>> ---
>>>    net/ceph/osdmap.c | 156
>>> +++++++++++++++++++++++++++++++++++++++---------------
>>>    1 file changed, 113 insertions(+), 43 deletions(-)
>>
>>
>> This matches userspace's incremental handling now. Looks good.
>
> Technically, no.  new_primary_affinity is decoded along with new_weight
> in userspace and crushmap is applied at the very end, after weights and
> states are applied.  AFAICT crushmap is applied last only because of
> the userspace-only adjust_osd_weights(), which needs to have the new
> weights.
>
> As for primary-affinity, the following
>
>      new_up_client: { osd=X, addr=... }
>      new_state: { osd=X, xorstate=EXISTS }
>      new_primary_affinity: { osd=X, aff=Y }
>
> would be misinterpreted - the kernel client would stay on aff=Y instead
> of aff=1 (reset), but I don't see a (obvious, at least) way to get such
> a map, and, as I wanted this to be as small as possible for backporting,
> I punted on that.  Let me know if you see otherwise.

Yeah, I also see no way for this to happen, and it isn't likely to be
needed in the future either. So keeping this update small sounds good
to me.

Josh

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 03062bb763b3..7e480bf75bcf 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1261,6 +1261,115 @@  struct ceph_osdmap *ceph_osdmap_decode(void **p, void *end)
 }
 
 /*
+ * Encoding order is (new_up_client, new_state, new_weight).  Need to
+ * apply in the (new_weight, new_state, new_up_client) order, because
+ * an incremental map may look like e.g.
+ *
+ *     new_up_client: { osd=6, addr=... } # set osd_state and addr
+ *     new_state: { osd=6, xorstate=EXISTS } # clear osd_state
+ */
+static int decode_new_up_state_weight(void **p, void *end,
+				      struct ceph_osdmap *map)
+{
+	void *new_up_client;
+	void *new_state;
+	void *new_weight_end;
+	u32 len;
+
+	new_up_client = *p;
+	ceph_decode_32_safe(p, end, len, e_inval);
+	len *= sizeof(u32) + sizeof(struct ceph_entity_addr);
+	ceph_decode_need(p, end, len, e_inval);
+	*p += len;
+
+	new_state = *p;
+	ceph_decode_32_safe(p, end, len, e_inval);
+	len *= sizeof(u32) + sizeof(u8);
+	ceph_decode_need(p, end, len, e_inval);
+	*p += len;
+
+	/* new_weight */
+	ceph_decode_32_safe(p, end, len, e_inval);
+	while (len--) {
+		s32 osd;
+		u32 w;
+
+		ceph_decode_need(p, end, 2*sizeof(u32), e_inval);
+		osd = ceph_decode_32(p);
+		w = ceph_decode_32(p);
+		BUG_ON(osd >= map->max_osd);
+		pr_info("osd%d weight 0x%x %s\n", osd, w,
+		     w == CEPH_OSD_IN ? "(in)" :
+		     (w == CEPH_OSD_OUT ? "(out)" : ""));
+		map->osd_weight[osd] = w;
+
+		/*
+		 * If we are marking in, set the EXISTS, and clear the
+		 * AUTOOUT and NEW bits.
+		 */
+		if (w) {
+			map->osd_state[osd] |= CEPH_OSD_EXISTS;
+			map->osd_state[osd] &= ~(CEPH_OSD_AUTOOUT |
+						 CEPH_OSD_NEW);
+		}
+	}
+	new_weight_end = *p;
+
+	/* new_state (up/down) */
+	*p = new_state;
+	len = ceph_decode_32(p);
+	while (len--) {
+		s32 osd;
+		u8 xorstate;
+		int ret;
+
+		osd = ceph_decode_32(p);
+		xorstate = ceph_decode_8(p);
+		if (xorstate == 0)
+			xorstate = CEPH_OSD_UP;
+		BUG_ON(osd >= map->max_osd);
+		if ((map->osd_state[osd] & CEPH_OSD_UP) &&
+		    (xorstate & CEPH_OSD_UP))
+			pr_info("osd%d down\n", osd);
+		if ((map->osd_state[osd] & CEPH_OSD_EXISTS) &&
+		    (xorstate & CEPH_OSD_EXISTS)) {
+			pr_info("osd%d does not exist\n", osd);
+			map->osd_weight[osd] = CEPH_OSD_IN;
+			ret = set_primary_affinity(map, osd,
+						   CEPH_OSD_DEFAULT_PRIMARY_AFFINITY);
+			if (ret)
+				return ret;
+			memset(map->osd_addr + osd, 0, sizeof(*map->osd_addr));
+			map->osd_state[osd] = 0;
+		} else {
+			map->osd_state[osd] ^= xorstate;
+		}
+	}
+
+	/* new_up_client */
+	*p = new_up_client;
+	len = ceph_decode_32(p);
+	while (len--) {
+		s32 osd;
+		struct ceph_entity_addr addr;
+
+		osd = ceph_decode_32(p);
+		ceph_decode_copy(p, &addr, sizeof(addr));
+		ceph_decode_addr(&addr);
+		BUG_ON(osd >= map->max_osd);
+		pr_info("osd%d up\n", osd);
+		map->osd_state[osd] |= CEPH_OSD_EXISTS | CEPH_OSD_UP;
+		map->osd_addr[osd] = addr;
+	}
+
+	*p = new_weight_end;
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+/*
  * decode and apply an incremental map update.
  */
 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
@@ -1358,49 +1467,10 @@  struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 			__remove_pg_pool(&map->pg_pools, pi);
 	}
 
-	/* new_up */
-	ceph_decode_32_safe(p, end, len, e_inval);
-	while (len--) {
-		u32 osd;
-		struct ceph_entity_addr addr;
-		ceph_decode_32_safe(p, end, osd, e_inval);
-		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), e_inval);
-		ceph_decode_addr(&addr);
-		pr_info("osd%d up\n", osd);
-		BUG_ON(osd >= map->max_osd);
-		map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS;
-		map->osd_addr[osd] = addr;
-	}
-
-	/* new_state */
-	ceph_decode_32_safe(p, end, len, e_inval);
-	while (len--) {
-		u32 osd;
-		u8 xorstate;
-		ceph_decode_32_safe(p, end, osd, e_inval);
-		xorstate = **(u8 **)p;
-		(*p)++;  /* clean flag */
-		if (xorstate == 0)
-			xorstate = CEPH_OSD_UP;
-		if (xorstate & CEPH_OSD_UP)
-			pr_info("osd%d down\n", osd);
-		if (osd < map->max_osd)
-			map->osd_state[osd] ^= xorstate;
-	}
-
-	/* new_weight */
-	ceph_decode_32_safe(p, end, len, e_inval);
-	while (len--) {
-		u32 osd, off;
-		ceph_decode_need(p, end, sizeof(u32)*2, e_inval);
-		osd = ceph_decode_32(p);
-		off = ceph_decode_32(p);
-		pr_info("osd%d weight 0x%x %s\n", osd, off,
-		     off == CEPH_OSD_IN ? "(in)" :
-		     (off == CEPH_OSD_OUT ? "(out)" : ""));
-		if (osd < map->max_osd)
-			map->osd_weight[osd] = off;
-	}
+	/* new_up_client, new_state, new_weight */
+	err = decode_new_up_state_weight(p, end, map);
+	if (err)
+		goto bad;
 
 	/* new_pg_temp */
 	err = decode_new_pg_temp(p, end, map);