代码拉取完成,页面将自动刷新
package worker
import (
"errors"
"reflect"
"time"
"gitee.com/evanscat/worker/task"
)
var (
// ErrBackendNotConfigured ...
ErrBackendNotConfigured = errors.New("Result backend not configured")
// ErrTimeoutReached ...
ErrTimeoutReached = errors.New("Timeout reached")
)
// AsyncResult represents a task result
type AsyncResult struct {
Signature *task.Signature
taskState *task.TaskState
backend Backend
}
// ChordAsyncResult represents a result of a chord
type ChordAsyncResult struct {
groupAsyncResults []*AsyncResult
chordAsyncResult *AsyncResult
backend Backend
}
// ChainAsyncResult represents a result of a chain of tasks
type ChainAsyncResult struct {
asyncResults []*AsyncResult
backend Backend
}
// NewAsyncResult creates AsyncResult instance
func NewAsyncResult(signature *task.Signature, backend Backend) *AsyncResult {
return &AsyncResult{
Signature: signature,
taskState: new(task.TaskState),
backend: backend,
}
}
// NewChordAsyncResult creates ChordAsyncResult instance
func NewChordAsyncResult(groupTasks []*task.Signature, chordCallback *task.Signature, backend Backend) *ChordAsyncResult {
asyncResults := make([]*AsyncResult, len(groupTasks))
for i, task := range groupTasks {
asyncResults[i] = NewAsyncResult(task, backend)
}
return &ChordAsyncResult{
groupAsyncResults: asyncResults,
chordAsyncResult: NewAsyncResult(chordCallback, backend),
backend: backend,
}
}
// NewChainAsyncResult creates ChainAsyncResult instance
func NewChainAsyncResult(tasks []*task.Signature, backend Backend) *ChainAsyncResult {
asyncResults := make([]*AsyncResult, len(tasks))
for i, task := range tasks {
asyncResults[i] = NewAsyncResult(task, backend)
}
return &ChainAsyncResult{
asyncResults: asyncResults,
backend: backend,
}
}
// Touch the state and don't wait
func (asyncResult *AsyncResult) Touch() ([]reflect.Value, error) {
if asyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}
asyncResult.GetState()
// Purge state if we are using AMQP backend
if asyncResult.backend.IsAMQP() && asyncResult.taskState.IsCompleted() {
asyncResult.backend.PurgeState(asyncResult.taskState.TaskUUID)
}
if asyncResult.taskState.IsFailure() {
return nil, errors.New(asyncResult.taskState.Error)
}
if asyncResult.taskState.IsSuccess() {
return task.ReflectTaskResults(asyncResult.taskState.Results)
}
return nil, nil
}
// Get returns task results (synchronous blocking call)
func (asyncResult *AsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
for {
results, err := asyncResult.Touch()
if results == nil && err == nil {
time.Sleep(sleepDuration)
} else {
return results, err
}
}
}
// GetWithTimeout returns task results with a timeout (synchronous blocking call)
func (asyncResult *AsyncResult) GetWithTimeout(timeoutDuration, sleepDuration time.Duration) ([]reflect.Value, error) {
timeout := time.NewTimer(timeoutDuration)
for {
select {
case <-timeout.C:
return nil, ErrTimeoutReached
default:
results, err := asyncResult.Touch()
if results == nil && err == nil {
time.Sleep(sleepDuration)
} else {
return results, err
}
}
}
}
// GetState returns latest task state
func (asyncResult *AsyncResult) GetState() *task.TaskState {
if asyncResult.taskState.IsCompleted() {
return asyncResult.taskState
}
taskState, err := asyncResult.backend.GetState(asyncResult.Signature.UUID)
if err == nil {
asyncResult.taskState = taskState
}
return asyncResult.taskState
}
// Get returns results of a chain of tasks (synchronous blocking call)
func (chainAsyncResult *ChainAsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
if chainAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}
var (
results []reflect.Value
err error
)
for _, asyncResult := range chainAsyncResult.asyncResults {
results, err = asyncResult.Get(sleepDuration)
if err != nil {
return nil, err
}
}
return results, err
}
// Get returns result of a chord (synchronous blocking call)
func (chordAsyncResult *ChordAsyncResult) Get(sleepDuration time.Duration) ([]reflect.Value, error) {
if chordAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}
var err error
for _, asyncResult := range chordAsyncResult.groupAsyncResults {
_, err = asyncResult.Get(sleepDuration)
if err != nil {
return nil, err
}
}
return chordAsyncResult.chordAsyncResult.Get(sleepDuration)
}
// GetWithTimeout returns results of a chain of tasks with timeout (synchronous blocking call)
func (chainAsyncResult *ChainAsyncResult) GetWithTimeout(timeoutDuration, sleepDuration time.Duration) ([]reflect.Value, error) {
if chainAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}
var (
results []reflect.Value
err error
)
timeout := time.NewTimer(timeoutDuration)
ln := len(chainAsyncResult.asyncResults)
lastResult := chainAsyncResult.asyncResults[ln-1]
for {
select {
case <-timeout.C:
return nil, ErrTimeoutReached
default:
for _, asyncResult := range chainAsyncResult.asyncResults {
_, errcur := asyncResult.Touch()
if errcur != nil {
return nil, err
}
}
results, err = lastResult.Touch()
if err != nil {
return nil, err
}
if results != nil {
return results, err
}
time.Sleep(sleepDuration)
}
}
}
// GetWithTimeout returns result of a chord with a timeout (synchronous blocking call)
func (chordAsyncResult *ChordAsyncResult) GetWithTimeout(timeoutDuration, sleepDuration time.Duration) ([]reflect.Value, error) {
if chordAsyncResult.backend == nil {
return nil, ErrBackendNotConfigured
}
var (
results []reflect.Value
err error
)
timeout := time.NewTimer(timeoutDuration)
for {
select {
case <-timeout.C:
return nil, ErrTimeoutReached
default:
for _, asyncResult := range chordAsyncResult.groupAsyncResults {
_, errcur := asyncResult.Touch()
if errcur != nil {
return nil, err
}
}
results, err = chordAsyncResult.chordAsyncResult.Touch()
if err != nil {
return nil, nil
}
if results != nil {
return results, err
}
time.Sleep(sleepDuration)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。