mirror of
git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
synced 2025-08-05 16:54:27 +00:00
rxrpc: Move call state changes from sendmsg to I/O thread
Move all the call state changes that are made in rxrpc_sendmsg() to the I/O thread. This is a step towards removing the call state lock. This requires the switch to the RXRPC_CALL_CLIENT_AWAIT_REPLY and RXRPC_CALL_SERVER_SEND_REPLY states to be done when the last packet is decanted from ->tx_sendmsg to ->tx_buffer in the I/O thread, not when it is added to ->tx_sendmsg by sendmsg(). Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
This commit is contained in:
parent
d41b3f5b96
commit
2d689424b6
3 changed files with 63 additions and 60 deletions
|
@ -880,8 +880,8 @@ The kernel interface functions are as follows:
|
||||||
|
|
||||||
notify_end_rx can be NULL or it can be used to specify a function to be
|
notify_end_rx can be NULL or it can be used to specify a function to be
|
||||||
called when the call changes state to end the Tx phase. This function is
|
called when the call changes state to end the Tx phase. This function is
|
||||||
called with the call-state spinlock held to prevent any reply or final ACK
|
called with a spinlock held to prevent the last DATA packet from being
|
||||||
from being delivered first.
|
transmitted until the function returns.
|
||||||
|
|
||||||
(#) Receive data from a call::
|
(#) Receive data from a call::
|
||||||
|
|
||||||
|
|
|
@ -251,6 +251,50 @@ out:
|
||||||
_leave("");
|
_leave("");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Start transmitting the reply to a service. This cancels the need to ACK the
|
||||||
|
* request if we haven't yet done so.
|
||||||
|
*/
|
||||||
|
static void rxrpc_begin_service_reply(struct rxrpc_call *call)
|
||||||
|
{
|
||||||
|
unsigned long now;
|
||||||
|
|
||||||
|
write_lock(&call->state_lock);
|
||||||
|
|
||||||
|
if (call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
|
||||||
|
now = jiffies;
|
||||||
|
call->state = RXRPC_CALL_SERVER_SEND_REPLY;
|
||||||
|
WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
|
||||||
|
if (call->ackr_reason == RXRPC_ACK_DELAY)
|
||||||
|
call->ackr_reason = 0;
|
||||||
|
trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
write_unlock(&call->state_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Close the transmission phase. After this point there is no more data to be
|
||||||
|
* transmitted in the call.
|
||||||
|
*/
|
||||||
|
static void rxrpc_close_tx_phase(struct rxrpc_call *call)
|
||||||
|
{
|
||||||
|
_debug("________awaiting reply/ACK__________");
|
||||||
|
|
||||||
|
write_lock(&call->state_lock);
|
||||||
|
switch (call->state) {
|
||||||
|
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
||||||
|
call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
|
||||||
|
break;
|
||||||
|
case RXRPC_CALL_SERVER_SEND_REPLY:
|
||||||
|
call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
write_unlock(&call->state_lock);
|
||||||
|
}
|
||||||
|
|
||||||
static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
|
static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
|
||||||
{
|
{
|
||||||
unsigned int winsize = min_t(unsigned int, call->tx_winsize,
|
unsigned int winsize = min_t(unsigned int, call->tx_winsize,
|
||||||
|
@ -285,6 +329,9 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
|
||||||
call->tx_top = txb->seq;
|
call->tx_top = txb->seq;
|
||||||
list_add_tail(&txb->call_link, &call->tx_buffer);
|
list_add_tail(&txb->call_link, &call->tx_buffer);
|
||||||
|
|
||||||
|
if (txb->wire.flags & RXRPC_LAST_PACKET)
|
||||||
|
rxrpc_close_tx_phase(call);
|
||||||
|
|
||||||
rxrpc_transmit_one(call, txb);
|
rxrpc_transmit_one(call, txb);
|
||||||
|
|
||||||
if (!rxrpc_tx_window_has_space(call))
|
if (!rxrpc_tx_window_has_space(call))
|
||||||
|
@ -298,12 +345,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
|
||||||
case RXRPC_CALL_SERVER_ACK_REQUEST:
|
case RXRPC_CALL_SERVER_ACK_REQUEST:
|
||||||
if (list_empty(&call->tx_sendmsg))
|
if (list_empty(&call->tx_sendmsg))
|
||||||
return;
|
return;
|
||||||
|
rxrpc_begin_service_reply(call);
|
||||||
fallthrough;
|
fallthrough;
|
||||||
|
|
||||||
case RXRPC_CALL_SERVER_SEND_REPLY:
|
case RXRPC_CALL_SERVER_SEND_REPLY:
|
||||||
case RXRPC_CALL_SERVER_AWAIT_ACK:
|
|
||||||
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
||||||
case RXRPC_CALL_CLIENT_AWAIT_REPLY:
|
|
||||||
if (!rxrpc_tx_window_has_space(call))
|
if (!rxrpc_tx_window_has_space(call))
|
||||||
return;
|
return;
|
||||||
if (list_empty(&call->tx_sendmsg)) {
|
if (list_empty(&call->tx_sendmsg)) {
|
||||||
|
|
|
@ -189,7 +189,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
|
||||||
struct rxrpc_txbuf *txb,
|
struct rxrpc_txbuf *txb,
|
||||||
rxrpc_notify_end_tx_t notify_end_tx)
|
rxrpc_notify_end_tx_t notify_end_tx)
|
||||||
{
|
{
|
||||||
unsigned long now;
|
|
||||||
rxrpc_seq_t seq = txb->seq;
|
rxrpc_seq_t seq = txb->seq;
|
||||||
bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
|
bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
|
||||||
|
|
||||||
|
@ -212,36 +211,10 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
|
||||||
poke = list_empty(&call->tx_sendmsg);
|
poke = list_empty(&call->tx_sendmsg);
|
||||||
list_add_tail(&txb->call_link, &call->tx_sendmsg);
|
list_add_tail(&txb->call_link, &call->tx_sendmsg);
|
||||||
call->tx_prepared = seq;
|
call->tx_prepared = seq;
|
||||||
|
if (last)
|
||||||
|
rxrpc_notify_end_tx(rx, call, notify_end_tx);
|
||||||
spin_unlock(&call->tx_lock);
|
spin_unlock(&call->tx_lock);
|
||||||
|
|
||||||
if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
|
|
||||||
_debug("________awaiting reply/ACK__________");
|
|
||||||
write_lock(&call->state_lock);
|
|
||||||
switch (call->state) {
|
|
||||||
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
|
||||||
call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
|
|
||||||
rxrpc_notify_end_tx(rx, call, notify_end_tx);
|
|
||||||
break;
|
|
||||||
case RXRPC_CALL_SERVER_ACK_REQUEST:
|
|
||||||
call->state = RXRPC_CALL_SERVER_SEND_REPLY;
|
|
||||||
now = jiffies;
|
|
||||||
WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
|
|
||||||
if (call->ackr_reason == RXRPC_ACK_DELAY)
|
|
||||||
call->ackr_reason = 0;
|
|
||||||
trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
|
|
||||||
if (!last)
|
|
||||||
break;
|
|
||||||
fallthrough;
|
|
||||||
case RXRPC_CALL_SERVER_SEND_REPLY:
|
|
||||||
call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
|
|
||||||
rxrpc_notify_end_tx(rx, call, notify_end_tx);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
write_unlock(&call->state_lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (poke)
|
if (poke)
|
||||||
rxrpc_poke_call(call, rxrpc_call_poke_start);
|
rxrpc_poke_call(call, rxrpc_call_poke_start);
|
||||||
}
|
}
|
||||||
|
@ -280,8 +253,13 @@ reload:
|
||||||
ret = -EPROTO;
|
ret = -EPROTO;
|
||||||
if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
|
if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
|
||||||
state != RXRPC_CALL_SERVER_ACK_REQUEST &&
|
state != RXRPC_CALL_SERVER_ACK_REQUEST &&
|
||||||
state != RXRPC_CALL_SERVER_SEND_REPLY)
|
state != RXRPC_CALL_SERVER_SEND_REPLY) {
|
||||||
|
/* Request phase complete for this client call */
|
||||||
|
trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
|
||||||
|
call->cid, call->call_id, call->rx_consumed,
|
||||||
|
0, -EPROTO);
|
||||||
goto maybe_error;
|
goto maybe_error;
|
||||||
|
}
|
||||||
|
|
||||||
ret = -EMSGSIZE;
|
ret = -EMSGSIZE;
|
||||||
if (call->tx_total_len != -1) {
|
if (call->tx_total_len != -1) {
|
||||||
|
@ -573,7 +551,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
|
||||||
int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
|
int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
|
||||||
__releases(&rx->sk.sk_lock.slock)
|
__releases(&rx->sk.sk_lock.slock)
|
||||||
{
|
{
|
||||||
enum rxrpc_call_state state;
|
|
||||||
struct rxrpc_call *call;
|
struct rxrpc_call *call;
|
||||||
unsigned long now, j;
|
unsigned long now, j;
|
||||||
bool dropped_lock = false;
|
bool dropped_lock = false;
|
||||||
|
@ -672,11 +649,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
state = rxrpc_call_state(call);
|
if (rxrpc_call_is_complete(call)) {
|
||||||
_debug("CALL %d USR %lx ST %d on CONN %p",
|
|
||||||
call->debug_id, call->user_call_ID, state, call->conn);
|
|
||||||
|
|
||||||
if (state >= RXRPC_CALL_COMPLETE) {
|
|
||||||
/* it's too late for this call */
|
/* it's too late for this call */
|
||||||
ret = -ESHUTDOWN;
|
ret = -ESHUTDOWN;
|
||||||
} else if (p.command == RXRPC_CMD_SEND_ABORT) {
|
} else if (p.command == RXRPC_CMD_SEND_ABORT) {
|
||||||
|
@ -722,7 +695,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
|
||||||
bool dropped_lock = false;
|
bool dropped_lock = false;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
_enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
|
_enter("{%d},", call->debug_id);
|
||||||
|
|
||||||
ASSERTCMP(msg->msg_name, ==, NULL);
|
ASSERTCMP(msg->msg_name, ==, NULL);
|
||||||
ASSERTCMP(msg->msg_control, ==, NULL);
|
ASSERTCMP(msg->msg_control, ==, NULL);
|
||||||
|
@ -732,26 +705,10 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
|
||||||
_debug("CALL %d USR %lx ST %d on CONN %p",
|
_debug("CALL %d USR %lx ST %d on CONN %p",
|
||||||
call->debug_id, call->user_call_ID, call->state, call->conn);
|
call->debug_id, call->user_call_ID, call->state, call->conn);
|
||||||
|
|
||||||
switch (rxrpc_call_state(call)) {
|
ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
|
||||||
case RXRPC_CALL_CLIENT_SEND_REQUEST:
|
notify_end_tx, &dropped_lock);
|
||||||
case RXRPC_CALL_SERVER_ACK_REQUEST:
|
if (ret == -ESHUTDOWN)
|
||||||
case RXRPC_CALL_SERVER_SEND_REPLY:
|
|
||||||
ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
|
|
||||||
notify_end_tx, &dropped_lock);
|
|
||||||
break;
|
|
||||||
case RXRPC_CALL_COMPLETE:
|
|
||||||
read_lock(&call->state_lock);
|
|
||||||
ret = call->error;
|
ret = call->error;
|
||||||
read_unlock(&call->state_lock);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
/* Request phase complete for this client call */
|
|
||||||
trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
|
|
||||||
call->cid, call->call_id, call->rx_consumed,
|
|
||||||
0, -EPROTO);
|
|
||||||
ret = -EPROTO;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!dropped_lock)
|
if (!dropped_lock)
|
||||||
mutex_unlock(&call->user_mutex);
|
mutex_unlock(&call->user_mutex);
|
||||||
|
|
Loading…
Add table
Reference in a new issue