Skip to content

Commit 4fa20d1

Browse files
committed
Pause loading if too many operations are in flight
1 parent 43a0280 commit 4fa20d1

1 file changed

Lines changed: 34 additions & 21 deletions

File tree

hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadCommonCrawl.java

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -152,24 +152,27 @@ public class IntegrationTestLoadCommonCrawl extends IntegrationTestBase {
152152

153153
private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestLoadCommonCrawl.class);
154154

155-
protected static String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
156-
protected static String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
157-
158-
protected static String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments";
159-
protected static boolean DEFAULT_INCREMENTS = false;
160-
161-
protected static byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
162-
protected static byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
163-
protected static byte[] URL_FAMILY_NAME = Bytes.toBytes("u");
164-
protected static byte[] SEP = Bytes.toBytes(":");
165-
protected static byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
166-
protected static byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
167-
protected static byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
168-
protected static byte[] CRC_QUALIFIER = Bytes.toBytes("c");
169-
protected static byte[] DATE_QUALIFIER = Bytes.toBytes("d");
170-
protected static byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
171-
protected static byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
172-
protected static byte[] REF_QUALIFIER = Bytes.toBytes("ref");
155+
static final String TABLE_NAME_KEY = "IntegrationTestLoadCommonCrawl.table";
156+
static final String DEFAULT_TABLE_NAME = "IntegrationTestLoadCommonCrawl";
157+
158+
static final String INCREMENTS_NAME_KEY = "IntegrationTestLoadCommonCrawl.increments";
159+
static final boolean DEFAULT_INCREMENTS = false;
160+
161+
static final int MAX_INFLIGHT = 1000;
162+
static final int INFLIGHT_PAUSE_MS = 100;
163+
164+
static final byte[] CONTENT_FAMILY_NAME = Bytes.toBytes("c");
165+
static final byte[] INFO_FAMILY_NAME = Bytes.toBytes("i");
166+
static final byte[] URL_FAMILY_NAME = Bytes.toBytes("u");
167+
static final byte[] SEP = Bytes.toBytes(":");
168+
static final byte[] CONTENT_QUALIFIER = HConstants.EMPTY_BYTE_ARRAY;
169+
static final byte[] CONTENT_LENGTH_QUALIFIER = Bytes.toBytes("l");
170+
static final byte[] CONTENT_TYPE_QUALIFIER = Bytes.toBytes("t");
171+
static final byte[] CRC_QUALIFIER = Bytes.toBytes("c");
172+
static final byte[] DATE_QUALIFIER = Bytes.toBytes("d");
173+
static final byte[] IP_ADDRESS_QUALIFIER = Bytes.toBytes("a");
174+
static final byte[] TARGET_URI_QUALIFIER = Bytes.toBytes("u");
175+
static final byte[] REF_QUALIFIER = Bytes.toBytes("ref");
173176

174177
public static enum Counts {
175178
REFERENCED,
@@ -612,7 +615,7 @@ protected void cleanup(final Context context) throws IOException, InterruptedExc
612615

613616
while (inflight.get() != 0) {
614617
LOG.info("Operations in flight, waiting");
615-
Thread.sleep(1000);
618+
Thread.sleep(INFLIGHT_PAUSE_MS);
616619
}
617620

618621
// Shut down the executor
@@ -673,7 +676,12 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte
673676
if (ipAddr != null) {
674677
put.addColumn(INFO_FAMILY_NAME, IP_ADDRESS_QUALIFIER, ts, Bytes.toBytes(ipAddr));
675678
}
676-
inflight.incrementAndGet();
679+
long pending = inflight.incrementAndGet();
680+
while (pending > MAX_INFLIGHT) {
681+
LOG.info("Too many operations in flight, waiting");
682+
Thread.sleep(INFLIGHT_PAUSE_MS);
683+
pending = inflight.get();
684+
}
677685
final long putStartTime = System.currentTimeMillis();
678686
final CompletableFuture<Void> putFuture = table.put(put);
679687
putFuture.thenRun(() -> {
@@ -716,7 +724,12 @@ protected void map(final LongWritable key, final WARCWritable value, final Conte
716724
final Increment increment = new Increment(urlRowKey);
717725
increment.setTimestamp(ts);
718726
increment.addColumn(URL_FAMILY_NAME, refQual, 1);
719-
inflight.incrementAndGet();
727+
pending = inflight.incrementAndGet();
728+
while (pending > MAX_INFLIGHT) {
729+
LOG.info("Too many operations in flight, waiting");
730+
Thread.sleep(INFLIGHT_PAUSE_MS);
731+
pending = inflight.get();
732+
}
720733
final long incrStartTime = System.currentTimeMillis();
721734
final CompletableFuture<Result> incrFuture = table.increment(increment);
722735
incrFuture.thenRun(() -> {

0 commit comments

Comments
 (0)