diff mbox series

[net,1/8] amt: use workqueue for gateway side message handling

Message ID 20220712105714.12282-2-ap420073@gmail.com (mailing list archive)
State Superseded
Delegated to: Netdev Maintainers
Headers show
Series amt: fix validation and synchronization bugs | expand

Checks

Context Check Description
netdev/tree_selection success Clearly marked for net
netdev/fixes_present success Fixes tag present in non-next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1 this patch: 1
netdev/cc_maintainers success CCed 6 of 6 maintainers
netdev/build_clang success Errors and warnings before: 0 this patch: 0
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success Fixes tag looks correct
netdev/build_allmodconfig_warn success Errors and warnings before: 1 this patch: 1
netdev/checkpatch success total: 0 errors, 0 warnings, 0 checks, 296 lines checked
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0

Commit Message

Taehee Yoo July 12, 2022, 10:57 a.m. UTC
There are some synchronization issues(amt->status, amt->req_cnt, etc)
if the interface is in gateway mode because gateway message handlers
are processed concurrently.
This applies a work queue for processing these messages instead of
expanding the locking context.

So, the purposes of this patch are to fix exist race conditions and to make
gateway to be able to validate a gateway status more correctly.

When the AMT gateway interface is created, it tries to establish to relay.
The establishment step looks stateless, but it should be managed well.
In order to handle messages in the gateway, it saves the current
status(i.e. AMT_STATUS_XXX).
This patch makes gateway code to be worked with a single thread.

Now, all messages except the multicast are triggered(received or
delay expired), and these messages will be stored in the event
queue(amt->events).
Then, the single worker processes stored messages asynchronously one
by one.
The multicast data message type will be still processed immediately.

Now, amt->lock is only needed to access the event queue(amt->events)
if an interface is the gateway mode.

Fixes: cbc21dc1cfe9 ("amt: add data plane of amt interface")
Signed-off-by: Taehee Yoo <ap420073@gmail.com>
---
 drivers/net/amt.c | 158 +++++++++++++++++++++++++++++++++++++++++-----
 include/net/amt.h |  20 ++++++
 2 files changed, 163 insertions(+), 15 deletions(-)

Comments

Jakub Kicinski July 14, 2022, 3:55 a.m. UTC | #1
On Tue, 12 Jul 2022 10:57:07 +0000 Taehee Yoo wrote:
> @@ -2392,12 +2429,14 @@ static bool amt_membership_query_handler(struct amt_dev *amt,
>  	skb->pkt_type = PACKET_MULTICAST;
>  	skb->ip_summed = CHECKSUM_NONE;
>  	len = skb->len;
> +	rcu_read_lock_bh();
>  	if (__netif_rx(skb) == NET_RX_SUCCESS) {
>  		amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
>  		dev_sw_netstats_rx_add(amt->dev, len);
>  	} else {
>  		amt->dev->stats.rx_dropped++;
>  	}
> +	rcu_read_unlock_bh();
>  
>  	return false;
>  }

The RCU lock addition looks potentially unrelated?

> @@ -2892,10 +3007,21 @@ static int amt_dev_stop(struct net_device *dev)
>  	struct amt_dev *amt = netdev_priv(dev);
>  	struct amt_tunnel_list *tunnel, *tmp;
>  	struct socket *sock;
> +	struct sk_buff *skb;
> +	int i;
>  
>  	cancel_delayed_work_sync(&amt->req_wq);
>  	cancel_delayed_work_sync(&amt->discovery_wq);
>  	cancel_delayed_work_sync(&amt->secret_wq);
> +	cancel_work_sync(&amt->event_wq);

Are you sure the work will not get scheduled again?
What has stopped packet Rx at this point?

> +	for (i = 0; i < AMT_MAX_EVENTS; i++) {
> +		skb = amt->events[i].skb;
> +		if (skb)
> +			kfree_skb(skb);
> +		amt->events[i].event = AMT_EVENT_NONE;
> +		amt->events[i].skb = NULL;
> +	}
> 
>  	/* shutdown */
>  	sock = rtnl_dereference(amt->sock);
> @@ -3051,6 +3177,8 @@ static int amt_newlink(struct net *net, struct net_device *dev,
>  		amt->max_tunnels = AMT_MAX_TUNNELS;
>  
>  	spin_lock_init(&amt->lock);
> +	amt->event_idx = 0;
> +	amt->nr_events = 0;

no need to init member of netdev_priv() to 0, it's zalloc'ed

>  	amt->max_groups = AMT_MAX_GROUP;
>  	amt->max_sources = AMT_MAX_SOURCE;
>  	amt->hash_buckets = AMT_HSIZE;
Paolo Abeni July 14, 2022, 8:09 a.m. UTC | #2
On Tue, 2022-07-12 at 10:57 +0000, Taehee Yoo wrote:
> There are some synchronization issues(amt->status, amt->req_cnt, etc)
> if the interface is in gateway mode because gateway message handlers
> are processed concurrently.
> This applies a work queue for processing these messages instead of
> expanding the locking context.
> 
> So, the purposes of this patch are to fix exist race conditions and to make
> gateway to be able to validate a gateway status more correctly.
> 
> When the AMT gateway interface is created, it tries to establish to relay.
> The establishment step looks stateless, but it should be managed well.
> In order to handle messages in the gateway, it saves the current
> status(i.e. AMT_STATUS_XXX).
> This patch makes gateway code to be worked with a single thread.
> 
> Now, all messages except the multicast are triggered(received or
> delay expired), and these messages will be stored in the event
> queue(amt->events).
> Then, the single worker processes stored messages asynchronously one
> by one.
> The multicast data message type will be still processed immediately.
> 
> Now, amt->lock is only needed to access the event queue(amt->events)
> if an interface is the gateway mode.
> 
> Fixes: cbc21dc1cfe9 ("amt: add data plane of amt interface")
> Signed-off-by: Taehee Yoo <ap420073@gmail.com>
> ---
>  drivers/net/amt.c | 158 +++++++++++++++++++++++++++++++++++++++++-----
>  include/net/amt.h |  20 ++++++
>  2 files changed, 163 insertions(+), 15 deletions(-)
> 
> diff --git a/drivers/net/amt.c b/drivers/net/amt.c
> index be2719a3ba70..032c2934e466 100644
> --- a/drivers/net/amt.c
> +++ b/drivers/net/amt.c
> @@ -900,6 +900,28 @@ static void amt_send_mld_gq(struct amt_dev *amt, struct amt_tunnel_list *tunnel)
>  }
>  #endif
>  
> +static bool amt_queue_events(struct amt_dev *amt, enum amt_event event,
> +			     struct sk_buff *skb)
> +{
> +	int index;
> +
> +	spin_lock_bh(&amt->lock);
> +	if (amt->nr_events >= AMT_MAX_EVENTS) {
> +		spin_unlock_bh(&amt->lock);
> +		return 1;
> +	}
> +
> +	index = (amt->event_idx + amt->nr_events) % AMT_MAX_EVENTS;
> +	amt->events[index].event = event;
> +	amt->events[index].skb = skb;
> +	amt->nr_events++;
> +	amt->event_idx %= AMT_MAX_EVENTS;
> +	queue_work(amt_wq, &amt->event_wq);
> +	spin_unlock_bh(&amt->lock);
> +
> +	return 0;
> +}
> +
>  static void amt_secret_work(struct work_struct *work)
>  {
>  	struct amt_dev *amt = container_of(to_delayed_work(work),
> @@ -913,12 +935,8 @@ static void amt_secret_work(struct work_struct *work)
>  			 msecs_to_jiffies(AMT_SECRET_TIMEOUT));
>  }
>  
> -static void amt_discovery_work(struct work_struct *work)
> +static void amt_event_send_discovery(struct amt_dev *amt)
>  {
> -	struct amt_dev *amt = container_of(to_delayed_work(work),
> -					   struct amt_dev,
> -					   discovery_wq);
> -
>  	spin_lock_bh(&amt->lock);
>  	if (amt->status > AMT_STATUS_SENT_DISCOVERY)
>  		goto out;
> @@ -933,11 +951,19 @@ static void amt_discovery_work(struct work_struct *work)
>  	spin_unlock_bh(&amt->lock);
>  }
>  
> -static void amt_req_work(struct work_struct *work)
> +static void amt_discovery_work(struct work_struct *work)
>  {
>  	struct amt_dev *amt = container_of(to_delayed_work(work),
>  					   struct amt_dev,
> -					   req_wq);
> +					   discovery_wq);
> +
> +	if (amt_queue_events(amt, AMT_EVENT_SEND_DISCOVERY, NULL))
> +		mod_delayed_work(amt_wq, &amt->discovery_wq,
> +				 msecs_to_jiffies(AMT_DISCOVERY_TIMEOUT));
> +}
> +
> +static void amt_event_send_request(struct amt_dev *amt)
> +{
>  	u32 exp;
>  
>  	spin_lock_bh(&amt->lock);
> @@ -967,6 +993,17 @@ static void amt_req_work(struct work_struct *work)
>  	spin_unlock_bh(&amt->lock);
>  }
>  
> +static void amt_req_work(struct work_struct *work)
> +{
> +	struct amt_dev *amt = container_of(to_delayed_work(work),
> +					   struct amt_dev,
> +					   req_wq);
> +
> +	if (amt_queue_events(amt, AMT_EVENT_SEND_REQUEST, NULL))
> +		mod_delayed_work(amt_wq, &amt->req_wq,
> +				 msecs_to_jiffies(100));
> +}
> +
>  static bool amt_send_membership_update(struct amt_dev *amt,
>  				       struct sk_buff *skb,
>  				       bool v6)
> @@ -2392,12 +2429,14 @@ static bool amt_membership_query_handler(struct amt_dev *amt,
>  	skb->pkt_type = PACKET_MULTICAST;
>  	skb->ip_summed = CHECKSUM_NONE;
>  	len = skb->len;
> +	rcu_read_lock_bh();

Here you only need local_bh_disable(), the RCU part is confusing as
Jakub noted, and not needed.

>  	if (__netif_rx(skb) == NET_RX_SUCCESS) {
>  		amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
>  		dev_sw_netstats_rx_add(amt->dev, len);
>  	} else {
>  		amt->dev->stats.rx_dropped++;
>  	}
> +	rcu_read_unlock_bh();
>  
>  	return false;
>  }
> @@ -2688,6 +2727,38 @@ static bool amt_request_handler(struct amt_dev *amt, struct sk_buff *skb)
>  	return false;
>  }
>  
> +static void amt_gw_rcv(struct amt_dev *amt, struct sk_buff *skb)
> +{
> +	int type = amt_parse_type(skb);
> +	int err = 1;
> +
> +	if (type == -1)
> +		goto drop;
> +
> +	if (amt->mode == AMT_MODE_GATEWAY) {
> +		switch (type) {
> +		case AMT_MSG_ADVERTISEMENT:
> +			err = amt_advertisement_handler(amt, skb);
> +			break;
> +		case AMT_MSG_MEMBERSHIP_QUERY:
> +			err = amt_membership_query_handler(amt, skb);
> +			if (!err)
> +				return;
> +			break;
> +		default:
> +			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
> +			break;
> +		}
> +	}
> +drop:
> +	if (err) {
> +		amt->dev->stats.rx_dropped++;
> +		kfree_skb(skb);
> +	} else {
> +		consume_skb(skb);
> +	}
> +}
> +
>  static int amt_rcv(struct sock *sk, struct sk_buff *skb)
>  {
>  	struct amt_dev *amt;
> @@ -2719,8 +2790,12 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
>  				err = true;
>  				goto drop;
>  			}
> -			err = amt_advertisement_handler(amt, skb);
> -			break;
> +			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
> +				netdev_dbg(amt->dev, "AMT Event queue full\n");
> +				err = true;
> +				goto drop;
> +			}
> +			goto out;
>  		case AMT_MSG_MULTICAST_DATA:
>  			if (iph->saddr != amt->remote_ip) {
>  				netdev_dbg(amt->dev, "Invalid Relay IP\n");
> @@ -2738,11 +2813,12 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
>  				err = true;
>  				goto drop;
>  			}
> -			err = amt_membership_query_handler(amt, skb);
> -			if (err)
> +			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
> +				netdev_dbg(amt->dev, "AMT Event queue full\n");
> +				err = true;
>  				goto drop;
> -			else
> -				goto out;
> +			}
> +			goto out;
>  		default:
>  			err = true;
>  			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
> @@ -2780,6 +2856,45 @@ static int amt_rcv(struct sock *sk, struct sk_buff *skb)
>  	return 0;
>  }
>  
> +static void amt_event_work(struct work_struct *work)
> +{
> +	struct amt_dev *amt = container_of(work, struct amt_dev, event_wq);
> +	struct sk_buff *skb;
> +	u8 event;
> +
> +	while (1) {
> +		spin_lock(&amt->lock);

This is called in process context, amd amt->lock can be acquired from
BH context, you need spin_lock_bh() here.

Lockdep should help finding this kind of issue.

> +		if (amt->nr_events == 0) {
> +			spin_unlock(&amt->lock);
> +			return;
> +		}
> +		event = amt->events[amt->event_idx].event;
> +		skb = amt->events[amt->event_idx].skb;
> +		amt->events[amt->event_idx].event = AMT_EVENT_NONE;
> +		amt->events[amt->event_idx].skb = NULL;
> +		amt->nr_events--;
> +		amt->event_idx++;
> +		amt->event_idx %= AMT_MAX_EVENTS;
> +		spin_unlock(&amt->lock);
> +
> +		switch (event) {
> +		case AMT_EVENT_RECEIVE:
> +			amt_gw_rcv(amt, skb);
> +			break;
> +		case AMT_EVENT_SEND_DISCOVERY:
> +			amt_event_send_discovery(amt);
> +			break;
> +		case AMT_EVENT_SEND_REQUEST:
> +			amt_event_send_request(amt);
> +			break;
> +		default:
> +			if (skb)
> +				kfree_skb(skb);
> +			break;
> +		}

This loops is unbound. If the socket keep adding events, it can keep
running forever. You need either to add cond_schedule() or even better
break it after a low max number of iterations - pending event will be
served when the work struct is dequeued next

> +	}
> +}
> +
>  static int amt_err_lookup(struct sock *sk, struct sk_buff *skb)
>  {
>  	struct amt_dev *amt;
> @@ -2892,10 +3007,21 @@ static int amt_dev_stop(struct net_device *dev)
>  	struct amt_dev *amt = netdev_priv(dev);
>  	struct amt_tunnel_list *tunnel, *tmp;
>  	struct socket *sock;
> +	struct sk_buff *skb;
> +	int i;
>  
>  	cancel_delayed_work_sync(&amt->req_wq);
>  	cancel_delayed_work_sync(&amt->discovery_wq);
>  	cancel_delayed_work_sync(&amt->secret_wq);
> +	cancel_work_sync(&amt->event_wq);
> +
> +	for (i = 0; i < AMT_MAX_EVENTS; i++) {
> +		skb = amt->events[i].skb;
> +		if (skb)
> +			kfree_skb(skb);
> +		amt->events[i].event = AMT_EVENT_NONE;
> +		amt->events[i].skb = NULL;
> +	}
>  
>  	/* shutdown */
>  	sock = rtnl_dereference(amt->sock);
> @@ -3051,6 +3177,8 @@ static int amt_newlink(struct net *net, struct net_device *dev,
>  		amt->max_tunnels = AMT_MAX_TUNNELS;
>  
>  	spin_lock_init(&amt->lock);
> +	amt->event_idx = 0;
> +	amt->nr_events = 0;
>  	amt->max_groups = AMT_MAX_GROUP;
>  	amt->max_sources = AMT_MAX_SOURCE;
>  	amt->hash_buckets = AMT_HSIZE;
> @@ -3146,8 +3274,8 @@ static int amt_newlink(struct net *net, struct net_device *dev,
>  	INIT_DELAYED_WORK(&amt->discovery_wq, amt_discovery_work);
>  	INIT_DELAYED_WORK(&amt->req_wq, amt_req_work);
>  	INIT_DELAYED_WORK(&amt->secret_wq, amt_secret_work);
> +	INIT_WORK(&amt->event_wq, amt_event_work);
>  	INIT_LIST_HEAD(&amt->tunnel_list);
> -
>  	return 0;
>  err:
>  	dev_put(amt->stream_dev);
> @@ -3280,7 +3408,7 @@ static int __init amt_init(void)
>  	if (err < 0)
>  		goto unregister_notifier;
>  
> -	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 1);
> +	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 0);
>  	if (!amt_wq) {
>  		err = -ENOMEM;
>  		goto rtnl_unregister;
> diff --git a/include/net/amt.h b/include/net/amt.h
> index 0e40c3d64fcf..08fc30cf2f34 100644
> --- a/include/net/amt.h
> +++ b/include/net/amt.h
> @@ -78,6 +78,15 @@ enum amt_status {
>  
>  #define AMT_STATUS_MAX (__AMT_STATUS_MAX - 1)
>  
> +/* Gateway events only */
> +enum amt_event {
> +	AMT_EVENT_NONE,
> +	AMT_EVENT_RECEIVE,
> +	AMT_EVENT_SEND_DISCOVERY,
> +	AMT_EVENT_SEND_REQUEST,
> +	__AMT_EVENT_MAX,
> +};
> +
>  struct amt_header {
>  #if defined(__LITTLE_ENDIAN_BITFIELD)
>  	u8 type:4,
> @@ -292,6 +301,12 @@ struct amt_group_node {
>  	struct hlist_head	sources[];
>  };
>  
> +#define AMT_MAX_EVENTS	16
> +struct amt_events {
> +	enum amt_event event;
> +	struct sk_buff *skb;
> +};
> +
>  struct amt_dev {
>  	struct net_device       *dev;
>  	struct net_device       *stream_dev;
> @@ -308,6 +323,7 @@ struct amt_dev {
>  	struct delayed_work     req_wq;
>  	/* Protected by RTNL */
>  	struct delayed_work     secret_wq;
> +	struct work_struct	event_wq;
>  	/* AMT status */
>  	enum amt_status		status;
>  	/* Generated key */
> @@ -345,6 +361,10 @@ struct amt_dev {
>  	/* Used only in gateway mode */
>  	u64			mac:48,
>  				reserved:16;
> +	/* AMT gateway side message handler queue */
> +	struct amt_events	events[AMT_MAX_EVENTS];
> +	u8			event_idx;
> +	u8			nr_events;
>  };
>  
>  #define AMT_TOS			0xc0
Taehee Yoo July 15, 2022, 12:34 p.m. UTC | #3
Hi Jakub,
Thank you so much for your review!

On 7/14/22 12:55, Jakub Kicinski wrote:
 > On Tue, 12 Jul 2022 10:57:07 +0000 Taehee Yoo wrote:
 >> @@ -2392,12 +2429,14 @@ static bool 
amt_membership_query_handler(struct amt_dev *amt,
 >>   	skb->pkt_type = PACKET_MULTICAST;
 >>   	skb->ip_summed = CHECKSUM_NONE;
 >>   	len = skb->len;
 >> +	rcu_read_lock_bh();
 >>   	if (__netif_rx(skb) == NET_RX_SUCCESS) {
 >>   		amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
 >>   		dev_sw_netstats_rx_add(amt->dev, len);
 >>   	} else {
 >>   		amt->dev->stats.rx_dropped++;
 >>   	}
 >> +	rcu_read_unlock_bh();
 >>
 >>   	return false;
 >>   }
 >
 > The RCU lock addition looks potentially unrelated?
 >

I checked that RCU is not necessary here.
I tested local_bh_disable(), and it works well. no splats appeared.
So, as Paolo suggested, I will use local_bh_disable() instead of 
rcu_read_lock_bh() in the v2.

 >> @@ -2892,10 +3007,21 @@ static int amt_dev_stop(struct net_device *dev)
 >>   	struct amt_dev *amt = netdev_priv(dev);
 >>   	struct amt_tunnel_list *tunnel, *tmp;
 >>   	struct socket *sock;
 >> +	struct sk_buff *skb;
 >> +	int i;
 >>
 >>   	cancel_delayed_work_sync(&amt->req_wq);
 >>   	cancel_delayed_work_sync(&amt->discovery_wq);
 >>   	cancel_delayed_work_sync(&amt->secret_wq);
 >> +	cancel_work_sync(&amt->event_wq);
 >
 > Are you sure the work will not get scheduled again?
 > What has stopped packet Rx at this point?

I expected that RX was already stopped then ->ndo_stop() can be called.
But as you pointed out, it isn't.
In order to ensure that no more RX concurrently, amt_rcv() should not be 
called.
So, I think cancel_delayed_work() should be called after setting null to 
the amt socket in amt_dev_stop().
code looks like:
    RCU_INIT_POINTER(amt->sock, NULL);
    synchronize_net();
    cancel_delayed_work(&amt->event_wq).

 >
 >> +	for (i = 0; i < AMT_MAX_EVENTS; i++) {
 >> +		skb = amt->events[i].skb;
 >> +		if (skb)
 >> +			kfree_skb(skb);
 >> +		amt->events[i].event = AMT_EVENT_NONE;
 >> +		amt->events[i].skb = NULL;
 >> +	}
 >>
 >>   	/* shutdown */
 >>   	sock = rtnl_dereference(amt->sock);
 >> @@ -3051,6 +3177,8 @@ static int amt_newlink(struct net *net, struct 
net_device *dev,
 >>   		amt->max_tunnels = AMT_MAX_TUNNELS;
 >>
 >>   	spin_lock_init(&amt->lock);
 >> +	amt->event_idx = 0;
 >> +	amt->nr_events = 0;
 >
 > no need to init member of netdev_priv() to 0, it's zalloc'ed
 >

Thanks a lot for pointing it out.
This is my mistake.
If an amt interface is down and then up again, it would have incorrect 
values of event_idx and nr_events.
Because there is no init for these values.
So, init for these variable in ->ndo_open() or ->ndo_stop() is needed.
Initilization in the ->ndo_newlink() can't fix anything.

As a test, there is a bug certainly.
So, I will move it to ->ndo_open().

 >>   	amt->max_groups = AMT_MAX_GROUP;
 >>   	amt->max_sources = AMT_MAX_SOURCE;
 >>   	amt->hash_buckets = AMT_HSIZE;

I will send the v2 patch after some tests!

Thanks a lot for your review!
Taehee Yoo
Taehee Yoo July 15, 2022, 12:51 p.m. UTC | #4
Hi Paolo,
Thank you so much for your review!

On 7/14/22 17:09, Paolo Abeni wrote:
 > On Tue, 2022-07-12 at 10:57 +0000, Taehee Yoo wrote:
 >> There are some synchronization issues(amt->status, amt->req_cnt, etc)
 >> if the interface is in gateway mode because gateway message handlers
 >> are processed concurrently.
 >> This applies a work queue for processing these messages instead of
 >> expanding the locking context.
 >>
 >> So, the purposes of this patch are to fix exist race conditions and 
to make
 >> gateway to be able to validate a gateway status more correctly.
 >>
 >> When the AMT gateway interface is created, it tries to establish to 
relay.
 >> The establishment step looks stateless, but it should be managed well.
 >> In order to handle messages in the gateway, it saves the current
 >> status(i.e. AMT_STATUS_XXX).
 >> This patch makes gateway code to be worked with a single thread.
 >>
 >> Now, all messages except the multicast are triggered(received or
 >> delay expired), and these messages will be stored in the event
 >> queue(amt->events).
 >> Then, the single worker processes stored messages asynchronously one
 >> by one.
 >> The multicast data message type will be still processed immediately.
 >>
 >> Now, amt->lock is only needed to access the event queue(amt->events)
 >> if an interface is the gateway mode.
 >>
 >> Fixes: cbc21dc1cfe9 ("amt: add data plane of amt interface")
 >> Signed-off-by: Taehee Yoo <ap420073@gmail.com>
 >> ---
 >>   drivers/net/amt.c | 158 +++++++++++++++++++++++++++++++++++++++++-----
 >>   include/net/amt.h |  20 ++++++
 >>   2 files changed, 163 insertions(+), 15 deletions(-)
 >>
 >> diff --git a/drivers/net/amt.c b/drivers/net/amt.c
 >> index be2719a3ba70..032c2934e466 100644
 >> --- a/drivers/net/amt.c
 >> +++ b/drivers/net/amt.c
 >> @@ -900,6 +900,28 @@ static void amt_send_mld_gq(struct amt_dev 
*amt, struct amt_tunnel_list *tunnel)
 >>   }
 >>   #endif
 >>
 >> +static bool amt_queue_events(struct amt_dev *amt, enum amt_event event,
 >> +			     struct sk_buff *skb)
 >> +{
 >> +	int index;
 >> +
 >> +	spin_lock_bh(&amt->lock);
 >> +	if (amt->nr_events >= AMT_MAX_EVENTS) {
 >> +		spin_unlock_bh(&amt->lock);
 >> +		return 1;
 >> +	}
 >> +
 >> +	index = (amt->event_idx + amt->nr_events) % AMT_MAX_EVENTS;
 >> +	amt->events[index].event = event;
 >> +	amt->events[index].skb = skb;
 >> +	amt->nr_events++;
 >> +	amt->event_idx %= AMT_MAX_EVENTS;
 >> +	queue_work(amt_wq, &amt->event_wq);
 >> +	spin_unlock_bh(&amt->lock);
 >> +
 >> +	return 0;
 >> +}
 >> +
 >>   static void amt_secret_work(struct work_struct *work)
 >>   {
 >>   	struct amt_dev *amt = container_of(to_delayed_work(work),
 >> @@ -913,12 +935,8 @@ static void amt_secret_work(struct work_struct 
*work)
 >>   			 msecs_to_jiffies(AMT_SECRET_TIMEOUT));
 >>   }
 >>
 >> -static void amt_discovery_work(struct work_struct *work)
 >> +static void amt_event_send_discovery(struct amt_dev *amt)
 >>   {
 >> -	struct amt_dev *amt = container_of(to_delayed_work(work),
 >> -					   struct amt_dev,
 >> -					   discovery_wq);
 >> -
 >>   	spin_lock_bh(&amt->lock);
 >>   	if (amt->status > AMT_STATUS_SENT_DISCOVERY)
 >>   		goto out;
 >> @@ -933,11 +951,19 @@ static void amt_discovery_work(struct 
work_struct *work)
 >>   	spin_unlock_bh(&amt->lock);
 >>   }
 >>
 >> -static void amt_req_work(struct work_struct *work)
 >> +static void amt_discovery_work(struct work_struct *work)
 >>   {
 >>   	struct amt_dev *amt = container_of(to_delayed_work(work),
 >>   					   struct amt_dev,
 >> -					   req_wq);
 >> +					   discovery_wq);
 >> +
 >> +	if (amt_queue_events(amt, AMT_EVENT_SEND_DISCOVERY, NULL))
 >> +		mod_delayed_work(amt_wq, &amt->discovery_wq,
 >> +				 msecs_to_jiffies(AMT_DISCOVERY_TIMEOUT));
 >> +}
 >> +
 >> +static void amt_event_send_request(struct amt_dev *amt)
 >> +{
 >>   	u32 exp;
 >>
 >>   	spin_lock_bh(&amt->lock);
 >> @@ -967,6 +993,17 @@ static void amt_req_work(struct work_struct *work)
 >>   	spin_unlock_bh(&amt->lock);
 >>   }
 >>
 >> +static void amt_req_work(struct work_struct *work)
 >> +{
 >> +	struct amt_dev *amt = container_of(to_delayed_work(work),
 >> +					   struct amt_dev,
 >> +					   req_wq);
 >> +
 >> +	if (amt_queue_events(amt, AMT_EVENT_SEND_REQUEST, NULL))
 >> +		mod_delayed_work(amt_wq, &amt->req_wq,
 >> +				 msecs_to_jiffies(100));
 >> +}
 >> +
 >>   static bool amt_send_membership_update(struct amt_dev *amt,
 >>   				       struct sk_buff *skb,
 >>   				       bool v6)
 >> @@ -2392,12 +2429,14 @@ static bool 
amt_membership_query_handler(struct amt_dev *amt,
 >>   	skb->pkt_type = PACKET_MULTICAST;
 >>   	skb->ip_summed = CHECKSUM_NONE;
 >>   	len = skb->len;
 >> +	rcu_read_lock_bh();
 >
 > Here you only need local_bh_disable(), the RCU part is confusing as
 > Jakub noted, and not needed.

Thanks for suggesting, I will use local_bh_disabled() instead of 
rcu_read_lock_bh().

 >
 >>   	if (__netif_rx(skb) == NET_RX_SUCCESS) {
 >>   		amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
 >>   		dev_sw_netstats_rx_add(amt->dev, len);
 >>   	} else {
 >>   		amt->dev->stats.rx_dropped++;
 >>   	}
 >> +	rcu_read_unlock_bh();
 >>
 >>   	return false;
 >>   }
 >> @@ -2688,6 +2727,38 @@ static bool amt_request_handler(struct 
amt_dev *amt, struct sk_buff *skb)
 >>   	return false;
 >>   }
 >>
 >> +static void amt_gw_rcv(struct amt_dev *amt, struct sk_buff *skb)
 >> +{
 >> +	int type = amt_parse_type(skb);
 >> +	int err = 1;
 >> +
 >> +	if (type == -1)
 >> +		goto drop;
 >> +
 >> +	if (amt->mode == AMT_MODE_GATEWAY) {
 >> +		switch (type) {
 >> +		case AMT_MSG_ADVERTISEMENT:
 >> +			err = amt_advertisement_handler(amt, skb);
 >> +			break;
 >> +		case AMT_MSG_MEMBERSHIP_QUERY:
 >> +			err = amt_membership_query_handler(amt, skb);
 >> +			if (!err)
 >> +				return;
 >> +			break;
 >> +		default:
 >> +			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
 >> +			break;
 >> +		}
 >> +	}
 >> +drop:
 >> +	if (err) {
 >> +		amt->dev->stats.rx_dropped++;
 >> +		kfree_skb(skb);
 >> +	} else {
 >> +		consume_skb(skb);
 >> +	}
 >> +}
 >> +
 >>   static int amt_rcv(struct sock *sk, struct sk_buff *skb)
 >>   {
 >>   	struct amt_dev *amt;
 >> @@ -2719,8 +2790,12 @@ static int amt_rcv(struct sock *sk, struct 
sk_buff *skb)
 >>   				err = true;
 >>   				goto drop;
 >>   			}
 >> -			err = amt_advertisement_handler(amt, skb);
 >> -			break;
 >> +			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
 >> +				netdev_dbg(amt->dev, "AMT Event queue full\n");
 >> +				err = true;
 >> +				goto drop;
 >> +			}
 >> +			goto out;
 >>   		case AMT_MSG_MULTICAST_DATA:
 >>   			if (iph->saddr != amt->remote_ip) {
 >>   				netdev_dbg(amt->dev, "Invalid Relay IP\n");
 >> @@ -2738,11 +2813,12 @@ static int amt_rcv(struct sock *sk, struct 
sk_buff *skb)
 >>   				err = true;
 >>   				goto drop;
 >>   			}
 >> -			err = amt_membership_query_handler(amt, skb);
 >> -			if (err)
 >> +			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
 >> +				netdev_dbg(amt->dev, "AMT Event queue full\n");
 >> +				err = true;
 >>   				goto drop;
 >> -			else
 >> -				goto out;
 >> +			}
 >> +			goto out;
 >>   		default:
 >>   			err = true;
 >>   			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
 >> @@ -2780,6 +2856,45 @@ static int amt_rcv(struct sock *sk, struct 
sk_buff *skb)
 >>   	return 0;
 >>   }
 >>
 >> +static void amt_event_work(struct work_struct *work)
 >> +{
 >> +	struct amt_dev *amt = container_of(work, struct amt_dev, event_wq);
 >> +	struct sk_buff *skb;
 >> +	u8 event;
 >> +
 >> +	while (1) {
 >> +		spin_lock(&amt->lock);
 >
 > This is called in process context, amd amt->lock can be acquired from
 > BH context, you need spin_lock_bh() here.
 >
 > Lockdep should help finding this kind of issue.
 >

Ah, I found that lockdep is disabled in boot time due to other bugs.
I will use spin_lock_bh() and test with the lockdep!

 >> +		if (amt->nr_events == 0) {
 >> +			spin_unlock(&amt->lock);
 >> +			return;
 >> +		}
 >> +		event = amt->events[amt->event_idx].event;
 >> +		skb = amt->events[amt->event_idx].skb;
 >> +		amt->events[amt->event_idx].event = AMT_EVENT_NONE;
 >> +		amt->events[amt->event_idx].skb = NULL;
 >> +		amt->nr_events--;
 >> +		amt->event_idx++;
 >> +		amt->event_idx %= AMT_MAX_EVENTS;
 >> +		spin_unlock(&amt->lock);
 >> +
 >> +		switch (event) {
 >> +		case AMT_EVENT_RECEIVE:
 >> +			amt_gw_rcv(amt, skb);
 >> +			break;
 >> +		case AMT_EVENT_SEND_DISCOVERY:
 >> +			amt_event_send_discovery(amt);
 >> +			break;
 >> +		case AMT_EVENT_SEND_REQUEST:
 >> +			amt_event_send_request(amt);
 >> +			break;
 >> +		default:
 >> +			if (skb)
 >> +				kfree_skb(skb);
 >> +			break;
 >> +		}
 >
 > This loops is unbound. If the socket keep adding events, it can keep
 > running forever. You need either to add cond_schedule() or even better
 > break it after a low max number of iterations - pending event will be
 > served when the work struct is dequeued next
 >

Thanks, I will use a limit variable.


 >> +	}
 >> +}
 >> +
 >>   static int amt_err_lookup(struct sock *sk, struct sk_buff *skb)
 >>   {
 >>   	struct amt_dev *amt;
 >> @@ -2892,10 +3007,21 @@ static int amt_dev_stop(struct net_device *dev)
 >>   	struct amt_dev *amt = netdev_priv(dev);
 >>   	struct amt_tunnel_list *tunnel, *tmp;
 >>   	struct socket *sock;
 >> +	struct sk_buff *skb;
 >> +	int i;
 >>
 >>   	cancel_delayed_work_sync(&amt->req_wq);
 >>   	cancel_delayed_work_sync(&amt->discovery_wq);
 >>   	cancel_delayed_work_sync(&amt->secret_wq);
 >> +	cancel_work_sync(&amt->event_wq);
 >> +
 >> +	for (i = 0; i < AMT_MAX_EVENTS; i++) {
 >> +		skb = amt->events[i].skb;
 >> +		if (skb)
 >> +			kfree_skb(skb);
 >> +		amt->events[i].event = AMT_EVENT_NONE;
 >> +		amt->events[i].skb = NULL;
 >> +	}
 >>
 >>   	/* shutdown */
 >>   	sock = rtnl_dereference(amt->sock);
 >> @@ -3051,6 +3177,8 @@ static int amt_newlink(struct net *net, struct 
net_device *dev,
 >>   		amt->max_tunnels = AMT_MAX_TUNNELS;
 >>
 >>   	spin_lock_init(&amt->lock);
 >> +	amt->event_idx = 0;
 >> +	amt->nr_events = 0;
 >>   	amt->max_groups = AMT_MAX_GROUP;
 >>   	amt->max_sources = AMT_MAX_SOURCE;
 >>   	amt->hash_buckets = AMT_HSIZE;
 >> @@ -3146,8 +3274,8 @@ static int amt_newlink(struct net *net, struct 
net_device *dev,
 >>   	INIT_DELAYED_WORK(&amt->discovery_wq, amt_discovery_work);
 >>   	INIT_DELAYED_WORK(&amt->req_wq, amt_req_work);
 >>   	INIT_DELAYED_WORK(&amt->secret_wq, amt_secret_work);
 >> +	INIT_WORK(&amt->event_wq, amt_event_work);
 >>   	INIT_LIST_HEAD(&amt->tunnel_list);
 >> -
 >>   	return 0;
 >>   err:
 >>   	dev_put(amt->stream_dev);
 >> @@ -3280,7 +3408,7 @@ static int __init amt_init(void)
 >>   	if (err < 0)
 >>   		goto unregister_notifier;
 >>
 >> -	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 1);
 >> +	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 0);
 >>   	if (!amt_wq) {
 >>   		err = -ENOMEM;
 >>   		goto rtnl_unregister;
 >> diff --git a/include/net/amt.h b/include/net/amt.h
 >> index 0e40c3d64fcf..08fc30cf2f34 100644
 >> --- a/include/net/amt.h
 >> +++ b/include/net/amt.h
 >> @@ -78,6 +78,15 @@ enum amt_status {
 >>
 >>   #define AMT_STATUS_MAX (__AMT_STATUS_MAX - 1)
 >>
 >> +/* Gateway events only */
 >> +enum amt_event {
 >> +	AMT_EVENT_NONE,
 >> +	AMT_EVENT_RECEIVE,
 >> +	AMT_EVENT_SEND_DISCOVERY,
 >> +	AMT_EVENT_SEND_REQUEST,
 >> +	__AMT_EVENT_MAX,
 >> +};
 >> +
 >>   struct amt_header {
 >>   #if defined(__LITTLE_ENDIAN_BITFIELD)
 >>   	u8 type:4,
 >> @@ -292,6 +301,12 @@ struct amt_group_node {
 >>   	struct hlist_head	sources[];
 >>   };
 >>
 >> +#define AMT_MAX_EVENTS	16
 >> +struct amt_events {
 >> +	enum amt_event event;
 >> +	struct sk_buff *skb;
 >> +};
 >> +
 >>   struct amt_dev {
 >>   	struct net_device       *dev;
 >>   	struct net_device       *stream_dev;
 >> @@ -308,6 +323,7 @@ struct amt_dev {
 >>   	struct delayed_work     req_wq;
 >>   	/* Protected by RTNL */
 >>   	struct delayed_work     secret_wq;
 >> +	struct work_struct	event_wq;
 >>   	/* AMT status */
 >>   	enum amt_status		status;
 >>   	/* Generated key */
 >> @@ -345,6 +361,10 @@ struct amt_dev {
 >>   	/* Used only in gateway mode */
 >>   	u64			mac:48,
 >>   				reserved:16;
 >> +	/* AMT gateway side message handler queue */
 >> +	struct amt_events	events[AMT_MAX_EVENTS];
 >> +	u8			event_idx;
 >> +	u8			nr_events;
 >>   };
 >>
 >>   #define AMT_TOS			0xc0
 >

I will send the v2 patch after some tests with lockdep.

Thanks a lot!
Taehee Yoo
diff mbox series

Patch

diff --git a/drivers/net/amt.c b/drivers/net/amt.c
index be2719a3ba70..032c2934e466 100644
--- a/drivers/net/amt.c
+++ b/drivers/net/amt.c
@@ -900,6 +900,28 @@  static void amt_send_mld_gq(struct amt_dev *amt, struct amt_tunnel_list *tunnel)
 }
 #endif
 
+static bool amt_queue_events(struct amt_dev *amt, enum amt_event event,
+			     struct sk_buff *skb)
+{
+	int index;
+
+	spin_lock_bh(&amt->lock);
+	if (amt->nr_events >= AMT_MAX_EVENTS) {
+		spin_unlock_bh(&amt->lock);
+		return 1;
+	}
+
+	index = (amt->event_idx + amt->nr_events) % AMT_MAX_EVENTS;
+	amt->events[index].event = event;
+	amt->events[index].skb = skb;
+	amt->nr_events++;
+	amt->event_idx %= AMT_MAX_EVENTS;
+	queue_work(amt_wq, &amt->event_wq);
+	spin_unlock_bh(&amt->lock);
+
+	return 0;
+}
+
 static void amt_secret_work(struct work_struct *work)
 {
 	struct amt_dev *amt = container_of(to_delayed_work(work),
@@ -913,12 +935,8 @@  static void amt_secret_work(struct work_struct *work)
 			 msecs_to_jiffies(AMT_SECRET_TIMEOUT));
 }
 
-static void amt_discovery_work(struct work_struct *work)
+static void amt_event_send_discovery(struct amt_dev *amt)
 {
-	struct amt_dev *amt = container_of(to_delayed_work(work),
-					   struct amt_dev,
-					   discovery_wq);
-
 	spin_lock_bh(&amt->lock);
 	if (amt->status > AMT_STATUS_SENT_DISCOVERY)
 		goto out;
@@ -933,11 +951,19 @@  static void amt_discovery_work(struct work_struct *work)
 	spin_unlock_bh(&amt->lock);
 }
 
-static void amt_req_work(struct work_struct *work)
+static void amt_discovery_work(struct work_struct *work)
 {
 	struct amt_dev *amt = container_of(to_delayed_work(work),
 					   struct amt_dev,
-					   req_wq);
+					   discovery_wq);
+
+	if (amt_queue_events(amt, AMT_EVENT_SEND_DISCOVERY, NULL))
+		mod_delayed_work(amt_wq, &amt->discovery_wq,
+				 msecs_to_jiffies(AMT_DISCOVERY_TIMEOUT));
+}
+
+static void amt_event_send_request(struct amt_dev *amt)
+{
 	u32 exp;
 
 	spin_lock_bh(&amt->lock);
@@ -967,6 +993,17 @@  static void amt_req_work(struct work_struct *work)
 	spin_unlock_bh(&amt->lock);
 }
 
+static void amt_req_work(struct work_struct *work)
+{
+	struct amt_dev *amt = container_of(to_delayed_work(work),
+					   struct amt_dev,
+					   req_wq);
+
+	if (amt_queue_events(amt, AMT_EVENT_SEND_REQUEST, NULL))
+		mod_delayed_work(amt_wq, &amt->req_wq,
+				 msecs_to_jiffies(100));
+}
+
 static bool amt_send_membership_update(struct amt_dev *amt,
 				       struct sk_buff *skb,
 				       bool v6)
@@ -2392,12 +2429,14 @@  static bool amt_membership_query_handler(struct amt_dev *amt,
 	skb->pkt_type = PACKET_MULTICAST;
 	skb->ip_summed = CHECKSUM_NONE;
 	len = skb->len;
+	rcu_read_lock_bh();
 	if (__netif_rx(skb) == NET_RX_SUCCESS) {
 		amt_update_gw_status(amt, AMT_STATUS_RECEIVED_QUERY, true);
 		dev_sw_netstats_rx_add(amt->dev, len);
 	} else {
 		amt->dev->stats.rx_dropped++;
 	}
+	rcu_read_unlock_bh();
 
 	return false;
 }
@@ -2688,6 +2727,38 @@  static bool amt_request_handler(struct amt_dev *amt, struct sk_buff *skb)
 	return false;
 }
 
+static void amt_gw_rcv(struct amt_dev *amt, struct sk_buff *skb)
+{
+	int type = amt_parse_type(skb);
+	int err = 1;
+
+	if (type == -1)
+		goto drop;
+
+	if (amt->mode == AMT_MODE_GATEWAY) {
+		switch (type) {
+		case AMT_MSG_ADVERTISEMENT:
+			err = amt_advertisement_handler(amt, skb);
+			break;
+		case AMT_MSG_MEMBERSHIP_QUERY:
+			err = amt_membership_query_handler(amt, skb);
+			if (!err)
+				return;
+			break;
+		default:
+			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
+			break;
+		}
+	}
+drop:
+	if (err) {
+		amt->dev->stats.rx_dropped++;
+		kfree_skb(skb);
+	} else {
+		consume_skb(skb);
+	}
+}
+
 static int amt_rcv(struct sock *sk, struct sk_buff *skb)
 {
 	struct amt_dev *amt;
@@ -2719,8 +2790,12 @@  static int amt_rcv(struct sock *sk, struct sk_buff *skb)
 				err = true;
 				goto drop;
 			}
-			err = amt_advertisement_handler(amt, skb);
-			break;
+			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
+				netdev_dbg(amt->dev, "AMT Event queue full\n");
+				err = true;
+				goto drop;
+			}
+			goto out;
 		case AMT_MSG_MULTICAST_DATA:
 			if (iph->saddr != amt->remote_ip) {
 				netdev_dbg(amt->dev, "Invalid Relay IP\n");
@@ -2738,11 +2813,12 @@  static int amt_rcv(struct sock *sk, struct sk_buff *skb)
 				err = true;
 				goto drop;
 			}
-			err = amt_membership_query_handler(amt, skb);
-			if (err)
+			if (amt_queue_events(amt, AMT_EVENT_RECEIVE, skb)) {
+				netdev_dbg(amt->dev, "AMT Event queue full\n");
+				err = true;
 				goto drop;
-			else
-				goto out;
+			}
+			goto out;
 		default:
 			err = true;
 			netdev_dbg(amt->dev, "Invalid type of Gateway\n");
@@ -2780,6 +2856,45 @@  static int amt_rcv(struct sock *sk, struct sk_buff *skb)
 	return 0;
 }
 
+static void amt_event_work(struct work_struct *work)
+{
+	struct amt_dev *amt = container_of(work, struct amt_dev, event_wq);
+	struct sk_buff *skb;
+	u8 event;
+
+	while (1) {
+		spin_lock(&amt->lock);
+		if (amt->nr_events == 0) {
+			spin_unlock(&amt->lock);
+			return;
+		}
+		event = amt->events[amt->event_idx].event;
+		skb = amt->events[amt->event_idx].skb;
+		amt->events[amt->event_idx].event = AMT_EVENT_NONE;
+		amt->events[amt->event_idx].skb = NULL;
+		amt->nr_events--;
+		amt->event_idx++;
+		amt->event_idx %= AMT_MAX_EVENTS;
+		spin_unlock(&amt->lock);
+
+		switch (event) {
+		case AMT_EVENT_RECEIVE:
+			amt_gw_rcv(amt, skb);
+			break;
+		case AMT_EVENT_SEND_DISCOVERY:
+			amt_event_send_discovery(amt);
+			break;
+		case AMT_EVENT_SEND_REQUEST:
+			amt_event_send_request(amt);
+			break;
+		default:
+			if (skb)
+				kfree_skb(skb);
+			break;
+		}
+	}
+}
+
 static int amt_err_lookup(struct sock *sk, struct sk_buff *skb)
 {
 	struct amt_dev *amt;
@@ -2892,10 +3007,21 @@  static int amt_dev_stop(struct net_device *dev)
 	struct amt_dev *amt = netdev_priv(dev);
 	struct amt_tunnel_list *tunnel, *tmp;
 	struct socket *sock;
+	struct sk_buff *skb;
+	int i;
 
 	cancel_delayed_work_sync(&amt->req_wq);
 	cancel_delayed_work_sync(&amt->discovery_wq);
 	cancel_delayed_work_sync(&amt->secret_wq);
+	cancel_work_sync(&amt->event_wq);
+
+	for (i = 0; i < AMT_MAX_EVENTS; i++) {
+		skb = amt->events[i].skb;
+		if (skb)
+			kfree_skb(skb);
+		amt->events[i].event = AMT_EVENT_NONE;
+		amt->events[i].skb = NULL;
+	}
 
 	/* shutdown */
 	sock = rtnl_dereference(amt->sock);
@@ -3051,6 +3177,8 @@  static int amt_newlink(struct net *net, struct net_device *dev,
 		amt->max_tunnels = AMT_MAX_TUNNELS;
 
 	spin_lock_init(&amt->lock);
+	amt->event_idx = 0;
+	amt->nr_events = 0;
 	amt->max_groups = AMT_MAX_GROUP;
 	amt->max_sources = AMT_MAX_SOURCE;
 	amt->hash_buckets = AMT_HSIZE;
@@ -3146,8 +3274,8 @@  static int amt_newlink(struct net *net, struct net_device *dev,
 	INIT_DELAYED_WORK(&amt->discovery_wq, amt_discovery_work);
 	INIT_DELAYED_WORK(&amt->req_wq, amt_req_work);
 	INIT_DELAYED_WORK(&amt->secret_wq, amt_secret_work);
+	INIT_WORK(&amt->event_wq, amt_event_work);
 	INIT_LIST_HEAD(&amt->tunnel_list);
-
 	return 0;
 err:
 	dev_put(amt->stream_dev);
@@ -3280,7 +3408,7 @@  static int __init amt_init(void)
 	if (err < 0)
 		goto unregister_notifier;
 
-	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 1);
+	amt_wq = alloc_workqueue("amt", WQ_UNBOUND, 0);
 	if (!amt_wq) {
 		err = -ENOMEM;
 		goto rtnl_unregister;
diff --git a/include/net/amt.h b/include/net/amt.h
index 0e40c3d64fcf..08fc30cf2f34 100644
--- a/include/net/amt.h
+++ b/include/net/amt.h
@@ -78,6 +78,15 @@  enum amt_status {
 
 #define AMT_STATUS_MAX (__AMT_STATUS_MAX - 1)
 
+/* Gateway events only */
+enum amt_event {
+	AMT_EVENT_NONE,
+	AMT_EVENT_RECEIVE,
+	AMT_EVENT_SEND_DISCOVERY,
+	AMT_EVENT_SEND_REQUEST,
+	__AMT_EVENT_MAX,
+};
+
 struct amt_header {
 #if defined(__LITTLE_ENDIAN_BITFIELD)
 	u8 type:4,
@@ -292,6 +301,12 @@  struct amt_group_node {
 	struct hlist_head	sources[];
 };
 
+#define AMT_MAX_EVENTS	16
+struct amt_events {
+	enum amt_event event;
+	struct sk_buff *skb;
+};
+
 struct amt_dev {
 	struct net_device       *dev;
 	struct net_device       *stream_dev;
@@ -308,6 +323,7 @@  struct amt_dev {
 	struct delayed_work     req_wq;
 	/* Protected by RTNL */
 	struct delayed_work     secret_wq;
+	struct work_struct	event_wq;
 	/* AMT status */
 	enum amt_status		status;
 	/* Generated key */
@@ -345,6 +361,10 @@  struct amt_dev {
 	/* Used only in gateway mode */
 	u64			mac:48,
 				reserved:16;
+	/* AMT gateway side message handler queue */
+	struct amt_events	events[AMT_MAX_EVENTS];
+	u8			event_idx;
+	u8			nr_events;
 };
 
 #define AMT_TOS			0xc0