Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ec126c8
Use the JDK's built-in support for Unix Domain Sockets on Java 16+
mcculls Jan 14, 2025
87425f1
First draft of timeout test.
sarahchen6 Jan 29, 2025
1634bcd
Add server to test.
sarahchen6 Jan 30, 2025
683e7c1
Fix build.gradle file.
sarahchen6 Jan 31, 2025
662602a
Add debugging statements.
sarahchen6 Jan 31, 2025
64da205
Second draft of timeout test.
sarahchen6 Jan 31, 2025
953f3e8
Update getInputStream to use a selector.
sarahchen6 Feb 6, 2025
d07fc19
Clean code.
sarahchen6 Feb 6, 2025
2aec0e7
Adjust dd-java-agent build.gradle.
sarahchen6 Feb 10, 2025
ea9a237
Revert dd-java-agent build.gradle change
mcculls Feb 12, 2025
e475abd
Revert another build.gradle change.
sarahchen6 Feb 18, 2025
38b98db
Try specifying setSrcDirs.
sarahchen6 Feb 18, 2025
5ae79f9
Try changing compatibility version.
sarahchen6 Feb 18, 2025
e80ce7e
Revert previous two changes.
sarahchen6 Feb 18, 2025
6c188d9
Avoid implementation dependency for Java17.
sarahchen6 Feb 20, 2025
dda0997
Make gradle dependency more specific and add testImplementation to so…
sarahchen6 Feb 20, 2025
412e4d3
Add synchronization to ensure that server starts before client connects.
sarahchen6 Feb 20, 2025
381f841
Try this...
sarahchen6 Feb 27, 2025
beba316
Add print statements.
sarahchen6 Mar 4, 2025
305d9af
Add catch statement.
sarahchen6 Mar 4, 2025
8ff0eab
Refactor getInputStream and getOutputStream.
sarahchen6 Mar 5, 2025
0251e37
Address PR comments.
sarahchen6 Mar 11, 2025
114d230
Add test for when timeout is 0.
sarahchen6 Mar 11, 2025
37ca729
Add config option.
sarahchen6 Mar 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions gradle/java_no_deps.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ if (project.hasProperty('minJavaVersionForTests') && project.getProperty('minJav
}

dependencies {
compileOnly files(project.sourceSets."main_$name".compileClasspath)
implementation files(project.sourceSets."main_$name".output)
if ("${project.projectDir}".endsWith("socket-utils")) {
Comment thread
sarahchen6 marked this conversation as resolved.
compileOnly files(project.sourceSets."main_$name".output)
} else {
compileOnly files(project.sourceSets."main_$name".compileClasspath)
implementation files(project.sourceSets."main_$name".output)
}
}

jar {
Expand Down
25 changes: 24 additions & 1 deletion utils/socket-utils/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
ext {
minJavaVersionForTests = JavaVersion.VERSION_17
}

apply from: "$rootDir/gradle/java.gradle"
apply plugin: "idea"

[compileMain_java17Java, compileTestJava].each {
it.configure {
setJavaVersion(it, 17)
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
}

dependencies {
implementation libs.slf4j
implementation project(':internal-api')
implementation libs.jnr.unixsocket
testImplementation files(sourceSets.main_java17.output)
}

forbiddenApisMain_java17 {
failOnMissingClasses = false
}

implementation group: 'com.github.jnr', name: 'jnr-unixsocket', version: libs.versions.jnr.unixsocket.get()
idea {
module {
jdkName = '17'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;

import datadog.trace.api.Config;
import datadog.trace.api.Platform;
import datadog.trace.relocate.api.RatelimitedLogger;
import java.io.File;
import java.io.IOException;
Expand All @@ -24,6 +25,8 @@
public final class UnixDomainSocketFactory extends SocketFactory {
private static final Logger log = LoggerFactory.getLogger(UnixDomainSocketFactory.class);

private static final boolean JDK_SUPPORTS_UDS = Platform.isJavaVersionAtLeast(16);

private final RatelimitedLogger rlLog = new RatelimitedLogger(log, 5, MINUTES);

private final File path;
Expand All @@ -35,8 +38,14 @@ public UnixDomainSocketFactory(final File path) {
@Override
public Socket createSocket() throws IOException {
try {
final UnixSocketChannel channel = UnixSocketChannel.open();
return new TunnelingUnixSocket(path, channel);
if (JDK_SUPPORTS_UDS) {
try {
return new TunnelingJdkSocket(path.toPath());
} catch (Throwable ignore) {
// fall back to jnr-unixsocket library
}
}
return new TunnelingUnixSocket(path, UnixSocketChannel.open());
} catch (Throwable e) {
if (Config.get().isAgentConfiguredUsingDefault()) {
// fall back to port if we previously auto-discovered this socket file
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package datadog.common.socket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnixDomainSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Set;

/**
* Subtype UNIX socket for a higher-fidelity impersonation of TCP sockets. This is named "tunneling"
* because it assumes the ultimate destination has a hostname and port.
*
* <p>Bsed on {@link TunnelingUnixSocket}; adapted to use the built-in UDS support added in Java 16.
*/
final class TunnelingJdkSocket extends Socket {
private final SocketAddress unixSocketAddress;
private InetSocketAddress inetSocketAddress;

private SocketChannel unixSocketChannel;

private int timeout;
private boolean shutIn;
private boolean shutOut;
private boolean closed;

TunnelingJdkSocket(final Path path) {
this.unixSocketAddress = UnixDomainSocketAddress.of(path);
}
Comment thread
sarahchen6 marked this conversation as resolved.

TunnelingJdkSocket(final Path path, final InetSocketAddress address) {
this(path);
inetSocketAddress = address;
}

@Override
public boolean isConnected() {
return null != unixSocketChannel;
}

@Override
public boolean isInputShutdown() {
return shutIn;
}

@Override
public boolean isOutputShutdown() {
return shutOut;
}

@Override
public boolean isClosed() {
return closed;
}

@Override
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (timeout < 0) {
throw new IllegalArgumentException("Socket timeout can't be negative");
}
this.timeout = timeout;
}

@Override
public synchronized int getSoTimeout() throws SocketException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
return timeout;
}

@Override
public void connect(final SocketAddress endpoint) throws IOException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (isConnected()) {
throw new SocketException("Socket is already connected");
}
inetSocketAddress = (InetSocketAddress) endpoint;
unixSocketChannel = SocketChannel.open(unixSocketAddress);
}

@Override
public void connect(final SocketAddress endpoint, final int timeout) throws IOException {
Comment thread
sarahchen6 marked this conversation as resolved.
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (isConnected()) {
throw new SocketException("Socket is already connected");
}
inetSocketAddress = (InetSocketAddress) endpoint;
unixSocketChannel = SocketChannel.open(unixSocketAddress);
}

@Override
public SocketChannel getChannel() {
return unixSocketChannel;
}

@Override
public InputStream getInputStream() throws IOException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isConnected()) {
throw new SocketException("Socket is not connected");
}
if (isInputShutdown()) {
throw new SocketException("Socket input is shutdown");
}

return new InputStream() {
private final ByteBuffer buffer = ByteBuffer.allocate(256);
Comment thread
sarahchen6 marked this conversation as resolved.
Outdated
private final Selector selector = Selector.open();

{
unixSocketChannel.configureBlocking(false);
unixSocketChannel.register(selector, SelectionKey.OP_READ);
}

@Override
public int read() throws IOException {
byte[] nextByte = new byte[1];
return (read(nextByte, 0, 1) == -1) ? -1 : (nextByte[0] & 0xFF);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
buffer.clear();

int readyChannels = selector.select(timeout);
Comment thread
sarahchen6 marked this conversation as resolved.
if (readyChannels == 0) {
System.out.println("Timeout (" + timeout + "ms) while waiting for data.");
return 0;
}

Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isReadable()) {
int r = unixSocketChannel.read(buffer);
if (r == -1) {
return -1;
}
buffer.flip();
len = Math.min(r, len);
buffer.get(b, off, len);
return len;
}
}
return 0;
}

@Override
public void close() throws IOException {
selector.close();
}
};
}

@Override
public OutputStream getOutputStream() throws IOException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isConnected()) {
throw new SocketException("Socket is not connected");
}
if (isInputShutdown()) {
throw new SocketException("Socket output is shutdown");
}

return new OutputStream() {
@Override
public void write(int b) throws IOException {
byte[] array = ByteBuffer.allocate(4).putInt(b).array();
write(array, 0, 4);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(b, off, len);

while (buffer.hasRemaining()) {
unixSocketChannel.write(buffer);
}
}
};
}

@Override
public void shutdownInput() throws IOException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isConnected()) {
throw new SocketException("Socket is not connected");
}
if (isInputShutdown()) {
throw new SocketException("Socket input is already shutdown");
}
unixSocketChannel.shutdownInput();
shutIn = true;
}

@Override
public void shutdownOutput() throws IOException {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isConnected()) {
throw new SocketException("Socket is not connected");
}
if (isOutputShutdown()) {
throw new SocketException("Socket output is already shutdown");
}
unixSocketChannel.shutdownOutput();
shutOut = true;
}

@Override
public InetAddress getInetAddress() {
return inetSocketAddress.getAddress();
}

@Override
public void close() throws IOException {
if (isClosed()) {
return;
}
if (null != unixSocketChannel) {
unixSocketChannel.close();
}
closed = true;
}
}
Loading