8322829: Refactor nioBlocker to avoid blocking while holding Thread's interrupt lock

Reviewed-by: jpai
This commit is contained in:
Alan Bateman 2024-01-09 07:05:27 +00:00
parent 07fce8eff2
commit 7286f5291d
6 changed files with 118 additions and 54 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 1994, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -2374,7 +2374,7 @@ public final class System {
return klass.getEnumConstantsShared();
}
public void blockedOn(Interruptible b) {
Thread.blockedOn(b);
Thread.currentThread().blockedOn(b);
}
public void registerShutdownHook(int slot, boolean registerShutdownInProgress, Runnable hook) {
Shutdown.add(slot, registerShutdownInProgress, hook);

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 1994, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -345,15 +345,20 @@ public class Thread implements Runnable {
* operation, if any. The blocker's interrupt method should be invoked
* after setting this thread's interrupt status.
*/
volatile Interruptible nioBlocker;
private Interruptible nioBlocker;
Interruptible nioBlocker() {
//assert Thread.holdsLock(interruptLock);
return nioBlocker;
}
/* Set the blocker field; invoked via jdk.internal.access.SharedSecrets
* from java.nio code
*/
static void blockedOn(Interruptible b) {
Thread me = Thread.currentThread();
synchronized (me.interruptLock) {
me.nioBlocker = b;
void blockedOn(Interruptible b) {
//assert Thread.currentThread() == this;
synchronized (interruptLock) {
nioBlocker = b;
}
}
@ -1699,15 +1704,19 @@ public class Thread implements Runnable {
checkAccess();
// thread may be blocked in an I/O operation
Interruptible blocker;
synchronized (interruptLock) {
Interruptible b = nioBlocker;
if (b != null) {
blocker = nioBlocker;
if (blocker != null) {
interrupted = true;
interrupt0(); // inform VM of interrupt
b.interrupt(this);
return;
blocker.interrupt(this);
}
}
if (blocker != null) {
blocker.postInterrupt();
return;
}
}
interrupted = true;
interrupt0(); // inform VM of interrupt

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -852,18 +852,32 @@ final class VirtualThread extends BaseVirtualThread {
return true;
}
@Override
void blockedOn(Interruptible b) {
notifyJvmtiDisableSuspend(true);
try {
super.blockedOn(b);
} finally {
notifyJvmtiDisableSuspend(false);
}
}
@Override
@SuppressWarnings("removal")
public void interrupt() {
if (Thread.currentThread() != this) {
checkAccess();
// if current thread is a virtual thread then prevent it from being
// suspended when entering or holding interruptLock
Interruptible blocker;
notifyJvmtiDisableSuspend(true);
try {
synchronized (interruptLock) {
interrupted = true;
Interruptible b = nioBlocker;
if (b != null) {
b.interrupt(this);
blocker = nioBlocker();
if (blocker != null) {
blocker.interrupt(this);
}
// interrupt carrier thread if mounted
@ -873,6 +887,11 @@ final class VirtualThread extends BaseVirtualThread {
} finally {
notifyJvmtiDisableSuspend(false);
}
// notify blocker after releasing interruptLock
if (blocker != null) {
blocker.postInterrupt();
}
} else {
interrupted = true;
carrierThread.setInterrupt();

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -32,9 +32,9 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.InterruptibleChannel;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
import sun.nio.ch.Interruptible;
/**
* Base implementation class for interruptible channels.
*
@ -89,10 +89,26 @@ public abstract class AbstractInterruptibleChannel
private final Object closeLock = new Object();
private volatile boolean closed;
// invoked if a Thread is interrupted when blocked in an I/O op
private final Interruptible interruptor;
/**
* Initializes a new instance of this class.
*/
protected AbstractInterruptibleChannel() { }
protected AbstractInterruptibleChannel() {
this.interruptor = new Interruptible() {
@Override
public void interrupt(Thread target) {
AbstractInterruptibleChannel.this.trySetTarget(target);
}
@Override
public void postInterrupt() {
try {
AbstractInterruptibleChannel.this.close();
} catch (IOException x) { }
}
};
}
/**
* Closes this channel.
@ -139,8 +155,15 @@ public abstract class AbstractInterruptibleChannel
// -- Interruption machinery --
private Interruptible interruptor;
private volatile Thread interrupted;
private static final Unsafe U = Unsafe.getUnsafe();
private static final long INTERRUPTED_TARGET =
U.objectFieldOffset(AbstractInterruptibleChannel.class, "interruptedTarget");
private volatile Object interruptedTarget; // Thread or placeholder object
private void trySetTarget(Thread target) {
// can't use VarHandle here as CAS may park on first usage
U.compareAndSetReference(this, INTERRUPTED_TARGET, null, target);
}
/**
* Marks the beginning of an I/O operation that might block indefinitely.
@ -151,24 +174,12 @@ public abstract class AbstractInterruptibleChannel
* closing and interruption for this channel. </p>
*/
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (closed)
return;
closed = true;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
if (me.isInterrupted()) {
interruptor.interrupt(me);
interruptor.postInterrupt();
}
}
/**
@ -194,10 +205,14 @@ public abstract class AbstractInterruptibleChannel
throws AsynchronousCloseException
{
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
this.interrupted = null;
throw new ClosedByInterruptException();
Object interruptedTarget = this.interruptedTarget;
if (interruptedTarget != null) {
interruptor.postInterrupt();
if (interruptedTarget == Thread.currentThread()) {
// replace with dummy object to avoid retaining reference to this thread
this.interruptedTarget = new Object();
throw new ClosedByInterruptException();
}
}
if (!completed && closed)
throw new AsynchronousCloseException();

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -89,6 +89,9 @@ public abstract class AbstractSelector
// cancelled-key, not used by the JDK Selector implementations
private final Set<SelectionKey> cancelledKeys;
// invoked if a Thread is interrupted when blocked on a selection op
private final Interruptible interruptor;
/**
* Initializes a new instance of this class.
*
@ -103,6 +106,15 @@ public abstract class AbstractSelector
} else {
this.cancelledKeys = new HashSet<>();
}
this.interruptor = new Interruptible() {
@Override
public void interrupt(Thread ignore) {
}
@Override
public void postInterrupt() {
AbstractSelector.this.wakeup();
}
};
}
void cancel(SelectionKey k) { // package-private
@ -209,8 +221,6 @@ public abstract class AbstractSelector
// -- Interruption machinery --
private Interruptible interruptor = null;
/**
* Marks the beginning of an I/O operation that might block indefinitely.
*
@ -225,16 +235,11 @@ public abstract class AbstractSelector
* blocked in an I/O operation upon the selector. </p>
*/
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
if (me.isInterrupted()) {
interruptor.postInterrupt();
}
}
/**
@ -248,5 +253,4 @@ public abstract class AbstractSelector
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -31,6 +31,23 @@ package sun.nio.ch;
public interface Interruptible {
public void interrupt(Thread t);
/**
* Invoked by Thread.interrupt when the given Thread is interrupted. Thread.interrupt
* invokes this method while holding the given Thread's interrupt lock. This method
* is also invoked by AbstractInterruptibleChannel when beginning an I/O operation
* with the current thread's interrupt status set. This method must not block.
*/
void interrupt(Thread target);
/**
* Invoked by Thread.interrupt after releasing the Thread's interrupt lock.
* It may also be invoked by AbstractInterruptibleChannel or AbstractSelector when
* beginning an I/O operation with the current thread's interrupt status set, or at
* the end of an I/O operation when any thread doing I/O on the channel (or selector)
* has been interrupted. This method closes the channel (or wakes up the Selector) to
* ensure that AsynchronousCloseException or ClosedByInterruptException is thrown.
* This method is required to be idempotent.
*/
void postInterrupt();
}