Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public Map<String, Object> list(@Context GraphManager manager,
limit = NO_LIMIT;
List<Id> idList = ids.stream().map(IdGenerator::of)
.collect(Collectors.toList());
iter = scheduler.tasks(idList);
iter = scheduler.tasks(idList, false);
} else {
if (status == null) {
iter = scheduler.tasks(null, limit, page);
iter = scheduler.tasks(null, limit, page, false);
} else {
iter = scheduler.tasks(parseStatus(status), limit, page);
iter = scheduler.tasks(parseStatus(status), limit, page,
false);
}
}

Expand Down Expand Up @@ -136,12 +137,17 @@ public Map<String, Object> get(@Context GraphManager manager,
@Parameter(description = "The graph name")
@PathParam("graph") String graph,
@Parameter(description = "The task id")
@PathParam("id") long id) {
@PathParam("id") long id,
@Parameter(description = "Whether to load task result")
@DefaultValue("true")
@QueryParam("with_result")
boolean withResult) {
LOG.debug("Graph [{}] get task: {}", graph, id);

TaskScheduler scheduler = graph(manager, graphSpace, graph)
.taskScheduler();
return scheduler.task(IdGenerator.of(id)).asMap();
return scheduler.task(IdGenerator.of(id), withResult)
.asMap(true, withResult);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,8 +1326,14 @@ public <V> void save(HugeTask<V> task) {

@Override
public <V> HugeTask<V> task(Id id) {
return this.task(id, true);
}

@Override
public <V> HugeTask<V> task(Id id, boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.task(id));
this.taskScheduler.task(id,
withResult));
}

@Override
Expand All @@ -1336,18 +1342,36 @@ public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
this.taskScheduler.tasks(ids));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(List<Id> ids,
boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.tasks(ids,
withResult));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(status,
limit, page);
limit,
page);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page,
boolean withResult) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(
status, limit, page, withResult);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
verifyTaskPermission(HugePermission.DELETE,
this.taskScheduler.task(id));
this.taskScheduler.task(id, false));
return this.taskScheduler.delete(id, force);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
if (vertex == null) {
return null;
}
HugeTask<V> result = HugeTask.fromVertex(vertex);
this.tx().removeVertex(vertex);
HugeTask<V> result = HugeTask.fromVertex(vertex, false);
this.deleteTaskResultFromTx(id);
this.tx().removeTaskVertex(vertex);
return result;
});
}
Expand Down Expand Up @@ -629,7 +630,7 @@ public void run() {
// 1. start task can be from schedule() & cronSchedule()
// 2. recheck the status of task, in case one same task
// called by both methods at same time;
HugeTask<Object> queryTask = task(this.task.id());
HugeTask<Object> queryTask = task(this.task.id(), false);
if (queryTask != null &&
!TaskStatus.NEW.equals(queryTask.status())) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.GremlinJob;
import org.apache.hugegraph.job.schema.SchemaJob;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.define.SerialEnum;
import org.apache.hugegraph.util.Blob;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -653,6 +654,11 @@ public Map<String, Object> asMap() {
}

public synchronized Map<String, Object> asMap(boolean withDetails) {
return this.asMap(withDetails, true);
}

public synchronized Map<String, Object> asMap(boolean withDetails,
boolean withResult) {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down Expand Up @@ -689,15 +695,45 @@ public synchronized Map<String, Object> asMap(boolean withDetails) {
if (this.input != null) {
map.put(Hidden.unHide(P.INPUT), this.input);
}
if (this.result != null) {
if (withResult && this.result != null) {
map.put(Hidden.unHide(P.RESULT), this.result);
}
}

return map;
}

synchronized HugeTask<V> copyWithoutResult() {
HugeTask<V> task = new HugeTask<>(this.id, this.parent, this.callable);
task.type = this.type;
task.name = this.name;
task.dependencies = this.dependencies == null ?
null : InsertionOrderUtil.newSet(this.dependencies);
task.description = this.description;
task.context = this.context;
task.create = this.create;
task.server = this.server;
task.load = this.load;
task.status = this.status;
task.progress = this.progress;
task.update = this.update;
task.retries = this.retries;
task.input = this.input;
task.result = null;
task.scheduler = this.scheduler;
return task;
}

public static <V> HugeTask<V> fromVertex(Vertex vertex) {
return fromVertex(vertex, true);
}

public static <V> HugeTask<V> fromVertex(Vertex vertex,
boolean withResult) {
if (!withResult && vertex instanceof HugeVertex) {
return fromHugeVertex((HugeVertex) vertex);
}

String callableName = vertex.value(P.CALLABLE);
TaskCallable<V> callable;
try {
Expand All @@ -710,11 +746,37 @@ public static <V> HugeTask<V> fromVertex(Vertex vertex) {
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
if (!withResult && P.RESULT.equals(prop.key())) {
continue;
}
task.property(prop.key(), prop.value());
}
return task;
}

private static <V> HugeTask<V> fromHugeVertex(HugeVertex vertex) {
String callableName = getPropertyValue(vertex, P.CALLABLE);
TaskCallable<V> callable;
try {
callable = TaskCallable.fromClass(callableName);
} catch (Exception e) {
callable = TaskCallable.empty(e);
}

HugeTask<V> task = new HugeTask<>(vertex.id(), null, callable);
for (String property : P.METADATA_PROPERTIES) {
Object value = getPropertyValue(vertex, property);
if (value != null) {
task.property(property, value);
}
}
return task;
}

private static <V> V getPropertyValue(HugeVertex vertex, String property) {
return vertex.getPropertyValue(vertex.graph().propertyKey(property).id());
}

private static <V> Collector<V, ?, Set<V>> toOrderSet() {
return Collectors.toCollection(InsertionOrderUtil::newSet);
}
Expand Down Expand Up @@ -792,6 +854,11 @@ public static final class P {
public static final String DEPENDENCIES = "~task_dependencies";
public static final String SERVER = "~task_server";

private static final String[] METADATA_PROPERTIES = new String[]{
TYPE, NAME, CALLABLE, DESCRIPTION, CONTEXT, STATUS, PROGRESS,
CREATE, UPDATE, RETRIES, DEPENDENCIES, INPUT, SERVER
};

//public static final String PARENT = hide("parent");
//public static final String CHILDREN = hide("children");

Expand Down
Loading
Loading