From 47fb2f95f8e335f20b85b37e89896dd8a67d9455 Mon Sep 17 00:00:00 2001 From: contrueCT Date: Wed, 17 Jun 2026 21:33:00 +0800 Subject: [PATCH] fix(task): avoid loading huge task results --- .../org/apache/hugegraph/api/job/TaskAPI.java | 16 +++- .../hugegraph/auth/HugeGraphAuthProxy.java | 30 +++++- .../task/DistributedTaskScheduler.java | 7 +- .../org/apache/hugegraph/task/HugeTask.java | 69 +++++++++++++- .../hugegraph/task/StandardTaskScheduler.java | 95 ++++++++++++++----- .../task/TaskAndResultScheduler.java | 61 +++++++++--- .../apache/hugegraph/task/TaskScheduler.java | 21 +++- .../hugegraph/task/TaskTransaction.java | 29 ++++++ .../org/apache/hugegraph/api/TaskApiTest.java | 24 ++++- .../apache/hugegraph/core/TaskCoreTest.java | 74 +++++++++++++++ 10 files changed, 373 insertions(+), 53 deletions(-) diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java index d35cc9a955..7143efb5f7 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java @@ -102,12 +102,13 @@ public Map list(@Context GraphManager manager, limit = NO_LIMIT; List idList = ids.stream().map(IdGenerator::of) .collect(Collectors.toList()); - iter = scheduler.tasks(idList); + iter = scheduler.tasks(idList, false); } else { if (status == null) { - iter = scheduler.tasks(null, limit, page); + iter = scheduler.tasks(null, limit, page, false); } else { - iter = scheduler.tasks(parseStatus(status), limit, page); + iter = scheduler.tasks(parseStatus(status), limit, page, + false); } } @@ -136,12 +137,17 @@ public Map get(@Context GraphManager manager, @Parameter(description = "The graph name") @PathParam("graph") String graph, @Parameter(description = "The task id") - @PathParam("id") long id) { + @PathParam("id") long id, + @Parameter(description = "Whether to load task result") + @DefaultValue("true") + @QueryParam("with_result") + boolean withResult) { LOG.debug("Graph [{}] get task: {}", graph, id); TaskScheduler scheduler = graph(manager, graphSpace, graph) .taskScheduler(); - return scheduler.task(IdGenerator.of(id)).asMap(); + return scheduler.task(IdGenerator.of(id), withResult) + .asMap(true, withResult); } @DELETE diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java index cf390b886e..5b117faae1 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java @@ -1326,8 +1326,14 @@ public void save(HugeTask task) { @Override public HugeTask task(Id id) { + return this.task(id, true); + } + + @Override + public HugeTask task(Id id, boolean withResult) { return verifyTaskPermission(HugePermission.READ, - this.taskScheduler.task(id)); + this.taskScheduler.task(id, + withResult)); } @Override @@ -1336,18 +1342,36 @@ public Iterator> tasks(List ids) { this.taskScheduler.tasks(ids)); } + @Override + public Iterator> tasks(List ids, + boolean withResult) { + return verifyTaskPermission(HugePermission.READ, + this.taskScheduler.tasks(ids, + withResult)); + } + @Override public Iterator> tasks(TaskStatus status, long limit, String page) { Iterator> tasks = this.taskScheduler.tasks(status, - limit, page); + limit, + page); + return verifyTaskPermission(HugePermission.READ, tasks); + } + + @Override + public Iterator> tasks(TaskStatus status, + long limit, String page, + boolean withResult) { + Iterator> tasks = this.taskScheduler.tasks( + status, limit, page, withResult); return verifyTaskPermission(HugePermission.READ, tasks); } @Override public HugeTask delete(Id id, boolean force) { verifyTaskPermission(HugePermission.DELETE, - this.taskScheduler.task(id)); + this.taskScheduler.task(id, false)); return this.taskScheduler.delete(id, force); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java index b4bba2ea12..c9c66ee602 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java @@ -308,8 +308,9 @@ protected HugeTask deleteFromDB(Id id) { if (vertex == null) { return null; } - HugeTask result = HugeTask.fromVertex(vertex); - this.tx().removeVertex(vertex); + HugeTask result = HugeTask.fromVertex(vertex, false); + this.deleteTaskResultFromTx(id); + this.tx().removeTaskVertex(vertex); return result; }); } @@ -629,7 +630,7 @@ public void run() { // 1. start task can be from schedule() & cronSchedule() // 2. recheck the status of task, in case one same task // called by both methods at same time; - HugeTask queryTask = task(this.task.id()); + HugeTask queryTask = task(this.task.id(), false); if (queryTask != null && !TaskStatus.NEW.equals(queryTask.status())) { return; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java index f9e4f120f4..d5d703928a 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java @@ -41,6 +41,7 @@ import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.job.GremlinJob; import org.apache.hugegraph.job.schema.SchemaJob; +import org.apache.hugegraph.structure.HugeVertex; import org.apache.hugegraph.type.define.SerialEnum; import org.apache.hugegraph.util.Blob; import org.apache.hugegraph.util.E; @@ -653,6 +654,11 @@ public Map asMap() { } public synchronized Map asMap(boolean withDetails) { + return this.asMap(withDetails, true); + } + + public synchronized Map asMap(boolean withDetails, + boolean withResult) { E.checkState(this.type != null, "Task type can't be null"); E.checkState(this.name != null, "Task name can't be null"); @@ -689,7 +695,7 @@ public synchronized Map asMap(boolean withDetails) { if (this.input != null) { map.put(Hidden.unHide(P.INPUT), this.input); } - if (this.result != null) { + if (withResult && this.result != null) { map.put(Hidden.unHide(P.RESULT), this.result); } } @@ -697,7 +703,37 @@ public synchronized Map asMap(boolean withDetails) { return map; } + synchronized HugeTask copyWithoutResult() { + HugeTask task = new HugeTask<>(this.id, this.parent, this.callable); + task.type = this.type; + task.name = this.name; + task.dependencies = this.dependencies == null ? + null : InsertionOrderUtil.newSet(this.dependencies); + task.description = this.description; + task.context = this.context; + task.create = this.create; + task.server = this.server; + task.load = this.load; + task.status = this.status; + task.progress = this.progress; + task.update = this.update; + task.retries = this.retries; + task.input = this.input; + task.result = null; + task.scheduler = this.scheduler; + return task; + } + public static HugeTask fromVertex(Vertex vertex) { + return fromVertex(vertex, true); + } + + public static HugeTask fromVertex(Vertex vertex, + boolean withResult) { + if (!withResult && vertex instanceof HugeVertex) { + return fromHugeVertex((HugeVertex) vertex); + } + String callableName = vertex.value(P.CALLABLE); TaskCallable callable; try { @@ -710,11 +746,37 @@ public static HugeTask fromVertex(Vertex vertex) { for (Iterator> iter = vertex.properties(); iter.hasNext(); ) { VertexProperty prop = iter.next(); + if (!withResult && P.RESULT.equals(prop.key())) { + continue; + } task.property(prop.key(), prop.value()); } return task; } + private static HugeTask fromHugeVertex(HugeVertex vertex) { + String callableName = getPropertyValue(vertex, P.CALLABLE); + TaskCallable callable; + try { + callable = TaskCallable.fromClass(callableName); + } catch (Exception e) { + callable = TaskCallable.empty(e); + } + + HugeTask task = new HugeTask<>(vertex.id(), null, callable); + for (String property : P.METADATA_PROPERTIES) { + Object value = getPropertyValue(vertex, property); + if (value != null) { + task.property(property, value); + } + } + return task; + } + + private static V getPropertyValue(HugeVertex vertex, String property) { + return vertex.getPropertyValue(vertex.graph().propertyKey(property).id()); + } + private static Collector> toOrderSet() { return Collectors.toCollection(InsertionOrderUtil::newSet); } @@ -792,6 +854,11 @@ public static final class P { public static final String DEPENDENCIES = "~task_dependencies"; public static final String SERVER = "~task_server"; + private static final String[] METADATA_PROPERTIES = new String[]{ + TYPE, NAME, CALLABLE, DESCRIPTION, CONTEXT, STATUS, PROGRESS, + CREATE, UPDATE, RETRIES, DEPENDENCIES, INPUT, SERVER + }; + //public static final String PARENT = hide("parent"); //public static final String CHILDREN = hide("children"); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 5f60792af1..5f1a7fab29 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -153,7 +153,7 @@ public void restoreTasks() { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { Iterator> iter; - for (iter = this.findTask(status, PAGE_SIZE, page); + for (iter = this.findTask(status, PAGE_SIZE, page, false); iter.hasNext(); ) { HugeTask task = iter.next(); if (selfServer.equals(task.server())) { @@ -323,7 +323,8 @@ protected synchronized void scheduleTasksOnMaster() { Collection serverInfos = this.serverManager().allServerInfos(); String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page, + false); while (tasks.hasNext()) { HugeTask task = tasks.next(); if (task.server() != null) { @@ -365,7 +366,8 @@ protected synchronized void scheduleTasksOnMaster() { protected void executeTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page, + false); while (tasks.hasNext()) { HugeTask task = tasks.next(); this.initTaskCallable(task); @@ -394,7 +396,8 @@ protected void executeTasksOnWorker(Id server) { protected void cancelTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page, + false); while (tasks.hasNext()) { HugeTask task = tasks.next(); Id taskServer = task.server(); @@ -493,25 +496,26 @@ public boolean close() { } @Override - public HugeTask task(Id id) { + public HugeTask task(Id id, boolean withResult) { E.checkArgumentNotNull(id, "Parameter task id can't be null"); @SuppressWarnings("unchecked") HugeTask task = (HugeTask) this.tasks.get(id); if (task != null) { - return task; + return withResult ? task : task.copyWithoutResult(); } - return this.findTask(id); + return this.findTask(id, withResult); } @Override - public Iterator> tasks(List ids) { + public Iterator> tasks(List ids, + boolean withResult) { List taskIdsNotInMem = new ArrayList<>(); List> taskInMem = new ArrayList<>(); for (Id id : ids) { @SuppressWarnings("unchecked") HugeTask task = (HugeTask) this.tasks.get(id); if (task != null) { - taskInMem.add(task); + taskInMem.add(withResult ? task : task.copyWithoutResult()); } else { taskIdsNotInMem.add(id); } @@ -522,27 +526,31 @@ public Iterator> tasks(List ids) { } else { iterator = new ExtendableIterator<>(taskInMem.iterator()); } - iterator.extend(this.findTasks(taskIdsNotInMem)); + iterator.extend(this.findTasks(taskIdsNotInMem, withResult)); return iterator; } @Override - public Iterator> tasks(TaskStatus status, - long limit, String page) { + public Iterator> tasks(TaskStatus status, long limit, + String page, boolean withResult) { if (status == null) { - return this.findAllTask(limit, page); + return this.findAllTask(limit, page, withResult); } - return this.findTask(status, limit, page); + return this.findTask(status, limit, page, withResult); } public HugeTask findTask(Id id) { + return this.findTask(id, true); + } + + public HugeTask findTask(Id id, boolean withResult) { HugeTask result = this.call(() -> { Iterator vertices = this.tx().queryTaskInfos(id); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; } - return HugeTask.fromVertex(vertex); + return HugeTask.fromVertex(vertex, withResult); }); if (result == null) { throw new NotFoundException("Can't find task with id '%s'", id); @@ -551,23 +559,40 @@ public HugeTask findTask(Id id) { } public Iterator> findTasks(List ids) { - return this.queryTask(ids); + return this.findTasks(ids, true); + } + + public Iterator> findTasks(List ids, + boolean withResult) { + return this.queryTask(ids, withResult); } public Iterator> findAllTask(long limit, String page) { - return this.queryTask(ImmutableMap.of(), limit, page); + return this.findAllTask(limit, page, true); + } + + public Iterator> findAllTask(long limit, String page, + boolean withResult) { + return this.queryTask(ImmutableMap.of(), limit, page, withResult); } public Iterator> findTask(TaskStatus status, long limit, String page) { - return this.queryTask(P.STATUS, status.code(), limit, page); + return this.findTask(status, limit, page, true); + } + + public Iterator> findTask(TaskStatus status, + long limit, String page, + boolean withResult) { + return this.queryTask(P.STATUS, status.code(), limit, page, + withResult); } @Override public HugeTask delete(Id id, boolean force) { this.checkOnMasterNode("delete"); - HugeTask task = this.task(id); + HugeTask task = this.task(id, false); /* * The following is out of date when the task running on worker node: * HugeTask task = this.tasks.get(id); @@ -592,11 +617,11 @@ public HugeTask delete(Id id, boolean force) { if (vertex == null) { return null; } - HugeTask result = HugeTask.fromVertex(vertex); + HugeTask result = HugeTask.fromVertex(vertex, false); E.checkState(force || result.completed(), "Can't delete incomplete task '%s' in status %s", id, result.status()); - this.tx().removeVertex(vertex); + this.tx().removeTaskVertex(vertex); return result; }); } @@ -674,11 +699,24 @@ public void checkRequirement(String op) { private Iterator> queryTask(String key, Object value, long limit, String page) { - return this.queryTask(ImmutableMap.of(key, value), limit, page); + return this.queryTask(key, value, limit, page, true); + } + + private Iterator> queryTask(String key, Object value, + long limit, String page, + boolean withResult) { + return this.queryTask(ImmutableMap.of(key, value), limit, page, + withResult); } private Iterator> queryTask(Map conditions, long limit, String page) { + return this.queryTask(conditions, limit, page, true); + } + + private Iterator> queryTask(Map conditions, + long limit, String page, + boolean withResult) { return this.call(() -> { ConditionQuery query; if (this.graph.backendStoreFeatures().supportsTaskAndServerVertex()) { @@ -701,18 +739,27 @@ private Iterator> queryTask(Map conditions, } Iterator vertices = this.tx().queryVertices(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, vertex -> { + return HugeTask.fromVertex(vertex, withResult); + }); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); } private Iterator> queryTask(List ids) { + return this.queryTask(ids, true); + } + + private Iterator> queryTask(List ids, + boolean withResult) { return this.call(() -> { Object[] idArray = ids.toArray(new Id[0]); Iterator vertices = this.tx().queryTaskInfos(idArray); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, vertex -> { + return HugeTask.fromVertex(vertex, withResult); + }); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java index 2ba3fd8a6d..07e34e2691 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java @@ -103,23 +103,25 @@ public void save(HugeTask task) { } @Override - public HugeTask task(Id id) { + public HugeTask task(Id id, boolean withResult) { HugeTask task = this.call(() -> { Iterator vertices = this.tx().queryTaskInfos(id); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; } - return HugeTask.fromVertex(vertex); + return HugeTask.fromVertex(vertex, withResult); }); if (task == null) { throw new NotFoundException("Can't find task with id '%s'", id); } - HugeTaskResult taskResult = queryTaskResult(id); - if (taskResult != null) { - task.result(taskResult); + if (withResult) { + HugeTaskResult taskResult = queryTaskResult(id); + if (taskResult != null) { + task.result(taskResult); + } } return task; @@ -127,17 +129,39 @@ public HugeTask task(Id id) { @Override public Iterator> tasks(List ids) { - return this.tasksWithoutResult(ids); + return this.tasks(ids, false); + } + + @Override + public Iterator> tasks(List ids, + boolean withResult) { + if (!withResult) { + return this.tasksWithoutResult(ids); + } + return this.queryTask(ids); } @Override public Iterator> tasks(TaskStatus status, long limit, String page) { + return this.tasks(status, limit, page, false); + } + + @Override + public Iterator> tasks(TaskStatus status, long limit, + String page, boolean withResult) { + if (!withResult) { + if (status == null) { + return this.queryTaskWithoutResult(ImmutableMap.of(), limit, + page); + } + return this.queryTaskWithoutResult(HugeTask.P.STATUS, + status.code(), limit, page); + } if (status == null) { - return this.queryTaskWithoutResult(ImmutableMap.of(), limit, page); + return this.queryTask(ImmutableMap.of(), limit, page); } - return this.queryTaskWithoutResult(HugeTask.P.STATUS, status.code(), - limit, page); + return this.queryTask(HugeTask.P.STATUS, status.code(), limit, page); } protected Iterator> queryTask(String key, Object value, @@ -216,7 +240,7 @@ protected HugeTask taskWithoutResult(Id id) { if (vertex == null) { return null; } - return HugeTask.fromVertex(vertex); + return HugeTask.fromVertex(vertex, false); }); return result; @@ -227,7 +251,9 @@ protected Iterator> tasksWithoutResult(List ids) { Object[] idArray = ids.toArray(new Id[ids.size()]); Iterator vertices = this.tx().queryTaskInfos(idArray); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, vertex -> { + return HugeTask.fromVertex(vertex, false); + }); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -268,12 +294,23 @@ protected Iterator> queryTaskWithoutResult(Map vertices = this.tx().queryTaskInfos(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, vertex -> { + return HugeTask.fromVertex(vertex, false); + }); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); } + protected void deleteTaskResultFromTx(Id taskId) { + Iterator vertices = + this.tx().queryTaskInfos(HugeTaskResult.genId(taskId)); + HugeVertex vertex = (HugeVertex) QueryResults.one(vertices); + if (vertex != null) { + this.tx().removeTaskVertex(vertex); + } + } + protected HugeTaskResult queryTaskResult(Id taskid) { HugeTaskResult result = this.call(() -> { Iterator vertices = diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java index af789c5230..d1cf2b2e22 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java @@ -47,12 +47,25 @@ public interface TaskScheduler { HugeTask delete(Id id, boolean force); - HugeTask task(Id id); + default HugeTask task(Id id) { + return this.task(id, true); + } - Iterator> tasks(List ids); + HugeTask task(Id id, boolean withResult); - Iterator> tasks(TaskStatus status, - long limit, String page); + default Iterator> tasks(List ids) { + return this.tasks(ids, true); + } + + Iterator> tasks(List ids, boolean withResult); + + default Iterator> tasks(TaskStatus status, + long limit, String page) { + return this.tasks(status, limit, page, true); + } + + Iterator> tasks(TaskStatus status, long limit, + String page, boolean withResult); void init(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java index 2b27019c74..9dd4a46aa0 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskTransaction.java @@ -31,6 +31,7 @@ import org.apache.hugegraph.schema.PropertyKey; import org.apache.hugegraph.schema.SchemaManager; import org.apache.hugegraph.schema.VertexLabel; +import org.apache.hugegraph.structure.HugeIndex; import org.apache.hugegraph.structure.HugeVertex; import org.apache.hugegraph.type.HugeType; import org.apache.hugegraph.type.define.Cardinality; @@ -74,6 +75,34 @@ private boolean deleteIndexIfNeeded(HugeVertex oldV, HugeVertex newV) { return false; } + public void removeTaskVertex(HugeVertex vertex) { + this.checkOwnerThread(); + + this.beforeWrite(); + + this.doRemove(this.serializer.writeVertex(vertex.prepareRemoved())); + if (TASK.equals(vertex.schemaLabel().name())) { + this.updateIndex(this.indexLabel(HugeTask.P.STATUS).id(), + vertex, true); + } + this.removeLabelIndex(vertex); + + this.afterWrite(); + } + + private void removeLabelIndex(HugeVertex vertex) { + if (this.store().features().supportsQueryByLabel() || + !vertex.schemaLabel().enableLabelIndex()) { + return; + } + + HugeIndex index = new HugeIndex(this.graph(), + IndexLabel.label(vertex.type())); + index.fieldValues(vertex.schemaLabel().id()); + index.elementIds(vertex.id(), vertex.expiredTime()); + this.doEliminate(this.serializer.writeIndex(index)); + } + public void initSchema() { if (this.existVertexLabel(TASK)) { return; diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/TaskApiTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/TaskApiTest.java index 9ed25fd71d..b8fe63f797 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/TaskApiTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/TaskApiTest.java @@ -87,6 +87,24 @@ public void testGet() { Assert.assertEquals("success", status); } + @Test + public void testGetWithoutResult() { + int taskId = this.gremlinJob("1 + 2"); + + waitTaskSuccess(taskId); + + Response r = client().get(PATH, String.valueOf(taskId)); + String content = assertResponseStatus(200, r); + assertJsonContains(content, "task_result"); + + r = client().get(PATH + taskId, + ImmutableMap.of("with_result", false)); + content = assertResponseStatus(200, r); + assertJsonContains(content, "id"); + assertJsonContains(content, "task_callable"); + Assert.assertFalse(content, content.contains("task_result")); + } + @Test public void testCancel() { // create a task @@ -143,8 +161,12 @@ private int rebuild() { } private int gremlinJob() { + return this.gremlinJob("Thread.sleep(1000L)"); + } + + private int gremlinJob(String gremlin) { String body = "{" + - "\"gremlin\":\"Thread.sleep(1000L)\"," + + "\"gremlin\":\"" + gremlin + "\"," + "\"bindings\":{}," + "\"language\":\"gremlin-groovy\"," + "\"aliases\":{}}"; diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java index 212ccc0588..5f6350bbd6 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import org.apache.hugegraph.HugeException; @@ -115,6 +116,79 @@ public void testTask() throws TimeoutException { }); } + @Test + public void testTaskWithoutResult() throws TimeoutException { + HugeGraph graph = graph(); + TaskScheduler scheduler = graph.taskScheduler(); + CountDownLatch latch = new CountDownLatch(1); + + TaskCallable callable = new TaskCallable() { + @Override + public String call() throws Exception { + latch.await(); + return "metadata-result"; + } + + @Override + protected void done() { + scheduler.save(this.task()); + } + }; + + Id id = IdGenerator.of(88889); + HugeTask task = new HugeTask<>(id, null, callable); + task.type("test"); + task.name("metadata-task"); + scheduler.schedule(task); + + try { + Whitebox.setInternalState(task, "result", "\"in-memory-result\""); + + HugeTask taskWithoutResult = scheduler.task(id, false); + Assert.assertEquals("metadata-task", taskWithoutResult.name()); + Assert.assertNull(taskWithoutResult.result()); + + Iterator> iter = scheduler.tasks(ImmutableList.of(id), + false); + Assert.assertTrue(iter.hasNext()); + taskWithoutResult = iter.next(); + Assert.assertEquals("metadata-task", taskWithoutResult.name()); + Assert.assertNull(taskWithoutResult.result()); + Assert.assertFalse(iter.hasNext()); + } finally { + Whitebox.setInternalState(task, "result", null); + latch.countDown(); + } + + scheduler.waitUntilTaskCompleted(id, 10); + + HugeTask taskWithResult = scheduler.task(id, true); + Assert.assertEquals("\"metadata-result\"", taskWithResult.result()); + + HugeTask taskWithoutResult = scheduler.task(id, false); + Assert.assertEquals("metadata-task", taskWithoutResult.name()); + Assert.assertNull(taskWithoutResult.result()); + + Iterator> iter = scheduler.tasks(ImmutableList.of(id), + false); + Assert.assertTrue(iter.hasNext()); + taskWithoutResult = iter.next(); + Assert.assertEquals("metadata-task", taskWithoutResult.name()); + Assert.assertNull(taskWithoutResult.result()); + Assert.assertFalse(iter.hasNext()); + + iter = scheduler.tasks(TaskStatus.SUCCESS, 10, null, false); + Assert.assertTrue(iter.hasNext()); + taskWithoutResult = iter.next(); + Assert.assertEquals("metadata-task", taskWithoutResult.name()); + Assert.assertNull(taskWithoutResult.result()); + + scheduler.delete(id, false); + Assert.assertThrows(NotFoundException.class, () -> { + scheduler.task(id); + }); + } + @Test public void testTaskWithFailure() throws TimeoutException { HugeGraph graph = graph();