From a80660dab6bc57d44008e706ac9504e848a6a65e Mon Sep 17 00:00:00 2001 From: Fernando Moraes Date: Wed, 16 Jun 2021 22:53:44 -0300 Subject: [PATCH 1/2] Adding cluster support --- .eslintrc.json | 1 + jest.config.js | 4 +- src/cacheable.ts | 36 ++- src/tests/cacheable.spec.ts | 518 ++++++++++++++++++++++-------------- 4 files changed, 342 insertions(+), 217 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index c1690fe..06ea014 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -18,6 +18,7 @@ "prettier/@typescript-eslint" ], "rules": { + "no-shadow": "off", "import/prefer-default-export": "off", "class-methods-use-this": "off", "import-helpers/order-imports": [ diff --git a/jest.config.js b/jest.config.js index c1c9641..e4a77de 100644 --- a/jest.config.js +++ b/jest.config.js @@ -68,7 +68,7 @@ module.exports = { // moduleNameMapper: {}, // An array of regexp pattern strings, matched against all module paths before considered 'visible' to the module loader - // modulePathIgnorePatterns: [], + modulePathIgnorePatterns: ['dist'], // Activates notifications for test results // notify: false, @@ -175,5 +175,5 @@ module.exports = { // Whether to use watchman for file crawling // watchman: true, - testTimeout: 30000, + testTimeout: 60000, }; diff --git a/src/cacheable.ts b/src/cacheable.ts index 3fcddd4..6c8f6ef 100644 --- a/src/cacheable.ts +++ b/src/cacheable.ts @@ -1,7 +1,9 @@ -import { Redis } from 'ioredis'; +import { Cluster, Redis } from 'ioredis'; import winston, { Logger } from 'winston'; -type RedisFactory = () => Redis; +type RedisAbstract = Redis | Cluster; + +type RedisFactory = () => RedisAbstract; type ConstructorOptions = { logger?: Logger; @@ -16,7 +18,7 @@ type CallOptions = { type Result = string | unknown; export class PromiseCacheable { - private redis: Redis; + private redis: RedisAbstract; private logger: Logger; @@ -39,14 +41,26 @@ export class PromiseCacheable { const safeSuccessCallback = async (result: Result) => { const resultAsString = typeof result === 'string' ? result : JSON.stringify(result); + + // console.log('processou'); try { + await Promise.all([ + this.redis.set( + key, + resultAsString, + 'PX', + options.cacheTimeout + ), + this.redis.publish(notifyKey, resultAsString), + this.redis.del(lockKey), + ]); // atomic remove lock and setting cache - await this.redis - .multi() - .set(key, resultAsString, 'PX', options.cacheTimeout) - .publish(notifyKey, resultAsString) - .del(lockKey) - .exec(); + // await this.redis + // .multi() + // .set(key, resultAsString, 'PX', options.cacheTimeout) + // .publish(notifyKey, resultAsString) + // .del(lockKey) + // .exec(); } catch (e) { this.logger.error(`problems on success callback operations`, e); } @@ -83,7 +97,7 @@ export class PromiseCacheable { this.safeDisconnectRedis(this.redis); } - private safeDisconnectRedis(redis: Redis) { + private async safeDisconnectRedis(redis: RedisAbstract) { try { redis.disconnect(); } catch (e) { @@ -110,7 +124,7 @@ export class PromiseCacheable { } private safeSubscribe( - subscriberConnection: Redis, + subscriberConnection: RedisAbstract, key: string, notifyKey: string, timeout: number diff --git a/src/tests/cacheable.spec.ts b/src/tests/cacheable.spec.ts index 10c7cd1..d743abf 100644 --- a/src/tests/cacheable.spec.ts +++ b/src/tests/cacheable.spec.ts @@ -1,17 +1,103 @@ -import Redis from 'ioredis'; +import Redis, { Cluster } from 'ioredis'; import allSettled from 'promise.allsettled'; -import { GenericContainer, StartedTestContainer } from 'testcontainers'; +import { + GenericContainer, + Network, + StartedNetwork, + StartedTestContainer, + Wait, +} from 'testcontainers'; import { v4 as uuidv4 } from 'uuid'; import { PromiseCacheable } from '..'; -let container: StartedTestContainer; +class Docker { + private cluster: StartedTestContainer | undefined; -const defaultRedisFactory = () => - new Redis({ - host: container.getHost(), - port: container.getMappedPort(6379), - }); + private standalone: StartedTestContainer | undefined; + + private network: StartedNetwork | undefined; + + private clusterPorts = [7000, 7001, 7002]; + + async start() { + this.network = await new Network().start(); + + [this.cluster, this.standalone] = await Promise.all([ + new GenericContainer('grokzen/redis-cluster:latest') + .withExposedPorts(...this.clusterPorts) + .withNetworkMode(this.network.getName()) + .withEnv('MASTERS', '3') + .withEnv('SLAVES_PER_MASTER', '0') + .withWaitStrategy( + Wait.forLogMessage('Ready to accept connections') + ) + .start(), + new GenericContainer('redis:6').withExposedPorts(6379).start(), + ]); + } + + async stop() { + await this.cluster?.stop(); + return Promise.all([this.network?.stop(), this.standalone?.stop()]); + } + + getStandaloneConnection() { + const container = this.standalone as StartedTestContainer; + return { + host: container.getHost(), + port: container.getMappedPort(6379), + }; + } + + getClusterConnection() { + const container = this.cluster as StartedTestContainer; + const network = this.network as StartedNetwork; + + const networkIpAddress = container.getIpAddress(network.getName()); + + const dockerHost = container.getHost() || ''; + const hosts = this.clusterPorts.map((port) => { + return { + host: dockerHost, + port: container.getMappedPort(port), + }; + }); + + const natMap = this.clusterPorts.reduce( + (map: Record, port) => { + const hostPort = container.getMappedPort(port); + const internalAddress = `${networkIpAddress}:${port}`; + // eslint-disable-next-line no-param-reassign + map[internalAddress] = { host: dockerHost, port: hostPort }; + return map; + }, + {} + ); + + return { natMap, hosts }; + } +} + +const docker = new Docker(); + +const serversType: Record = { + standalone: { + clusterMode: false, + }, + cluster: { + clusterMode: true, + }, +}; + +const createRedisFactory = (serverType: string) => { + const { clusterMode } = serversType[serverType]; + if (clusterMode) { + const { hosts, natMap } = docker.getClusterConnection(); + return () => new Cluster(hosts, { natMap }); + } + return () => new Redis(docker.getStandaloneConnection()); +}; const defaultExecuteOptions = { cacheTimeout: 10 * 1000, @@ -19,228 +105,248 @@ const defaultExecuteOptions = { }; beforeAll(async (done) => { - container = await new GenericContainer('redis:6') - .withExposedPorts(6379) - .start(); + await docker.start(); done(); }); afterAll(async (done) => { - await container.stop(); + await docker.stop(); done(); }); -test('should return processed value', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const result = await cacheable.call( - { ...defaultExecuteOptions, key: uuidv4() }, - () => 'value' - ); +test.each(Object.keys(serversType))( + '%p: should return processed value', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); - expect(result).toEqual('value'); + const result = await cacheable.call( + { ...defaultExecuteOptions, key: uuidv4() }, + () => 'value' + ); - cacheable.close(); -}); - -test('should not process key if it has already been processed', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; + expect(result).toEqual('value'); - const result = await cacheable.call(executeOptions, () => 'value'); - - let twice = false; + cacheable.close(); + } +); - const result2 = await cacheable.call(executeOptions, () => { - twice = true; - return 'error'; - }); +test.each(Object.keys(serversType))( + '%p: should not process key if it has already been processed', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; - expect(twice).toBeFalsy(); - expect(result2).toEqual(result); + const result = await cacheable.call(executeOptions, () => 'value'); - cacheable.close(); -}); + let twice = false; -test('should wait for the process', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; + const result2 = await cacheable.call(executeOptions, () => { + twice = true; + return 'error'; + }); - const promise: Promise = new Promise((resolve) => { - setTimeout(() => resolve('slow value'), 2 * 1000); - }); + expect(twice).toBeFalsy(); + expect(result2).toEqual(result); - const result = await cacheable.call(executeOptions, () => promise); + cacheable.close(); + } +); - expect(result).toEqual('slow value'); - expect(promise).resolves.toBeDefined(); +test.each(Object.keys(serversType))( + '%p: should wait for the process', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; - cacheable.close(); -}); + const promise: Promise = new Promise((resolve) => { + setTimeout(() => resolve('slow value'), 2 * 1000); + }); -test('should not process the same key twice', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; + const result = await cacheable.call(executeOptions, () => promise); - let resultProcessed = false; - let result2Processed = false; - let result3Processed = false; + expect(result).toEqual('slow value'); + expect(promise).resolves.toBeDefined(); - const resultPromise = cacheable.call(executeOptions, () => { - resultProcessed = true; - return new Promise((resolve) => { - setTimeout(() => resolve('slow value1'), 3 * 1000); + cacheable.close(); + } +); + +test.each(Object.keys(serversType))( + '%p: should not process the same key twice', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { ...defaultExecuteOptions, key: uuidv4() }; + + let resultProcessed = false; + let result2Processed = false; + let result3Processed = false; + + const resultPromise = cacheable.call(executeOptions, () => { + resultProcessed = true; + return new Promise((resolve) => { + setTimeout(() => resolve('slow value1'), 3 * 1000); + }); }); - }); - const resultPromise2 = cacheable.call(executeOptions, () => { - result2Processed = true; - return new Promise((resolve) => { - setTimeout(() => resolve('slow value2'), 3 * 1000); + const resultPromise2 = cacheable.call(executeOptions, () => { + result2Processed = true; + return new Promise((resolve) => { + setTimeout(() => resolve('slow value2'), 3 * 1000); + }); }); - }); - const resultPromise3 = cacheable.call(executeOptions, () => { - result3Processed = true; - return new Promise((resolve) => { - setTimeout(() => resolve('slow value3'), 3 * 1000); + const resultPromise3 = cacheable.call(executeOptions, () => { + result3Processed = true; + return new Promise((resolve) => { + setTimeout(() => resolve('slow value3'), 3 * 1000); + }); }); - }); - - const results = await Promise.all([ - resultPromise, - resultPromise2, - resultPromise3, - ]); - expect( - [resultProcessed, result2Processed, result3Processed].filter( - (processed) => processed - ).length - ).toEqual(1); - - if (resultProcessed) { - expect(results.every((result) => result === 'slow value1')); - } else if (result2Processed) { - expect(results.every((result) => result === 'slow value2')); - } else { - expect(results.every((result) => result === 'slow value3')); + const results = await Promise.all([ + resultPromise, + resultPromise2, + resultPromise3, + ]); + + expect( + [resultProcessed, result2Processed, result3Processed].filter( + (processed) => processed + ).length + ).toEqual(1); + + if (resultProcessed) { + expect(results.every((result) => result === 'slow value1')); + } else if (result2Processed) { + expect(results.every((result) => result === 'slow value2')); + } else { + expect(results.every((result) => result === 'slow value3')); + } + + cacheable.close(); } +); + +test.each(Object.keys(serversType))( + '%p: should process other calls in case of wait timeout happens on inflight process', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { + ...defaultExecuteOptions, + key: uuidv4(), + waitTimeout: 1 * 1000, + }; + + const resultPromise = cacheable.call( + executeOptions, + () => + new Promise((resolve) => { + setTimeout(() => resolve('slow value'), 4 * 1000); + }) + ); + + let twice = false; + const resultPromise2 = cacheable.call(executeOptions, () => { + twice = true; + return 'result2'; + }); - cacheable.close(); -}); - -test('should process other calls in case of wait timeout happens on inflight process', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { - ...defaultExecuteOptions, - key: uuidv4(), - waitTimeout: 1 * 1000, - }; + let twice2 = false; + const resultPromise3 = cacheable.call(executeOptions, () => { + twice2 = true; + return 'result3'; + }); - const resultPromise = cacheable.call( - executeOptions, - () => - new Promise((resolve) => { - setTimeout(() => resolve('slow value'), 4 * 1000); - }) - ); - - let twice = false; - const resultPromise2 = cacheable.call(executeOptions, () => { - twice = true; - return 'result2'; - }); - - let twice2 = false; - const resultPromise3 = cacheable.call(executeOptions, () => { - twice2 = true; - return 'result3'; - }); - - const [result, result2, result3] = await Promise.all([ - resultPromise, - resultPromise2, - resultPromise3, - ]); + const [result, result2, result3] = await Promise.all([ + resultPromise, + resultPromise2, + resultPromise3, + ]); - expect(result).toEqual('slow value'); + expect(result).toEqual('slow value'); - expect(twice).toBeTruthy(); - expect(result2).toEqual('result2'); + expect(twice).toBeTruthy(); + expect(result2).toEqual('result2'); - expect(twice2).toBeTruthy(); - expect(result3).toEqual('result3'); + expect(twice2).toBeTruthy(); + expect(result3).toEqual('result3'); - cacheable.close(); -}); + cacheable.close(); + } +); + +test.each(Object.keys(serversType))( + '%p: should process other calls in case the wait timeout happens on inflight error', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { + ...defaultExecuteOptions, + key: uuidv4(), + }; + + const resultPromise = cacheable.call( + executeOptions, + () => + new Promise((resolve, reject) => { + setTimeout(() => reject(new Error()), 2 * 1000); + }) + ); + + let twice = false; + const resultPromise2 = cacheable.call(executeOptions, () => { + twice = true; + return 'result2'; + }); -test('should process other calls in case the wait timeout happens on inflight error', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { - ...defaultExecuteOptions, - key: uuidv4(), - }; + let twice2 = false; + const resultPromise3 = cacheable.call(executeOptions, () => { + twice2 = true; + return 'result3'; + }); - const resultPromise = cacheable.call( - executeOptions, - () => - new Promise((resolve, reject) => { - setTimeout(() => reject(new Error()), 2 * 1000); - }) - ); - - let twice = false; - const resultPromise2 = cacheable.call(executeOptions, () => { - twice = true; - return 'result2'; - }); - - let twice2 = false; - const resultPromise3 = cacheable.call(executeOptions, () => { - twice2 = true; - return 'result3'; - }); - - const [, result2, result3] = await allSettled.call(Promise, [ - resultPromise, - resultPromise2, - resultPromise3, - ]); + const [, result2, result3] = await allSettled.call(Promise, [ + resultPromise, + resultPromise2, + resultPromise3, + ]); - expect(twice).toBeTruthy(); - expect((result2 as { value: string }).value).toEqual('result2'); + expect(twice).toBeTruthy(); + expect((result2 as { value: string }).value).toEqual('result2'); - expect(twice2).toBeTruthy(); - expect((result3 as { value: string }).value).toEqual('result3'); + expect(twice2).toBeTruthy(); + expect((result3 as { value: string }).value).toEqual('result3'); - cacheable.close(); -}); + cacheable.close(); + } +); -test('should respect the cache timeout', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); - const executeOptions = { - ...defaultExecuteOptions, - key: uuidv4(), - cacheTimeout: 2 * 1000, - }; +test.each(Object.keys(serversType))( + '%p: should respect the cache timeout', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); + const executeOptions = { + ...defaultExecuteOptions, + key: uuidv4(), + cacheTimeout: 2 * 1000, + }; - const result = await cacheable.call(executeOptions, () => 'value'); - const result2 = await cacheable.call(executeOptions, () => 'result2'); + const result = await cacheable.call(executeOptions, () => 'value'); + const result2 = await cacheable.call(executeOptions, () => 'result2'); - expect(result2).toEqual(result); + expect(result2).toEqual(result); - await new Promise((resolve) => { - setTimeout(resolve, executeOptions.cacheTimeout + 1 * 1000); - }); + await new Promise((resolve) => { + setTimeout(resolve, executeOptions.cacheTimeout + 1 * 1000); + }); - const result3 = await cacheable.call(executeOptions, () => 'result3'); + const result3 = await cacheable.call(executeOptions, () => 'result3'); - expect(result3).toEqual('result3'); + expect(result3).toEqual('result3'); - cacheable.close(); -}); + cacheable.close(); + } +); -test('should ignore cache if redis is out', async () => { +test('standalone: should ignore cache if redis is out', async () => { const redisFactory = () => new Redis({ port: 0, @@ -255,6 +361,7 @@ test('should ignore cache if redis is out', async () => { }; const result = await cacheable.call(executeOptions, () => 'result'); + const result2 = await cacheable.call(executeOptions, () => 'result2'); const result3 = await cacheable.call(executeOptions, () => 'result3'); @@ -275,31 +382,34 @@ test('should ignore cache if redis is out', async () => { cacheable.close(); }); -test('should support object values', async () => { - const cacheable = new PromiseCacheable(defaultRedisFactory); +test.each(Object.keys(serversType))( + '%p: should support object values', + async (serverType) => { + const cacheable = new PromiseCacheable(createRedisFactory(serverType)); - const executeOptions = { - ...defaultExecuteOptions, - key: uuidv4(), - }; + const executeOptions = { + ...defaultExecuteOptions, + key: uuidv4(), + }; - const processedValue = { - id: 'id', - name: 'name', - }; + const processedValue = { + id: 'id', + name: 'name', + }; - const result = JSON.parse( - await cacheable.call(executeOptions, () => processedValue) - ); + const result = JSON.parse( + await cacheable.call(executeOptions, () => processedValue) + ); - const result2 = JSON.parse( - await cacheable.call(executeOptions, () => processedValue) - ); + const result2 = JSON.parse( + await cacheable.call(executeOptions, () => processedValue) + ); - expect(result).toStrictEqual(processedValue); + expect(result).toStrictEqual(processedValue); - // returned by redis cache - expect(result2).toStrictEqual(processedValue); + // returned by redis cache + expect(result2).toStrictEqual(processedValue); - cacheable.close(); -}); + cacheable.close(); + } +); From 82bb2a571ae9cbcf641edf2fa7cf0e51a674e9d8 Mon Sep 17 00:00:00 2001 From: Fernando Moraes Date: Wed, 16 Jun 2021 23:08:31 -0300 Subject: [PATCH 2/2] Updating success callback logic --- src/cacheable.ts | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/cacheable.ts b/src/cacheable.ts index 6c8f6ef..ad51acc 100644 --- a/src/cacheable.ts +++ b/src/cacheable.ts @@ -41,26 +41,20 @@ export class PromiseCacheable { const safeSuccessCallback = async (result: Result) => { const resultAsString = typeof result === 'string' ? result : JSON.stringify(result); - - // console.log('processou'); try { - await Promise.all([ - this.redis.set( - key, - resultAsString, - 'PX', - options.cacheTimeout - ), - this.redis.publish(notifyKey, resultAsString), - this.redis.del(lockKey), - ]); - // atomic remove lock and setting cache - // await this.redis - // .multi() - // .set(key, resultAsString, 'PX', options.cacheTimeout) - // .publish(notifyKey, resultAsString) - // .del(lockKey) - // .exec(); + // try first set on cache, less problems if something goes wrong. + // it doesn't use transaction to be possible to work with cluster. + await this.redis.set( + key, + resultAsString, + 'PX', + options.cacheTimeout + ); + await this.redis + .pipeline() + .publish(notifyKey, resultAsString) + .del(lockKey) + .exec(); } catch (e) { this.logger.error(`problems on success callback operations`, e); }