diff mbox series

[wpan-next,v2,13/14] net: mac802154: Introduce a tx queue flushing mechanism

Message ID 20220207144804.708118-14-miquel.raynal@bootlin.com (mailing list archive)
State Superseded
Headers show
Series ieee802154: Synchronous Tx API | expand

Commit Message

Miquel Raynal Feb. 7, 2022, 2:48 p.m. UTC
Right now we are able to stop a queue but we have no indication if a
transmission is ongoing or not.

Thanks to recent additions, we can track the number of ongoing
transmissions so we know if the last transmission is over. Adding on top
of it an internal wait queue also allows to be woken up asynchronously
when this happens. If, beforehands, we marked the queue to be held and
stopped it, we end up flushing and stopping the tx queue.

Thanks to this feature, we will soon be able to introduce a synchronous
transmit API.

Signed-off-by: Miquel Raynal <miquel.raynal@bootlin.com>
---
 include/net/cfg802154.h      |  1 +
 net/ieee802154/core.c        |  1 +
 net/mac802154/cfg.c          |  5 ++---
 net/mac802154/ieee802154_i.h |  1 +
 net/mac802154/tx.c           | 11 ++++++++++-
 net/mac802154/util.c         |  3 ++-
 6 files changed, 17 insertions(+), 5 deletions(-)

Comments

Alexander Aring Feb. 20, 2022, 11:49 p.m. UTC | #1
Hi,

On Mon, Feb 7, 2022 at 9:48 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
>
> Right now we are able to stop a queue but we have no indication if a
> transmission is ongoing or not.
>
> Thanks to recent additions, we can track the number of ongoing
> transmissions so we know if the last transmission is over. Adding on top
> of it an internal wait queue also allows to be woken up asynchronously
> when this happens. If, beforehands, we marked the queue to be held and
> stopped it, we end up flushing and stopping the tx queue.
>
> Thanks to this feature, we will soon be able to introduce a synchronous
> transmit API.
>
> Signed-off-by: Miquel Raynal <miquel.raynal@bootlin.com>
> ---
>  include/net/cfg802154.h      |  1 +
>  net/ieee802154/core.c        |  1 +
>  net/mac802154/cfg.c          |  5 ++---
>  net/mac802154/ieee802154_i.h |  1 +
>  net/mac802154/tx.c           | 11 ++++++++++-
>  net/mac802154/util.c         |  3 ++-
>  6 files changed, 17 insertions(+), 5 deletions(-)
>
> diff --git a/include/net/cfg802154.h b/include/net/cfg802154.h
> index 043d8e4359e7..0d385a214da3 100644
> --- a/include/net/cfg802154.h
> +++ b/include/net/cfg802154.h
> @@ -217,6 +217,7 @@ struct wpan_phy {
>         /* Transmission monitoring and control */
>         atomic_t ongoing_txs;
>         atomic_t hold_txs;
> +       wait_queue_head_t sync_txq;
>
>         char priv[] __aligned(NETDEV_ALIGN);
>  };
> diff --git a/net/ieee802154/core.c b/net/ieee802154/core.c
> index de259b5170ab..0953cacafbff 100644
> --- a/net/ieee802154/core.c
> +++ b/net/ieee802154/core.c
> @@ -129,6 +129,7 @@ wpan_phy_new(const struct cfg802154_ops *ops, size_t priv_size)
>         wpan_phy_net_set(&rdev->wpan_phy, &init_net);
>
>         init_waitqueue_head(&rdev->dev_wait);
> +       init_waitqueue_head(&rdev->wpan_phy.sync_txq);
>
>         return &rdev->wpan_phy;
>  }
> diff --git a/net/mac802154/cfg.c b/net/mac802154/cfg.c
> index e8aabf215286..da94aaa32fcb 100644
> --- a/net/mac802154/cfg.c
> +++ b/net/mac802154/cfg.c
> @@ -46,8 +46,7 @@ static int ieee802154_suspend(struct wpan_phy *wpan_phy)
>         if (!local->open_count)
>                 goto suspend;
>
> -       atomic_inc(&wpan_phy->hold_txs);
> -       ieee802154_stop_queue(&local->hw);
> +       ieee802154_sync_and_stop_tx(local);
>         synchronize_net();
>
>         /* stop hardware - this must stop RX */
> @@ -73,7 +72,7 @@ static int ieee802154_resume(struct wpan_phy *wpan_phy)
>                 return ret;
>
>  wake_up:
> -       if (!atomic_dec_and_test(&wpan_phy->hold_txs))
> +       if (!atomic_read(&wpan_phy->hold_txs))
>                 ieee802154_wake_queue(&local->hw);
>         local->suspended = false;
>         return 0;
> diff --git a/net/mac802154/ieee802154_i.h b/net/mac802154/ieee802154_i.h
> index 56fcd7ef5b6f..295c9ce091e1 100644
> --- a/net/mac802154/ieee802154_i.h
> +++ b/net/mac802154/ieee802154_i.h
> @@ -122,6 +122,7 @@ extern struct ieee802154_mlme_ops mac802154_mlme_wpan;
>
>  void ieee802154_rx(struct ieee802154_local *local, struct sk_buff *skb);
>  void ieee802154_xmit_sync_worker(struct work_struct *work);
> +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local);
>  netdev_tx_t
>  ieee802154_monitor_start_xmit(struct sk_buff *skb, struct net_device *dev);
>  netdev_tx_t
> diff --git a/net/mac802154/tx.c b/net/mac802154/tx.c
> index abd9a057521e..06ae2e6cea43 100644
> --- a/net/mac802154/tx.c
> +++ b/net/mac802154/tx.c
> @@ -47,7 +47,8 @@ void ieee802154_xmit_sync_worker(struct work_struct *work)
>                 ieee802154_wake_queue(&local->hw);
>
>         kfree_skb(skb);
> -       atomic_dec(&local->phy->ongoing_txs);
> +       if (!atomic_dec_and_test(&local->phy->ongoing_txs))
> +               wake_up(&local->phy->sync_txq);
>         netdev_dbg(dev, "transmission failed\n");
>  }
>
> @@ -117,6 +118,14 @@ ieee802154_hot_tx(struct ieee802154_local *local, struct sk_buff *skb)
>         return ieee802154_tx(local, skb);
>  }
>
> +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local)
> +{
> +       atomic_inc(&local->phy->hold_txs);
> +       ieee802154_stop_queue(&local->hw);
> +       wait_event(local->phy->sync_txq, !atomic_read(&local->phy->ongoing_txs));
> +       atomic_dec(&local->phy->hold_txs);

In my opinion this _still_ races as I mentioned earlier. You need to
be sure that if you do ieee802154_stop_queue() that no ieee802154_tx()
or hot_tx() is running at this time. Look into the function I
mentioned earlier "?netif_tx_disable()?".

- Alex
Miquel Raynal March 3, 2022, 6:17 p.m. UTC | #2
Hi Alexander,

alex.aring@gmail.com wrote on Sun, 20 Feb 2022 18:49:06 -0500:

> Hi,
> 
> On Mon, Feb 7, 2022 at 9:48 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
> >
> > Right now we are able to stop a queue but we have no indication if a
> > transmission is ongoing or not.
> >
> > Thanks to recent additions, we can track the number of ongoing
> > transmissions so we know if the last transmission is over. Adding on top
> > of it an internal wait queue also allows to be woken up asynchronously
> > when this happens. If, beforehands, we marked the queue to be held and
> > stopped it, we end up flushing and stopping the tx queue.
> >
> > Thanks to this feature, we will soon be able to introduce a synchronous
> > transmit API.
> >
> > Signed-off-by: Miquel Raynal <miquel.raynal@bootlin.com>
> > ---
> >  include/net/cfg802154.h      |  1 +
> >  net/ieee802154/core.c        |  1 +
> >  net/mac802154/cfg.c          |  5 ++---
> >  net/mac802154/ieee802154_i.h |  1 +
> >  net/mac802154/tx.c           | 11 ++++++++++-
> >  net/mac802154/util.c         |  3 ++-
> >  6 files changed, 17 insertions(+), 5 deletions(-)
> >
> > diff --git a/include/net/cfg802154.h b/include/net/cfg802154.h
> > index 043d8e4359e7..0d385a214da3 100644
> > --- a/include/net/cfg802154.h
> > +++ b/include/net/cfg802154.h
> > @@ -217,6 +217,7 @@ struct wpan_phy {
> >         /* Transmission monitoring and control */
> >         atomic_t ongoing_txs;
> >         atomic_t hold_txs;
> > +       wait_queue_head_t sync_txq;
> >
> >         char priv[] __aligned(NETDEV_ALIGN);
> >  };
> > diff --git a/net/ieee802154/core.c b/net/ieee802154/core.c
> > index de259b5170ab..0953cacafbff 100644
> > --- a/net/ieee802154/core.c
> > +++ b/net/ieee802154/core.c
> > @@ -129,6 +129,7 @@ wpan_phy_new(const struct cfg802154_ops *ops, size_t priv_size)
> >         wpan_phy_net_set(&rdev->wpan_phy, &init_net);
> >
> >         init_waitqueue_head(&rdev->dev_wait);
> > +       init_waitqueue_head(&rdev->wpan_phy.sync_txq);
> >
> >         return &rdev->wpan_phy;
> >  }
> > diff --git a/net/mac802154/cfg.c b/net/mac802154/cfg.c
> > index e8aabf215286..da94aaa32fcb 100644
> > --- a/net/mac802154/cfg.c
> > +++ b/net/mac802154/cfg.c
> > @@ -46,8 +46,7 @@ static int ieee802154_suspend(struct wpan_phy *wpan_phy)
> >         if (!local->open_count)
> >                 goto suspend;
> >
> > -       atomic_inc(&wpan_phy->hold_txs);
> > -       ieee802154_stop_queue(&local->hw);
> > +       ieee802154_sync_and_stop_tx(local);
> >         synchronize_net();
> >
> >         /* stop hardware - this must stop RX */
> > @@ -73,7 +72,7 @@ static int ieee802154_resume(struct wpan_phy *wpan_phy)
> >                 return ret;
> >
> >  wake_up:
> > -       if (!atomic_dec_and_test(&wpan_phy->hold_txs))
> > +       if (!atomic_read(&wpan_phy->hold_txs))
> >                 ieee802154_wake_queue(&local->hw);
> >         local->suspended = false;
> >         return 0;
> > diff --git a/net/mac802154/ieee802154_i.h b/net/mac802154/ieee802154_i.h
> > index 56fcd7ef5b6f..295c9ce091e1 100644
> > --- a/net/mac802154/ieee802154_i.h
> > +++ b/net/mac802154/ieee802154_i.h
> > @@ -122,6 +122,7 @@ extern struct ieee802154_mlme_ops mac802154_mlme_wpan;
> >
> >  void ieee802154_rx(struct ieee802154_local *local, struct sk_buff *skb);
> >  void ieee802154_xmit_sync_worker(struct work_struct *work);
> > +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local);
> >  netdev_tx_t
> >  ieee802154_monitor_start_xmit(struct sk_buff *skb, struct net_device *dev);
> >  netdev_tx_t
> > diff --git a/net/mac802154/tx.c b/net/mac802154/tx.c
> > index abd9a057521e..06ae2e6cea43 100644
> > --- a/net/mac802154/tx.c
> > +++ b/net/mac802154/tx.c
> > @@ -47,7 +47,8 @@ void ieee802154_xmit_sync_worker(struct work_struct *work)
> >                 ieee802154_wake_queue(&local->hw);
> >
> >         kfree_skb(skb);
> > -       atomic_dec(&local->phy->ongoing_txs);
> > +       if (!atomic_dec_and_test(&local->phy->ongoing_txs))
> > +               wake_up(&local->phy->sync_txq);
> >         netdev_dbg(dev, "transmission failed\n");
> >  }
> >
> > @@ -117,6 +118,14 @@ ieee802154_hot_tx(struct ieee802154_local *local, struct sk_buff *skb)
> >         return ieee802154_tx(local, skb);
> >  }
> >
> > +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local)
> > +{
> > +       atomic_inc(&local->phy->hold_txs);
> > +       ieee802154_stop_queue(&local->hw);
> > +       wait_event(local->phy->sync_txq, !atomic_read(&local->phy->ongoing_txs));
> > +       atomic_dec(&local->phy->hold_txs);  
> 
> In my opinion this _still_ races as I mentioned earlier. You need to
> be sure that if you do ieee802154_stop_queue() that no ieee802154_tx()
> or hot_tx() is running at this time. Look into the function I
> mentioned earlier "?netif_tx_disable()?".

I think now I get the problem, but I am having troubles understanding
the logic in netif_tx_disable(), or should I say, the idea that I
should adapt to our situation.

I understand that we should make sure the following situation does not
happen:
- ieee802154_subif_start_xmit() is called
- ieee802154_subif_start_xmit() is called again
- ieee802154_tx() get's executed once and stops the queue
- ongoing_txs gets incremented once
- the first transfer finishes and ongoing_txs gets decremented
- the tx queue is supposedly empty by the current series while
  the second transfer requested earlier has not yet been processed and
  will definitely be tried in a short moment.

I don't find a pretty solution for that. Is your suggestion to use the
netdev tx_global_lock? If yes, then, how? Because it does not appear
clear to me how we should tackle this issue.

In the mean time, I believe the first half of the series is now big
enough to be sent aside given the number of additional commits that
have popped up following your last review :)

Thanks,
Miquèl
Miquel Raynal March 4, 2022, 10:54 a.m. UTC | #3
Hi Alexander,

miquel.raynal@bootlin.com wrote on Thu, 3 Mar 2022 19:17:23 +0100:

> Hi Alexander,
> 
> alex.aring@gmail.com wrote on Sun, 20 Feb 2022 18:49:06 -0500:
> 
> > Hi,
> > 
> > On Mon, Feb 7, 2022 at 9:48 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:  
> > >
> > > Right now we are able to stop a queue but we have no indication if a
> > > transmission is ongoing or not.
> > >
> > > Thanks to recent additions, we can track the number of ongoing
> > > transmissions so we know if the last transmission is over. Adding on top
> > > of it an internal wait queue also allows to be woken up asynchronously
> > > when this happens. If, beforehands, we marked the queue to be held and
> > > stopped it, we end up flushing and stopping the tx queue.
> > >
> > > Thanks to this feature, we will soon be able to introduce a synchronous
> > > transmit API.
> > >
> > > Signed-off-by: Miquel Raynal <miquel.raynal@bootlin.com>
> > > ---
> > >  include/net/cfg802154.h      |  1 +
> > >  net/ieee802154/core.c        |  1 +
> > >  net/mac802154/cfg.c          |  5 ++---
> > >  net/mac802154/ieee802154_i.h |  1 +
> > >  net/mac802154/tx.c           | 11 ++++++++++-
> > >  net/mac802154/util.c         |  3 ++-
> > >  6 files changed, 17 insertions(+), 5 deletions(-)
> > >
> > > diff --git a/include/net/cfg802154.h b/include/net/cfg802154.h
> > > index 043d8e4359e7..0d385a214da3 100644
> > > --- a/include/net/cfg802154.h
> > > +++ b/include/net/cfg802154.h
> > > @@ -217,6 +217,7 @@ struct wpan_phy {
> > >         /* Transmission monitoring and control */
> > >         atomic_t ongoing_txs;
> > >         atomic_t hold_txs;
> > > +       wait_queue_head_t sync_txq;
> > >
> > >         char priv[] __aligned(NETDEV_ALIGN);
> > >  };
> > > diff --git a/net/ieee802154/core.c b/net/ieee802154/core.c
> > > index de259b5170ab..0953cacafbff 100644
> > > --- a/net/ieee802154/core.c
> > > +++ b/net/ieee802154/core.c
> > > @@ -129,6 +129,7 @@ wpan_phy_new(const struct cfg802154_ops *ops, size_t priv_size)
> > >         wpan_phy_net_set(&rdev->wpan_phy, &init_net);
> > >
> > >         init_waitqueue_head(&rdev->dev_wait);
> > > +       init_waitqueue_head(&rdev->wpan_phy.sync_txq);
> > >
> > >         return &rdev->wpan_phy;
> > >  }
> > > diff --git a/net/mac802154/cfg.c b/net/mac802154/cfg.c
> > > index e8aabf215286..da94aaa32fcb 100644
> > > --- a/net/mac802154/cfg.c
> > > +++ b/net/mac802154/cfg.c
> > > @@ -46,8 +46,7 @@ static int ieee802154_suspend(struct wpan_phy *wpan_phy)
> > >         if (!local->open_count)
> > >                 goto suspend;
> > >
> > > -       atomic_inc(&wpan_phy->hold_txs);
> > > -       ieee802154_stop_queue(&local->hw);
> > > +       ieee802154_sync_and_stop_tx(local);
> > >         synchronize_net();
> > >
> > >         /* stop hardware - this must stop RX */
> > > @@ -73,7 +72,7 @@ static int ieee802154_resume(struct wpan_phy *wpan_phy)
> > >                 return ret;
> > >
> > >  wake_up:
> > > -       if (!atomic_dec_and_test(&wpan_phy->hold_txs))
> > > +       if (!atomic_read(&wpan_phy->hold_txs))
> > >                 ieee802154_wake_queue(&local->hw);
> > >         local->suspended = false;
> > >         return 0;
> > > diff --git a/net/mac802154/ieee802154_i.h b/net/mac802154/ieee802154_i.h
> > > index 56fcd7ef5b6f..295c9ce091e1 100644
> > > --- a/net/mac802154/ieee802154_i.h
> > > +++ b/net/mac802154/ieee802154_i.h
> > > @@ -122,6 +122,7 @@ extern struct ieee802154_mlme_ops mac802154_mlme_wpan;
> > >
> > >  void ieee802154_rx(struct ieee802154_local *local, struct sk_buff *skb);
> > >  void ieee802154_xmit_sync_worker(struct work_struct *work);
> > > +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local);
> > >  netdev_tx_t
> > >  ieee802154_monitor_start_xmit(struct sk_buff *skb, struct net_device *dev);
> > >  netdev_tx_t
> > > diff --git a/net/mac802154/tx.c b/net/mac802154/tx.c
> > > index abd9a057521e..06ae2e6cea43 100644
> > > --- a/net/mac802154/tx.c
> > > +++ b/net/mac802154/tx.c
> > > @@ -47,7 +47,8 @@ void ieee802154_xmit_sync_worker(struct work_struct *work)
> > >                 ieee802154_wake_queue(&local->hw);
> > >
> > >         kfree_skb(skb);
> > > -       atomic_dec(&local->phy->ongoing_txs);
> > > +       if (!atomic_dec_and_test(&local->phy->ongoing_txs))
> > > +               wake_up(&local->phy->sync_txq);
> > >         netdev_dbg(dev, "transmission failed\n");
> > >  }
> > >
> > > @@ -117,6 +118,14 @@ ieee802154_hot_tx(struct ieee802154_local *local, struct sk_buff *skb)
> > >         return ieee802154_tx(local, skb);
> > >  }
> > >
> > > +void ieee802154_sync_and_stop_tx(struct ieee802154_local *local)
> > > +{
> > > +       atomic_inc(&local->phy->hold_txs);
> > > +       ieee802154_stop_queue(&local->hw);
> > > +       wait_event(local->phy->sync_txq, !atomic_read(&local->phy->ongoing_txs));
> > > +       atomic_dec(&local->phy->hold_txs);    
> > 
> > In my opinion this _still_ races as I mentioned earlier. You need to
> > be sure that if you do ieee802154_stop_queue() that no ieee802154_tx()
> > or hot_tx() is running at this time. Look into the function I
> > mentioned earlier "?netif_tx_disable()?".  
> 
> I think now I get the problem, but I am having troubles understanding
> the logic in netif_tx_disable(), or should I say, the idea that I
> should adapt to our situation.
> 
> I understand that we should make sure the following situation does not
> happen:
> - ieee802154_subif_start_xmit() is called
> - ieee802154_subif_start_xmit() is called again
> - ieee802154_tx() get's executed once and stops the queue
> - ongoing_txs gets incremented once
> - the first transfer finishes and ongoing_txs gets decremented
> - the tx queue is supposedly empty by the current series while
>   the second transfer requested earlier has not yet been processed and
>   will definitely be tried in a short moment.
> 
> I don't find a pretty solution for that. Is your suggestion to use the
> netdev tx_global_lock? If yes, then, how? Because it does not appear
> clear to me how we should tackle this issue.

I had a second look at it and it appears to me that the issue was
already there and is structural. We just did not really cared about it
because we didn't bother with synchronization issues.

Here is a figure to base our discussions on:

                       enable
                         ┌────────────────────────────────────────────────────────────┐
                         │                                                            │
                         ▼                                                            │
          packet     ┌────────┐   ┌────────────┐   ┌────────────┐   ┌───────┐   ┌─────┴─────┐
            ┌┐       │        │   │            │   │            │   │       │   │           │
User  ──────┴┴──────►│ Queue  ├──►│ ieee*_tx() ├──►│stop_queue()├──►│xmit() ├──►│ wait/sync │
                     │        │   │            │   │            │   │       │   │           │
                     └────────┘   └────────────┘   └─────┬──────┘   └───────┘   └───────────┘
                         ▲                               │
                         │                               │
                         │                               │
                         └───────────────────────────────┘
                      disable

I assumed that we don't have the hand on the queuing mechanism (on the
left of the 'queue' box). I looked at the core code under
net/ieee802154/ and even if incrementing a counter there would be
handy, I assumed this was not an acceptable solution.

So then we end up with the possible situation where there are two (or
more) packets that must be processed by the mac tx logic (at the right
of the 'queue' box). The problem is of course the atomicity of the
stop_queue() compared to the number of times the ieee802154_tx()
function call can be made. We can have several packets being processed,
we don't have any way to know that.

Moving the stop_queue earlier would just reduce the racy area, without
fully preventing it, so not a solution per-se.

Perhaps we could monitor the state of the queue, it would help us know
if we need to retain a packet, but I personally find this a bit crappy,
yet probably working. Here is a drafted implementation, I'm only half
convinced this is a good idea and your input is welcome here:

--- a/net/mac802154/tx.c
+++ b/net/mac802154/tx.c
@@ -77,14 +77,26 @@ ieee802154_tx(struct ieee802154_local *local, struct sk_buff *skb)
                put_unaligned_le16(crc, skb_put(skb, 2));
        }
 
+retry:
+       while (netif_queue_stopped())
+               schedule();
+
+       acquire_lock();
+       if (netif_queue_stopped()) {
+               release_lock();
+               goto retry;
+       }
+
        /* Stop the netif queue on each sub_if_data object. */
        ieee802154_stop_queue(&local->hw);
 
+       atomic_inc(&local->phy->ongoing_txs);
+       release_lock();
+
        /* Drivers should preferably implement the async callback. In some rare
         * cases they only provide a sync callback which we will use as a
         * fallback.
         */
        if (local->ops->xmit_async) {
                unsigned int len = skb->len;
 
@@ -122,8 +134,10 @@ int ieee802154_sync_and_stop_tx(struct ieee802154_local *local)
 {
        int ret;
 
+       acquire_lock();
        atomic_inc(&local->phy->hold_txs);
        ieee802154_stop_queue(&local->hw);
+       release_lock();
        wait_event(local->phy->sync_txq, !atomic_read(&local->phy->ongoing_txs));
        ret = local->tx_result;
        atomic_dec(&local->phy->hold_txs);


If we go in this direction, perhaps it's best to rework the sync API
like you already proposed: just stopping the queue and syncing the
ongoing transfers, so that after that we can use a dedicated tx path
for MLME commands, bypassing the queue-is-stopped check. This way we
avoid risking to deliver data packets between two MLME calls.

Thanks,
Miquèl
Alexander Aring March 13, 2022, 8:43 p.m. UTC | #4
Hi,

On Fri, Mar 4, 2022 at 5:54 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:

> I had a second look at it and it appears to me that the issue was
> already there and is structural. We just did not really cared about it
> because we didn't bother with synchronization issues.
>

I am not sure if I understand correctly. We stop the queue at some
specific moment and we need to make sure that xmit_do() is not called
or can't be called anymore.

I was thinking about:

void ieee802154_disable_queue(struct ieee802154_hw *hw)
{
        struct ieee802154_local *local = hw_to_local(hw);
        struct ieee802154_sub_if_data *sdata;

        rcu_read_lock();
        list_for_each_entry_rcu(sdata, &local->interfaces, list) {
                if (!sdata->dev)
                        continue;

               netif_tx_disable(sdata->dev);
        }
        rcu_read_unlock();
}
EXPORT_SYMBOL(ieee802154_stop_queue);

From my quick view is that "netif_tx_disable()" ensures by holding
locks and other things and doing netif_tx_stop_queue() it we can be
sure there will be no xmit_do() going on while it's called and
afterwards. It can be that there are still transmissions on the
transceiver that are on the way, but then your atomic counter and
wait_event() will come in place.
We need to be sure there will be nothing queued anymore for
transmission what (in my opinion) tx_disable() does. from any context.

We might need to review some netif callbacks... I have in my mind for
example stop(), maybe netif_tx_stop_queue() is enough (because the
context is like netif_tx_disable(), helding similar locks, etc.) but
we might want to be sure that nothing is going on anymore by using
your wait_event() with counter.

Is there any problem which I don't see?

- Alex
Miquel Raynal March 18, 2022, 6:11 p.m. UTC | #5
Hi Alexander,

alex.aring@gmail.com wrote on Sun, 13 Mar 2022 16:43:52 -0400:

> Hi,
> 
> On Fri, Mar 4, 2022 at 5:54 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
> 
> > I had a second look at it and it appears to me that the issue was
> > already there and is structural. We just did not really cared about it
> > because we didn't bother with synchronization issues.
> >  
> 
> I am not sure if I understand correctly. We stop the queue at some
> specific moment and we need to make sure that xmit_do() is not called
> or can't be called anymore.
> 
> I was thinking about:
> 
> void ieee802154_disable_queue(struct ieee802154_hw *hw)
> {
>         struct ieee802154_local *local = hw_to_local(hw);
>         struct ieee802154_sub_if_data *sdata;
> 
>         rcu_read_lock();
>         list_for_each_entry_rcu(sdata, &local->interfaces, list) {
>                 if (!sdata->dev)
>                         continue;
> 
>                netif_tx_disable(sdata->dev);
>         }
>         rcu_read_unlock();
> }
> EXPORT_SYMBOL(ieee802154_stop_queue);
> 
> From my quick view is that "netif_tx_disable()" ensures by holding
> locks and other things and doing netif_tx_stop_queue() it we can be
> sure there will be no xmit_do() going on while it's called and
> afterwards. It can be that there are still transmissions on the
> transceiver that are on the way, but then your atomic counter and
> wait_event() will come in place.

I went for a deeper investigation to understand how the net core
was calling our callbacks. And it appeared to go through
dev_hard_start_xmit() and come from __dev_queue_xmit(). This means
the ieee802154 callback could only be called once at a time
because it is protected by the network device transmit lock
(netif_tx_lock()). Which makes the logic safe and not racy as I
initially thought. This was the missing peace in my mental model I
believe.

> We need to be sure there will be nothing queued anymore for
> transmission what (in my opinion) tx_disable() does. from any context.
>
> We might need to review some netif callbacks... I have in my mind for
> example stop(), maybe netif_tx_stop_queue() is enough (because the
> context is like netif_tx_disable(), helding similar locks, etc.) but
> we might want to be sure that nothing is going on anymore by using
> your wait_event() with counter.

I don't see a real reason anymore to use the tx_disable() call. Is
there any reason this could be needed that I don't have in mind? Right
now the only thing that I see is that it could delay a little bit the
moment where we actually stop the queue because we would be waiting for
the lock to be released after the skb has been offloaded to hardware.
Perhaps maybe we would let another frame to be transmitted before we
actually get the lock.

> Is there any problem which I don't see?

One question however, as I understand, if userspace tries to send more
packets, I believe the "if (!stopped)" condition will be false and the
xmit call will simply be skipped, ending with a -ENETDOWN error [1]. Is
it what we want? I initially thought we could actually queue patches and
wait for the queue to be re-enabled again, but it does not look easy.

[1] https://elixir.bootlin.com/linux/latest/source/net/core/dev.c#L4249

Thanks,
Miquèl
Alexander Aring March 27, 2022, 4:45 p.m. UTC | #6
Hi,

On Fri, Mar 18, 2022 at 2:11 PM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
>
> Hi Alexander,
>
> alex.aring@gmail.com wrote on Sun, 13 Mar 2022 16:43:52 -0400:
>
> > Hi,
> >
> > On Fri, Mar 4, 2022 at 5:54 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
> >
> > > I had a second look at it and it appears to me that the issue was
> > > already there and is structural. We just did not really cared about it
> > > because we didn't bother with synchronization issues.
> > >
> >
> > I am not sure if I understand correctly. We stop the queue at some
> > specific moment and we need to make sure that xmit_do() is not called
> > or can't be called anymore.
> >
> > I was thinking about:
> >
> > void ieee802154_disable_queue(struct ieee802154_hw *hw)
> > {
> >         struct ieee802154_local *local = hw_to_local(hw);
> >         struct ieee802154_sub_if_data *sdata;
> >
> >         rcu_read_lock();
> >         list_for_each_entry_rcu(sdata, &local->interfaces, list) {
> >                 if (!sdata->dev)
> >                         continue;
> >
> >                netif_tx_disable(sdata->dev);
> >         }
> >         rcu_read_unlock();
> > }
> > EXPORT_SYMBOL(ieee802154_stop_queue);
> >
> > From my quick view is that "netif_tx_disable()" ensures by holding
> > locks and other things and doing netif_tx_stop_queue() it we can be
> > sure there will be no xmit_do() going on while it's called and
> > afterwards. It can be that there are still transmissions on the
> > transceiver that are on the way, but then your atomic counter and
> > wait_event() will come in place.
>
> I went for a deeper investigation to understand how the net core
> was calling our callbacks. And it appeared to go through
> dev_hard_start_xmit() and come from __dev_queue_xmit(). This means
> the ieee802154 callback could only be called once at a time
> because it is protected by the network device transmit lock
> (netif_tx_lock()). Which makes the logic safe and not racy as I
> initially thought. This was the missing peace in my mental model I
> believe.
>

You forget here that you want to stop all transmission and to wait
that all transmissions are done from a sleepable context.

> > We need to be sure there will be nothing queued anymore for
> > transmission what (in my opinion) tx_disable() does. from any context.
> >
> > We might need to review some netif callbacks... I have in my mind for
> > example stop(), maybe netif_tx_stop_queue() is enough (because the
> > context is like netif_tx_disable(), helding similar locks, etc.) but
> > we might want to be sure that nothing is going on anymore by using
> > your wait_event() with counter.
>
> I don't see a real reason anymore to use the tx_disable() call. Is
> there any reason this could be needed that I don't have in mind? Right
> now the only thing that I see is that it could delay a little bit the
> moment where we actually stop the queue because we would be waiting for
> the lock to be released after the skb has been offloaded to hardware.
> Perhaps maybe we would let another frame to be transmitted before we
> actually get the lock.
>
> > Is there any problem which I don't see?
>
> One question however, as I understand, if userspace tries to send more
> packets, I believe the "if (!stopped)" condition will be false and the
> xmit call will simply be skipped, ending with a -ENETDOWN error [1]. Is
> it what we want? I initially thought we could actually queue patches and
> wait for the queue to be re-enabled again, but it does not look easy.
>

The problem is here that netif_tx_stop_queue() will only set some
flags and this will be checked there. [0]
Now you want to do from a sleepable context:

1. stop queue (net core functionality check [0])
2. wait until all ongoing transmissions are done (softmac layer atomic
counter handled in xmit_do())

Example situation for the race:

cpu0:
 - checked _already_ the if queue is stopped [0] it was not the case
BUT not incremented the atomic counter yet to signal that a
transmission is going on  (which is done later in xmit_do)

While cpu0 is in the above mentioned state cpu1 is in the following state:

- mlme message will transmitted
- stop the queue by setting flag [0] (note the check in cpu0 already
happened and a transmission is on the way)
- make a wait_event($ONGOING_TRANSMISSION_COUNTER) which will not wait
- (it's zero because and does not wait because cpu0 didn't incremented
the ongoing transmission counter yet)

---

This will end in that both cpu0 and cpu1 start transmissions... but
this is only "we completed the spi transfer to the transceiver" the
framebuffer is written and transmission is started. That the
transceiver actually transmits the frame is completely handled on the
transceiver side, on the Linux side we only need to care about that we
don't overwrite the framebuffer while a transmission is going on. This
can happen here, e.g. cpu0 writes the framebuffer first, then cpu1
will overwrite the framebuffer because we start another transmission
(writing framebuffer) while the transceiver still reads the
framebuffer for the cpu0 transmission. In short it will break.

If we want to start transmissions from any sleepable context we cannot
use "netif_tx_stop_queue()" because it does not guarantee that
xmit_do() is still going on, "netif_tx_disable()" will do it because
it will held the xmit_lock while setting the flag and we don't run
into the above problem.

Is this more clear? I think it was never clear what I really meant by
this race, I hope the above example helped. Also "netif_tx_disable()"
was my first hit to find netif_tx_disable()what we need, but maybe
there exists better options?
To be sure, I mean we need "netif_tx_disable()" only for any sleepable
context e.g. we trigger mlme transmission and take control of all
transmissions and be sure nothing is going on anymore, then we need to
have still the wait_event(). After this is done we can be sure no
transmission is going on and we can take over until we are done (mlme
sync tx handling) and can call wake_queue() again.

- Alex

[0] https://elixir.bootlin.com/linux/v5.17/source/net/core/dev.c#L4114
Miquel Raynal March 29, 2022, 4:29 p.m. UTC | #7
Hi Alexander,

alex.aring@gmail.com wrote on Sun, 27 Mar 2022 12:45:20 -0400:

> Hi,
> 
> On Fri, Mar 18, 2022 at 2:11 PM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
> >
> > Hi Alexander,
> >
> > alex.aring@gmail.com wrote on Sun, 13 Mar 2022 16:43:52 -0400:
> >  
> > > Hi,
> > >
> > > On Fri, Mar 4, 2022 at 5:54 AM Miquel Raynal <miquel.raynal@bootlin.com> wrote:
> > >  
> > > > I had a second look at it and it appears to me that the issue was
> > > > already there and is structural. We just did not really cared about it
> > > > because we didn't bother with synchronization issues.
> > > >  
> > >
> > > I am not sure if I understand correctly. We stop the queue at some
> > > specific moment and we need to make sure that xmit_do() is not called
> > > or can't be called anymore.
> > >
> > > I was thinking about:
> > >
> > > void ieee802154_disable_queue(struct ieee802154_hw *hw)
> > > {
> > >         struct ieee802154_local *local = hw_to_local(hw);
> > >         struct ieee802154_sub_if_data *sdata;
> > >
> > >         rcu_read_lock();
> > >         list_for_each_entry_rcu(sdata, &local->interfaces, list) {
> > >                 if (!sdata->dev)
> > >                         continue;
> > >
> > >                netif_tx_disable(sdata->dev);
> > >         }
> > >         rcu_read_unlock();
> > > }
> > > EXPORT_SYMBOL(ieee802154_stop_queue);
> > >
> > > From my quick view is that "netif_tx_disable()" ensures by holding
> > > locks and other things and doing netif_tx_stop_queue() it we can be
> > > sure there will be no xmit_do() going on while it's called and
> > > afterwards. It can be that there are still transmissions on the
> > > transceiver that are on the way, but then your atomic counter and
> > > wait_event() will come in place.  
> >
> > I went for a deeper investigation to understand how the net core
> > was calling our callbacks. And it appeared to go through
> > dev_hard_start_xmit() and come from __dev_queue_xmit(). This means
> > the ieee802154 callback could only be called once at a time
> > because it is protected by the network device transmit lock
> > (netif_tx_lock()). Which makes the logic safe and not racy as I
> > initially thought. This was the missing peace in my mental model I
> > believe.
> >  
> 
> You forget here that you want to stop all transmission and to wait
> that all transmissions are done from a sleepable context.
> 
> > > We need to be sure there will be nothing queued anymore for
> > > transmission what (in my opinion) tx_disable() does. from any context.
> > >
> > > We might need to review some netif callbacks... I have in my mind for
> > > example stop(), maybe netif_tx_stop_queue() is enough (because the
> > > context is like netif_tx_disable(), helding similar locks, etc.) but
> > > we might want to be sure that nothing is going on anymore by using
> > > your wait_event() with counter.  
> >
> > I don't see a real reason anymore to use the tx_disable() call. Is
> > there any reason this could be needed that I don't have in mind? Right
> > now the only thing that I see is that it could delay a little bit the
> > moment where we actually stop the queue because we would be waiting for
> > the lock to be released after the skb has been offloaded to hardware.
> > Perhaps maybe we would let another frame to be transmitted before we
> > actually get the lock.
> >  
> > > Is there any problem which I don't see?  
> >
> > One question however, as I understand, if userspace tries to send more
> > packets, I believe the "if (!stopped)" condition will be false and the
> > xmit call will simply be skipped, ending with a -ENETDOWN error [1]. Is
> > it what we want? I initially thought we could actually queue patches and
> > wait for the queue to be re-enabled again, but it does not look easy.
> >  
> 
> The problem is here that netif_tx_stop_queue() will only set some
> flags and this will be checked there. [0]
> Now you want to do from a sleepable context:
> 
> 1. stop queue (net core functionality check [0])
> 2. wait until all ongoing transmissions are done (softmac layer atomic
> counter handled in xmit_do())
> 
> Example situation for the race:
> 
> cpu0:
>  - checked _already_ the if queue is stopped [0] it was not the case
> BUT not incremented the atomic counter yet to signal that a
> transmission is going on  (which is done later in xmit_do)
> 
> While cpu0 is in the above mentioned state cpu1 is in the following state:
> 
> - mlme message will transmitted
> - stop the queue by setting flag [0] (note the check in cpu0 already
> happened and a transmission is on the way)
> - make a wait_event($ONGOING_TRANSMISSION_COUNTER) which will not wait
> - (it's zero because and does not wait because cpu0 didn't incremented
> the ongoing transmission counter yet)
> 
> ---
> 
> This will end in that both cpu0 and cpu1 start transmissions... but
> this is only "we completed the spi transfer to the transceiver" the
> framebuffer is written and transmission is started. That the
> transceiver actually transmits the frame is completely handled on the
> transceiver side, on the Linux side we only need to care about that we
> don't overwrite the framebuffer while a transmission is going on. This
> can happen here, e.g. cpu0 writes the framebuffer first, then cpu1
> will overwrite the framebuffer because we start another transmission
> (writing framebuffer) while the transceiver still reads the
> framebuffer for the cpu0 transmission. In short it will break.
> 
> If we want to start transmissions from any sleepable context we cannot
> use "netif_tx_stop_queue()" because it does not guarantee that
> xmit_do() is still going on, "netif_tx_disable()" will do it because
> it will held the xmit_lock while setting the flag and we don't run
> into the above problem.
> 
> Is this more clear?

Crystal clear. Thanks for taking the time to explain it. I am now
convinced of the usefulness of calling netif_tx_disable() (and create
our own ieee802154 helper to call it).

> I think it was never clear what I really meant by
> this race, I hope the above example helped. Also "netif_tx_disable()"
> was my first hit to find netif_tx_disable()what we need, but maybe
> there exists better options?
> To be sure, I mean we need "netif_tx_disable()" only for any sleepable
> context e.g. we trigger mlme transmission and take control of all
> transmissions and be sure nothing is going on anymore, then we need to
> have still the wait_event(). After this is done we can be sure no
> transmission is going on and we can take over until we are done (mlme
> sync tx handling) and can call wake_queue() again.
> 
> - Alex
> 
> [0] https://elixir.bootlin.com/linux/v5.17/source/net/core/dev.c#L4114


Thanks,
Miquèl
diff mbox series

Patch

diff --git a/include/net/cfg802154.h b/include/net/cfg802154.h
index 043d8e4359e7..0d385a214da3 100644
--- a/include/net/cfg802154.h
+++ b/include/net/cfg802154.h
@@ -217,6 +217,7 @@  struct wpan_phy {
 	/* Transmission monitoring and control */
 	atomic_t ongoing_txs;
 	atomic_t hold_txs;
+	wait_queue_head_t sync_txq;
 
 	char priv[] __aligned(NETDEV_ALIGN);
 };
diff --git a/net/ieee802154/core.c b/net/ieee802154/core.c
index de259b5170ab..0953cacafbff 100644
--- a/net/ieee802154/core.c
+++ b/net/ieee802154/core.c
@@ -129,6 +129,7 @@  wpan_phy_new(const struct cfg802154_ops *ops, size_t priv_size)
 	wpan_phy_net_set(&rdev->wpan_phy, &init_net);
 
 	init_waitqueue_head(&rdev->dev_wait);
+	init_waitqueue_head(&rdev->wpan_phy.sync_txq);
 
 	return &rdev->wpan_phy;
 }
diff --git a/net/mac802154/cfg.c b/net/mac802154/cfg.c
index e8aabf215286..da94aaa32fcb 100644
--- a/net/mac802154/cfg.c
+++ b/net/mac802154/cfg.c
@@ -46,8 +46,7 @@  static int ieee802154_suspend(struct wpan_phy *wpan_phy)
 	if (!local->open_count)
 		goto suspend;
 
-	atomic_inc(&wpan_phy->hold_txs);
-	ieee802154_stop_queue(&local->hw);
+	ieee802154_sync_and_stop_tx(local);
 	synchronize_net();
 
 	/* stop hardware - this must stop RX */
@@ -73,7 +72,7 @@  static int ieee802154_resume(struct wpan_phy *wpan_phy)
 		return ret;
 
 wake_up:
-	if (!atomic_dec_and_test(&wpan_phy->hold_txs))
+	if (!atomic_read(&wpan_phy->hold_txs))
 		ieee802154_wake_queue(&local->hw);
 	local->suspended = false;
 	return 0;
diff --git a/net/mac802154/ieee802154_i.h b/net/mac802154/ieee802154_i.h
index 56fcd7ef5b6f..295c9ce091e1 100644
--- a/net/mac802154/ieee802154_i.h
+++ b/net/mac802154/ieee802154_i.h
@@ -122,6 +122,7 @@  extern struct ieee802154_mlme_ops mac802154_mlme_wpan;
 
 void ieee802154_rx(struct ieee802154_local *local, struct sk_buff *skb);
 void ieee802154_xmit_sync_worker(struct work_struct *work);
+void ieee802154_sync_and_stop_tx(struct ieee802154_local *local);
 netdev_tx_t
 ieee802154_monitor_start_xmit(struct sk_buff *skb, struct net_device *dev);
 netdev_tx_t
diff --git a/net/mac802154/tx.c b/net/mac802154/tx.c
index abd9a057521e..06ae2e6cea43 100644
--- a/net/mac802154/tx.c
+++ b/net/mac802154/tx.c
@@ -47,7 +47,8 @@  void ieee802154_xmit_sync_worker(struct work_struct *work)
 		ieee802154_wake_queue(&local->hw);
 
 	kfree_skb(skb);
-	atomic_dec(&local->phy->ongoing_txs);
+	if (!atomic_dec_and_test(&local->phy->ongoing_txs))
+		wake_up(&local->phy->sync_txq);
 	netdev_dbg(dev, "transmission failed\n");
 }
 
@@ -117,6 +118,14 @@  ieee802154_hot_tx(struct ieee802154_local *local, struct sk_buff *skb)
 	return ieee802154_tx(local, skb);
 }
 
+void ieee802154_sync_and_stop_tx(struct ieee802154_local *local)
+{
+	atomic_inc(&local->phy->hold_txs);
+	ieee802154_stop_queue(&local->hw);
+	wait_event(local->phy->sync_txq, !atomic_read(&local->phy->ongoing_txs));
+	atomic_dec(&local->phy->hold_txs);
+}
+
 netdev_tx_t
 ieee802154_monitor_start_xmit(struct sk_buff *skb, struct net_device *dev)
 {
diff --git a/net/mac802154/util.c b/net/mac802154/util.c
index cc572c12a8f9..e666ac7a16bd 100644
--- a/net/mac802154/util.c
+++ b/net/mac802154/util.c
@@ -93,7 +93,8 @@  static void ieee802154_xmit_end(struct ieee802154_hw *hw, bool ifs_handling,
 	if (!mac802154_queue_is_stopped(local))
 		ieee802154_wakeup_after_xmit_done(hw, ifs_handling, skb_len);
 
-	atomic_dec(&hw->phy->ongoing_txs);
+	if (!atomic_dec_and_test(&hw->phy->ongoing_txs))
+		wake_up(&hw->phy->sync_txq);
 }
 
 void ieee802154_xmit_complete(struct ieee802154_hw *hw, struct sk_buff *skb,