8267990: Revisit some uses of synchronized in the HttpClient API

Reviewed-by: chegar
This commit is contained in:
Daniel Fuchs 2021-06-01 17:10:38 +00:00
parent 36dc268abe
commit 9d8ad2ed62
13 changed files with 99 additions and 43 deletions

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -167,7 +167,7 @@ class Http1AsyncReceiver {
private final ConcurrentLinkedDeque<ByteBuffer> queue
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::flush);
SequentialScheduler.lockingScheduler(this::flush);
final MinimalFuture<Void> whenFinished;
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -628,7 +628,7 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
final SequentialScheduler writeScheduler =
SequentialScheduler.synchronizedScheduler(new WriteTask());
SequentialScheduler.lockingScheduler(new WriteTask());
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -1292,7 +1292,7 @@ class Http2Connection {
private final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::processQueue);
SequentialScheduler.lockingScheduler(this::processQueue);
private final HttpClientImpl client;
Http2TubeSubscriber(HttpClientImpl client) {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -165,7 +165,7 @@ public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
newline = separator;
upstream = Objects.requireNonNull(subscriber);
cf = Objects.requireNonNull(completion);
scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
scheduler = SequentialScheduler.lockingScheduler(this::loop);
}
@Override

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -583,7 +583,7 @@ public final class RequestPublishers {
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
this.scheduler = SequentialScheduler.lockingScheduler(this::run);
}
@Override

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -36,6 +36,8 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import jdk.internal.net.http.common.BufferSupplier;
@ -163,16 +165,22 @@ final class SocketTube implements FlowTube {
*/
private static class SocketFlowTask implements RestartableTask {
final Runnable task;
private final Object monitor = new Object();
private final Lock lock = new ReentrantLock();
SocketFlowTask(Runnable task) {
this.task = task;
}
@Override
public final void run(DeferredCompleter taskCompleter) {
try {
// non contentious synchronized for visibility.
synchronized(monitor) {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
task.run();
} finally {
if (locked) lock.unlock();
}
} finally {
taskCompleter.complete();

@ -101,7 +101,7 @@ class Stream<T> extends ExchangeImpl<T> {
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
SequentialScheduler.lockingScheduler(this::schedule);
final SubscriptionBase userSubscription =
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
@ -840,7 +840,7 @@ class Stream<T> extends ExchangeImpl<T> {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
this.sendScheduler =
SequentialScheduler.synchronizedScheduler(this::trySend);
SequentialScheduler.lockingScheduler(this::trySend);
}
@Override

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -271,7 +271,7 @@ public class SSLFlowDelegate {
Reader() {
super();
scheduler = SequentialScheduler.synchronizedScheduler(
scheduler = SequentialScheduler.lockingScheduler(
new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
@ -777,10 +777,10 @@ public class SSLFlowDelegate {
try {
if (debugw.on())
debugw.log("processData, writeList remaining:"
+ Utils.remaining(writeList) + ", hsTriggered:"
+ Utils.synchronizedRemaining(writeList) + ", hsTriggered:"
+ hsTriggered() + ", needWrap:" + needWrap());
while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
while (Utils.synchronizedRemaining(writeList) > 0 || hsTriggered() || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
if (debugw.on())
@ -823,7 +823,7 @@ public class SSLFlowDelegate {
}
}
}
if (completing && Utils.remaining(writeList) == 0) {
if (completing && Utils.synchronizedRemaining(writeList) == 0) {
if (!completed) {
completed = true;
writeList.clear();

@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,6 +27,8 @@ package jdk.internal.net.http.common;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.Objects.requireNonNull;
@ -177,6 +179,36 @@ public final class SequentialScheduler {
}
}
/**
* A task that runs its main loop within a block protected by a lock to provide
* memory visibility between runs. Since the main loop can't run concurrently,
* the lock shouldn't be contended and no deadlock should ever be possible.
*/
public static final class LockingRestartableTask
extends CompleteRestartableTask {
private final Runnable mainLoop;
private final Lock lock = new ReentrantLock();
public LockingRestartableTask(Runnable mainLoop) {
this.mainLoop = mainLoop;
}
@Override
protected void run() {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
mainLoop.run();
} finally {
if (locked) lock.unlock();
}
}
}
private static final int OFFLOAD = 1;
private static final int AGAIN = 2;
private static final int BEGIN = 4;
@ -359,4 +391,20 @@ public final class SequentialScheduler {
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
}
/**
* Returns a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*
* @apiNote This is equivalent to calling
* {@code new SequentialScheduler(new LockingRestartableTask(mainLoop))}
* The main loop must not perform any blocking operation.
*
* @param mainLoop The main loop of the new sequential scheduler
* @return a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*/
public static SequentialScheduler lockingScheduler(Runnable mainLoop) {
return new SequentialScheduler(new LockingRestartableTask(mainLoop));
}
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -96,7 +96,7 @@ public abstract class SubscriberWrapper
errorCommon(t);
});
this.pushScheduler =
SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
SequentialScheduler.lockingScheduler(new DownstreamPusher());
this.downstreamSubscription = new SubscriptionBase(pushScheduler,
this::downstreamCompletion);
}

@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -655,33 +655,33 @@ public final class Utils {
}
public static boolean hasRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
return false;
}
public static long remaining(List<ByteBuffer> bufs) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
return remain;
}
public static long synchronizedRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
return remaining(bufs);
}
}
public static int remaining(List<ByteBuffer> bufs, int max) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
}
return (int) remain;

@ -86,7 +86,7 @@ import static org.testng.Assert.assertTrue;
* @library /test/lib http2/server
* @build Http2TestServer LineBodyHandlerTest HttpServerAdapters
* @build jdk.test.lib.net.SimpleSSLContext
* @run testng/othervm LineBodyHandlerTest
* @run testng/othervm -XX:+UnlockDiagnosticVMOptions -XX:DiagnoseSyncOnValueBasedClasses=1 LineBodyHandlerTest
*/
public class LineBodyHandlerTest implements HttpServerAdapters {

@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -161,7 +161,7 @@ public class SSLEchoTubeTest extends AbstractSSLTubeTest {
final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
= new ConcurrentLinkedQueue<>();
AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
SequentialScheduler scheduler = SequentialScheduler.lockingScheduler(this::loop);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
private volatile boolean finished;
private volatile boolean completed;