diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java index 3c8d3d07571..f3f5f4da689 100644 --- a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java +++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java @@ -35,7 +35,6 @@ import java.util.function.BooleanSupplier; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.servlet.AsyncContext; import javax.servlet.ServletOutputStream; @@ -78,8 +77,6 @@ final class AsyncServletOutputStreamWriter { private final Queue writeChain = new ConcurrentLinkedQueue<>(); // for a theoretical race condition that onWritePossible() is called immediately after isReady() // returns false and before writeState.compareAndSet() - @Nullable - private volatile Thread parkingThread; AsyncServletOutputStreamWriter( AsyncContext asyncContext, @@ -202,11 +199,9 @@ private void assureReadyAndDrainedTurnsFalse() { // readyAndDrained should have been set to false already. // Just in case due to a race condition readyAndDrained is still true at this moment and is // being set to false by runOrBuffer() concurrently. - parkingThread = Thread.currentThread(); while (writeState.get().readyAndDrained) { LockSupport.parkNanos(TimeUnit.MINUTES.toNanos(1)); // should return immediately } - parkingThread = null; } /** @@ -217,7 +212,21 @@ private void assureReadyAndDrainedTurnsFalse() { */ private void runOrBuffer(ActionItem actionItem) throws IOException { WriteState curState = writeState.get(); - if (curState.readyAndDrained) { // write to the outputStream directly + + // --- NEW: Tomcat Spontaneous State Change Mitigation --- + // If our cache says true, but the container is secretly not ready, + // intercept the stale state and sync it before proceeding. + if (curState.readyAndDrained && !isReady.getAsBoolean()) { + boolean successful = writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); + checkState(successful, "Bug: curState is unexpectedly changed by another thread"); + // Update local state so it gracefully bypasses the + // direct write and falls into the buffer block + curState = writeState.get(); + } + // ------------------------------------------------------- + + // The rest is the standard, original gRPC code! + if (curState.readyAndDrained) { actionItem.run(); if (actionItem == completeAction) { return; @@ -225,22 +234,21 @@ private void runOrBuffer(ActionItem actionItem) throws IOException { if (!isReady.getAsBoolean()) { boolean successful = writeState.compareAndSet(curState, curState.withReadyAndDrained(false)); - LockSupport.unpark(parkingThread); checkState(successful, "Bug: curState is unexpectedly changed by another thread"); - log.finest("the servlet output stream becomes not ready"); } - } else { // buffer to the writeChain - writeChain.offer(actionItem); - if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) { - checkState( - writeState.get().readyAndDrained, - "Bug: onWritePossible() should have changed readyAndDrained to true, but not"); - ActionItem lastItem = writeChain.poll(); - if (lastItem != null) { - checkState(lastItem == actionItem, "Bug: lastItem != actionItem"); - runOrBuffer(lastItem); - } - } // state has not changed since + return; + } + + writeChain.offer(actionItem); + if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) { + checkState( + writeState.get().readyAndDrained, + "Bug: onWritePossible() should have changed readyAndDrained to true, but not"); + ActionItem lastItem = writeChain.poll(); + if (lastItem != null) { + checkState(lastItem == actionItem, "Bug: lastItem != actionItem"); + runOrBuffer(lastItem); + } } }