diff mbox series

[v1,5/9] hw/core: stream: Add an end-of-packet flag

Message ID 20200430162439.2659-6-edgar.iglesias@gmail.com (mailing list archive)
State New, archived
Headers show
Series hw/core: stream: Add end-of-packet flag | expand

Commit Message

Edgar E. Iglesias April 30, 2020, 4:24 p.m. UTC
From: "Edgar E. Iglesias" <edgar.iglesias@xilinx.com>

Some stream clients stream an endless stream of data while
other clients stream data in packets. Stream interfaces
usually have a way to signal the end of a packet or the
last beat of a transfer.

This adds an end-of-packet flag to the push interface.

Signed-off-by: Edgar E. Iglesias <edgar.iglesias@xilinx.com>
---
 include/hw/stream.h     |  5 +++--
 hw/core/stream.c        |  4 ++--
 hw/dma/xilinx_axidma.c  | 10 ++++++----
 hw/net/xilinx_axienet.c | 14 ++++++++++----
 hw/ssi/xilinx_spips.c   |  2 +-
 5 files changed, 22 insertions(+), 13 deletions(-)

Comments

Francisco Iglesias May 5, 2020, 2:20 p.m. UTC | #1
On [2020 Apr 30] Thu 18:24:35, Edgar E. Iglesias wrote:
> From: "Edgar E. Iglesias" <edgar.iglesias@xilinx.com>
> 
> Some stream clients stream an endless stream of data while
> other clients stream data in packets. Stream interfaces
> usually have a way to signal the end of a packet or the
> last beat of a transfer.
> 
> This adds an end-of-packet flag to the push interface.
> 
> Signed-off-by: Edgar E. Iglesias <edgar.iglesias@xilinx.com>

Reviewed-by: Francisco Iglesias <frasse.iglesias@gmail.com>

> ---
>  include/hw/stream.h     |  5 +++--
>  hw/core/stream.c        |  4 ++--
>  hw/dma/xilinx_axidma.c  | 10 ++++++----
>  hw/net/xilinx_axienet.c | 14 ++++++++++----
>  hw/ssi/xilinx_spips.c   |  2 +-
>  5 files changed, 22 insertions(+), 13 deletions(-)
> 
> diff --git a/include/hw/stream.h b/include/hw/stream.h
> index d02f62ca89..ed09e83683 100644
> --- a/include/hw/stream.h
> +++ b/include/hw/stream.h
> @@ -39,12 +39,13 @@ typedef struct StreamSlaveClass {
>       * @obj: Stream slave to push to
>       * @buf: Data to write
>       * @len: Maximum number of bytes to write
> +     * @eop: End of packet flag
>       */
> -    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len);
> +    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len, bool eop);
>  } StreamSlaveClass;
>  
>  size_t
> -stream_push(StreamSlave *sink, uint8_t *buf, size_t len);
> +stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop);
>  
>  bool
>  stream_can_push(StreamSlave *sink, StreamCanPushNotifyFn notify,
> diff --git a/hw/core/stream.c b/hw/core/stream.c
> index 39b1e595cd..a65ad1208d 100644
> --- a/hw/core/stream.c
> +++ b/hw/core/stream.c
> @@ -3,11 +3,11 @@
>  #include "qemu/module.h"
>  
>  size_t
> -stream_push(StreamSlave *sink, uint8_t *buf, size_t len)
> +stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop)
>  {
>      StreamSlaveClass *k =  STREAM_SLAVE_GET_CLASS(sink);
>  
> -    return k->push(sink, buf, len);
> +    return k->push(sink, buf, len, eop);
>  }
>  
>  bool
> diff --git a/hw/dma/xilinx_axidma.c b/hw/dma/xilinx_axidma.c
> index 4540051448..a770e12c96 100644
> --- a/hw/dma/xilinx_axidma.c
> +++ b/hw/dma/xilinx_axidma.c
> @@ -283,7 +283,7 @@ static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
>  
>          if (stream_desc_sof(&s->desc)) {
>              s->pos = 0;
> -            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app));
> +            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app), true);
>          }
>  
>          txlen = s->desc.control & SDESC_CTRL_LEN_MASK;
> @@ -298,7 +298,7 @@ static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
>          s->pos += txlen;
>  
>          if (stream_desc_eof(&s->desc)) {
> -            stream_push(tx_data_dev, s->txbuf, s->pos);
> +            stream_push(tx_data_dev, s->txbuf, s->pos, true);
>              s->pos = 0;
>              stream_complete(s);
>          }
> @@ -384,7 +384,7 @@ static void xilinx_axidma_reset(DeviceState *dev)
>  
>  static size_t
>  xilinx_axidma_control_stream_push(StreamSlave *obj, unsigned char *buf,
> -                                  size_t len)
> +                                  size_t len, bool eop)
>  {
>      XilinxAXIDMAStreamSlave *cs = XILINX_AXI_DMA_CONTROL_STREAM(obj);
>      struct Stream *s = &cs->dma->streams[1];
> @@ -416,12 +416,14 @@ xilinx_axidma_data_stream_can_push(StreamSlave *obj,
>  }
>  
>  static size_t
> -xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len)
> +xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len,
> +                               bool eop)
>  {
>      XilinxAXIDMAStreamSlave *ds = XILINX_AXI_DMA_DATA_STREAM(obj);
>      struct Stream *s = &ds->dma->streams[1];
>      size_t ret;
>  
> +    assert(eop);
>      ret = stream_process_s2mem(s, buf, len);
>      stream_update_irq(s);
>      return ret;
> diff --git a/hw/net/xilinx_axienet.c b/hw/net/xilinx_axienet.c
> index c8dfcda3ee..bd48305577 100644
> --- a/hw/net/xilinx_axienet.c
> +++ b/hw/net/xilinx_axienet.c
> @@ -697,14 +697,14 @@ static void axienet_eth_rx_notify(void *opaque)
>                                             axienet_eth_rx_notify, s)) {
>          size_t ret = stream_push(s->tx_control_dev,
>                                   (void *)s->rxapp + CONTROL_PAYLOAD_SIZE
> -                                 - s->rxappsize, s->rxappsize);
> +                                 - s->rxappsize, s->rxappsize, true);
>          s->rxappsize -= ret;
>      }
>  
>      while (s->rxsize && stream_can_push(s->tx_data_dev,
>                                          axienet_eth_rx_notify, s)) {
>          size_t ret = stream_push(s->tx_data_dev, (void *)s->rxmem + s->rxpos,
> -                                 s->rxsize);
> +                                 s->rxsize, true);
>          s->rxsize -= ret;
>          s->rxpos += ret;
>          if (!s->rxsize) {
> @@ -874,12 +874,14 @@ static ssize_t eth_rx(NetClientState *nc, const uint8_t *buf, size_t size)
>  }
>  
>  static size_t
> -xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
> +xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len,
> +                                   bool eop)
>  {
>      int i;
>      XilinxAXIEnetStreamSlave *cs = XILINX_AXI_ENET_CONTROL_STREAM(obj);
>      XilinxAXIEnet *s = cs->enet;
>  
> +    assert(eop);
>      if (len != CONTROL_PAYLOAD_SIZE) {
>          hw_error("AXI Enet requires %d byte control stream payload\n",
>                   (int)CONTROL_PAYLOAD_SIZE);
> @@ -894,11 +896,15 @@ xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
>  }
>  
>  static size_t
> -xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size)
> +xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size,
> +                                bool eop)
>  {
>      XilinxAXIEnetStreamSlave *ds = XILINX_AXI_ENET_DATA_STREAM(obj);
>      XilinxAXIEnet *s = ds->enet;
>  
> +    /* We don't support fragmented packets yet.  */
> +    assert(eop);
> +
>      /* TX enable ?  */
>      if (!(s->tc & TC_TX)) {
>          return size;
> diff --git a/hw/ssi/xilinx_spips.c b/hw/ssi/xilinx_spips.c
> index c57850a505..4cfce882ab 100644
> --- a/hw/ssi/xilinx_spips.c
> +++ b/hw/ssi/xilinx_spips.c
> @@ -868,7 +868,7 @@ static void xlnx_zynqmp_qspips_notify(void *opaque)
>  
>          memcpy(rq->dma_buf, rxd, num);
>  
> -        ret = stream_push(rq->dma, rq->dma_buf, num);
> +        ret = stream_push(rq->dma, rq->dma_buf, num, false);
>          assert(ret == num);
>          xlnx_zynqmp_qspips_check_flush(rq);
>      }
> -- 
> 2.20.1
>
Alistair Francis May 5, 2020, 4:35 p.m. UTC | #2
On Thu, Apr 30, 2020 at 9:26 AM Edgar E. Iglesias
<edgar.iglesias@gmail.com> wrote:
>
> From: "Edgar E. Iglesias" <edgar.iglesias@xilinx.com>
>
> Some stream clients stream an endless stream of data while
> other clients stream data in packets. Stream interfaces
> usually have a way to signal the end of a packet or the
> last beat of a transfer.
>
> This adds an end-of-packet flag to the push interface.
>
> Signed-off-by: Edgar E. Iglesias <edgar.iglesias@xilinx.com>

Reviewed-by: Alistair Francis <alistair.francis@wdc.com>

Alistair

> ---
>  include/hw/stream.h     |  5 +++--
>  hw/core/stream.c        |  4 ++--
>  hw/dma/xilinx_axidma.c  | 10 ++++++----
>  hw/net/xilinx_axienet.c | 14 ++++++++++----
>  hw/ssi/xilinx_spips.c   |  2 +-
>  5 files changed, 22 insertions(+), 13 deletions(-)
>
> diff --git a/include/hw/stream.h b/include/hw/stream.h
> index d02f62ca89..ed09e83683 100644
> --- a/include/hw/stream.h
> +++ b/include/hw/stream.h
> @@ -39,12 +39,13 @@ typedef struct StreamSlaveClass {
>       * @obj: Stream slave to push to
>       * @buf: Data to write
>       * @len: Maximum number of bytes to write
> +     * @eop: End of packet flag
>       */
> -    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len);
> +    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len, bool eop);
>  } StreamSlaveClass;
>
>  size_t
> -stream_push(StreamSlave *sink, uint8_t *buf, size_t len);
> +stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop);
>
>  bool
>  stream_can_push(StreamSlave *sink, StreamCanPushNotifyFn notify,
> diff --git a/hw/core/stream.c b/hw/core/stream.c
> index 39b1e595cd..a65ad1208d 100644
> --- a/hw/core/stream.c
> +++ b/hw/core/stream.c
> @@ -3,11 +3,11 @@
>  #include "qemu/module.h"
>
>  size_t
> -stream_push(StreamSlave *sink, uint8_t *buf, size_t len)
> +stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop)
>  {
>      StreamSlaveClass *k =  STREAM_SLAVE_GET_CLASS(sink);
>
> -    return k->push(sink, buf, len);
> +    return k->push(sink, buf, len, eop);
>  }
>
>  bool
> diff --git a/hw/dma/xilinx_axidma.c b/hw/dma/xilinx_axidma.c
> index 4540051448..a770e12c96 100644
> --- a/hw/dma/xilinx_axidma.c
> +++ b/hw/dma/xilinx_axidma.c
> @@ -283,7 +283,7 @@ static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
>
>          if (stream_desc_sof(&s->desc)) {
>              s->pos = 0;
> -            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app));
> +            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app), true);
>          }
>
>          txlen = s->desc.control & SDESC_CTRL_LEN_MASK;
> @@ -298,7 +298,7 @@ static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
>          s->pos += txlen;
>
>          if (stream_desc_eof(&s->desc)) {
> -            stream_push(tx_data_dev, s->txbuf, s->pos);
> +            stream_push(tx_data_dev, s->txbuf, s->pos, true);
>              s->pos = 0;
>              stream_complete(s);
>          }
> @@ -384,7 +384,7 @@ static void xilinx_axidma_reset(DeviceState *dev)
>
>  static size_t
>  xilinx_axidma_control_stream_push(StreamSlave *obj, unsigned char *buf,
> -                                  size_t len)
> +                                  size_t len, bool eop)
>  {
>      XilinxAXIDMAStreamSlave *cs = XILINX_AXI_DMA_CONTROL_STREAM(obj);
>      struct Stream *s = &cs->dma->streams[1];
> @@ -416,12 +416,14 @@ xilinx_axidma_data_stream_can_push(StreamSlave *obj,
>  }
>
>  static size_t
> -xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len)
> +xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len,
> +                               bool eop)
>  {
>      XilinxAXIDMAStreamSlave *ds = XILINX_AXI_DMA_DATA_STREAM(obj);
>      struct Stream *s = &ds->dma->streams[1];
>      size_t ret;
>
> +    assert(eop);
>      ret = stream_process_s2mem(s, buf, len);
>      stream_update_irq(s);
>      return ret;
> diff --git a/hw/net/xilinx_axienet.c b/hw/net/xilinx_axienet.c
> index c8dfcda3ee..bd48305577 100644
> --- a/hw/net/xilinx_axienet.c
> +++ b/hw/net/xilinx_axienet.c
> @@ -697,14 +697,14 @@ static void axienet_eth_rx_notify(void *opaque)
>                                             axienet_eth_rx_notify, s)) {
>          size_t ret = stream_push(s->tx_control_dev,
>                                   (void *)s->rxapp + CONTROL_PAYLOAD_SIZE
> -                                 - s->rxappsize, s->rxappsize);
> +                                 - s->rxappsize, s->rxappsize, true);
>          s->rxappsize -= ret;
>      }
>
>      while (s->rxsize && stream_can_push(s->tx_data_dev,
>                                          axienet_eth_rx_notify, s)) {
>          size_t ret = stream_push(s->tx_data_dev, (void *)s->rxmem + s->rxpos,
> -                                 s->rxsize);
> +                                 s->rxsize, true);
>          s->rxsize -= ret;
>          s->rxpos += ret;
>          if (!s->rxsize) {
> @@ -874,12 +874,14 @@ static ssize_t eth_rx(NetClientState *nc, const uint8_t *buf, size_t size)
>  }
>
>  static size_t
> -xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
> +xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len,
> +                                   bool eop)
>  {
>      int i;
>      XilinxAXIEnetStreamSlave *cs = XILINX_AXI_ENET_CONTROL_STREAM(obj);
>      XilinxAXIEnet *s = cs->enet;
>
> +    assert(eop);
>      if (len != CONTROL_PAYLOAD_SIZE) {
>          hw_error("AXI Enet requires %d byte control stream payload\n",
>                   (int)CONTROL_PAYLOAD_SIZE);
> @@ -894,11 +896,15 @@ xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
>  }
>
>  static size_t
> -xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size)
> +xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size,
> +                                bool eop)
>  {
>      XilinxAXIEnetStreamSlave *ds = XILINX_AXI_ENET_DATA_STREAM(obj);
>      XilinxAXIEnet *s = ds->enet;
>
> +    /* We don't support fragmented packets yet.  */
> +    assert(eop);
> +
>      /* TX enable ?  */
>      if (!(s->tc & TC_TX)) {
>          return size;
> diff --git a/hw/ssi/xilinx_spips.c b/hw/ssi/xilinx_spips.c
> index c57850a505..4cfce882ab 100644
> --- a/hw/ssi/xilinx_spips.c
> +++ b/hw/ssi/xilinx_spips.c
> @@ -868,7 +868,7 @@ static void xlnx_zynqmp_qspips_notify(void *opaque)
>
>          memcpy(rq->dma_buf, rxd, num);
>
> -        ret = stream_push(rq->dma, rq->dma_buf, num);
> +        ret = stream_push(rq->dma, rq->dma_buf, num, false);
>          assert(ret == num);
>          xlnx_zynqmp_qspips_check_flush(rq);
>      }
> --
> 2.20.1
>
>
diff mbox series

Patch

diff --git a/include/hw/stream.h b/include/hw/stream.h
index d02f62ca89..ed09e83683 100644
--- a/include/hw/stream.h
+++ b/include/hw/stream.h
@@ -39,12 +39,13 @@  typedef struct StreamSlaveClass {
      * @obj: Stream slave to push to
      * @buf: Data to write
      * @len: Maximum number of bytes to write
+     * @eop: End of packet flag
      */
-    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len);
+    size_t (*push)(StreamSlave *obj, unsigned char *buf, size_t len, bool eop);
 } StreamSlaveClass;
 
 size_t
-stream_push(StreamSlave *sink, uint8_t *buf, size_t len);
+stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop);
 
 bool
 stream_can_push(StreamSlave *sink, StreamCanPushNotifyFn notify,
diff --git a/hw/core/stream.c b/hw/core/stream.c
index 39b1e595cd..a65ad1208d 100644
--- a/hw/core/stream.c
+++ b/hw/core/stream.c
@@ -3,11 +3,11 @@ 
 #include "qemu/module.h"
 
 size_t
-stream_push(StreamSlave *sink, uint8_t *buf, size_t len)
+stream_push(StreamSlave *sink, uint8_t *buf, size_t len, bool eop)
 {
     StreamSlaveClass *k =  STREAM_SLAVE_GET_CLASS(sink);
 
-    return k->push(sink, buf, len);
+    return k->push(sink, buf, len, eop);
 }
 
 bool
diff --git a/hw/dma/xilinx_axidma.c b/hw/dma/xilinx_axidma.c
index 4540051448..a770e12c96 100644
--- a/hw/dma/xilinx_axidma.c
+++ b/hw/dma/xilinx_axidma.c
@@ -283,7 +283,7 @@  static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
 
         if (stream_desc_sof(&s->desc)) {
             s->pos = 0;
-            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app));
+            stream_push(tx_control_dev, s->desc.app, sizeof(s->desc.app), true);
         }
 
         txlen = s->desc.control & SDESC_CTRL_LEN_MASK;
@@ -298,7 +298,7 @@  static void stream_process_mem2s(struct Stream *s, StreamSlave *tx_data_dev,
         s->pos += txlen;
 
         if (stream_desc_eof(&s->desc)) {
-            stream_push(tx_data_dev, s->txbuf, s->pos);
+            stream_push(tx_data_dev, s->txbuf, s->pos, true);
             s->pos = 0;
             stream_complete(s);
         }
@@ -384,7 +384,7 @@  static void xilinx_axidma_reset(DeviceState *dev)
 
 static size_t
 xilinx_axidma_control_stream_push(StreamSlave *obj, unsigned char *buf,
-                                  size_t len)
+                                  size_t len, bool eop)
 {
     XilinxAXIDMAStreamSlave *cs = XILINX_AXI_DMA_CONTROL_STREAM(obj);
     struct Stream *s = &cs->dma->streams[1];
@@ -416,12 +416,14 @@  xilinx_axidma_data_stream_can_push(StreamSlave *obj,
 }
 
 static size_t
-xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len)
+xilinx_axidma_data_stream_push(StreamSlave *obj, unsigned char *buf, size_t len,
+                               bool eop)
 {
     XilinxAXIDMAStreamSlave *ds = XILINX_AXI_DMA_DATA_STREAM(obj);
     struct Stream *s = &ds->dma->streams[1];
     size_t ret;
 
+    assert(eop);
     ret = stream_process_s2mem(s, buf, len);
     stream_update_irq(s);
     return ret;
diff --git a/hw/net/xilinx_axienet.c b/hw/net/xilinx_axienet.c
index c8dfcda3ee..bd48305577 100644
--- a/hw/net/xilinx_axienet.c
+++ b/hw/net/xilinx_axienet.c
@@ -697,14 +697,14 @@  static void axienet_eth_rx_notify(void *opaque)
                                            axienet_eth_rx_notify, s)) {
         size_t ret = stream_push(s->tx_control_dev,
                                  (void *)s->rxapp + CONTROL_PAYLOAD_SIZE
-                                 - s->rxappsize, s->rxappsize);
+                                 - s->rxappsize, s->rxappsize, true);
         s->rxappsize -= ret;
     }
 
     while (s->rxsize && stream_can_push(s->tx_data_dev,
                                         axienet_eth_rx_notify, s)) {
         size_t ret = stream_push(s->tx_data_dev, (void *)s->rxmem + s->rxpos,
-                                 s->rxsize);
+                                 s->rxsize, true);
         s->rxsize -= ret;
         s->rxpos += ret;
         if (!s->rxsize) {
@@ -874,12 +874,14 @@  static ssize_t eth_rx(NetClientState *nc, const uint8_t *buf, size_t size)
 }
 
 static size_t
-xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
+xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len,
+                                   bool eop)
 {
     int i;
     XilinxAXIEnetStreamSlave *cs = XILINX_AXI_ENET_CONTROL_STREAM(obj);
     XilinxAXIEnet *s = cs->enet;
 
+    assert(eop);
     if (len != CONTROL_PAYLOAD_SIZE) {
         hw_error("AXI Enet requires %d byte control stream payload\n",
                  (int)CONTROL_PAYLOAD_SIZE);
@@ -894,11 +896,15 @@  xilinx_axienet_control_stream_push(StreamSlave *obj, uint8_t *buf, size_t len)
 }
 
 static size_t
-xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size)
+xilinx_axienet_data_stream_push(StreamSlave *obj, uint8_t *buf, size_t size,
+                                bool eop)
 {
     XilinxAXIEnetStreamSlave *ds = XILINX_AXI_ENET_DATA_STREAM(obj);
     XilinxAXIEnet *s = ds->enet;
 
+    /* We don't support fragmented packets yet.  */
+    assert(eop);
+
     /* TX enable ?  */
     if (!(s->tc & TC_TX)) {
         return size;
diff --git a/hw/ssi/xilinx_spips.c b/hw/ssi/xilinx_spips.c
index c57850a505..4cfce882ab 100644
--- a/hw/ssi/xilinx_spips.c
+++ b/hw/ssi/xilinx_spips.c
@@ -868,7 +868,7 @@  static void xlnx_zynqmp_qspips_notify(void *opaque)
 
         memcpy(rq->dma_buf, rxd, num);
 
-        ret = stream_push(rq->dma, rq->dma_buf, num);
+        ret = stream_push(rq->dma, rq->dma_buf, num, false);
         assert(ret == num);
         xlnx_zynqmp_qspips_check_flush(rq);
     }