diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/ARQConstants.java b/jena-arq/src/main/java/org/apache/jena/sparql/ARQConstants.java index dc6adb68442..ba302c93f0f 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/ARQConstants.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/ARQConstants.java @@ -326,6 +326,13 @@ public class ARQConstants public static final Symbol registryExtensions = SystemARQ.allocSymbol("registryExtensions") ; - public static void init() {} + /** The query dispatcher registry key */ + public static final Symbol registryQueryDispatchers = + SystemARQ.allocSymbol("registryQueryDispatchers") ; + + /** The update dispatcher registry key */ + public static final Symbol registryUpdateDispatchers = + SystemARQ.allocSymbol("registryUpdateDispatchers") ; + public static void init() {} } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java index d43e7bc9272..d06ba680abf 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/Timeouts.java @@ -236,13 +236,23 @@ public static String toString(Timeout timeout) { return result; } - // Set times from context if not set directly. e..g Context provides default values. - // Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. + /** + * Update unset values in the builder with values from the context. + * + * Set times from context if not set directly, i.e. context provides default values. + * Contrast with SPARQLQueryProcessor where the context is limiting values of the protocol parameter. + */ public static void applyDefaultQueryTimeoutFromContext(TimeoutBuilderImpl builder, Context cxt) { Timeout queryTimeout = extractQueryTimeout(cxt); applyDefaultTimeout(builder, queryTimeout); } + /** Update unset values in the builder with values from the context. */ + public static void applyDefaultUpdateTimeoutFromContext(TimeoutBuilderImpl builder, Context cxt) { + Timeout queryTimeout = extractUpdateTimeout(cxt); + applyDefaultTimeout(builder, queryTimeout); + } + /** Returns milliseconds if the given time unit is null. */ private static TimeUnit nullToMillis(TimeUnit unit) { return unit != null ? unit : TimeUnit.MILLISECONDS; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingQueryDispatcher.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingQueryDispatcher.java new file mode 100644 index 00000000000..07e7a4110fd --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingQueryDispatcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import org.apache.jena.query.Query; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.util.Context; + +/** + * A query dispatcher is responsible for taking a query and + * preparing it for the execution against a dataset. + * The result is a {@linkplain QueryExec} instance. + * + * Query dispatchers form a chain, and a {@link ChainingQueryDispatcher} acts as a link in such a chain. + * A ChainingQueryDispatcher instance can choose to process a query by itself or to delegate processing to the + * remainder of the chain. + * + * @see QueryDispatcherRegistry + */ +public interface ChainingQueryDispatcher { + QueryExec create(Query query, DatasetGraph dsg, Context context, QueryDispatcher chain); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingUpdateDispatcher.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingUpdateDispatcher.java new file mode 100644 index 00000000000..ea5116a48eb --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/ChainingUpdateDispatcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +/** + * An update dispatcher is responsible for taking an update request + * preparing it for the execution against a dataset. + * The result is a {@linkplain UpdateExec} instance. + * + * Update dispatchers form a chain, and a {@link ChainingUpdateDispatcher} acts as a link in such a chain. + * A ChainingUpdateDispatcher instance can choose to process an update request by itself or to delegate processing to the + * remainder of the chain. + * + * @see UpdateDispatcherRegistry + */ +public interface ChainingUpdateDispatcher { + UpdateExec create(UpdateRequest updateRequest, DatasetGraph dsg, Context context, UpdateDispatcher chain); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcher.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcher.java new file mode 100644 index 00000000000..77c86abedb3 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcher.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import org.apache.jena.query.Query; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.util.Context; + +public interface QueryDispatcher { + QueryExec create(Query query, DatasetGraph dsg, Context context); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherOverRegistry.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherOverRegistry.java new file mode 100644 index 00000000000..8d95d4ac44d --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherOverRegistry.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import java.util.List; + +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryException; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.util.Context; + +/** + * {@link QueryDispatcher} that invokes all dispatchers registered with {@link QueryDispatcherRegistry} in a chain. + */ +public class QueryDispatcherOverRegistry + implements QueryDispatcher +{ + protected QueryDispatcherRegistry registry; + + /** Position in the chain */ + protected int pos; + + public QueryDispatcherOverRegistry(QueryDispatcherRegistry registry) { + this(registry, 0); + } + + public QueryDispatcherOverRegistry(QueryDispatcherRegistry registry, int pos) { + super(); + this.registry = registry; + this.pos = pos; + } + + protected ChainingQueryDispatcher getDispatcher() { + List queryDispatchers = registry.dispatchers(); + int n = queryDispatchers.size(); + if (pos >= n) { + throw new QueryException("No more elements in query dispatcher chain (pos=" + pos + ", chain size=" + n + ")"); + } + ChainingQueryDispatcher dispatcher = queryDispatchers.get(pos); + return dispatcher; + } + + @Override + public QueryExec create(Query query, DatasetGraph dsg, Context context) { + ChainingQueryDispatcher dispatcher = getDispatcher(); + QueryDispatcher next = new QueryDispatcherOverRegistry(registry, pos + 1); + QueryExec result = dispatcher.create(query, dsg, context, next); + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherRegistry.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherRegistry.java new file mode 100644 index 00000000000..ce013014f4b --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcherRegistry.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.jena.query.Query; +import org.apache.jena.sparql.ARQConstants; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.ChainingQueryDispatcherMain; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.QueryExecBuilder; +import org.apache.jena.sparql.util.Context; + +/** + * Registry of {@link ChainingQueryDispatcher} instances. + * Allows for plugging into the {@link QueryExecBuilder} creation process + * based on dataset and context. + * + * @see ChainingQueryDispatcher + * @since 6.1.0 + */ +public class QueryDispatcherRegistry { + List dispatchers = Collections.synchronizedList(new ArrayList<>()); + + // Singleton + private static QueryDispatcherRegistry registry; + static { init(); } + + static public QueryDispatcherRegistry get() { + return registry; + } + + /** If there is a QueryDispatcherRegistry in the context then return it otherwise yield the global instance */ + static public QueryDispatcherRegistry chooseRegistry(Context context) { + QueryDispatcherRegistry result = get(context); + if (result == null) { + result = get(); + } + return result; + } + + /** Get the QueryDispatcherRegistry from the context or null if there is none. + * Returns null if the context is null. */ + static public QueryDispatcherRegistry get(Context context) { + QueryDispatcherRegistry result = context == null + ? null + : context.get(ARQConstants.registryQueryDispachers); + return result; + } + + static public void set(Context context, QueryDispatcherRegistry registry) { + context.set(ARQConstants.registryQueryDispachers, registry); + } + + public QueryDispatcherRegistry copy() { + QueryDispatcherRegistry result = new QueryDispatcherRegistry(); + result.dispatchers.addAll(dispatchers); + return result; + } + + /** Create a copy of the registry from the context or return a new instance */ + public static QueryDispatcherRegistry copyFrom(Context context) { + QueryDispatcherRegistry tmp = get(context); + QueryDispatcherRegistry result = tmp != null + ? tmp.copy() + : new QueryDispatcherRegistry(); + return result; + } + + public QueryDispatcherRegistry() { } + + private static void init() { + registry = new QueryDispatcherRegistry(); + + registry.add(ChainingQueryDispatcherMain.get()); + } + + // ----- Query ----- + + /** Add a ChainingQueryDispatcher to the default registry. */ + public static void addDispatcher(ChainingQueryDispatcher f) { get().add(f); } + + /** Add a ChainingQueryDispatcher. */ + public void add(ChainingQueryDispatcher f) { + // Add to low end so that newer factories are tried first + dispatchers.add(0, f); + } + + /** Remove a ChainingQueryDispatcher from the default registry. */ + public static void removeDispatcher(ChainingQueryDispatcher f) { get().remove(f); } + + /** Remove a ChainingQueryDispatcher. */ + public void remove(ChainingQueryDispatcher f) { dispatchers.remove(f); } + + /** Allow careful manipulation of the dispatchers list */ + public List dispatchers() { return dispatchers; } + + /** Check whether a ChainingQueryDispatcher is registered in the default registry. */ + public static boolean containsDispatcher(ChainingQueryDispatcher f) { return get().contains(f); } + + /** Check whether a ChainingQueryDispatcher is already registered. */ + public boolean contains(ChainingQueryDispatcher f) { return dispatchers.contains(f); } + + public static QueryExec create(Query query, DatasetGraph dsg, Context context) { + QueryDispatcher dispatcher = new QueryDispatcherOverRegistry(registry); + QueryExec qe = dispatcher.create(query, dsg, context); + return qe; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcher.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcher.java new file mode 100644 index 00000000000..e7ad3768c21 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcher.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +public interface UpdateDispatcher { + UpdateExec create(UpdateRequest updateRequest, DatasetGraph dsg, Context context); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherOverRegistry.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherOverRegistry.java new file mode 100644 index 00000000000..a03d58d8d53 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherOverRegistry.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import java.util.List; + +import org.apache.jena.query.QueryException; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +/** + * {@link UpdateDispatcher} that invokes all dispatchers registered with {@link UpdateDispatcherRegistry} in a chain. + */ +public class UpdateDispatcherOverRegistry + implements UpdateDispatcher +{ + protected UpdateDispatcherRegistry registry; + + /** Position in the chain */ + protected int pos; + + public UpdateDispatcherOverRegistry(UpdateDispatcherRegistry registry) { + this(registry, 0); + } + + public UpdateDispatcherOverRegistry(UpdateDispatcherRegistry registry, int pos) { + super(); + this.registry = registry; + this.pos = pos; + } + + protected ChainingUpdateDispatcher getDispatcher() { + List updateDispatchers = registry.dispatchers(); + int n = updateDispatchers.size(); + if (pos >= n) { + throw new QueryException("No more elements in query dispatcher chain (pos=" + pos + ", chain size=" + n + ")"); + } + ChainingUpdateDispatcher dispatcher = updateDispatchers.get(pos); + return dispatcher; + } + + @Override + public UpdateExec create(UpdateRequest updateRequest, DatasetGraph dsg, Context context) { + ChainingUpdateDispatcher dispatcher = getDispatcher(); + UpdateDispatcher next = new UpdateDispatcherOverRegistry(registry, pos + 1); + UpdateExec result = dispatcher.create(updateRequest, dsg, context, next); + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherRegistry.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherRegistry.java new file mode 100644 index 00000000000..a57277b660e --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcherRegistry.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.engine.dispatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.jena.sparql.ARQConstants; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.ChainingUpdateDispatcherMain; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +/** + * Registry of {@link ChainingUpdateDispatcher} instances. + * Allows for plugging into the {@link UpdateExec} creation process + * based on dataset and context. + * + * @see ChainingUpdateDispatcher + * @since 6.1.0 + */ +public class UpdateDispatcherRegistry { + List dispatchers = Collections.synchronizedList(new ArrayList<>()); + + // Singleton + private static UpdateDispatcherRegistry registry; + static { init(); } + + static public UpdateDispatcherRegistry get() { + return registry; + } + + /** If there is a UpdateDispatcherRegistry in the context then return it otherwise yield the global instance */ + static public UpdateDispatcherRegistry chooseRegistry(Context context) { + UpdateDispatcherRegistry result = get(context); + if (result == null) { + result = get(); + } + return result; + } + + /** Get the UpdateDispatcherRegistry from the context or null if there is none. + * Returns null if the context is null. */ + static public UpdateDispatcherRegistry get(Context context) { + UpdateDispatcherRegistry result = context == null + ? null + : context.get(ARQConstants.registryUpdateDispatchers); + return result; + } + + static public void set(Context context, UpdateDispatcherRegistry registry) { + context.set(ARQConstants.registryUpdateDispatchers, registry); + } + + public UpdateDispatcherRegistry copy() { + UpdateDispatcherRegistry result = new UpdateDispatcherRegistry(); + result.dispatchers.addAll(dispatchers); + return result; + } + + /** Create a copy of the registry from the context or return a new instance */ + public static UpdateDispatcherRegistry copyFrom(Context context) { + UpdateDispatcherRegistry tmp = get(context); + UpdateDispatcherRegistry result = tmp != null + ? tmp.copy() + : new UpdateDispatcherRegistry(); + return result; + } + + public UpdateDispatcherRegistry() { } + + private static void init() { + registry = new UpdateDispatcherRegistry(); + registry.add(ChainingUpdateDispatcherMain.get()); + } + + /** Add an ChainingUpdateDispatcher to the default registry. */ + public static void addDispatcher(ChainingUpdateDispatcher f) { get().add(f); } + + /** Add an ChainingUpdateDispatcher. */ + public void add(ChainingUpdateDispatcher f) { + // Add to low end so that newer factories are tried first + dispatchers.add(0, f); + } + + /** Remove an ChainingUpdateDispatcher from the default registry. */ + public static void removeDispatcher(ChainingUpdateDispatcher f) { get().remove(f); } + + /** Remove an ChainingUpdateDispatcher. */ + public void remove(ChainingUpdateDispatcher f) { dispatchers.remove(f); } + + /** Allow careful manipulation of the factories list */ + public List dispatchers() { return dispatchers; } + + /** Check whether an ChainingUpdateDispatcher is already registered in the default registry */ + public static boolean containsDispatcher(ChainingUpdateDispatcher f) { return get().contains(f); } + + /** Check whether an ChainingUpdateDispatcher is already registered. */ + public boolean contains(ChainingUpdateDispatcher f) { return dispatchers.contains(f); } + + public static UpdateExec create(UpdateRequest updateRequest, DatasetGraph dsg, Context context) { + UpdateDispatcher Dispatcher = new UpdateDispatcherOverRegistry(registry); + UpdateExec ue = Dispatcher.create(updateRequest, dsg, context); + return ue; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingQueryDispatcherMain.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingQueryDispatcherMain.java new file mode 100644 index 00000000000..b693f69e0eb --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingQueryDispatcherMain.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec; + +import org.apache.jena.atlas.logging.Log; +import org.apache.jena.query.Query; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.QueryEngineFactory; +import org.apache.jena.sparql.engine.QueryEngineRegistry; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; +import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.dispatch.ChainingQueryDispatcher; +import org.apache.jena.sparql.engine.dispatch.QueryDispatcher; +import org.apache.jena.sparql.util.Context; + +public class ChainingQueryDispatcherMain + implements ChainingQueryDispatcher +{ + private static final ChainingQueryDispatcherMain INSTANCE = new ChainingQueryDispatcherMain(); + public static ChainingQueryDispatcher get() { + return INSTANCE; + } + + private ChainingQueryDispatcherMain() {} + + @Override + public QueryExec create(Query queryActual, DatasetGraph dataset, Context cxt, QueryDispatcher chain) { + QueryEngineFactory qeFactory = QueryEngineRegistry.findFactory(queryActual, dataset, cxt); + if ( qeFactory == null ) { + Log.warn(QueryExecDatasetBuilder.class, "Failed to find a QueryEngineFactory"); + return null; + } + + TimeoutBuilderImpl timeoutBuilder = new TimeoutBuilderImpl(); + Timeouts.applyDefaultQueryTimeoutFromContext(timeoutBuilder, cxt); + Timeout timeout = timeoutBuilder.build(); + + String queryStringActual = null; + Binding initialBinding = null; + QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual, dataset, cxt, qeFactory, + timeout, initialBinding); + return qExec; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingUpdateDispatcherMain.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingUpdateDispatcherMain.java new file mode 100644 index 00000000000..7968e44d25f --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/ChainingUpdateDispatcherMain.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.sparql.exec; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.Timeouts; +import org.apache.jena.sparql.engine.Timeouts.Timeout; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.dispatch.ChainingUpdateDispatcher; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcher; +import org.apache.jena.sparql.modify.UpdateEngineFactory; +import org.apache.jena.sparql.modify.UpdateEngineRegistry; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateException; +import org.apache.jena.update.UpdateRequest; + +public class ChainingUpdateDispatcherMain + implements ChainingUpdateDispatcher +{ + private static final ChainingUpdateDispatcherMain INSTANCE = new ChainingUpdateDispatcherMain(); + public static ChainingUpdateDispatcher get() { + return INSTANCE; + } + + private ChainingUpdateDispatcherMain() {} + + @Override + public UpdateExec create(UpdateRequest updateRequest, DatasetGraph dataset, Context cxt, UpdateDispatcher chain) { + UpdateRequest actualUpdate = updateRequest; + + UpdateEngineFactory f = UpdateEngineRegistry.get().find(dataset, cxt); + if ( f == null ) + throw new UpdateException("Failed to find an UpdateEngine"); + + Timeout timeout = Timeouts.extractUpdateTimeout(cxt); + Binding initialBinding = null; + UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f, timeout); + return uExec; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java index 94746d0567a..a779f051e7a 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDatasetBuilder.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; -import org.apache.jena.atlas.logging.Log; import org.apache.jena.graph.Graph; import org.apache.jena.graph.Node; import org.apache.jena.query.*; @@ -34,12 +33,11 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.DatasetGraphFactory; import org.apache.jena.sparql.core.Var; -import org.apache.jena.sparql.engine.QueryEngineFactory; -import org.apache.jena.sparql.engine.QueryEngineRegistry; -import org.apache.jena.sparql.engine.binding.Binding; import org.apache.jena.sparql.engine.Timeouts; import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.dispatch.QueryDispatcherRegistry; import org.apache.jena.sparql.syntax.syntaxtransform.QueryTransformOps; import org.apache.jena.sparql.util.Context; import org.apache.jena.sparql.util.ContextAccumulator; @@ -189,12 +187,6 @@ public QueryExec build() { query.ensureResultVars(); Context cxt = getContext(); - QueryEngineFactory qeFactory = QueryEngineRegistry.findFactory(query, dataset, cxt); - if ( qeFactory == null ) { - Log.warn(QueryExecDatasetBuilder.class, "Failed to find a QueryEngineFactory"); - return null; - } - // Initial bindings / parameterized query Query queryActual = query; String queryStringActual = queryString; @@ -204,17 +196,22 @@ public QueryExec build() { queryStringActual = null; } - Timeouts.applyDefaultQueryTimeoutFromContext(this.timeoutBuilder, cxt); - if ( dataset != null ) cxt.set(ARQConstants.sysCurrentDataset, DatasetFactory.wrap(dataset)); if ( queryActual != null ) cxt.set(ARQConstants.sysCurrentQuery, queryActual); + // Place the effective timeout into the context + Timeouts.applyDefaultQueryTimeoutFromContext(this.timeoutBuilder, cxt); Timeout timeout = timeoutBuilder.build(); + Timeouts.setQueryTimeout(cxt, timeout); + + if ( dataset != null ) + cxt.set(ARQConstants.sysCurrentDataset, DatasetFactory.wrap(dataset)); + if ( queryActual != null ) + cxt.set(ARQConstants.sysCurrentQuery, queryActual); - QueryExec qExec = new QueryExecDataset(queryActual, queryStringActual, dataset, cxt, qeFactory, - timeout, initialBinding); + QueryExec qExec = QueryDispatcherRegistry.create(queryActual, dataset, cxt); return qExec; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java index 99731723ab6..4784ff91bde 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/UpdateExecDatasetBuilder.java @@ -30,9 +30,11 @@ import org.apache.jena.query.ARQ; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.Var; -import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.Timeouts; import org.apache.jena.sparql.engine.Timeouts.Timeout; import org.apache.jena.sparql.engine.Timeouts.TimeoutBuilderImpl; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcherRegistry; import org.apache.jena.sparql.modify.UpdateEngineFactory; import org.apache.jena.sparql.modify.UpdateEngineRegistry; import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps; @@ -89,7 +91,7 @@ public UpdateExecDatasetBuilder update(String updateRequestString) { /** Hint has no effect on update execs over datasets. */ @Override - public UpdateExecBuilder parseCheck(boolean parseCheck) { + public UpdateExecDatasetBuilder parseCheck(boolean parseCheck) { return this; } @@ -166,9 +168,11 @@ public UpdateExec build() { if ( f == null ) throw new UpdateException("Failed to find an UpdateEngine"); + Timeouts.applyDefaultUpdateTimeoutFromContext(timeoutBuilder, cxt); Timeout timeout = timeoutBuilder.build(); + Timeouts.setUpdateTimeout(cxt, timeout); - UpdateExec uExec = new UpdateExecDataset(actualUpdate, dataset, initialBinding, cxt, f, timeout); + UpdateExec uExec = UpdateDispatcherRegistry.create(actualUpdate, dataset, cxt); return uExec; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingQueryDispatcherExecTracker.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingQueryDispatcherExecTracker.java new file mode 100644 index 00000000000..543b06c7fd9 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingQueryDispatcherExecTracker.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Optional; + +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryType; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.dispatch.ChainingQueryDispatcher; +import org.apache.jena.sparql.engine.dispatch.QueryDispatcher; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.util.Context; + +public class ChainingQueryDispatcherExecTracker + implements ChainingQueryDispatcher +{ + @Override + public QueryExec create(Query query, DatasetGraph dataset, Context context, QueryDispatcher chain) { + QueryExec delegateExec = chain.create(query, dataset, context); + + String queryStr = Optional.ofNullable(delegateExec.getQuery()).map(Object::toString) + .orElse(delegateExec.getQueryString()); + ThrowableTracker throwableTracker = new ThrowableTrackerFirst(); + + return new QueryExecTracked<>(delegateExec, throwableTracker) { + @Override + public void beforeExec(QueryType queryType) { + System.out.println("Query execution started: " + queryStr); + super.beforeExec(queryType); + } + @Override + public void afterExec() { + Optional t = this.getThrowableTracker().getFirstThrowable(); + if (t.isPresent()) { + System.out.println("Query execution completed WITH EXCEPTION: " + queryStr + " " + t.get()); + } else { + System.out.println("Query execution completed successfully: " + queryStr); + } + } + }; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingUpdateExecBuilderFactoryExecTracker.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingUpdateExecBuilderFactoryExecTracker.java new file mode 100644 index 00000000000..859a201fb87 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ChainingUpdateExecBuilderFactoryExecTracker.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Optional; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.dispatch.ChainingUpdateDispatcher; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcher; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +public class ChainingUpdateExecBuilderFactoryExecTracker + implements ChainingUpdateDispatcher +{ + @Override + public UpdateExec create(UpdateRequest updateRequest, DatasetGraph dataset, Context context, UpdateDispatcher chain) { + UpdateExec delegateExec = chain.create(updateRequest, dataset, context); + + String updateStr = Optional.ofNullable(delegateExec.getUpdateRequest()).map(Object::toString) + .orElse(delegateExec.getUpdateRequestString()); + return new UpdateExecWrapper<>(delegateExec) { + @Override + public void execute() { + System.out.println("Update started: " + updateStr); + try { + super.execute(); + } catch (Exception e) { + System.out.println("Update completed WITH EXCEPTION: " + updateStr + " " + e); + throw e; + } + System.out.println("Update completed successfully: " + updateStr); + } + }; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ExampleExecTrackerCore.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ExampleExecTrackerCore.java new file mode 100644 index 00000000000..d4577383241 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ExampleExecTrackerCore.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import org.apache.jena.query.ReadWrite; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphFactory; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.expr.NodeValue; +import org.apache.jena.sparql.function.FunctionBase0; +import org.apache.jena.sparql.function.FunctionRegistry; +import org.apache.jena.sys.JenaSystem; +import org.apache.jena.system.AutoTxn; +import org.apache.jena.system.Txn; + +public class ExampleExecTrackerCore { + + static { JenaSystem.init(); } + + public static class FunctionFail extends FunctionBase0 { + @Override + public NodeValue exec() { + throw new RuntimeException("Simulated failure"); + } + } + + public static void main(String[] args) { + // Already registered in InitExecTracker: + // - QueryDispatcherRegistry.addDispatcher(new ChainingQueryDispatcherExecTracker()); + // - UpdateDispatcherRegistry.addDispatcher(new ChainingUpdateExecBuilderFactoryExecTracker()); + + FunctionRegistry.get().put("urn:fail", FunctionFail.class); + + DatasetGraph dsg = DatasetGraphFactory.create(); + // DatasetGraph dsg = TDB2Factory.createDataset().asDatasetGraph(); + try (AutoTxn txn = Txn.autoTxn(dsg, ReadWrite.WRITE)) { + QueryExec.newBuilder() + .dataset(dsg) + .query("SELECT * { ?s ?p ?o }") + .table(); + + UpdateExec.newBuilder() + .dataset(dsg) + .update("PREFIX eg: INSERT DATA { eg:s eg:p eg:o }") + .execute(); + + // Will fail: + try { + UpdateExec.newBuilder() + .dataset(DatasetGraphFactory.empty()) + .update("PREFIX eg: INSERT DATA { eg:s eg:p eg:o }") + .execute(); + } catch (Exception e) { + // Ignore + } + + // Will fail: + try { + QueryExec.newBuilder() + .dataset(dsg) + .query("SELECT (() AS ?x) { }") + .table(); + } catch (Exception e) { + // Ignore + } + + txn.commit(); + } + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/InitExecTrackerCore.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/InitExecTrackerCore.java new file mode 100644 index 00000000000..1967a469f5a --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/InitExecTrackerCore.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import org.apache.jena.sparql.engine.dispatch.QueryDispatcherRegistry; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcherRegistry; +import org.apache.jena.sys.JenaSubsystemLifecycle ; + +public class InitExecTrackerCore implements JenaSubsystemLifecycle { + @Override + public void start() { + QueryDispatcherRegistry.addDispatcher(new ChainingQueryDispatcherExecTracker()); + UpdateDispatcherRegistry.addDispatcher(new ChainingUpdateExecBuilderFactoryExecTracker()); + } + + @Override + public void stop() {} + + @Override + public int level() { + return 40 ; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/IteratorTracked.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/IteratorTracked.java new file mode 100644 index 00000000000..8928b8577cc --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/IteratorTracked.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Iterator; +import java.util.Objects; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.apache.jena.atlas.iterator.IteratorWrapper; + +/** + * Iterator wrapper that forwards an encountered exception + * to a configured destination. + */ +public class IteratorTracked + extends IteratorWrapper +{ + protected ThrowableTracker tracker; + + public IteratorTracked(Iterator iterator, ThrowableTracker tracker) { + super(iterator); + this.tracker = Objects.requireNonNull(tracker); + } + + @Override + public boolean hasNext() { + return trackBoolean(tracker, get()::hasNext); + } + + @Override + public T next() { + return track(tracker, get()::next); + } + + @Override + public void forEachRemaining(Consumer action) { + trackForEachRemaining(tracker, get(), action); + } + + public static boolean trackBoolean(ThrowableTracker tracker, BooleanSupplier action) { + try { + boolean result = action.getAsBoolean(); + return result; + } catch (Throwable t) { + tracker.report(t); + t.addSuppressed(new RuntimeException("Error during hasNext.")); + throw t; + } + } + + public static T track(ThrowableTracker tracker, Supplier action) { + try { + T result = action.get(); + return result; + } catch (Throwable t) { + tracker.report(t); + t.addSuppressed(new RuntimeException("Error during hasNext.")); + throw t; + } + } + + public static void trackForEachRemaining(ThrowableTracker tracker, Iterator it, Consumer action) { + try { + it.forEachRemaining(action); + } catch (Throwable t) { + tracker.report(t); + t.addSuppressed(new RuntimeException("Error during forEachRemaining.")); + throw t; + } + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecModWrapper.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecModWrapper.java new file mode 100644 index 00000000000..737e13d70e1 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecModWrapper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.concurrent.TimeUnit; + +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.QueryExecMod; +import org.apache.jena.sparql.util.Context; + +public class QueryExecModWrapper + implements QueryExecMod +{ + protected T delegate; + + public QueryExecModWrapper(T delegate) { + super(); + this.delegate = delegate; + } + + protected T getDelegate() { + return delegate; + } + + @SuppressWarnings("unchecked") + public X self() { + return (X)this; + } + + @Override + public X timeout(long timeout) { + getDelegate().timeout(timeout, TimeUnit.MILLISECONDS); + return self(); + } + + @Override + public X timeout(long timeout, TimeUnit timeoutUnits) { + getDelegate().timeout(timeout, TimeUnit.MILLISECONDS); + return self(); + } + + @Override + public X initialTimeout(long timeout, TimeUnit timeUnit) { + getDelegate().initialTimeout(timeout, timeUnit); + return self(); + } + + @Override + public X overallTimeout(long timeout, TimeUnit timeUnit) { + getDelegate().overallTimeout(timeout, timeUnit); + return self(); + } + + @Override + public Context getContext() { + return getDelegate().getContext(); + } + + @Override + public QueryExec build() { + return getDelegate().build(); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecTracked.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecTracked.java new file mode 100644 index 00000000000..a2aff66ad6f --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/QueryExecTracked.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Iterator; +import java.util.function.Supplier; + +import org.apache.jena.atlas.json.JsonArray; +import org.apache.jena.atlas.json.JsonObject; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Triple; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryType; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.Quad; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.RowSet; +import org.apache.jena.sparql.util.Context; + +/** + * Wrapper for QueryExec that tracks any encountered exception. + * This is accomplished by wrapping RowSets and Iterators of the underlying QueryExec. + */ +public abstract class QueryExecTracked implements QueryExec +{ + private T delegate; + private ThrowableTracker throwableTracker; + private QueryType queryExecType = null; + + public QueryExecTracked(T delegate, ThrowableTracker tracker) { + super(); + this.delegate = delegate; + this.throwableTracker = tracker; + } + + protected T getDelegate() { + return delegate; + } + + protected ThrowableTracker getThrowableTracker() { + return throwableTracker; + } + + /** + * The query type requested for execution. + * For example, calling select() sets this type to {@link QueryType#SELECT}. + */ + public QueryType getQueryExecType() { + return queryExecType; + } + + @Override + public Context getContext() { + return getDelegate().getContext(); + } + + @Override + public Query getQuery() { + return getDelegate().getQuery(); + } + + @Override + public String getQueryString() { + return getDelegate().getQueryString(); + } + + @Override + public void close() { + try { + getDelegate().close(); + } finally { + afterExec(); + } + } + + @Override + public boolean isClosed() { + return getDelegate().isClosed(); + } + + @Override + public void abort() { + getDelegate().abort(); + } + + public void beforeExec(QueryType queryType) { + this.queryExecType = queryType; + } + + /** Note that afterExec is run only on close. */ + public void afterExec() { + } + + @Override + public RowSet select() { + return compute(QueryType.SELECT, () -> wrapRowSet(getDelegate().select())); + } + + @Override + public Graph construct() { + return compute(QueryType.CONSTRUCT, () -> getDelegate().construct()); + } + + @Override + public Graph construct(Graph graph) { + return compute(QueryType.CONSTRUCT, () -> getDelegate().construct(graph)); + } + + @Override + public Graph describe() { + return compute(QueryType.DESCRIBE, () -> getDelegate().describe()); + } + + @Override + public Graph describe(Graph graph) { + return compute(QueryType.DESCRIBE, () -> getDelegate().describe(graph)); + } + + @Override + public boolean ask() { + return compute(QueryType.ASK, () -> getDelegate().ask()); + } + + @Override + public Iterator constructTriples() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().constructTriples())); + } + + @Override + public Iterator describeTriples() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().describeTriples())); + } + + @Override + public Iterator constructQuads() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().constructQuads())); + } + + @Override + public DatasetGraph constructDataset() { + return compute(QueryType.CONSTRUCT, () -> getDelegate().constructDataset()); + } + + @Override + public DatasetGraph constructDataset(DatasetGraph dataset) { + return compute(QueryType.CONSTRUCT, () -> getDelegate().constructDataset(dataset)); + } + + @Override + public JsonArray execJson() { + return compute(QueryType.CONSTRUCT_JSON, () -> getDelegate().execJson()); + } + + @Override + public Iterator execJsonItems() { + return compute(QueryType.CONSTRUCT_JSON, () -> wrapIterator(getDelegate().execJsonItems())); + } + + @Override + public DatasetGraph getDataset() { + return getDelegate().getDataset(); + } + + protected RowSet wrapRowSet(RowSet base) { + return new RowSetTracked(base, getThrowableTracker()); + } + + protected Iterator wrapIterator(Iterator base) { + return new IteratorTracked<>(base, getThrowableTracker()); + } + + protected X compute(QueryType queryType, Supplier supplier) { + beforeExec(queryType); + try { + X result = supplier.get(); + return result; + } catch(Throwable e) { + e.addSuppressed(new RuntimeException("Error during select().")); + throwableTracker.report(e); + throw e; + } + // afterExec is called during close() + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/RowSetTracked.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/RowSetTracked.java new file mode 100644 index 00000000000..67d69cb6e21 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/RowSetTracked.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +import org.apache.jena.riot.rowset.RowSetWrapper; +import org.apache.jena.sparql.core.Var; +import org.apache.jena.sparql.engine.binding.Binding; +import org.apache.jena.sparql.exec.RowSet; + +/** RowSetWrapper that tracks any encountered exceptions in the provided tracker. */ +public class RowSetTracked + extends RowSetWrapper +{ + protected ThrowableTracker tracker; + + public RowSetTracked(RowSet other, ThrowableTracker tracker) { + super(other); + this.tracker = Objects.requireNonNull(tracker); + } + + public ThrowableTracker getTracker() { + return tracker; + } + + @Override + public boolean hasNext() { + return IteratorTracked.trackBoolean(tracker, get()::hasNext); + } + + @Override + public Binding next() { + return IteratorTracked.track(tracker, get()::next); + } + + @Override + public List getResultVars() { + return IteratorTracked.track(tracker, get()::getResultVars); + } + + @Override + public void forEachRemaining(Consumer action) { + IteratorTracked.trackForEachRemaining(tracker, get(), action); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTracker.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTracker.java new file mode 100644 index 00000000000..4c4414149d1 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTracker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Iterator; +import java.util.Optional; + +public interface ThrowableTracker { + void report(Throwable throwable); + Iterator getThrowables(); + + default Optional getFirstThrowable() { + Iterator it = getThrowables(); + Throwable throwable = it.hasNext() ? it.next() : null; + return Optional.ofNullable(throwable); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTrackerFirst.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTrackerFirst.java new file mode 100644 index 00000000000..d51be846780 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/ThrowableTrackerFirst.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** Throwable tracker that only stores the first encountered exception. */ +public class ThrowableTrackerFirst + implements ThrowableTracker +{ + protected Throwable throwable = null; + + @Override + public void report(Throwable throwable) { + if (this.throwable == null) { + this.throwable = throwable; + } + // Ignore any throwables after the first + } + + @Override + public Iterator getThrowables() { + return throwable == null ? Collections.emptyIterator() : List.of(throwable).iterator(); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateExecWrapper.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateExecWrapper.java new file mode 100644 index 00000000000..d626fad71b8 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateExecWrapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import org.apache.jena.sparql.exec.UpdateExec; + +public class UpdateExecWrapper + extends UpdateProcessorWrapper + implements UpdateExec +{ + public UpdateExecWrapper(T delegate) { + super(delegate); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateProcessorWrapper.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateProcessorWrapper.java new file mode 100644 index 00000000000..405d3fd25a0 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/core/UpdateProcessorWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.core; + +import org.apache.jena.update.UpdateProcessor; +import org.apache.jena.update.UpdateRequest; + +public class UpdateProcessorWrapper + implements UpdateProcessor +{ + private T delegate; + + public UpdateProcessorWrapper(T delegate) { + super(); + this.delegate = delegate; + } + + protected T getDelegate() { + return delegate; + } + + @Override + public UpdateRequest getUpdateRequest() { + return getDelegate().getUpdateRequest(); + } + + @Override + public String getUpdateRequestString() { + return getDelegate().getUpdateRequestString(); + } + + @Override + public void execute() { + getDelegate().execute(); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskExec.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskExec.java new file mode 100644 index 00000000000..5548c90fdc3 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskExec.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.sparql.engine.iterator.Abortable; + +/** + * Interface for tasks that can execute once - i.e. not periodic ones. + * Combines task information with cancellation. + */ +public interface BasicTaskExec + extends BasicTaskInfo, Abortable // XXX Abortable is in iterator package - not ideal. +{ +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskInfo.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskInfo.java new file mode 100644 index 00000000000..440ba1c88f4 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/BasicTaskInfo.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.time.Instant; +import java.util.Optional; + +public interface BasicTaskInfo { + /** The state of the task. */ + TaskState getTaskState(); + + /** Time stamp for when the task object was created. */ + Instant getCreationTime(); + + /** Time stamp for when the task was started. Returns -1 if was not started yet.*/ + Optional getStartTime(); + + /** + * Time stamp for when the task completed. Returns -1 if it has not finished yet. + */ + Optional getFinishTime(); + + /** Time stamp for when the task was cancelled. Returns -1 if not aborted. */ + Optional getAbortTime(); + + /** + * Return a description suitable for presentation to users. + * This might be a less technical description than what is returned by toString(). + */ + String getLabel(); + + String getStatusMessage(); + + /** + * If this method returns a non-null result then the task is considered to be failing or to have failed. + * A non-null result does not imply that the task has already reached TERMINATED state. + */ + Optional getThrowable(); + + /** Whether abort has been called. */ + default boolean isAborting() { + return getAbortTime().isPresent(); + } + + default boolean isTerminated() { + TaskState state = getTaskState(); + return TaskState.TERMINATED.equals(state); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingQueryDispatcherExecTrackerSystem.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingQueryDispatcherExecTrackerSystem.java new file mode 100644 index 00000000000..2010584459c --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingQueryDispatcherExecTrackerSystem.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.query.Query; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.dispatch.ChainingQueryDispatcher; +import org.apache.jena.sparql.engine.dispatch.QueryDispatcher; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.util.Context; + +public class ChainingQueryDispatcherExecTrackerSystem + implements ChainingQueryDispatcher +{ + @Override + public QueryExec create(Query query, DatasetGraph dsg, Context context, QueryDispatcher chain) { + QueryExec delegate = chain.create(query, dsg, context); + QueryExec result = TaskEventBroker.track(context, delegate); + + // Remove event broker from dispatcher context as to avoid tracking possible nested executions. + TaskEventBroker.remove(context); + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingUpdateDispatcherExecTrackerSystem.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingUpdateDispatcherExecTrackerSystem.java new file mode 100644 index 00000000000..aeee182da71 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ChainingUpdateDispatcherExecTrackerSystem.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.engine.dispatch.ChainingUpdateDispatcher; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcher; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.update.UpdateRequest; + +public class ChainingUpdateDispatcherExecTrackerSystem + implements ChainingUpdateDispatcher +{ + @Override + public UpdateExec create(UpdateRequest updateRequest, DatasetGraph dsg, Context context, UpdateDispatcher chain) { + UpdateExec delegate = chain.create(updateRequest, dsg, context); + UpdateExec result = TaskEventBroker.track(context, delegate); + + // Remove event broker from dispatcher context as to avoid tracking possible nested executions. + TaskEventBroker.remove(context); + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ExampleExecTrackerSystem.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ExampleExecTrackerSystem.java new file mode 100644 index 00000000000..4e1e45b1e9c --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/ExampleExecTrackerSystem.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; + +import org.apache.jena.query.ARQ; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphFactory; +import org.apache.jena.sparql.engine.dispatch.QueryDispatcherRegistry; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcherRegistry; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.expr.NodeValue; +import org.apache.jena.sparql.function.FunctionBase0; +import org.apache.jena.sparql.function.FunctionRegistry; +import org.apache.jena.sys.JenaSystem; +import org.apache.jena.system.AutoTxn; +import org.apache.jena.system.Txn; + +public class ExampleExecTrackerSystem { + + static { JenaSystem.init(); } + + public static class FunctionFail extends FunctionBase0 { + @Override + public NodeValue exec() { + throw new RuntimeException("Simulated failure"); + } + } + + public static void main(String[] args) { + // Already registered in InitExecTrackerSystem: + QueryDispatcherRegistry.addDispatcher(new ChainingQueryDispatcherExecTrackerSystem()); + UpdateDispatcherRegistry.addDispatcher(new ChainingUpdateDispatcherExecTrackerSystem()); + + TaskEventBroker broker = TaskEventBroker.getOrCreate(ARQ.getContext()); + TaskEventHistory history = TaskEventHistory.getOrCreate(ARQ.getContext()); + + // Make the history listen to the event broker. + history.connect(broker); + + FunctionRegistry.get().put("urn:fail", FunctionFail.class); + + DatasetGraph dsg = DatasetGraphFactory.create(); + // DatasetGraph dsg = TDB2Factory.createDataset().asDatasetGraph(); + try (AutoTxn txn = Txn.autoTxn(dsg, ReadWrite.WRITE)) { + QueryExec.newBuilder() + .dataset(dsg) + .query("SELECT * { ?s ?p ?o }") + .table(); + + UpdateExec.newBuilder() + .dataset(dsg) + .update("PREFIX eg: INSERT DATA { eg:s eg:p eg:o }") + .execute(); + + // Will fail: + try { + UpdateExec.newBuilder() + .dataset(DatasetGraphFactory.empty()) + .update("PREFIX eg: INSERT DATA { eg:s eg:p eg:o }") + .execute(); + } catch (Exception e) { + // Ignore + } + + // Will fail: + try { + QueryExec.newBuilder() + .dataset(dsg) + .query("SELECT (() AS ?x) { }") + .table(); + } catch (Exception e) { + // Ignore + } + + txn.commit(); + } + + for (var e : history.getHistory()) { + BasicTaskExec taskExec = e.getValue(); + String label = taskExec.getLabel().replaceAll("\n", "↵"); + + System.out.println("----------------------"); + System.out.println("Label: " + label); + System.out.println("Created: " + taskExec.getCreationTime()); + System.out.println("Started: " + taskExec.getStartTime().map(ExampleExecTrackerSystem::formatInstant).orElse("n/a")); + System.out.println("Aborted: " + taskExec.getAbortTime().map(ExampleExecTrackerSystem::formatInstant).orElse("n/a")); + System.out.println("Finished: " + taskExec.getFinishTime().map(ExampleExecTrackerSystem::formatInstant).orElse("n/a")); + System.out.println("Error: " + taskExec.getThrowable().map(Throwable::getMessage).orElse("n/a")); + } + } + + private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS z"); + + private static String formatInstant(Instant instant) { + ZoneId zone = ZoneId.of("UTC"); + ZonedDateTime zdt = instant.atZone(zone); + String str = formatter.format(zdt); + return str; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/InitExecTrackerSystem.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/InitExecTrackerSystem.java new file mode 100644 index 00000000000..07a7185917e --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/InitExecTrackerSystem.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.sparql.engine.dispatch.QueryDispatcherRegistry; +import org.apache.jena.sparql.engine.dispatch.UpdateDispatcherRegistry; +import org.apache.jena.sys.JenaSubsystemLifecycle ; + +public class InitExecTrackerSystem implements JenaSubsystemLifecycle { + @Override + public void start() { + QueryDispatcherRegistry.addDispatcher(new ChainingQueryDispatcherExecTrackerSystem()); + UpdateDispatcherRegistry.addDispatcher(new ChainingUpdateDispatcherExecTrackerSystem()); + } + + @Override + public void stop() {} + + @Override + public int level() { + return 40 ; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTask.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTask.java new file mode 100644 index 00000000000..53738cdd9c2 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTask.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; + +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryType; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.tracker.core.ThrowableTrackerFirst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wraps a QueryExec and tracks its execution and any error in its machinery. + * This includes obtained RowSets and Iterators. + */ +public class QueryExecTask + extends QueryExecTaskBase // + implements BasicTaskExec +{ + private static final Logger logger = LoggerFactory.getLogger(QueryExecTask.class); + + protected TaskListener listener; + protected Instant creationTime; + protected Instant startTime = null; + protected Instant abortTime = null; + protected Instant finishTime = null; + protected TaskState currentState = TaskState.CREATED; + + /** + * Note: The constructor does not notify the listener with the creation event. + * This has to be done externally, such as using {@link #create(QueryExec, TaskListener)}. + */ + protected QueryExecTask(QueryExec delegate, Instant creationTime, TaskListener listener) { + super(delegate, new ThrowableTrackerFirst()); + this.listener = listener; + this.creationTime = creationTime; + } + + /** Wrap a QueryExec and notify the listener with the creation event using the current time. */ + public static QueryExecTask create(QueryExec delegate, TaskListener listener) { + Instant creationTime = Instant.now(); + return create(delegate, creationTime, listener); + } + + /** Wrap a QueryExec and notify the listener with the creation event using the given time. */ + public static QueryExecTask create(QueryExec delegate, Instant creationTime, TaskListener listener) { + Objects.requireNonNull(delegate); + Objects.requireNonNull(listener); + QueryExecTask result = new QueryExecTask(delegate, creationTime, listener); + listener.onStateChange(result); + return result; + } + + @Override + public TaskState getTaskState() { + return currentState; + } + + @Override + public Instant getCreationTime() { + return creationTime; + } + + @Override + public Optional getAbortTime() { + return Optional.ofNullable(abortTime); + } + + @Override + public Optional getStartTime() { + return Optional.ofNullable(startTime); + } + + @Override + public Optional getFinishTime() { + return Optional.ofNullable(finishTime); + } + + @Override + public Optional getThrowable() { + return getThrowableTracker().getFirstThrowable(); + } + + @Override + public String getLabel() { + Query query = getQuery(); + String result; + if (query == null) { + result = getQueryString(); + if (result == null) { + result = "Unknown query"; + } + } else { + result = query.toString(); + } + return result; + } + + @Override + public void abort() { + if (!isAborting()) { + this.abortTime = Instant.now(); + super.abort(); + } + } + + @Override + public void beforeExec(QueryType queryType) { + if (!TaskState.CREATED.equals(currentState)) { + throw new IllegalStateException("Already started."); + } + + startTime = Instant.now(); + transition(TaskState.STARTING, () -> {}); + transition(TaskState.RUNNING, () -> super.beforeExec(queryType)); + } + + @Override + public void afterExec() { + try { + transition(TaskState.TERMINATING, () -> {}); + } finally { + try { + super.afterExec(); + } finally { + updateFinishTime(); + advertiseStateChange(TaskState.TERMINATED); + } + } + } + + protected void updateFinishTime() { + if (finishTime == null) { + finishTime = Instant.now(); + } + } + + /** Update state notifies all listeners of the change. */ + protected void advertiseStateChange(TaskState newState) { + Objects.requireNonNull(newState); + if (currentState == null || newState.ordinal() > currentState.ordinal()) { + // State oldState = currentState; + currentState = newState; + if (listener != null) { + try { + listener.onStateChange(this); + } catch (Throwable e) { + logger.warn("Exception raised in listener.", e); + } + } + } + } + + /** + * Run the given action. + * + * On success, transitions to the specified target state. + * + * On failure, transitions to {@link TaskState#TERMINATING} and re-throws the encountered exception. + * This should cause a subsequent call to close() which transitions to {@link TaskState#TERMINATED}. + */ + protected void transition(TaskState targetState, Runnable action) { + try { + action.run(); + advertiseStateChange(targetState); + } catch (Throwable throwable) { + throwable.addSuppressed(new RuntimeException("Failure transitioning from " + currentState + " to " + targetState + ".", throwable)); + getThrowableTracker().report(throwable); + advertiseStateChange(TaskState.TERMINATING); + throw throwable; + } + } + + @Override + public String getStatusMessage() { + return ""; + } + + @Override + public String toString() { + return "QueryExecTask [startTime=" + getStartTime() + + ", finishTime=" + getFinishTime() + ", getThrowable=" + getThrowable() + ", queryExecType=" + getQueryExecType() + // Queries excluded because they make the string less readable. + // + ", query=" + getQuery() + ", queryString=" + getQueryString() + + ", delegate=" + getDelegate() + "]"; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTaskBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTaskBase.java new file mode 100644 index 00000000000..47bf408dd68 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/QueryExecTaskBase.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.util.Iterator; +import java.util.function.Supplier; + +import org.apache.jena.atlas.json.JsonArray; +import org.apache.jena.atlas.json.JsonObject; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Triple; +import org.apache.jena.query.Query; +import org.apache.jena.query.QueryType; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.Quad; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.RowSet; +import org.apache.jena.sparql.exec.tracker.core.IteratorTracked; +import org.apache.jena.sparql.exec.tracker.core.RowSetTracked; +import org.apache.jena.sparql.exec.tracker.core.ThrowableTracker; +import org.apache.jena.sparql.util.Context; + +/** + * Wrapper for QueryExec that tracks any encountered exception. + * This is accomplished by wrapping RowSets and Iterators of the underlying QueryExec. + */ +public abstract class QueryExecTaskBase implements QueryExec +{ + private T delegate; + private ThrowableTracker throwableTracker; + private QueryType queryExecType = null; + + public QueryExecTaskBase(T delegate, ThrowableTracker tracker) { + super(); + this.delegate = delegate; + this.throwableTracker = tracker; + } + + protected T getDelegate() { + return delegate; + } + + protected ThrowableTracker getThrowableTracker() { + return throwableTracker; + } + + /** + * The query type requested for execution. + * For example, calling select() sets this type to {@link QueryType#SELECT}. + */ + public QueryType getQueryExecType() { + return queryExecType; + } + + @Override + public Context getContext() { + return getDelegate().getContext(); + } + + @Override + public Query getQuery() { + return getDelegate().getQuery(); + } + + @Override + public String getQueryString() { + return getDelegate().getQueryString(); + } + + @Override + public void close() { + try { + getDelegate().close(); + } finally { + afterExec(); + } + } + + @Override + public boolean isClosed() { + return getDelegate().isClosed(); + } + + @Override + public void abort() { + getDelegate().abort(); + } + + public void beforeExec(QueryType queryType) { + this.queryExecType = queryType; + } + + /** Note that afterExec is run only on close. */ + public void afterExec() { + } + + @Override + public RowSet select() { + return compute(QueryType.SELECT, () -> wrapRowSet(getDelegate().select())); + } + + @Override + public Graph construct() { + return compute(QueryType.CONSTRUCT, () -> getDelegate().construct()); + } + + @Override + public Graph construct(Graph graph) { + return compute(QueryType.CONSTRUCT, () -> getDelegate().construct(graph)); + } + + @Override + public Graph describe() { + return compute(QueryType.DESCRIBE, () -> getDelegate().describe()); + } + + @Override + public Graph describe(Graph graph) { + return compute(QueryType.DESCRIBE, () -> getDelegate().describe(graph)); + } + + @Override + public boolean ask() { + return compute(QueryType.ASK, () -> getDelegate().ask()); + } + + @Override + public Iterator constructTriples() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().constructTriples())); + } + + @Override + public Iterator describeTriples() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().describeTriples())); + } + + @Override + public Iterator constructQuads() { + return compute(QueryType.CONSTRUCT, () -> wrapIterator(getDelegate().constructQuads())); + } + + @Override + public DatasetGraph constructDataset() { + return compute(QueryType.CONSTRUCT, () -> getDelegate().constructDataset()); + } + + @Override + public DatasetGraph constructDataset(DatasetGraph dataset) { + return compute(QueryType.CONSTRUCT, () -> getDelegate().constructDataset(dataset)); + } + + @Override + public JsonArray execJson() { + return compute(QueryType.CONSTRUCT_JSON, () -> getDelegate().execJson()); + } + + @Override + public Iterator execJsonItems() { + return compute(QueryType.CONSTRUCT_JSON, () -> wrapIterator(getDelegate().execJsonItems())); + } + + @Override + public DatasetGraph getDataset() { + return getDelegate().getDataset(); + } + + protected RowSet wrapRowSet(RowSet base) { + return new RowSetTracked(base, getThrowableTracker()); + } + + protected Iterator wrapIterator(Iterator base) { + return new IteratorTracked<>(base, getThrowableTracker()); + } + + protected X compute(QueryType queryType, Supplier supplier) { + beforeExec(queryType); + try { + X result = supplier.get(); + return result; + } catch(Throwable e) { + e.addSuppressed(new RuntimeException("Error during select().")); + throwableTracker.report(e); + throw e; + } + // afterExec is called during close() + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventBroker.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventBroker.java new file mode 100644 index 00000000000..989f83b9ba1 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventBroker.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.jena.sparql.SystemARQ; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.exec.QueryExec; +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; + +/** + * A broker that is both sink and source for task events. + * + * A broker can connect to other ones + * using {@link #connect(TaskEventBroker)} and disconnect from + * them all using {@link #disconnectFromAll()}. + */ +public class TaskEventBroker + extends TaskEventSource + implements TaskListener +{ + private Map, Runnable> upstreamRegistrations = new ConcurrentHashMap<>(); + + public Runnable connect(TaskEventBroker upstream) { + Runnable unregisterFromBase = upstream.addListener(BasicTaskExec.class, this); + Runnable unregisterFromThis = upstreamRegistrations.computeIfAbsent(upstream, u -> { + return () -> { + unregisterFromBase.run(); + upstreamRegistrations.remove(upstream); + }; + }); + return unregisterFromThis; + } + + @Override + public void onStateChange(BasicTaskExec task) { + advertiseStateChange(task); + } + + public void disconnectFromAll() { + upstreamRegistrations.values().forEach(Runnable::run); + } + + public static QueryExec track(QueryExec queryExec) { + Context cxt = queryExec.getContext(); + return track(cxt, queryExec); + } + + /** + * If there is a taskTracker in the context then return a {@link QueryExecTask}. + * Otherwise return the provided query exec. + */ + public static QueryExec track(Context cxt, QueryExec queryExec) { + TaskEventBroker registry = get(cxt); + QueryExec result = (registry == null) + ? queryExec + : QueryExecTask.create(queryExec, registry); + return result; + } + + public static UpdateExec track(UpdateExec updateExec) { + Context cxt = updateExec.getContext(); + return track(cxt, updateExec); + } + + /** + * If there is a taskTracker in the context then return a {@link QueryExecTask}. + * Otherwise return the provided query exec. + */ + public static UpdateExec track(Context cxt, UpdateExec updateExec) { + TaskEventBroker registry = get(cxt); + return track(registry, updateExec); + } + + public static UpdateExec track(TaskEventBroker tracker, UpdateExec updateExec) { + UpdateExec result = (tracker == null) + ? updateExec + : UpdateExecTask.create(updateExec, tracker); + return result; + } + + // ----- ARQ Integration ----- + + public static final Symbol symTaskEventBroker = SystemARQ.allocSymbol("taskEventBroker"); + + public static TaskEventBroker get(DatasetGraph dsg) { + return dsg == null ? null : get(dsg.getContext()); + } + + public static TaskEventBroker get(Context context) { + return context == null ? null : context.get(symTaskEventBroker); + } + + public static void remove(Context context) { + if (context != null) { + context.remove(symTaskEventBroker); + } + } + + /** Get an existing TaskEventBroker or atomically create a new one. */ + public static TaskEventBroker getOrCreate(Context context) { + TaskEventBroker result = context.computeIfAbsent(symTaskEventBroker, sym -> new TaskEventBroker()); + return result; + } + + /** Get an existing TaskEventBroker or fail with a {@link NoSuchElementException}. */ + public static TaskEventBroker require(Context context) { + TaskEventBroker result = get(context); + if (result == null) { + throw new NoSuchElementException("No task event broker in context."); + } + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventHistory.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventHistory.java new file mode 100644 index 00000000000..c2ab95d2290 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventHistory.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.jena.sparql.SystemARQ; +import org.apache.jena.sparql.util.Context; +import org.apache.jena.sparql.util.Symbol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Broker (sink + source) that stores the last n events. */ +public class TaskEventHistory + extends TaskEventBroker +{ + private static final Logger logger = LoggerFactory.getLogger(TaskEventHistory.class); + + // Relabel tasks by a sequential ids. + // XXX Id allocation could be factored out in a central place. Fuseki allocates IDs too. + protected AtomicLong nextSerial = new AtomicLong(); + protected Map taskIdToSerial = new ConcurrentHashMap<>(); + protected ConcurrentNavigableMap serialToTask = new ConcurrentSkipListMap<>(); + + protected int maxHistorySize = 1000; + + // History is indexed by serial id. + protected ConcurrentLinkedDeque> history = new ConcurrentLinkedDeque<>(); + + public BasicTaskExec getByTaskId(long taskId) { + Long serial = taskIdToSerial.get(taskId); + BasicTaskExec result = getTaskBySerialId(serial); + return result; + } + + public BasicTaskExec getTaskBySerialId(long serialId) { + return serialToTask.get(serialId); + } + + public ConcurrentNavigableMap getActiveTasks() { + return serialToTask; + } + + public ConcurrentLinkedDeque> getHistory() { + return history; + } + + public void setMaxHistorySize(int maxHistorySize) { + this.maxHistorySize = maxHistorySize; + trimHistory(); + } + + @Override + public void onStateChange(BasicTaskExec task) { + switch(task.getTaskState()) { + case STARTING: put(task); break; + case TERMINATED: remove(task); break; + default: break; + } + } + + public long getId(BasicTaskExec task) { + long id = System.identityHashCode(task); + return id; + } + + public Long getSerialId(long taskId) { + return taskIdToSerial.get(taskId); + } + + public void put(BasicTaskExec newTask) { + long taskId = getId(newTask); + boolean[] accepted = {false}; + taskIdToSerial.compute(taskId, (_taskId, oldSerial) -> { + if (oldSerial != null) { + BasicTaskExec oldTask = serialToTask.get(oldSerial); + if (oldTask != newTask) { + // Distinct tasks with the same id - should never happen. + logger.warn("Rejected task tracking because of a hash clash."); + } else { + logger.warn("Task was already added."); + } + return oldSerial; + } + + accepted[0] = true; + long r = nextSerial.incrementAndGet(); + serialToTask.put(r, newTask); + return r; + }); + if (accepted[0]) { + advertiseStateChange(newTask); + } + } + + protected void trimHistory() { + if (history.size() > maxHistorySize) { + Iterator> it = history.descendingIterator(); + while (history.size() >= maxHistorySize && it.hasNext()) { + // Note: No need to clean up taskIdToSerial here. + // Before items are added to the history, their serial id mapping is removed. + it.next(); + it.remove(); + } + } + } + + public boolean remove(BasicTaskExec task) { + long searchTaskId = getId(task); + + // Advertise state change before removing the task entry! + Long foundTaskId = taskIdToSerial.get(searchTaskId); + if (foundTaskId != null) { + advertiseStateChange(task); + } + + Long[] foundSerial = {null}; + taskIdToSerial.compute(searchTaskId, (_taskId, serial) -> { + if (serial != null) { + serialToTask.compute(serial, (s, oldTask) -> { + if (oldTask == task) { + foundSerial[0] = s; + return null; + } + return oldTask; + }); + return foundSerial[0] != null ? null : serial; + } + return serial; + }); + + boolean result = foundSerial[0] != null; + if (result) { + history.addFirst(Map.entry(foundSerial[0], task)); + trimHistory(); + } + return result; + } + + public void clear() { + history.clear(); + } + + @Override + public String toString() { + return "Active: " + serialToTask.size() + ", History: " + history.size() + "/" + maxHistorySize; + } + + // --- ARQ Integration --- + + public static final Symbol symTaskEventHistory = SystemARQ.allocSymbol("taskEventHistory"); + + public static TaskEventHistory get(Context context) { + return context == null ? null : context.get(symTaskEventHistory); + } + + public static TaskEventHistory getOrCreate(Context context) { + TaskEventHistory result = context.computeIfAbsent(symTaskEventHistory, sym -> new TaskEventHistory()); + return result; + } + + public static TaskEventHistory require(Context context) { + TaskEventHistory result = get(context); + if (result == null) { + throw new NoSuchElementException("No TaskEventHistory registered in context"); + } + return result; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventSource.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventSource.java new file mode 100644 index 00000000000..6c8ee18cf3d --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskEventSource.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskEventSource { + private static final Logger logger = LoggerFactory.getLogger(TaskEventSource.class); + + // LinkedHashMap to retain listener order. + protected Map, TaskListener> listenersByType = + Collections.synchronizedMap(new LinkedHashMap<>()); + + public Runnable addListener(Class clz, TaskListener listener) { + listenersByType.compute(listener, (k, v) -> { + if (v != null) { + throw new RuntimeException("Listener already registered"); + } + return new TaskListenerTypeAdapter<>(clz, listener); + }); + return () -> listenersByType.remove(listener); + } + + protected void advertiseStateChange(BasicTaskExec task) { + for (TaskListener listener : listenersByType.values()) { + try { + listener.onStateChange(task); + } catch (Throwable t) { + logger.warn("Failure while notifying listener.", t); + } + } + } + + class TaskListenerTypeAdapter + implements TaskListener + { + protected Class clz; + protected TaskListener delegate; + + public TaskListenerTypeAdapter(Class clz, TaskListener delegate) { + super(); + this.clz = clz; + this.delegate = delegate; + } + + @Override + public void onStateChange(BasicTaskExec task) { + if (clz.isInstance(task)) { + Y obj = clz.cast(task); + delegate.onStateChange(obj); + } + } + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListener.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListener.java new file mode 100644 index 00000000000..5c1ae1f9737 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +public interface TaskListener { + void onStateChange(T task); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListenerByState.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListenerByState.java new file mode 100644 index 00000000000..0f9e3453f76 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskListenerByState.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +public interface TaskListenerByState + extends TaskListener +{ + @Override + public default void onStateChange(T task) { + switch (task.getTaskState()) { + case CREATED: onCreated(task); break; + case TERMINATED: onTerminated(task); break; + default: + // Log warning? + break; + } + } + + public void onCreated(T task); + public void onTerminated(T task); +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskState.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskState.java new file mode 100644 index 00000000000..70a1315bbbf --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/TaskState.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +public enum TaskState { + /** Task object has been created. */ + CREATED, + + /** A method semantically akin to beforeRun() or init() has been called. */ + STARTING, + + /** A method semantically akin to run() has been called. This implies that the beforeRun() method has completed. */ + RUNNING, + + /** A method semantically akin to afterRun() or close() has been called but not completed yet. */ + TERMINATING, + + /** A method semantically akin to afterRun() or close() has completed. */ + TERMINATED +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecBase.java new file mode 100644 index 00000000000..ed5b3b88cc9 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecBase.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.update.UpdateRequest; + +public abstract class UpdateExecBase + implements UpdateExec +{ + protected String updateRequestString; + protected UpdateRequest updateRequest; + + public UpdateExecBase(UpdateRequest updateRequest, String updateRequestString) { + super(); + // this.datasetGraph = datasetGraph; + this.updateRequest = updateRequest; + this.updateRequestString = updateRequestString; + } + +// public DatasetGraph getDatasetGraph() { +// return datasetGraph; +// } + + @Override + public UpdateRequest getUpdateRequest() { + return updateRequest; + } + + @Override + public String getUpdateRequestString() { + return updateRequestString; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecTask.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecTask.java new file mode 100644 index 00000000000..fc72c81c901 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateExecTask.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import java.time.Instant; +import java.util.Objects; +import java.util.Optional; + +import org.apache.jena.sparql.exec.UpdateExec; +import org.apache.jena.update.UpdateRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Wrapper for UpdateExec that tracks {@link BasicTaskInfo} information. */ +public class UpdateExecTask + extends UpdateProcessorWrapper + implements UpdateExec, BasicTaskExec +{ + private static final Logger logger = LoggerFactory.getLogger(UpdateExecTask.class); + + protected TaskListener listener; + + protected final Instant creationTime; + protected Instant startTime = null; + protected Instant abortTime = null; + protected Instant finishTime = null; + protected Throwable throwable = null; + protected TaskState currentState = TaskState.CREATED; + + protected UpdateExecTask(UpdateExec delegate, Instant creationTime, TaskListener listener) { + super(delegate); + this.listener = listener; + this.creationTime = creationTime; + } + + /** Wrap a UpdateExec and notify the listener with the creation event using the current time. */ + public static UpdateExecTask create(UpdateExec delegate, TaskListener listener) { + Instant creationTime = Instant.now(); + return create(delegate, creationTime, listener); + } + + /** Wrap a QueryExec and notify the listener with the creation event using the given time. */ + public static UpdateExecTask create(UpdateExec delegate, Instant creationTime, TaskListener listener) { + Objects.requireNonNull(delegate); + Objects.requireNonNull(listener); + UpdateExecTask result = new UpdateExecTask(delegate, creationTime, listener); + listener.onStateChange(result); + return result; + } + + @Override + public TaskState getTaskState() { + return currentState; + } + + @Override + public Instant getCreationTime() { + return creationTime; + } + + @Override + public Optional getStartTime() { + return Optional.ofNullable(startTime); + } + + @Override + public Optional getAbortTime() { + return Optional.ofNullable(abortTime); + } + + @Override + public Optional getFinishTime() { + return Optional.ofNullable(finishTime); + } + + @Override + public Optional getThrowable() { + return Optional.ofNullable(throwable); + } + + @Override + public String getLabel() { + UpdateRequest updateRequest = getUpdateRequest(); + String result; + if (updateRequest == null) { + result = getUpdateRequestString(); + if (result == null) { + result = "Unknown query"; + } + } else { + result = updateRequest.toString(); + } + return result; + } + + @Override + public void abort() { // This method body is needed because of noop and delegating default methods. + if (!isAborting()) { + this.abortTime = Instant.now(); + getDelegate().abort(); + } + } + + protected void updateThrowable(Throwable throwable) { + if (this.throwable == null) { + this.throwable = throwable; + } + } + + @Override + public void execute() { + Throwable throwable = null; + beforeExec(); + try { + super.execute(); + } catch (Throwable t) { + t.addSuppressed(new RuntimeException("Error during update execution")); + throwable = t; + } finally { + afterExec(throwable); + } + } + + protected void beforeExec() { + this.startTime = Instant.now(); + updateState(TaskState.STARTING); + transition(TaskState.RUNNING, () -> {}); + } + + protected void afterExec(Throwable throwable) { + this.throwable = throwable; + try { + updateState(TaskState.TERMINATING); + } finally { + this.finishTime = Instant.now(); + updateState(TaskState.TERMINATED); + } + } + + protected void updateState(TaskState newState) { + Objects.requireNonNull(newState); + if (currentState == null || newState.ordinal() > currentState.ordinal()) { + // State oldState = currentState; + currentState = newState; + if (listener != null) { + try { + listener.onStateChange(this); + } catch (Throwable e) { + logger.warn("Exception raised in listener.", e); + } + } + } + } + + protected void transition(TaskState targetState, Runnable action) { + try { + action.run(); + updateState(targetState); + } catch (Throwable throwable) { + throwable.addSuppressed(new RuntimeException("Failure transitioning from " + currentState + " to " + targetState + ".", throwable)); + updateThrowable(throwable); + updateState(TaskState.TERMINATING); + throw throwable; + } + } + + @Override + public String getStatusMessage() { + return ""; + } + + @Override + public String toString() { + return "UpdateExecTracked [startTime=" + getStartTime() + + ", finishTime=" + getFinishTime() + ", throwable=" + throwable + + ", delegate=" + getDelegate() + "]"; + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateProcessorWrapper.java b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateProcessorWrapper.java new file mode 100644 index 00000000000..df3c1343cac --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/exec/tracker/system/UpdateProcessorWrapper.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.jena.sparql.exec.tracker.system; + +import org.apache.jena.update.UpdateProcessor; +import org.apache.jena.update.UpdateRequest; + +public class UpdateProcessorWrapper + implements UpdateProcessor +{ + private T delegate; + + public UpdateProcessorWrapper(T delegate) { + super(); + this.delegate = delegate; + } + + protected T getDelegate() { + return delegate; + } + + @Override + public UpdateRequest getUpdateRequest() { + return getDelegate().getUpdateRequest(); + } + + @Override + public String getUpdateRequestString() { + return getDelegate().getUpdateRequestString(); + } + + @Override + public void execute() { + getDelegate().execute(); + } +} diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java index 3ff59cc2bbf..d957eb26f81 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/modify/UpdateProcessorBase.java @@ -62,6 +62,11 @@ public UpdateProcessorBase(UpdateRequest request, } } + @Override + public UpdateRequest getUpdateRequest() { + return request; + } + @Override public void execute() { UpdateEngine uProc = factory.create(datasetGraph, context); diff --git a/jena-arq/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle b/jena-arq/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle index 492a2de3b34..ee00abaa4f1 100644 --- a/jena-arq/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle +++ b/jena-arq/src/main/resources/META-INF/services/org.apache.jena.sys.JenaSubsystemLifecycle @@ -1,3 +1,6 @@ org.apache.jena.riot.system.InitRIOT org.apache.jena.sparql.system.InitARQ org.apache.jena.rdfs.sys.InitRDFS + +org.apache.jena.sparql.exec.tracker.core.InitExecTrackerCore +org.apache.jena.sparql.exec.tracker.system.InitExecTrackerSystem