2323import java .util .function .Function ;
2424import java .util .function .Predicate ;
2525import java .util .function .UnaryOperator ;
26+ import java .util .stream .Collectors ;
2627import java .util .stream .Stream ;
2728
2829import org .slf4j .Logger ;
@@ -111,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
111112 res .ifPresentOrElse (
112113 r -> {
113114 R latestResource = (R ) r .getResource ().orElseThrow ();
114-
115115 // as previous resource version we use the one from successful update, since
116116 // we process new event here only if that is more recent then the event from our update.
117117 // Note that this is equivalent with the scenario when an informer watch connection
@@ -222,11 +222,6 @@ public Optional<R> getCachedValue(ResourceID resourceID) {
222222 return get (resourceID );
223223 }
224224
225- @ Override
226- public Stream <R > list (String namespace , Predicate <R > predicate ) {
227- return manager ().list (namespace , predicate );
228- }
229-
230225 void setTemporalResourceCache (TemporaryResourceCache <R > temporaryResourceCache ) {
231226 this .temporaryResourceCache = temporaryResourceCache ;
232227 }
@@ -239,19 +234,134 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
239234 this .indexers .putAll (indexers );
240235 }
241236
237+ @ Override
238+ public Stream <R > list (String namespace , Predicate <R > predicate ) {
239+ return manager ().list (namespace , predicate );
240+ }
241+
242+ @ Override
243+ public Stream <R > list (Predicate <R > predicate ) {
244+ return cache .list (predicate );
245+ }
246+
242247 @ Override
243248 public List <R > byIndex (String indexName , String indexKey ) {
244249 return manager ().byIndex (indexName , indexKey );
245250 }
246251
247- @ Override
248- public Stream <ResourceID > keys () {
249- return cache .keys ();
252+ public Stream <R > byIndexStream (String indexName , String indexKey ) {
253+ return manager ().byIndexStream (indexName , indexKey );
254+ }
255+
256+ /**
257+ * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is
258+ * useful when resources are updated using {@link
259+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
260+ */
261+ public Stream <R > listWithStrongConsistency (String namespace , Predicate <R > predicate ) {
262+ return mergeWithWithTempCacheResources (
263+ manager ().list (namespace , predicate ), namespace , predicate );
264+ }
265+
266+ /**
267+ * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when
268+ * resources are updated using {@link
269+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
270+ */
271+ public Stream <R > listWithStrongConsistency (Predicate <R > predicate ) {
272+ return mergeWithWithTempCacheResources (cache .list (predicate ), null , predicate );
273+ }
274+
275+ /**
276+ * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is
277+ * useful when resources are updated using {@link
278+ * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}.
279+ */
280+ public Stream <R > byIndexStreamWithStrongConsistency (String indexName , String indexKey ) {
281+ return mergeWithWithTempCacheResources (
282+ manager ().byIndexStream (indexName , indexKey ), indexName , indexKey );
283+ }
284+
285+ private Stream <R > mergeWithWithTempCacheResources (
286+ Stream <R > stream , String indexName , String indexKey ) {
287+ return mergeWithWithTempCacheResources (stream , null , null , indexName , indexKey );
288+ }
289+
290+ private Stream <R > mergeWithWithTempCacheResources (
291+ Stream <R > stream , String namespace , Predicate <R > predicate ) {
292+ return mergeWithWithTempCacheResources (stream , namespace , predicate , null , null );
293+ }
294+
295+ private Stream <R > mergeWithWithTempCacheResources (
296+ Stream <R > stream ,
297+ String namespace ,
298+ Predicate <R > predicate ,
299+ String indexName ,
300+ String indexKey ) {
301+ if (!comparableResourceVersions || temporaryResourceCache .isEmpty ()) {
302+ return stream ;
303+ }
304+ var allTempResources = temporaryResourceCache .getResources ();
305+ Map <ResourceID , R > tempResources ;
306+ if (namespace == null && predicate == null ) {
307+ tempResources = new HashMap <>(allTempResources );
308+ } else {
309+ // filtering the temp cache according the user input (predicate, namespace)
310+ tempResources =
311+ allTempResources .entrySet ().stream ()
312+ .filter (
313+ e -> {
314+ if (namespace != null ) {
315+ var res =
316+ e .getKey ().getNamespace ().map (ns -> ns .equals (namespace )).orElse (false );
317+ if (!res ) return false ;
318+ }
319+ if (predicate != null ) {
320+ return predicate .test (e .getValue ());
321+ }
322+ return true ;
323+ })
324+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
325+ }
326+ if (tempResources .isEmpty ()) {
327+ return stream ;
328+ }
329+ var upToDateList =
330+ stream
331+ .map (
332+ r -> {
333+ var resourceID = ResourceID .fromResource (r );
334+ // removing the id from the related temp resources
335+ // this is important so we can detect ghost resources:
336+ // all that remains is ghost resource
337+ var tempResource = tempResources .remove (resourceID );
338+ // using the latest version
339+ if (tempResource != null
340+ && ReconcilerUtilsInternal .compareResourceVersions (tempResource , r ) > 0 ) {
341+ return tempResource ;
342+ }
343+ return r ;
344+ })
345+ .toList ();
346+ Stream <R > tempResourceStream ;
347+ // ghost resource handling
348+ if (indexName != null && indexKey != null ) {
349+ var indexer = indexers .get (indexName );
350+ if (indexer == null ) {
351+ throw new IllegalArgumentException ("Indexer not found for: " + indexName );
352+ }
353+ // we check if the ghost resource is part of the index
354+ tempResourceStream =
355+ tempResources .values ().stream ().filter (r -> indexer .apply (r ).contains (indexKey ));
356+ } else {
357+ tempResourceStream = tempResources .values ().stream ();
358+ }
359+ return Stream .concat (tempResourceStream , upToDateList .stream ());
250360 }
251361
252362 @ Override
253- public Stream <R > list ( Predicate < R > predicate ) {
254- return cache .list ( predicate );
363+ public Stream <ResourceID > keys ( ) {
364+ return cache .keys ( );
255365 }
256366
257367 @ Override
0 commit comments