diff mbox series

[1/4] qat: fix misunderstood -EBUSY return code

Message ID 20200601160418.171851200@debian-a64.vm (mailing list archive)
State Changes Requested
Delegated to: Herbert Xu
Headers show
Series [1/4] qat: fix misunderstood -EBUSY return code | expand

Commit Message

Mikulas Patocka June 1, 2020, 4:03 p.m. UTC
Using dm-crypt with QAT result in deadlocks.

If a crypto routine returns -EBUSY, it is expected that the crypto driver
have already queued the request and the crypto API user should assume that
this request was processed, but it should stop sending more requests. When
an -EBUSY request is processed, the crypto driver calls the callback with
the error code -EINPROGRESS - this means that the request is still being
processed (i.e. the user should wait for another callback), but the user
can start sending more requests now.

The QAT driver misunderstood this logic, it return -EBUSY when the queue
was full and didn't queue the request - the request was lost and it
resulted in a deadlock.

This patch fixes busy state handling - if the queue is at least 15/16
full, we return -EBUSY to signal to the user that no more requests should
be sent. We remember that we returned -EBUSY (the variable backed_off) and
if we finish the request, we return -EINPROGRESS to indicate that the user
can send more requests.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Cc: stable@vger.kernel.org

Comments

Cabiddu, Giovanni June 2, 2020, 10:05 p.m. UTC | #1
Hi Mikulas,

thanks for your patch. See below.

On Mon, Jun 01, 2020 at 06:03:33PM +0200, Mikulas Patocka wrote:
> Using dm-crypt with QAT result in deadlocks.
> 
> If a crypto routine returns -EBUSY, it is expected that the crypto driver
> have already queued the request and the crypto API user should assume that
> this request was processed, but it should stop sending more requests. When
> an -EBUSY request is processed, the crypto driver calls the callback with
> the error code -EINPROGRESS - this means that the request is still being
> processed (i.e. the user should wait for another callback), but the user
> can start sending more requests now.
> 
> The QAT driver misunderstood this logic, it return -EBUSY when the queue
> was full and didn't queue the request - the request was lost and it
> resulted in a deadlock.
> 
> This patch fixes busy state handling - if the queue is at least 15/16
> full, we return -EBUSY to signal to the user that no more requests should
> be sent. We remember that we returned -EBUSY (the variable backed_off) and
> if we finish the request, we return -EINPROGRESS to indicate that the user
> can send more requests.
> 
> Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
> Cc: stable@vger.kernel.org
> 
> Index: linux-2.6/drivers/crypto/qat/qat_common/qat_algs.c
> ===================================================================
> --- linux-2.6.orig/drivers/crypto/qat/qat_common/qat_algs.c
> +++ linux-2.6/drivers/crypto/qat/qat_common/qat_algs.c
> @@ -826,6 +826,9 @@ static void qat_aead_alg_callback(struct
>  	qat_alg_free_bufl(inst, qat_req);
>  	if (unlikely(qat_res != ICP_QAT_FW_COMN_STATUS_FLAG_OK))
>  		res = -EBADMSG;
> +
> +	if (qat_req->backed_off)
> +		areq->base.complete(&areq->base, -EINPROGRESS);
>  	areq->base.complete(&areq->base, res);
>  }
>  
> @@ -847,6 +850,8 @@ static void qat_skcipher_alg_callback(st
>  	dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
>  			  qat_req->iv_paddr);
>  
> +	if (qat_req->backed_off)
> +		sreq->base.complete(&sreq->base, -EINPROGRESS);
>  	sreq->base.complete(&sreq->base, res);
>  }
>  
> @@ -869,7 +874,7 @@ static int qat_alg_aead_dec(struct aead_
>  	struct icp_qat_fw_la_auth_req_params *auth_param;
>  	struct icp_qat_fw_la_bulk_req *msg;
>  	int digst_size = crypto_aead_authsize(aead_tfm);
> -	int ret, ctr = 0;
> +	int ret, backed_off;
>  
>  	ret = qat_alg_sgl_to_bufl(ctx->inst, areq->src, areq->dst, qat_req);
>  	if (unlikely(ret))
> @@ -890,15 +895,16 @@ static int qat_alg_aead_dec(struct aead_
>  	auth_param = (void *)((uint8_t *)cipher_param + sizeof(*cipher_param));
>  	auth_param->auth_off = 0;
>  	auth_param->auth_len = areq->assoclen + cipher_param->cipher_length;
> -	do {
> -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> -	} while (ret == -EAGAIN && ctr++ < 10);
>  
> +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> +again:
> +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
>  	if (ret == -EAGAIN) {
> -		qat_alg_free_bufl(ctx->inst, qat_req);
> -		return -EBUSY;
> +		qat_req->backed_off = backed_off = 1;
> +		cpu_relax();
> +		goto again;
>  	}
I am a bit concerned about this potential infinite loop.
If an error occurred on the device and the queue is full, we will be
stuck here forever.
Should we just retry a number of times and then fail?
Or, should we just move to the crypto-engine?

> -	return -EINPROGRESS;
> +	return backed_off ? -EBUSY : -EINPROGRESS;
>  }
>  
>  static int qat_alg_aead_enc(struct aead_request *areq)
> @@ -911,7 +917,7 @@ static int qat_alg_aead_enc(struct aead_
>  	struct icp_qat_fw_la_auth_req_params *auth_param;
>  	struct icp_qat_fw_la_bulk_req *msg;
>  	uint8_t *iv = areq->iv;
> -	int ret, ctr = 0;
> +	int ret, backed_off;
>  
>  	ret = qat_alg_sgl_to_bufl(ctx->inst, areq->src, areq->dst, qat_req);
>  	if (unlikely(ret))
> @@ -935,15 +941,15 @@ static int qat_alg_aead_enc(struct aead_
>  	auth_param->auth_off = 0;
>  	auth_param->auth_len = areq->assoclen + areq->cryptlen;
>  
> -	do {
> -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> -	} while (ret == -EAGAIN && ctr++ < 10);
> -
> +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
checkpatch: line over 80 characters - same in every place
adf_should_back_off is used.

> +again:
> +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
>  	if (ret == -EAGAIN) {
> -		qat_alg_free_bufl(ctx->inst, qat_req);
> -		return -EBUSY;
> +		qat_req->backed_off = backed_off = 1;
> +		cpu_relax();
> +		goto again;
>  	}
> -	return -EINPROGRESS;
> +	return backed_off ? -EBUSY : -EINPROGRESS;
>  }
>  
>  static int qat_alg_skcipher_rekey(struct qat_alg_skcipher_ctx *ctx,
> @@ -1051,7 +1057,7 @@ static int qat_alg_skcipher_encrypt(stru
>  	struct icp_qat_fw_la_cipher_req_params *cipher_param;
>  	struct icp_qat_fw_la_bulk_req *msg;
>  	struct device *dev = &GET_DEV(ctx->inst->accel_dev);
> -	int ret, ctr = 0;
> +	int ret, backed_off;
>  
>  	if (req->cryptlen == 0)
>  		return 0;
> @@ -1081,17 +1087,16 @@ static int qat_alg_skcipher_encrypt(stru
>  	cipher_param->cipher_offset = 0;
>  	cipher_param->u.s.cipher_IV_ptr = qat_req->iv_paddr;
>  	memcpy(qat_req->iv, req->iv, AES_BLOCK_SIZE);
> -	do {
> -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> -	} while (ret == -EAGAIN && ctr++ < 10);
>  
> +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> +again:
> +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
>  	if (ret == -EAGAIN) {
> -		qat_alg_free_bufl(ctx->inst, qat_req);
> -		dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
> -				  qat_req->iv_paddr);
> -		return -EBUSY;
> +		qat_req->backed_off = backed_off = 1;
> +		cpu_relax();
> +		goto again;
>  	}
> -	return -EINPROGRESS;
> +	return backed_off ? -EBUSY : -EINPROGRESS;
>  }
>  
>  static int qat_alg_skcipher_blk_encrypt(struct skcipher_request *req)
> @@ -1111,7 +1116,7 @@ static int qat_alg_skcipher_decrypt(stru
>  	struct icp_qat_fw_la_cipher_req_params *cipher_param;
>  	struct icp_qat_fw_la_bulk_req *msg;
>  	struct device *dev = &GET_DEV(ctx->inst->accel_dev);
> -	int ret, ctr = 0;
> +	int ret, backed_off;
>  
>  	if (req->cryptlen == 0)
>  		return 0;
> @@ -1141,17 +1146,16 @@ static int qat_alg_skcipher_decrypt(stru
>  	cipher_param->cipher_offset = 0;
>  	cipher_param->u.s.cipher_IV_ptr = qat_req->iv_paddr;
>  	memcpy(qat_req->iv, req->iv, AES_BLOCK_SIZE);
> -	do {
> -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> -	} while (ret == -EAGAIN && ctr++ < 10);
>  
> +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> +again:
> +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
>  	if (ret == -EAGAIN) {
> -		qat_alg_free_bufl(ctx->inst, qat_req);
> -		dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
> -				  qat_req->iv_paddr);
> -		return -EBUSY;
> +		qat_req->backed_off = backed_off = 1;
> +		cpu_relax();
> +		goto again;
>  	}
> -	return -EINPROGRESS;
> +	return backed_off ? -EBUSY : -EINPROGRESS;
>  }
>  
>  static int qat_alg_skcipher_blk_decrypt(struct skcipher_request *req)
> Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> ===================================================================
> --- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.c
> +++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> @@ -114,10 +114,19 @@ static void adf_disable_ring_irq(struct
>  	WRITE_CSR_INT_COL_EN(bank->csr_addr, bank->bank_number, bank->irq_mask);
>  }
>  
> +bool adf_should_back_off(struct adf_etr_ring_data *ring)
> +{
> +	return atomic_read(ring->inflights) > ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size) * 15 / 16;
How did you came up with 15/16?
checkpatch: WARNING: line over 80 characters

> +}
> +
>  int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg)
>  {
> -	if (atomic_add_return(1, ring->inflights) >
> -	    ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size)) {
> +	int limit = ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size);
> +
> +	if (atomic_read(ring->inflights) >= limit)
> +		return -EAGAIN;
Can this be removed and leave only the condition below?
Am I missing something here?
> +
> +	if (atomic_add_return(1, ring->inflights) > limit) {
>  		atomic_dec(ring->inflights);
>  		return -EAGAIN;
>  	}
> Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.h
> ===================================================================
> --- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.h
> +++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.h
> @@ -58,6 +58,7 @@ int adf_create_ring(struct adf_accel_dev
>  		    const char *ring_name, adf_callback_fn callback,
>  		    int poll_mode, struct adf_etr_ring_data **ring_ptr);
>  
> +bool adf_should_back_off(struct adf_etr_ring_data *ring);
>  int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg);
>  void adf_remove_ring(struct adf_etr_ring_data *ring);
>  #endif
> Index: linux-2.6/drivers/crypto/qat/qat_common/qat_crypto.h
> ===================================================================
> --- linux-2.6.orig/drivers/crypto/qat/qat_common/qat_crypto.h
> +++ linux-2.6/drivers/crypto/qat/qat_common/qat_crypto.h
> @@ -90,6 +90,7 @@ struct qat_crypto_request {
>  		   struct qat_crypto_request *req);
>  	void *iv;
>  	dma_addr_t iv_paddr;
> +	int backed_off;
>  };
>  
>  #endif
> 
Regards,
Mikulas Patocka June 3, 2020, 8:31 a.m. UTC | #2
On Tue, 2 Jun 2020, Giovanni Cabiddu wrote:

> Hi Mikulas,
> 
> thanks for your patch. See below.
> 
> > +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> > +again:
> > +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> >  	if (ret == -EAGAIN) {
> > -		qat_alg_free_bufl(ctx->inst, qat_req);
> > -		return -EBUSY;
> > +		qat_req->backed_off = backed_off = 1;
> > +		cpu_relax();
> > +		goto again;
> >  	}
> I am a bit concerned about this potential infinite loop.
> If an error occurred on the device and the queue is full, we will be
> stuck here forever.
> Should we just retry a number of times and then fail?

It's better to get stuck in an infinite loop than to cause random I/O 
errors. The infinite loop requires reboot, but it doesn't damage data on 
disks.

The proper solution would be to add the request to a queue and process the 
queue when some other request ended - but it would need substantial 
rewrite of the driver. Do you want to rewrite it using a queue?

> Or, should we just move to the crypto-engine?

What do you mean by the crypto-engine?

> > -	do {
> > -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> > -	} while (ret == -EAGAIN && ctr++ < 10);
> > -
> > +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> checkpatch: line over 80 characters - same in every place
> adf_should_back_off is used.

Recently, Linus announced that we can have larger lines than 80 bytes.
See bdc48fa11e46f867ea4d75fa59ee87a7f48be144

> >  static int qat_alg_skcipher_blk_decrypt(struct skcipher_request *req)
> > Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> > ===================================================================
> > --- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.c
> > +++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> > @@ -114,10 +114,19 @@ static void adf_disable_ring_irq(struct
> >  	WRITE_CSR_INT_COL_EN(bank->csr_addr, bank->bank_number, bank->irq_mask);
> >  }
> >  
> > +bool adf_should_back_off(struct adf_etr_ring_data *ring)
> > +{
> > +	return atomic_read(ring->inflights) > ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size) * 15 / 16;
> How did you came up with 15/16?

I want the sender to back off before the queue is full, to avoid 
busy-waiting. There may be more concurrent senders, so we want to back off 
at some point before the queue is full.

> checkpatch: WARNING: line over 80 characters
> 
> > +}
> > +
> >  int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg)
> >  {
> > -	if (atomic_add_return(1, ring->inflights) >
> > -	    ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size)) {
> > +	int limit = ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size);
> > +
> > +	if (atomic_read(ring->inflights) >= limit)
> > +		return -EAGAIN;

> Can this be removed and leave only the condition below?
> Am I missing something here?

atomic_read is light, atomic_add_return is heavy. We may be busy-waiting 
here, so I want to use the light instruction. Spinlocks do the same - when 
they are spinning, they use just a light "read" instruction and when the 
"read" instruction indicates that the spinlock is free, they execute the 
read-modify-write instruction to actually acquire the lock.

> > +
> > +	if (atomic_add_return(1, ring->inflights) > limit) {
> >  		atomic_dec(ring->inflights);
> >  		return -EAGAIN;
> >  	}

Mikulas
Herbert Xu June 3, 2020, 12:25 p.m. UTC | #3
On Wed, Jun 03, 2020 at 04:31:54AM -0400, Mikulas Patocka wrote:
>
> > Should we just retry a number of times and then fail?
> 
> It's better to get stuck in an infinite loop than to cause random I/O 
> errors. The infinite loop requires reboot, but it doesn't damage data on 
> disks.
> 
> The proper solution would be to add the request to a queue and process the 
> queue when some other request ended - but it would need substantial 
> rewrite of the driver. Do you want to rewrite it using a queue?
> 
> > Or, should we just move to the crypto-engine?
> 
> What do you mean by the crypto-engine?

crypto-engine is the generic queueing mechanism that any crypto
driver can use to implement the queueing.

Thanks,
Cabiddu, Giovanni June 3, 2020, 4:55 p.m. UTC | #4
Hi Mikulas,

On Wed, Jun 03, 2020 at 04:31:54AM -0400, Mikulas Patocka wrote:
> On Tue, 2 Jun 2020, Giovanni Cabiddu wrote:
> > Hi Mikulas,
> > 
> > thanks for your patch. See below.
> > 
> > > +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> > > +again:
> > > +	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> > >  	if (ret == -EAGAIN) {
> > > -		qat_alg_free_bufl(ctx->inst, qat_req);
> > > -		return -EBUSY;
> > > +		qat_req->backed_off = backed_off = 1;
> > > +		cpu_relax();
> > > +		goto again;
> > >  	}
> > I am a bit concerned about this potential infinite loop.
> > If an error occurred on the device and the queue is full, we will be
> > stuck here forever.
> > Should we just retry a number of times and then fail?
> 
> It's better to get stuck in an infinite loop than to cause random I/O 
> errors. The infinite loop requires reboot, but it doesn't damage data on 
> disks.
Fair.

> 
> The proper solution would be to add the request to a queue and process the 
> queue when some other request ended
This is tricky. We explored a solution that was enqueuing to a sw queue
when the hw queue was full and then re-submitting in the callback.
Didn't work due to response ordering.

> - but it would need substantial 
> rewrite of the driver. Do you want to rewrite it using a queue?
We are looking at using the crypto-engine for this. However, since that
patch is not ready, we can use your solution for the time being.
I asked our validation team to run our regression suite on your patch
set.

> > Or, should we just move to the crypto-engine?
> What do you mean by the crypto-engine?
Herbert answered already this question :-)
https://www.kernel.org/doc/html/latest/crypto/crypto_engine.html

> > > -	do {
> > > -		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
> > > -	} while (ret == -EAGAIN && ctr++ < 10);
> > > -
> > > +	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
> > checkpatch: line over 80 characters - same in every place
> > adf_should_back_off is used.
> 
> Recently, Linus announced that we can have larger lines than 80 bytes.
> See bdc48fa11e46f867ea4d75fa59ee87a7f48be144
From bdc48fa11 I see that "80 columns is certainly still _preferred_".
80 is still my preference.
I can fix this and send a v2.

> 
> > >  static int qat_alg_skcipher_blk_decrypt(struct skcipher_request *req)
> > > Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> > > ===================================================================
> > > --- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.c
> > > +++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
> > > @@ -114,10 +114,19 @@ static void adf_disable_ring_irq(struct
> > >  	WRITE_CSR_INT_COL_EN(bank->csr_addr, bank->bank_number, bank->irq_mask);
> > >  }
> > >  
> > > +bool adf_should_back_off(struct adf_etr_ring_data *ring)
> > > +{
> > > +	return atomic_read(ring->inflights) > ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size) * 15 / 16;
> > How did you came up with 15/16?
> 
> I want the sender to back off before the queue is full, to avoid 
> busy-waiting. There may be more concurrent senders, so we want to back off 
> at some point before the queue is full.
Yes, I understood this. My question was about the actual number.
93% of the depth of the queue.

> > checkpatch: WARNING: line over 80 characters
> > 
> > > +}
> > > +
> > >  int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg)
> > >  {
> > > -	if (atomic_add_return(1, ring->inflights) >
> > > -	    ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size)) {
> > > +	int limit = ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size);
> > > +
> > > +	if (atomic_read(ring->inflights) >= limit)
> > > +		return -EAGAIN;
> 
> > Can this be removed and leave only the condition below?
> > Am I missing something here?
> 
> atomic_read is light, atomic_add_return is heavy. We may be busy-waiting 
> here, so I want to use the light instruction. Spinlocks do the same - when 
> they are spinning, they use just a light "read" instruction and when the 
> "read" instruction indicates that the spinlock is free, they execute the 
> read-modify-write instruction to actually acquire the lock.
Ok makes sense. Thanks.

Regards,
Mikulas Patocka June 3, 2020, 7:54 p.m. UTC | #5
On Wed, 3 Jun 2020, Giovanni Cabiddu wrote:

> > > > +bool adf_should_back_off(struct adf_etr_ring_data *ring)
> > > > +{
> > > > +	return atomic_read(ring->inflights) > ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size) * 15 / 16;
> > > How did you came up with 15/16?
> > 
> > I want the sender to back off before the queue is full, to avoid 
> > busy-waiting. There may be more concurrent senders, so we want to back off 
> > at some point before the queue is full.
> Yes, I understood this. My question was about the actual number.
> 93% of the depth of the queue.

I just guessed the value. If you have some benchmark, you can try 
different values, to test if they perform better.

Mikulas
diff mbox series

Patch

Index: linux-2.6/drivers/crypto/qat/qat_common/qat_algs.c
===================================================================
--- linux-2.6.orig/drivers/crypto/qat/qat_common/qat_algs.c
+++ linux-2.6/drivers/crypto/qat/qat_common/qat_algs.c
@@ -826,6 +826,9 @@  static void qat_aead_alg_callback(struct
 	qat_alg_free_bufl(inst, qat_req);
 	if (unlikely(qat_res != ICP_QAT_FW_COMN_STATUS_FLAG_OK))
 		res = -EBADMSG;
+
+	if (qat_req->backed_off)
+		areq->base.complete(&areq->base, -EINPROGRESS);
 	areq->base.complete(&areq->base, res);
 }
 
@@ -847,6 +850,8 @@  static void qat_skcipher_alg_callback(st
 	dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
 			  qat_req->iv_paddr);
 
+	if (qat_req->backed_off)
+		sreq->base.complete(&sreq->base, -EINPROGRESS);
 	sreq->base.complete(&sreq->base, res);
 }
 
@@ -869,7 +874,7 @@  static int qat_alg_aead_dec(struct aead_
 	struct icp_qat_fw_la_auth_req_params *auth_param;
 	struct icp_qat_fw_la_bulk_req *msg;
 	int digst_size = crypto_aead_authsize(aead_tfm);
-	int ret, ctr = 0;
+	int ret, backed_off;
 
 	ret = qat_alg_sgl_to_bufl(ctx->inst, areq->src, areq->dst, qat_req);
 	if (unlikely(ret))
@@ -890,15 +895,16 @@  static int qat_alg_aead_dec(struct aead_
 	auth_param = (void *)((uint8_t *)cipher_param + sizeof(*cipher_param));
 	auth_param->auth_off = 0;
 	auth_param->auth_len = areq->assoclen + cipher_param->cipher_length;
-	do {
-		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
-	} while (ret == -EAGAIN && ctr++ < 10);
 
+	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
+again:
+	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
 	if (ret == -EAGAIN) {
-		qat_alg_free_bufl(ctx->inst, qat_req);
-		return -EBUSY;
+		qat_req->backed_off = backed_off = 1;
+		cpu_relax();
+		goto again;
 	}
-	return -EINPROGRESS;
+	return backed_off ? -EBUSY : -EINPROGRESS;
 }
 
 static int qat_alg_aead_enc(struct aead_request *areq)
@@ -911,7 +917,7 @@  static int qat_alg_aead_enc(struct aead_
 	struct icp_qat_fw_la_auth_req_params *auth_param;
 	struct icp_qat_fw_la_bulk_req *msg;
 	uint8_t *iv = areq->iv;
-	int ret, ctr = 0;
+	int ret, backed_off;
 
 	ret = qat_alg_sgl_to_bufl(ctx->inst, areq->src, areq->dst, qat_req);
 	if (unlikely(ret))
@@ -935,15 +941,15 @@  static int qat_alg_aead_enc(struct aead_
 	auth_param->auth_off = 0;
 	auth_param->auth_len = areq->assoclen + areq->cryptlen;
 
-	do {
-		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
-	} while (ret == -EAGAIN && ctr++ < 10);
-
+	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
+again:
+	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
 	if (ret == -EAGAIN) {
-		qat_alg_free_bufl(ctx->inst, qat_req);
-		return -EBUSY;
+		qat_req->backed_off = backed_off = 1;
+		cpu_relax();
+		goto again;
 	}
-	return -EINPROGRESS;
+	return backed_off ? -EBUSY : -EINPROGRESS;
 }
 
 static int qat_alg_skcipher_rekey(struct qat_alg_skcipher_ctx *ctx,
@@ -1051,7 +1057,7 @@  static int qat_alg_skcipher_encrypt(stru
 	struct icp_qat_fw_la_cipher_req_params *cipher_param;
 	struct icp_qat_fw_la_bulk_req *msg;
 	struct device *dev = &GET_DEV(ctx->inst->accel_dev);
-	int ret, ctr = 0;
+	int ret, backed_off;
 
 	if (req->cryptlen == 0)
 		return 0;
@@ -1081,17 +1087,16 @@  static int qat_alg_skcipher_encrypt(stru
 	cipher_param->cipher_offset = 0;
 	cipher_param->u.s.cipher_IV_ptr = qat_req->iv_paddr;
 	memcpy(qat_req->iv, req->iv, AES_BLOCK_SIZE);
-	do {
-		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
-	} while (ret == -EAGAIN && ctr++ < 10);
 
+	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
+again:
+	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
 	if (ret == -EAGAIN) {
-		qat_alg_free_bufl(ctx->inst, qat_req);
-		dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
-				  qat_req->iv_paddr);
-		return -EBUSY;
+		qat_req->backed_off = backed_off = 1;
+		cpu_relax();
+		goto again;
 	}
-	return -EINPROGRESS;
+	return backed_off ? -EBUSY : -EINPROGRESS;
 }
 
 static int qat_alg_skcipher_blk_encrypt(struct skcipher_request *req)
@@ -1111,7 +1116,7 @@  static int qat_alg_skcipher_decrypt(stru
 	struct icp_qat_fw_la_cipher_req_params *cipher_param;
 	struct icp_qat_fw_la_bulk_req *msg;
 	struct device *dev = &GET_DEV(ctx->inst->accel_dev);
-	int ret, ctr = 0;
+	int ret, backed_off;
 
 	if (req->cryptlen == 0)
 		return 0;
@@ -1141,17 +1146,16 @@  static int qat_alg_skcipher_decrypt(stru
 	cipher_param->cipher_offset = 0;
 	cipher_param->u.s.cipher_IV_ptr = qat_req->iv_paddr;
 	memcpy(qat_req->iv, req->iv, AES_BLOCK_SIZE);
-	do {
-		ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
-	} while (ret == -EAGAIN && ctr++ < 10);
 
+	qat_req->backed_off = backed_off = adf_should_back_off(ctx->inst->sym_tx);
+again:
+	ret = adf_send_message(ctx->inst->sym_tx, (uint32_t *)msg);
 	if (ret == -EAGAIN) {
-		qat_alg_free_bufl(ctx->inst, qat_req);
-		dma_free_coherent(dev, AES_BLOCK_SIZE, qat_req->iv,
-				  qat_req->iv_paddr);
-		return -EBUSY;
+		qat_req->backed_off = backed_off = 1;
+		cpu_relax();
+		goto again;
 	}
-	return -EINPROGRESS;
+	return backed_off ? -EBUSY : -EINPROGRESS;
 }
 
 static int qat_alg_skcipher_blk_decrypt(struct skcipher_request *req)
Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
===================================================================
--- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.c
+++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.c
@@ -114,10 +114,19 @@  static void adf_disable_ring_irq(struct
 	WRITE_CSR_INT_COL_EN(bank->csr_addr, bank->bank_number, bank->irq_mask);
 }
 
+bool adf_should_back_off(struct adf_etr_ring_data *ring)
+{
+	return atomic_read(ring->inflights) > ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size) * 15 / 16;
+}
+
 int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg)
 {
-	if (atomic_add_return(1, ring->inflights) >
-	    ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size)) {
+	int limit = ADF_MAX_INFLIGHTS(ring->ring_size, ring->msg_size);
+
+	if (atomic_read(ring->inflights) >= limit)
+		return -EAGAIN;
+
+	if (atomic_add_return(1, ring->inflights) > limit) {
 		atomic_dec(ring->inflights);
 		return -EAGAIN;
 	}
Index: linux-2.6/drivers/crypto/qat/qat_common/adf_transport.h
===================================================================
--- linux-2.6.orig/drivers/crypto/qat/qat_common/adf_transport.h
+++ linux-2.6/drivers/crypto/qat/qat_common/adf_transport.h
@@ -58,6 +58,7 @@  int adf_create_ring(struct adf_accel_dev
 		    const char *ring_name, adf_callback_fn callback,
 		    int poll_mode, struct adf_etr_ring_data **ring_ptr);
 
+bool adf_should_back_off(struct adf_etr_ring_data *ring);
 int adf_send_message(struct adf_etr_ring_data *ring, uint32_t *msg);
 void adf_remove_ring(struct adf_etr_ring_data *ring);
 #endif
Index: linux-2.6/drivers/crypto/qat/qat_common/qat_crypto.h
===================================================================
--- linux-2.6.orig/drivers/crypto/qat/qat_common/qat_crypto.h
+++ linux-2.6/drivers/crypto/qat/qat_common/qat_crypto.h
@@ -90,6 +90,7 @@  struct qat_crypto_request {
 		   struct qat_crypto_request *req);
 	void *iv;
 	dma_addr_t iv_paddr;
+	int backed_off;
 };
 
 #endif