Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,7 @@ protected TCompressedElasticFramedTransport(
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);

if (size < 0) {
close();
throw new TTransportException(
TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
}

checkFrameSize(size);
readBuffer.fill(underlying, size);
RpcStat.readCompressedBytes.addAndGet(size);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,29 +174,71 @@ public int read(byte[] buf, int off, int len) throws TTransportException {
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
checkFrameSize(size);
readBuffer.fill(underlying, size);
}

if (size < 0) {
close();
throw new TTransportException(
TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
protected void checkFrameSize(int size) throws TTransportException {
final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
final int TLS_MIN_VERSION = 0x160300;
final int TLS_MAX_VERSION = 0x160303;
final int TLS_LENGTH_HIGH_MAX = 0x02;

FrameError error = null;
if (size == HTTP_GET_SIGNATURE || size == HTTP_POST_SIGNATURE) {
error = FrameError.HTTP_REQUEST;
} else {
int high24 = size >>> 8;
if (high24 >= TLS_MIN_VERSION
&& high24 <= TLS_MAX_VERSION
&& (i32buf[3] & 0xFF) <= TLS_LENGTH_HIGH_MAX) {
error = FrameError.TLS_REQUEST;
} else if (size < 0) {
error = FrameError.NEGATIVE_FRAME_SIZE;
} else if (size > thriftMaxFrameSize) {
error = FrameError.FRAME_SIZE_EXCEEDED;
}
}

if (size > thriftMaxFrameSize) {
close();
if (size == 1195725856L || size == 1347375956L) {
// if someone sends HTTP GET/POST to this port, the size will be read as the following
throw new TTransportException(
TTransportException.CORRUPTED_DATA,
"Singular frame size ("
+ size
+ ") detected, you may be sending HTTP GET/POST requests to the Thrift-RPC port, please confirm that you are using the right port");
} else {
throw new TTransportException(
TTransportException.CORRUPTED_DATA,
"Frame size (" + size + ") larger than protect max size (" + thriftMaxFrameSize + ")!");
}
if (error == null) {
return;
}

SocketAddress remoteAddress = null;
if (underlying instanceof TSocket) {
remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress();
}
String remoteInfo = (remoteAddress == null) ? "" : " from " + remoteAddress;
close();

error.throwException(size, remoteInfo, thriftMaxFrameSize);
}

private enum FrameError {
HTTP_REQUEST(
"Singular frame size (%d) detected, you may be sending HTTP GET/POST%s "
+ "requests to the Thrift-RPC port, please confirm that you are using the right port"),
TLS_REQUEST(
"Singular frame size (%d) detected, you may be sending TLS ClientHello "
+ "requests%s to the Non-SSL Thrift-RPC port, please confirm that you are using "
+ "the right configuration"),
NEGATIVE_FRAME_SIZE("Read a negative frame size (%d)%s!"),
FRAME_SIZE_EXCEEDED("Frame size (%d) larger than protect max size (%d)%s!");

private final String messageFormat;

FrameError(String messageFormat) {
this.messageFormat = messageFormat;
}

void throwException(int size, String remoteInfo, int maxSize) throws TTransportException {
String message =
(this == FRAME_SIZE_EXCEEDED)
? String.format(messageFormat, size, maxSize, remoteInfo)
: String.format(messageFormat, size, remoteInfo);
throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
}
readBuffer.fill(underlying, size);
}

protected void checkWriteFrameSize(int size) throws TTransportException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ public class IoTDBConfig {
private float udfCollectorMemoryBudgetInMB = (float) (1.0 / 3 * udfMemoryBudgetInMB);

/** Unit: byte */
private int thriftMaxFrameSize = 536870912;
private int thriftMaxFrameSize = getDefaultThriftMaxFrameSize();

private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;

Expand Down Expand Up @@ -2784,10 +2784,16 @@ public int getThriftMaxFrameSize() {
}

public void setThriftMaxFrameSize(int thriftMaxFrameSize) {
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.thriftMaxFrameSize =
thriftMaxFrameSize <= 0 ? getDefaultThriftMaxFrameSize() : thriftMaxFrameSize;
BaseRpcTransportFactory.setThriftMaxFrameSize(this.thriftMaxFrameSize);
}

private static int getDefaultThriftMaxFrameSize() {
return Math.min(
64 * 1024 * 1024, (int) Math.min(Runtime.getRuntime().maxMemory() / 64, Integer.MAX_VALUE));
}

public int getThriftDefaultBufferSize() {
return thriftDefaultBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,10 +861,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
properties.getProperty(
"dn_thrift_max_frame_size", String.valueOf(conf.getThriftMaxFrameSize()))));

if (conf.getThriftMaxFrameSize() < IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2) {
conf.setThriftMaxFrameSize(IoTDBConstant.LEFT_SIZE_IN_REQUEST * 2);
}

conf.setThriftDefaultBufferSize(
Integer.parseInt(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,10 @@ dn_rpc_min_concurrent_client_num=1
# Datatype: int
dn_rpc_max_concurrent_client_num=1000

# thrift max frame size, 512MB by default
# thrift max frame size in bytes. When set to 0, use min(64MB, datanode heap memory / 64)
# effectiveMode: restart
# Datatype: int
dn_thrift_max_frame_size=536870912
dn_thrift_max_frame_size=0

# thrift init buffer size
# effectiveMode: restart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ private IoTDBConstant() {}
public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";

// thrift
public static final int LEFT_SIZE_IN_REQUEST = 4 * 1024 * 1024;
public static final int DEFAULT_FETCH_SIZE = 5000;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0;

Expand Down
Loading