Skip to content

Commit 2451291

Browse files
committed
thread HFileConext using trace Context
1 parent 4e723dd commit 2451291

7 files changed

Lines changed: 189 additions & 155 deletions

File tree

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hbase.io.encoding;
1919

20-
import io.opentelemetry.api.common.Attributes;
21-
import io.opentelemetry.api.common.AttributesBuilder;
2220
import java.io.DataInputStream;
2321
import java.io.IOException;
2422
import java.io.InputStream;
@@ -32,7 +30,6 @@
3230
import org.apache.hadoop.hbase.io.crypto.Decryptor;
3331
import org.apache.hadoop.hbase.io.crypto.Encryption;
3432
import org.apache.hadoop.hbase.io.hfile.HFileContext;
35-
import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilder;
3633
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
3734
import org.apache.hadoop.hbase.nio.ByteBuff;
3835
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,14 +45,11 @@
4845
public class HFileBlockDefaultDecodingContext implements HFileBlockDecodingContext {
4946
private final Configuration conf;
5047
private final HFileContext fileContext;
51-
private final AttributesBuilder attributesBuilder;
5248
private TagCompressionContext tagCompressionContext;
5349

5450
public HFileBlockDefaultDecodingContext(Configuration conf, HFileContext fileContext) {
5551
this.conf = conf;
5652
this.fileContext = fileContext;
57-
this.attributesBuilder =
58-
new HFileContextAttributesBuilder(Attributes.builder(), fileContext).build();
5953
}
6054

6155
@Override
@@ -108,7 +102,7 @@ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWit
108102
try (InputStream is =
109103
compression.createDecompressionStream(dataInputStream, decompressor, 0)) {
110104
BlockIOUtils.readFullyWithHeapBuffer(is, blockBufferWithoutHeader,
111-
uncompressedSizeWithoutHeader, attributesBuilder);
105+
uncompressedSizeWithoutHeader);
112106
}
113107
} finally {
114108
if (decompressor != null) {
@@ -117,7 +111,7 @@ public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWit
117111
}
118112
} else {
119113
BlockIOUtils.readFullyWithHeapBuffer(dataInputStream, blockBufferWithoutHeader,
120-
onDiskSizeWithoutHeader, attributesBuilder);
114+
onDiskSizeWithoutHeader);
121115
}
122116
} finally {
123117
byteBuffInputStream.close();

hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilder.java renamed to hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,69 @@
2222
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY;
2323
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ENCRYPTION_CIPHER_KEY;
2424
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HFILE_NAME_KEY;
25+
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.READ_TYPE_KEY;
2526

2627
import io.opentelemetry.api.common.AttributesBuilder;
28+
import io.opentelemetry.context.Context;
29+
import io.opentelemetry.context.ContextKey;
2730
import java.util.Objects;
28-
import java.util.function.Supplier;
31+
import java.util.function.Consumer;
2932
import org.apache.hadoop.hbase.io.hfile.HFileContext;
33+
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
3034
import org.apache.hadoop.hbase.util.ChecksumType;
3135
import org.apache.yetus.audience.InterfaceAudience;
3236

3337
/**
34-
* Populate fields on an {@link AttributesBuilder} based on an {@link HFileContext}.
38+
* <p>
39+
* Populate fields on an {@link AttributesBuilder} based on an {@link HFileContext}. Passed around
40+
* inside an active {@link Context}, indexed under {@link #CONTEXT_KEY}. The class is designed such
41+
* that calls to the {@link #accept(AttributesBuilder)} method are idempotent with regards to the
42+
* instance of this class.
43+
* </p>
44+
* <p>
45+
* The true and truly ridiculous class name should be something more like
46+
* {@code HFileContext_ContextAttributes_AttributesBuilder_Consumer}.
47+
* </p>
3548
*/
3649
@InterfaceAudience.Private
37-
public class HFileContextAttributesBuilder implements Supplier<AttributesBuilder> {
50+
public class HFileContextAttributesBuilderConsumer implements Consumer<AttributesBuilder> {
51+
52+
/**
53+
* Used to place extract attributes pertaining to the {@link HFileContext} that scopes the active
54+
* {@link Context}.
55+
*/
56+
public static final ContextKey<Consumer<AttributesBuilder>> CONTEXT_KEY =
57+
ContextKey.named("db.hbase.io.hfile.context_attributes");
3858

39-
private final AttributesBuilder builder;
4059
private final HFileContext hFileContext;
4160

4261
private boolean skipChecksum = false;
62+
private ReadType readType = null;
4363

44-
public HFileContextAttributesBuilder(final AttributesBuilder builder,
45-
final HFileContext hFileContext) {
46-
this.builder = Objects.requireNonNull(builder);
64+
public HFileContextAttributesBuilderConsumer(final HFileContext hFileContext) {
4765
this.hFileContext = Objects.requireNonNull(hFileContext);
4866
}
4967

5068
/**
5169
* Specify that the {@link ChecksumType} should not be included in the attributes.
5270
*/
53-
public HFileContextAttributesBuilder setSkipChecksum(final boolean skipChecksum) {
71+
public HFileContextAttributesBuilderConsumer setSkipChecksum(final boolean skipChecksum) {
5472
this.skipChecksum = skipChecksum;
5573
return this;
5674
}
5775

58-
@Override
59-
public AttributesBuilder get() {
60-
return build();
76+
/**
77+
* Specify the {@link ReadType} involced in this IO operation.
78+
*/
79+
public HFileContextAttributesBuilderConsumer setReadType(final ReadType readType) {
80+
// TODO: this is not a part of the HFileBlock, its context of the operation. Should track this
81+
// detail elsewhere.
82+
this.readType = readType;
83+
return this;
6184
}
6285

63-
public AttributesBuilder build() {
86+
@Override
87+
public void accept(AttributesBuilder builder) {
6488
if (hFileContext.getHFileName() != null) {
6589
builder.put(HFILE_NAME_KEY, hFileContext.getHFileName());
6690
}
@@ -79,7 +103,8 @@ public AttributesBuilder build() {
79103
if (!skipChecksum && hFileContext.getChecksumType() != null) {
80104
builder.put(CHECKSUM_KEY, hFileContext.getChecksumType().getName());
81105
}
82-
83-
return builder;
106+
if (readType != null) {
107+
builder.put(READ_TYPE_KEY, readType.name());
108+
}
84109
}
85110
}

hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import io.opentelemetry.api.common.Attributes;
2424
import io.opentelemetry.api.common.AttributesBuilder;
2525
import io.opentelemetry.api.trace.Span;
26+
import io.opentelemetry.context.Context;
2627
import java.io.IOException;
2728
import java.io.InputStream;
2829
import java.lang.reflect.InvocationTargetException;
2930
import java.lang.reflect.Method;
3031
import java.nio.ByteBuffer;
32+
import java.util.Optional;
3133
import org.apache.hadoop.fs.ByteBufferReadable;
3234
import org.apache.hadoop.fs.FSDataInputStream;
35+
import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
3336
import org.apache.hadoop.hbase.nio.ByteBuff;
3437
import org.apache.hadoop.io.IOUtils;
3538
import org.apache.yetus.audience.InterfaceAudience;
@@ -81,17 +84,16 @@ public static boolean isByteBufferReadable(FSDataInputStream is) {
8184
* @param length bytes to read.
8285
* @throws IOException exception to throw if any error happen
8386
*/
84-
public static void readFully(ByteBuff buf, FSDataInputStream dis, int length,
85-
AttributesBuilder attributesBuilder) throws IOException {
87+
public static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException {
8688
final Span span = Span.current();
87-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
89+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
8890
if (!isByteBufferReadable(dis)) {
8991
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
9092
// the destination ByteBuff.
9193
byte[] heapBuf = new byte[length];
9294
IOUtils.readFully(dis, heapBuf, 0, length);
93-
annotateHeapBytesRead(copy, length);
94-
span.addEvent("BlockIOUtils.readFully", copy.build());
95+
annotateHeapBytesRead(attributesBuilder, length);
96+
span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
9597
copyToByteBuff(heapBuf, 0, length, buf);
9698
return;
9799
}
@@ -123,8 +125,8 @@ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length,
123125
}
124126
}
125127
} finally {
126-
annotateBytesRead(copy, directBytesRead, heapBytesRead);
127-
span.addEvent("BlockIOUtils.readFully", copy.build());
128+
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
129+
span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
128130
}
129131
}
130132

@@ -136,13 +138,11 @@ public static void readFully(ByteBuff buf, FSDataInputStream dis, int length,
136138
* @param length to read
137139
* @throws IOException if any io error encountered.
138140
*/
139-
public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length,
140-
AttributesBuilder attributesBuilder) throws IOException {
141+
public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int length)
142+
throws IOException {
141143
if (length < 0) {
142144
throw new IllegalArgumentException("Length must not be negative: " + length);
143145
}
144-
final Span span = Span.current();
145-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
146146
int heapBytesRead = 0;
147147
int remain = length, count;
148148
byte[] buffer = new byte[1024];
@@ -158,8 +158,10 @@ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int len
158158
heapBytesRead += count;
159159
}
160160
} finally {
161-
annotateHeapBytesRead(copy, heapBytesRead);
162-
span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", copy.build());
161+
final Span span = Span.current();
162+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
163+
annotateHeapBytesRead(attributesBuilder, heapBytesRead);
164+
span.addEvent("BlockIOUtils.readFullyWithHeapBuffer", attributesBuilder.build());
163165
}
164166
}
165167

@@ -177,9 +179,7 @@ public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int len
177179
* @throws IOException if failed to read the necessary bytes
178180
*/
179181
private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset,
180-
int necessaryLen, int extraLen, AttributesBuilder attributesBuilder) throws IOException {
181-
final Span span = Span.current();
182-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
182+
int necessaryLen, int extraLen) throws IOException {
183183
int heapBytesRead = 0;
184184
int bytesRemaining = necessaryLen + extraLen;
185185
try {
@@ -199,8 +199,10 @@ private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOf
199199
heapBytesRead += ret;
200200
}
201201
} finally {
202-
annotateHeapBytesRead(copy, heapBytesRead);
203-
span.addEvent("BlockIOUtils.readWithExtra", copy.build());
202+
final Span span = Span.current();
203+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
204+
annotateHeapBytesRead(attributesBuilder, heapBytesRead);
205+
span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
204206
}
205207
return bytesRemaining <= 0;
206208
}
@@ -217,17 +219,15 @@ private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOf
217219
* @throws IOException if failed to read the necessary bytes.
218220
*/
219221
public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen,
220-
int extraLen, AttributesBuilder attributesBuilder) throws IOException {
222+
int extraLen) throws IOException {
221223
if (!isByteBufferReadable(dis)) {
222224
// If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to
223225
// the destination ByteBuff.
224226
byte[] heapBuf = new byte[necessaryLen + extraLen];
225-
boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen, attributesBuilder);
227+
boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen);
226228
copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
227229
return ret;
228230
}
229-
final Span span = Span.current();
230-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
231231
int directBytesRead = 0, heapBytesRead = 0;
232232
ByteBuffer[] buffers = buf.nioByteBuffers();
233233
int bytesRead = 0;
@@ -259,8 +259,10 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
259259
}
260260
}
261261
} finally {
262-
annotateBytesRead(copy, directBytesRead, heapBytesRead);
263-
span.addEvent("BlockIOUtils.readWithExtra", copy.build());
262+
final Span span = Span.current();
263+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
264+
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
265+
span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
264266
}
265267
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
266268
}
@@ -281,8 +283,8 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
281283
* @throws IOException if failed to read the necessary bytes
282284
*/
283285
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
284-
int necessaryLen, int extraLen, AttributesBuilder attributesBuilder) throws IOException {
285-
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false, attributesBuilder);
286+
int necessaryLen, int extraLen) throws IOException {
287+
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
286288
}
287289

288290
/**
@@ -302,24 +304,18 @@ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long
302304
* @throws IOException if failed to read the necessary bytes
303305
*/
304306
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
305-
int necessaryLen, int extraLen, boolean readAllBytes, AttributesBuilder attributesBuilder)
306-
throws IOException {
307+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
307308
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");
308309

309310
if (preadbytebuffer) {
310-
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes,
311-
attributesBuilder);
311+
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
312312
} else {
313-
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes,
314-
attributesBuilder);
313+
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
315314
}
316315
}
317316

318317
private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
319-
int necessaryLen, int extraLen, boolean readAllBytes, AttributesBuilder attributesBuilder)
320-
throws IOException {
321-
final Span span = Span.current();
322-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
318+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
323319
int remain = necessaryLen + extraLen;
324320
byte[] buf = new byte[remain];
325321
int bytesRead = 0;
@@ -336,18 +332,17 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis
336332
remain -= ret;
337333
}
338334
} finally {
339-
annotateHeapBytesRead(copy, bytesRead);
340-
span.addEvent("BlockIOUtils.preadWithExtra", copy.build());
335+
final Span span = Span.current();
336+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
337+
annotateHeapBytesRead(attributesBuilder, bytesRead);
338+
span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
341339
}
342340
copyToByteBuff(buf, 0, bytesRead, buff);
343341
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
344342
}
345343

346344
private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
347-
int necessaryLen, int extraLen, boolean readAllBytes, AttributesBuilder attributesBuilder)
348-
throws IOException {
349-
final Span span = Span.current();
350-
final AttributesBuilder copy = Attributes.builder().putAll(attributesBuilder.build());
345+
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
351346
int directBytesRead = 0, heapBytesRead = 0;
352347
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
353348
ByteBuffer[] buffers = buff.nioByteBuffers();
@@ -391,8 +386,10 @@ private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream d
391386
}
392387
}
393388
} finally {
394-
annotateBytesRead(copy, directBytesRead, heapBytesRead);
395-
span.addEvent("BlockIOUtils.preadWithExtra", copy.build());
389+
final Span span = Span.current();
390+
final AttributesBuilder attributesBuilder = builderFromContext(Context.current());
391+
annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
392+
span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
396393
}
397394

398395
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
@@ -421,6 +418,18 @@ private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out)
421418
return len;
422419
}
423420

421+
/**
422+
* Construct a fresh {@link AttributesBuilder} from the provided {@link Context}, populated with
423+
* relevant attributes populated by {@link HFileContextAttributesBuilderConsumer#CONTEXT_KEY}.
424+
*/
425+
private static AttributesBuilder builderFromContext(Context context) {
426+
final AttributesBuilder attributesBuilder = Attributes.builder();
427+
Optional.ofNullable(context)
428+
.map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY))
429+
.ifPresent(c -> c.accept(attributesBuilder));
430+
return attributesBuilder;
431+
}
432+
424433
/**
425434
* Conditionally annotate {@code span} with the appropriate attribute when value is non-zero.
426435
*/

hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public final class HBaseSemanticAttributes {
112112
* filesystem data.
113113
*/
114114
public enum ReadType {
115-
// TODO: promote this to the FSReader#readBlockData API.
115+
// TODO: promote this to the FSReader#readBlockData API. Or somehow instead use Scan.ReadType.
116116
POSITIONAL_READ,
117117
SEEK_PLUS_READ,
118118
}

0 commit comments

Comments
 (0)