8225583: Examine the HttpResponse.BodySubscribers for null handling
Reviewed-by: dfuchs, prappo
This commit is contained in:
parent
21ab801a22
commit
af2a46b094
src/java.net.http/share/classes
java/net/http
jdk/internal/net/http
@ -1253,7 +1253,7 @@ public interface HttpResponse<T> {
|
||||
/**
|
||||
* Returns a {@code BodySubscriber} which buffers data before delivering
|
||||
* it to the given downstream subscriber. The subscriber guarantees to
|
||||
* deliver {@code buffersize} bytes of data to each invocation of the
|
||||
* deliver {@code bufferSize} bytes of data to each invocation of the
|
||||
* downstream's {@link BodySubscriber#onNext(Object) onNext} method,
|
||||
* except for the final invocation, just before
|
||||
* {@link BodySubscriber#onComplete() onComplete} is invoked. The final
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
|
||||
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Flow;
|
||||
import java.util.concurrent.Flow.Subscriber;
|
||||
import java.util.concurrent.Flow.Subscription;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
@ -56,6 +57,7 @@ public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
|
||||
private final Function<? super S, ? extends R> finisher;
|
||||
private final Charset charset;
|
||||
private final String eol;
|
||||
private final AtomicBoolean subscribed = new AtomicBoolean();
|
||||
private volatile LineSubscription downstream;
|
||||
|
||||
private LineSubscriberAdapter(S subscriber,
|
||||
@ -72,6 +74,12 @@ public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription subscription) {
|
||||
Objects.requireNonNull(subscription);
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
subscription.cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
downstream = LineSubscription.create(subscription,
|
||||
charset,
|
||||
eol,
|
||||
@ -82,6 +90,7 @@ public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
|
||||
|
||||
@Override
|
||||
public void onNext(List<ByteBuffer> item) {
|
||||
Objects.requireNonNull(item);
|
||||
try {
|
||||
downstream.submit(item);
|
||||
} catch (Throwable t) {
|
||||
@ -91,6 +100,7 @@ public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
Objects.requireNonNull(throwable);
|
||||
try {
|
||||
downstream.signalError(throwable);
|
||||
} finally {
|
||||
|
@ -125,6 +125,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
Objects.requireNonNull(subscription);
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
subscription.cancel();
|
||||
} else {
|
||||
@ -135,6 +136,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onNext(List<ByteBuffer> items) {
|
||||
Objects.requireNonNull(items);
|
||||
for (ByteBuffer item : items) {
|
||||
byte[] buf = new byte[item.remaining()];
|
||||
item.get(buf);
|
||||
@ -145,6 +147,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
Objects.requireNonNull(throwable);
|
||||
result.completeExceptionally(throwable);
|
||||
}
|
||||
|
||||
@ -172,6 +175,7 @@ public class ResponseSubscribers {
|
||||
private final FilePermission[] filePermissions;
|
||||
private final CompletableFuture<Path> result = new MinimalFuture<>();
|
||||
|
||||
private final AtomicBoolean subscribed = new AtomicBoolean();
|
||||
private volatile Flow.Subscription subscription;
|
||||
private volatile FileChannel out;
|
||||
|
||||
@ -211,6 +215,12 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
Objects.requireNonNull(subscription);
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
subscription.cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
this.subscription = subscription;
|
||||
if (System.getSecurityManager() == null) {
|
||||
try {
|
||||
@ -470,6 +480,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription s) {
|
||||
Objects.requireNonNull(s);
|
||||
try {
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
s.cancel();
|
||||
@ -600,6 +611,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
Objects.requireNonNull(subscription);
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
subscription.cancel();
|
||||
} else {
|
||||
@ -614,6 +626,7 @@ public class ResponseSubscribers {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
Objects.requireNonNull(throwable);
|
||||
cf.completeExceptionally(throwable);
|
||||
}
|
||||
|
||||
@ -907,13 +920,21 @@ public class ResponseSubscribers {
|
||||
}
|
||||
}
|
||||
|
||||
private final AtomicBoolean subscribed = new AtomicBoolean();
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {
|
||||
subscriptionCF.complete(subscription);
|
||||
Objects.requireNonNull(subscription);
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
subscription.cancel();
|
||||
} else {
|
||||
subscriptionCF.complete(subscription);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(List<ByteBuffer> item) {
|
||||
Objects.requireNonNull(item);
|
||||
try {
|
||||
// cannot be called before onSubscribe()
|
||||
assert subscriptionCF.isDone();
|
||||
@ -941,6 +962,7 @@ public class ResponseSubscribers {
|
||||
// onError can be called before request(1), and therefore can
|
||||
// be called before subscriberRef is set.
|
||||
signalError(throwable);
|
||||
Objects.requireNonNull(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user