@@ -216,10 +216,22 @@ protected static class RunResult implements Comparable<RunResult> {
216216 public RunResult (long duration , Histogram hist ) {
217217 this .duration = duration ;
218218 this .hist = hist ;
219+ numbOfReplyOverThreshold = 0 ;
220+ numOfReplyFromReplica = 0 ;
221+ }
222+
223+ public RunResult (long duration , long numbOfReplyOverThreshold , long numOfReplyFromReplica ,
224+ Histogram hist ) {
225+ this .duration = duration ;
226+ this .hist = hist ;
227+ this .numbOfReplyOverThreshold = numbOfReplyOverThreshold ;
228+ this .numOfReplyFromReplica = numOfReplyFromReplica ;
219229 }
220230
221231 public final long duration ;
222232 public final Histogram hist ;
233+ public final long numbOfReplyOverThreshold ;
234+ public final long numOfReplyFromReplica ;
223235
224236 @ Override
225237 public String toString () {
@@ -482,6 +494,10 @@ public void setStatus(final String msg) throws IOException {
482494 });
483495 LOG .info ("Finished " + Thread .currentThread ().getName () + " in " + run .duration +
484496 "ms over " + threadOpts .perClientRunRows + " rows" );
497+ if (opts .latencyThreshold > 0 ) {
498+ LOG .info ("Number of replies over latency threshold " + opts .latencyThreshold +
499+ "(ms) is " + run .numbOfReplyOverThreshold );
500+ }
485501 return run ;
486502 }
487503 });
@@ -502,10 +518,12 @@ public void setStatus(final String msg) throws IOException {
502518 long total = 0 ;
503519 float avgLatency = 0 ;
504520 float avgTPS = 0 ;
521+ long replicaWins = 0 ;
505522 for (RunResult result : results ) {
506523 total += result .duration ;
507524 avgLatency += result .hist .getSnapshot ().getMean ();
508525 avgTPS += opts .perClientRunRows * 1.0f / result .duration ;
526+ replicaWins += result .numOfReplyFromReplica ;
509527 }
510528 avgTPS *= 1000 ; // ms to second
511529 avgLatency = avgLatency / results .length ;
@@ -515,12 +533,15 @@ public void setStatus(final String msg) throws IOException {
515533 + "\t Avg: " + (total / results .length ) + "ms" );
516534 LOG .info ("[ Avg latency (us)]\t " + Math .round (avgLatency ));
517535 LOG .info ("[ Avg TPS/QPS]\t " + Math .round (avgTPS ) + "\t row per second" );
536+ if (opts .replicas > 1 ) {
537+ LOG .info ("[results from replica regions] " + replicaWins );
538+ }
539+
518540 for (int i = 0 ; i < opts .connCount ; i ++) {
519541 cons [i ].close ();
520542 asyncCons [i ].close ();
521543 }
522544
523-
524545 return results ;
525546 }
526547
@@ -696,6 +717,7 @@ static class TestOptions {
696717 int columns = 1 ;
697718 int families = 1 ;
698719 int caching = 30 ;
720+ int latencyThreshold = 0 ; // in millsecond
699721 boolean addColumns = true ;
700722 MemoryCompactionPolicy inMemoryCompaction =
701723 MemoryCompactionPolicy .valueOf (
@@ -731,6 +753,7 @@ public TestOptions(TestOptions that) {
731753 this .useTags = that .useTags ;
732754 this .noOfTags = that .noOfTags ;
733755 this .reportLatency = that .reportLatency ;
756+ this .latencyThreshold = that .latencyThreshold ;
734757 this .multiGet = that .multiGet ;
735758 this .multiPut = that .multiPut ;
736759 this .inMemoryCF = that .inMemoryCF ;
@@ -1120,6 +1143,7 @@ private static long nextRandomSeed() {
11201143
11211144 private String testName ;
11221145 private Histogram latencyHistogram ;
1146+ private Histogram replicaLatencyHistogram ;
11231147 private Histogram valueSizeHistogram ;
11241148 private Histogram rpcCallsHistogram ;
11251149 private Histogram remoteRpcCallsHistogram ;
@@ -1128,6 +1152,8 @@ private static long nextRandomSeed() {
11281152 private Histogram bytesInResultsHistogram ;
11291153 private Histogram bytesInRemoteResultsHistogram ;
11301154 private RandomDistribution .Zipf zipf ;
1155+ private long numOfReplyOverLatencyThreshold = 0 ;
1156+ private long numOfReplyFromReplica = 0 ;
11311157
11321158 /**
11331159 * Note that all subclasses of this class must provide a public constructor
@@ -1165,13 +1191,28 @@ int getValueLength(final Random r) {
11651191 }
11661192
11671193 void updateValueSize (final Result [] rs ) throws IOException {
1168- if (rs == null || !isRandomValueSize ()) return ;
1169- for (Result r : rs ) updateValueSize (r );
1194+ updateValueSize (rs , 0 );
1195+ }
1196+
1197+ void updateValueSize (final Result [] rs , final long latency ) throws IOException {
1198+ if (rs == null || (latency == 0 )) return ;
1199+ for (Result r : rs ) updateValueSize (r , latency );
11701200 }
11711201
11721202 void updateValueSize (final Result r ) throws IOException {
1173- if (r == null || !isRandomValueSize ()) return ;
1203+ updateValueSize (r , 0 );
1204+ }
1205+
1206+ void updateValueSize (final Result r , final long latency ) throws IOException {
1207+ if (r == null || (latency == 0 )) return ;
11741208 int size = 0 ;
1209+ // update replicaHistogram
1210+ if (r .isStale ()) {
1211+ replicaLatencyHistogram .update (latency / 1000 );
1212+ numOfReplyFromReplica ++;
1213+ }
1214+ if (!isRandomValueSize ()) return ;
1215+
11751216 for (CellScanner scanner = r .cellScanner (); scanner .advance ();) {
11761217 size += scanner .current ().getValueLength ();
11771218 }
@@ -1235,6 +1276,10 @@ public Histogram getLatencyHistogram() {
12351276 void testSetup () throws IOException {
12361277 // test metrics
12371278 latencyHistogram = YammerHistogramUtils .newHistogram (new UniformReservoir (1024 * 500 ));
1279+ // If it is a replica test, set up histogram for replica.
1280+ if (opts .replicas > 1 ) {
1281+ replicaLatencyHistogram = YammerHistogramUtils .newHistogram (new UniformReservoir (1024 * 500 ));
1282+ }
12381283 valueSizeHistogram = YammerHistogramUtils .newHistogram (new UniformReservoir (1024 * 500 ));
12391284 // scan metrics
12401285 rpcCallsHistogram = YammerHistogramUtils .newHistogram (new UniformReservoir (1024 * 500 ));
@@ -1258,6 +1303,10 @@ void testTakedown() throws IOException {
12581303 status .setStatus ("Test : " + testName + ", Thread : " + Thread .currentThread ().getName ());
12591304 status .setStatus ("Latency (us) : " + YammerHistogramUtils .getHistogramReport (
12601305 latencyHistogram ));
1306+ if (opts .replicas > 1 ) {
1307+ status .setStatus ("Latency (us) from Replica Regions: " +
1308+ YammerHistogramUtils .getHistogramReport (replicaLatencyHistogram ));
1309+ }
12611310 status .setStatus ("Num measures (latency) : " + latencyHistogram .getCount ());
12621311 status .setStatus (YammerHistogramUtils .getPrettyHistogramReport (latencyHistogram ));
12631312 if (valueSizeHistogram .getCount () > 0 ) {
@@ -1339,15 +1388,19 @@ void testTimed() throws IOException, InterruptedException {
13391388 long startTime = System .nanoTime ();
13401389 boolean requestSent = false ;
13411390 try (TraceScope scope = TraceUtil .createTrace ("test row" );){
1342- requestSent = testRow (i );
1391+ requestSent = testRow (i , startTime );
13431392 }
13441393 if ( (i - startRow ) > opts .measureAfter ) {
13451394 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
13461395 // first 9 times and sends the actual get request in the 10th iteration.
13471396 // We should only set latency when actual request is sent because otherwise
13481397 // it turns out to be 0.
13491398 if (requestSent ) {
1350- latencyHistogram .update ((System .nanoTime () - startTime ) / 1000 );
1399+ long latency = (System .nanoTime () - startTime ) / 1000 ;
1400+ latencyHistogram .update (latency );
1401+ if ((opts .latencyThreshold > 0 ) && (latency / 1000 >= opts .latencyThreshold )) {
1402+ numOfReplyOverLatencyThreshold ++;
1403+ }
13511404 }
13521405 if (status != null && i > 0 && (i % getReportingPeriod ()) == 0 ) {
13531406 status .setStatus (generateStatus (startRow , i , lastRow ));
@@ -1379,7 +1432,7 @@ public String getShortValueSizeReport() {
13791432 * False if not, multiGet and multiPut e.g., the rows are sent
13801433 * to server only if enough gets/puts are gathered.
13811434 */
1382- abstract boolean testRow (final int i ) throws IOException , InterruptedException ;
1435+ abstract boolean testRow (final int i , final long startTime ) throws IOException , InterruptedException ;
13831436 }
13841437
13851438 static abstract class Test extends TestBase {
@@ -1450,7 +1503,7 @@ static class AsyncRandomReadTest extends AsyncTableTest {
14501503 }
14511504
14521505 @ Override
1453- boolean testRow (final int i ) throws IOException , InterruptedException {
1506+ boolean testRow (final int i , final long startTime ) throws IOException , InterruptedException {
14541507 if (opts .randomSleep > 0 ) {
14551508 Thread .sleep (rd .nextInt (opts .randomSleep ));
14561509 }
@@ -1559,7 +1612,7 @@ void testTakedown() throws IOException {
15591612 }
15601613
15611614 @ Override
1562- boolean testRow (final int i ) throws IOException {
1615+ boolean testRow (final int i , final long startTime ) throws IOException {
15631616 if (this .testScanner == null ) {
15641617 Scan scan =
15651618 new Scan ().withStartRow (format (opts .startRow )).setCaching (opts .caching )
@@ -1593,7 +1646,7 @@ static class AsyncSequentialReadTest extends AsyncTableTest {
15931646 }
15941647
15951648 @ Override
1596- boolean testRow (final int i ) throws IOException , InterruptedException {
1649+ boolean testRow (final int i , final long startTime ) throws IOException , InterruptedException {
15971650 Get get = new Get (format (i ));
15981651 for (int family = 0 ; family < opts .families ; family ++) {
15991652 byte [] familyName = Bytes .toBytes (FAMILY_NAME_BASE + family );
@@ -1635,7 +1688,7 @@ protected byte[] generateRow(final int i) {
16351688
16361689 @ Override
16371690 @ SuppressWarnings ("ReturnValueIgnored" )
1638- boolean testRow (final int i ) throws IOException , InterruptedException {
1691+ boolean testRow (final int i , final long startTime ) throws IOException , InterruptedException {
16391692 byte [] row = generateRow (i );
16401693 Put put = new Put (row );
16411694 for (int family = 0 ; family < opts .families ; family ++) {
@@ -1710,7 +1763,7 @@ static class RandomSeekScanTest extends TableTest {
17101763 }
17111764
17121765 @ Override
1713- boolean testRow (final int i ) throws IOException {
1766+ boolean testRow (final int i , final long startTime ) throws IOException {
17141767 Scan scan = new Scan ().withStartRow (getRandomRow (this .rand , opts .totalRows ))
17151768 .setCaching (opts .caching ).setCacheBlocks (opts .cacheBlocks )
17161769 .setAsyncPrefetch (opts .asyncPrefetch ).setReadType (opts .scanReadType )
@@ -1758,7 +1811,7 @@ static abstract class RandomScanWithRangeTest extends TableTest {
17581811 }
17591812
17601813 @ Override
1761- boolean testRow (final int i ) throws IOException {
1814+ boolean testRow (final int i , final long startTime ) throws IOException {
17621815 Pair <byte [], byte []> startAndStopRow = getStartAndStopRow ();
17631816 Scan scan = new Scan ().withStartRow (startAndStopRow .getFirst ())
17641817 .withStopRow (startAndStopRow .getSecond ()).setCaching (opts .caching )
@@ -1861,6 +1914,7 @@ static class RandomReadTest extends TableTest {
18611914 private final Consistency consistency ;
18621915 private ArrayList <Get > gets ;
18631916 private Random rd = new Random ();
1917+ private long numOfReplyFromReplica = 0 ;
18641918
18651919 RandomReadTest (Connection con , TestOptions options , Status status ) {
18661920 super (con , options , status );
@@ -1872,7 +1926,7 @@ static class RandomReadTest extends TableTest {
18721926 }
18731927
18741928 @ Override
1875- boolean testRow (final int i ) throws IOException , InterruptedException {
1929+ boolean testRow (final int i , final long startTime ) throws IOException , InterruptedException {
18761930 if (opts .randomSleep > 0 ) {
18771931 Thread .sleep (rd .nextInt (opts .randomSleep ));
18781932 }
@@ -1897,13 +1951,24 @@ boolean testRow(final int i) throws IOException, InterruptedException {
18971951 this .gets .add (get );
18981952 if (this .gets .size () == opts .multiGet ) {
18991953 Result [] rs = this .table .get (this .gets );
1900- updateValueSize (rs );
1954+ if (opts .replicas > 1 ) {
1955+ long latency = System .nanoTime () - startTime ;
1956+ updateValueSize (rs , latency );
1957+ } else {
1958+ updateValueSize (rs );
1959+ }
19011960 this .gets .clear ();
19021961 } else {
19031962 return false ;
19041963 }
19051964 } else {
1906- updateValueSize (this .table .get (get ));
1965+ if (opts .replicas > 1 ) {
1966+ Result r = this .table .get (get );
1967+ long latency = System .nanoTime () - startTime ;
1968+ updateValueSize (r , latency );
1969+ } else {
1970+ updateValueSize (this .table .get (get ));
1971+ }
19071972 }
19081973 return true ;
19091974 }
@@ -1954,7 +2019,7 @@ void testTakedown() throws IOException {
19542019
19552020
19562021 @ Override
1957- boolean testRow (final int i ) throws IOException {
2022+ boolean testRow (final int i , final long startTime ) throws IOException {
19582023 if (this .testScanner == null ) {
19592024 Scan scan = new Scan ().withStartRow (format (opts .startRow )).setCaching (opts .caching )
19602025 .setCacheBlocks (opts .cacheBlocks ).setAsyncPrefetch (opts .asyncPrefetch )
@@ -2017,7 +2082,7 @@ static class IncrementTest extends CASTableTest {
20172082 }
20182083
20192084 @ Override
2020- boolean testRow (final int i ) throws IOException {
2085+ boolean testRow (final int i , final long startTime ) throws IOException {
20212086 Increment increment = new Increment (format (i ));
20222087 // unlike checkAndXXX tests, which make most sense to do on a single value,
20232088 // if multiple families are specified for an increment test we assume it is
@@ -2037,7 +2102,7 @@ static class AppendTest extends CASTableTest {
20372102 }
20382103
20392104 @ Override
2040- boolean testRow (final int i ) throws IOException {
2105+ boolean testRow (final int i , final long startTime ) throws IOException {
20412106 byte [] bytes = format (i );
20422107 Append append = new Append (bytes );
20432108 // unlike checkAndXXX tests, which make most sense to do on a single value,
@@ -2058,7 +2123,7 @@ static class CheckAndMutateTest extends CASTableTest {
20582123 }
20592124
20602125 @ Override
2061- boolean testRow (final int i ) throws IOException {
2126+ boolean testRow (final int i , final long startTime ) throws IOException {
20622127 final byte [] bytes = format (i );
20632128 // checkAndXXX tests operate on only a single value
20642129 // Put a known value so when we go to check it, it is there.
@@ -2079,7 +2144,7 @@ static class CheckAndPutTest extends CASTableTest {
20792144 }
20802145
20812146 @ Override
2082- boolean testRow (final int i ) throws IOException {
2147+ boolean testRow (final int i , final long startTime ) throws IOException {
20832148 final byte [] bytes = format (i );
20842149 // checkAndXXX tests operate on only a single value
20852150 // Put a known value so when we go to check it, it is there.
@@ -2098,7 +2163,7 @@ static class CheckAndDeleteTest extends CASTableTest {
20982163 }
20992164
21002165 @ Override
2101- boolean testRow (final int i ) throws IOException {
2166+ boolean testRow (final int i , final long startTime ) throws IOException {
21022167 final byte [] bytes = format (i );
21032168 // checkAndXXX tests operate on only a single value
21042169 // Put a known value so when we go to check it, it is there.
@@ -2119,7 +2184,7 @@ static class SequentialReadTest extends TableTest {
21192184 }
21202185
21212186 @ Override
2122- boolean testRow (final int i ) throws IOException {
2187+ boolean testRow (final int i , final long startTime ) throws IOException {
21232188 Get get = new Get (format (i ));
21242189 for (int family = 0 ; family < opts .families ; family ++) {
21252190 byte [] familyName = Bytes .toBytes (FAMILY_NAME_BASE + family );
@@ -2157,7 +2222,7 @@ protected byte[] generateRow(final int i) {
21572222 }
21582223
21592224 @ Override
2160- boolean testRow (final int i ) throws IOException {
2225+ boolean testRow (final int i , final long startTime ) throws IOException {
21612226 byte [] row = generateRow (i );
21622227 Put put = new Put (row );
21632228 for (int family = 0 ; family < opts .families ; family ++) {
@@ -2214,7 +2279,7 @@ static class FilteredScanTest extends TableTest {
22142279 }
22152280
22162281 @ Override
2217- boolean testRow (int i ) throws IOException {
2282+ boolean testRow (int i , final long startTime ) throws IOException {
22182283 byte [] value = generateData (this .rand , getValueLength (this .rand ));
22192284 Scan scan = constructScan (value );
22202285 ResultScanner scanner = null ;
@@ -2358,7 +2423,8 @@ static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration
23582423 " (" + calculateMbps ((int )(opts .perClientRunRows * opts .sampleRate ), totalElapsedTime ,
23592424 getAverageValueLength (opts ), opts .families , opts .columns ) + ")" );
23602425
2361- return new RunResult (totalElapsedTime , t .getLatencyHistogram ());
2426+ return new RunResult (totalElapsedTime , t .numOfReplyOverLatencyThreshold ,
2427+ t .numOfReplyFromReplica , t .getLatencyHistogram ());
23622428 }
23632429
23642430 private static int getAverageValueLength (final TestOptions opts ) {
@@ -2424,6 +2490,8 @@ protected static void printUsage(final String shortName, final String message) {
24242490 System .err .println (" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
24252491 "Default: 0" );
24262492 System .err .println (" latency Set to report operation latencies. Default: False" );
2493+ System .err .println (" latencyThreshold Set to report number of operations with latency " +
2494+ "over lantencyThreshold, unit in millisecond, default 0" );
24272495 System .err .println (" measureAfter Start to measure the latency once 'measureAfter'" +
24282496 " rows have been treated. Default: 0" );
24292497 System .err .println (" valueSize Pass value size to use: Default: "
@@ -2621,6 +2689,12 @@ static TestOptions parseOpts(Queue<String> args) {
26212689 continue ;
26222690 }
26232691
2692+ final String latencyThreshold = "--latencyThreshold=" ;
2693+ if (cmd .startsWith (latencyThreshold )) {
2694+ opts .latencyThreshold = Integer .parseInt (cmd .substring (latencyThreshold .length ()));
2695+ continue ;
2696+ }
2697+
26242698 final String latency = "--latency" ;
26252699 if (cmd .startsWith (latency )) {
26262700 opts .reportLatency = true ;
0 commit comments