1 Star 0 Fork 0

/flow-engine

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
engine.go 18.00 KB
一键复制 编辑 原始数据 按行查看 历史
陈昞翱 提交于 2021-08-21 15:43 . save
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
package flow_engine
import (
"fmt"
"gitee.com/whit/flow-engine/errs"
"gitee.com/whit/flow-engine/model"
jsonpatch "github.com/evanphx/json-patch/v5"
janitor "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/stores/sqlx"
"github.com/valyala/fastjson"
"strconv"
"strings"
"time"
)
const (
EvTaskCreated Event = iota // 任务被创建
EvTaskPassed // 任务通过
EvTaskFailed // 任务未通过
)
var json = janitor.ConfigCompatibleWithStandardLibrary
// Event 事件信号
type Event int
// TaskListener 任务监听器, 在任务被创建和完成时触发
type TaskListener func(event Event, taskId int64, engine *FlowEngine)
// ProcInstListener 流程实例监听器,流程实例更新时被触发
type ProcInstListener func(inst *model.ProcInst, engine *FlowEngine)
// GetUserByRoleCallback 根据角色id查询用户的回调
type GetUserByRoleCallback func(roleId string) (*User, error)
// GetLeaderByUserIdCallback 查询用户的leader
// level 表示距离用户所在组织的距离,0表示直接主管,1表示上一级主管,以此类推
type GetLeaderByUserIdCallback func(userId string, level int) (*User, error)
// FormMergeCallback 表单合并回调(会签节点结束时会调用,由用户决定不同业务表单地合并规则)
// taskKey 任务key,在定义节点的时候录入的
type FormMergeCallback func(taskKey string, forms []string) (string, error)
type User struct {
Id string
Name string
}
// FlowEngine 流程引擎
type FlowEngine struct {
c *cron.Cron // 定时任务
conf Config
conn sqlx.SqlConn
procDefModel model.ProcDefModel
procInstModel model.ProcInstModel
procExecModel model.ProcExecModel
procTaskModel model.ProcTaskModel
taskListener TaskListener
procInstListener ProcInstListener // 流程实例监听器,流程实例更新时被触发
getUserByRole GetUserByRoleCallback
getLeaderByUserId GetLeaderByUserIdCallback
formMerge FormMergeCallback
}
type Config struct {
Mysql struct {
Datasource string
}
}
// NewFlowEngine 新建一个流程引擎
func NewFlowEngine(cfg *Config,
taskListener TaskListener,
procInstListener ProcInstListener,
findUserByRoleCallback GetUserByRoleCallback,
findLeaderByUserIdCallback GetLeaderByUserIdCallback,
formMergeCallback FormMergeCallback,
) *FlowEngine {
// 根据配置连接数据库
conn := sqlx.NewMysql(cfg.Mysql.Datasource)
engine := &FlowEngine{
conn: conn,
taskListener: taskListener,
procInstListener: procInstListener,
getUserByRole: findUserByRoleCallback,
getLeaderByUserId: findLeaderByUserIdCallback,
formMerge: formMergeCallback,
procDefModel: model.NewProcDefModel(conn),
procInstModel: model.NewProcInstModel(conn),
procExecModel: model.NewProcExecModel(conn),
procTaskModel: model.NewProcTaskModel(conn),
}
engine.Start()
return engine
}
// AddProcDef 添加流程定义
func (m *FlowEngine) AddProcDef(procDef *model.ProcDef) (id int64, err error) {
if err := checkProcDefConf(procDef.Content); err != nil {
return 0, err
}
result, err := m.procDefModel.Insert(*procDef)
if err != nil {
return 0, err
}
id, err = result.LastInsertId()
if err != nil {
return 0, err
}
return id, nil
}
// UpdateProcDef 更新流程定义
func (m *FlowEngine) UpdateProcDef(procDef *model.ProcDef) error {
if err := checkProcDefConf(procDef.Content); err != nil {
return err
}
return m.procDefModel.Update(*procDef)
}
// DelProcDef 删除流程定义
func (m *FlowEngine) DelProcDef(id int64) error {
return m.procDefModel.Delete(id)
}
// FindProcDef 查询流程定义
func (m *FlowEngine) FindProcDef(id int64) (*model.ProcDef, error) {
def, err := m.procDefModel.FindOne(id)
if err != nil {
if err == model.ErrNotFound {
return nil, nil
}
return nil, err
}
return def, nil
}
func (m *FlowEngine) FindProcDefListByGroup(group string, page ...model.Page) {
//var group string
//if page != nil && len(page) > 0 {
//
//} else {
//
//}
panic("implement me")
}
// FindProcDefByGroupKey 用 group 和 key 查询一个流程定义
func (m *FlowEngine) FindProcDefByGroupKey(group string, key string) (*model.ProcDef, error) {
panic("must implement")
}
// StartProcess 开启一个流程
// 正常情况下返回流程实例id
// 本函数执行过程中会选择性调用新建引擎对象时传入的 GetUserByRoleCallback 和 GetLeaderByUserIdCallback 两个函数
// 如果 GetUserByRoleCallback 或 GetLeaderByUserIdCallback 返回err,则本函数返回值中的 err等于 GetUserByRoleCallback 或 GetLeaderByUserIdCallback 返回err
// 如果 GetUserByRoleCallback 和 GetLeaderByUserIdCallback 的两个返回值都是 nil 时,会返回 errs.ErrCanNotGetUserByRole 或 errs.ErrCanNotGetLeader 的错误
// 如果返回 errs.ErrInvalidForm 表示,表单中缺少流程定义中的条件字段
func (m *FlowEngine) StartProcess(userId string, procDefId int64, form string) (id int64, err error) {
// 查询对应的流程定义
procDef, err := m.procDefModel.FindOne(procDefId)
if err != nil {
// 如果没有对应的流程实例则新建流程失败
if err == sqlx.ErrNotFound {
return 0, errs.NewErrProcNotDef(err)
}
return 0, err
}
// 如果流程定义内容为空,返回流程未定义错误
if procDef == nil || procDef.Content == "" {
return 0, errs.NewErrProcNotDef(err)
}
// 解析流程配置 -> 生成流程实例和执行流
exec, err := m.parseProcConf(procDef.Content, userId, form)
if err != nil {
return 0, err
}
// 保存执行流和流程实例
execJson, err := json.Marshal(exec)
var tasks []model.ProcTask
if err != nil {
return 0, errs.NewErrJsonMarshal(err)
}
err = m.conn.Transact(func(session sqlx.Session) error { // 开启一个事务
// 新建流程实例
procInst := model.ProcInst{
ProcDefId: procDef.Id,
UserId: userId,
Key: procDef.Key,
Group: procDef.Group,
Title: procDef.Title,
MaxStep: int64(len(exec)) - 1,
CurrentStep: 1,
CurrentNodeTaskCount: int64(exec[1].ActorsCount),
CurrentNodeCompletedTaskCount: 0,
Status: 1,
}
r, err := m.procInstModel.Insert(procInst, session)
if err != nil {
return errs.NewErrDB(err)
}
procInst.Id, err = r.LastInsertId()
if err != nil {
return errs.NewErrDB(err)
}
// 新建执行流
procExec := model.ProcExec{
ProcInstId: procInst.Id,
NodeInfos: string(execJson),
Form: form,
Status: 1,
}
r, err = m.procExecModel.Insert(procExec, session)
if err != nil {
return errs.NewErrDB(err)
}
procExec.Id, err = r.LastInsertId()
if err != nil {
return errs.NewErrDB(err)
}
// 从执行流中拿出一个任务信息,生成一条任务记录
tasks = m.genTasks(&exec[1], 1, &procExec, &procInst)
if err != nil {
return err
}
// 保存任务记录
for i, v := range tasks {
v.ProcInstId = procInst.Id
v.ProcExecId = procExec.Id
r, err := m.procTaskModel.Insert(v)
if err != nil {
return errs.NewErrDB(err)
}
taskId, err := r.LastInsertId()
if err != nil {
return errs.NewErrDB(err)
}
tasks[i].Id = taskId
}
id = procInst.Id
return nil
})
if err != nil {
return 0, err
}
// 调用任务创建的回调
for _, task := range tasks {
go m.taskListener(EvTaskCreated, task.Id, m)
}
return id, nil
}
// parseProcConf 解析流程定义配置
// 返回执行流
func (m *FlowEngine) parseProcConf(confStr string, userId string, formJsonStr string) ([]model.ProcExecNode, error) {
form := []byte(formJsonStr)
exec := make([]model.ProcExecNode, 0)
procDefNode := model.ProcDefNode{}
if err := procDefNode.Parse(confStr); err != nil {
return nil, err
}
node := &procDefNode
// 第一个节点必须是开始节点
if node.Type != model.NtStart {
return nil, errs.NewErrInvalidProcDef(errors.New("第一个节点必须是开始节点"))
}
exec = append(exec, model.ProcExecNode{
Type: node.Type,
})
if node.NextNodes == nil || len(node.NextNodes) == 0 {
return nil, errs.NewErrInvalidProcDef(errors.New("开始节点的后续节点不能为空"))
}
node = &node.NextNodes[0]
// 解析剩余节点
for node != nil {
// 解析定义的每个节点,指定具体执行人生成执行流
if node.Type == model.NtTask { // 如果是任务节点
if node.Prop.Actors == nil || len(node.Prop.Actors) == 0 { // 如果任务节点没有处理人,就判断为异常(非法的流程定义配置)
panic(errors.New("有任务节点没有设置处理人," + confStr))
}
// 判断任务处理者类型是 主管、角色、还是用户
actors := make([]model.ProcExecActor, 0)
for _, v := range node.Prop.Actors {
var actor *User
var err error
switch v.Type {
case model.AtUser: // 如果是用户则直接把用户作为执行人
actor = &User{
Id: v.Id,
Name: v.Name,
}
break
case model.AtRole: // 如果是角色,则按角色查出执行者
actor, err = m.getUserByRole(v.Id)
if err != nil {
return nil, err
}
if actor == nil {
return nil, errs.NewErrCanNotGetUserByRole(errors.New("找不到角色对应的用户"))
}
break
case model.AtManager: // 如果是主管,则查询用户的主管
level, err := strconv.Atoi(v.Id)
if err != nil {
return nil, errs.NewErrInvalidProcDef(err)
}
actor, err = m.getLeaderByUserId(userId, level)
if err != nil {
return nil, err
}
break
default:
return nil, errs.NewErrInvalidProcDef(errors.New("未知的节点类型"))
}
if actor != nil {
actors = append(actors, model.ProcExecActor{
Id: actor.Id,
Name: actor.Name,
})
}
}
// 去重
var afterExcludeDupActors []model.ProcExecActor
if len(actors) > 1 {
afterExcludeDupActors = make([]model.ProcExecActor, 0)
for i := 0; i < len(actors); i++ {
isDup := false
for j := 0; j < len(afterExcludeDupActors); j++ {
if actors[i].Id == afterExcludeDupActors[j].Id {
isDup = true
break
}
}
if !isDup {
afterExcludeDupActors = append(afterExcludeDupActors, actors[i])
}
}
} else {
afterExcludeDupActors = actors
}
if len(afterExcludeDupActors) > 0 {
exec = append(exec, model.ProcExecNode{
Type: node.Type,
Name: node.Prop.Name,
Desc: node.Prop.Desc,
Actors: afterExcludeDupActors,
ActorsCount: len(actors),
MinNumOfPasser: len(actors),
})
}
} else if node.Type == model.NtEnd { // 如果是结束节点就结束解析
exec = append(exec, model.ProcExecNode{
Type: node.Type,
})
break
} else if node.Type == model.NtStart { // 开始节点在整个流程配置中只能出现1次,如果出现第二次就判断为异常(非法的流程定义配置)
panic(errors.New("开始节点在整个流程配置中只能出现1次"))
}
// 选择下一个节点
if node.Type == model.NtCondition { // 判断是否是条件节点
flag := false
// 遍历子节点进行条件判断。
for _, v := range node.NextNodes {
if v.Condition == nil { // 条件节点的下一个节点的进入条件为空,则判断为异常(非法的流程定义配置)
panic(errors.New("条件节点的下一个节点的进入条件不能为空"))
}
f := strings.Split(v.Condition.Field, ".")
value := fastjson.GetString(form, f...)
if value == "" {
return nil, errs.NewErrInvalidForm(errors.New("表单中找不到流程定义中用于判断的条件字段," + confStr))
}
if v.Condition.Match(value) {
node = &v
flag = true
break
}
}
if !flag { // 条件节点的下一项,所有的子节点都不满足条件则为异常(非法的流程定义配置)
panic(errors.New("条件节点的下一项,所有的子节点都不满足条件," + confStr))
}
continue
} else {
if node.NextNodes != nil && len(node.NextNodes) > 0 {
node = &node.NextNodes[0]
} else {
if node.Type != model.NtEnd { // 必须有结束节点,
return nil, errs.NewErrInvalidProcDef(errors.New("必须有结束节点"))
}
node = nil
}
}
}
if len(exec) < 3 {
return nil, errors.New("流程实例中找不到任何审批人")
}
return exec, nil
}
// genTasks 生成任务记录
func (m *FlowEngine) genTasks(execNode *model.ProcExecNode, step int64, procExec *model.ProcExec, procInst *model.ProcInst) []model.ProcTask {
tasks := make([]model.ProcTask, 0)
for _, actor := range execNode.Actors {
tasks = append(tasks, model.ProcTask{
ProcInstId: procInst.Id,
ProcExecId: procExec.Id,
ActorId: actor.Id,
Key: procInst.Key,
Group: procInst.Group,
Title: procInst.Title,
Form: procExec.Form,
Status: 1,
Step: step,
})
}
return tasks
}
// CompleteTask 完成任务
func (m *FlowEngine) CompleteTask(taskId int64, isAllowPass bool, form string) error {
// 查询任务
task, err := m.procTaskModel.FindOne(taskId)
if err != nil {
if err == sqlx.ErrNotFound {
return errs.NewErrNotFind(errors.Wrap(err, "完成任务时查询不到‘任务’"))
}
return errs.NewErrDB(err)
}
// 更改任务状态为完成
task.Status = 0
if isAllowPass {
task.Pass = 1
} else {
task.Pass = 0
}
// 查询任务所在的实例
inst, err := m.procInstModel.FindOne(task.ProcInstId)
if err != nil {
if err == sqlx.ErrNotFound {
return errs.NewErrNotFind(errors.Wrap(err, "完成任务时找不到‘任务’所属的流程实例"))
}
return errs.NewErrDB(err)
}
// 开启一个事务
if err := m.conn.Transact(func(session sqlx.Session) error {
// 保存更改状态后的任务
if err := m.procTaskModel.Update(*task, session); err != nil {
return err
}
// 给实例的当前节点任务完成数自增1
if err := m.procInstModel.IncCurrentNodeCompletedTaskCount(inst.Id, session); err != nil {
return err
}
return nil
}); err != nil {
return errs.NewErrDB(err)
}
go m.tryJump2NextNode(task.ProcInstId)
return nil
}
// tryJump2NextNode 检查流程实例是否满足流转到下个节点的条件,如果满足就执行下列操作:
// 1. 合并用户任务提交的表单(会签节点)
// 2. 标记跳转
// 3. 调用 ProcInstListener 方法
func (m *FlowEngine) tryJump2NextNode(procInstId int64) {
// 检查流程实例是否满足流转到下个节点的条件
// 查询流程实例
inst, err := m.procInstModel.FindOne(procInstId)
if err != nil {
if err == model.ErrNotFound {
logx.Error(errors.Wrap(err, "找不到流程实例"))
} else {
logx.Error(err)
}
return
}
m.tryJump2NextNode2(inst)
}
func (m *FlowEngine) tryJump2NextNode2(inst *model.ProcInst) {
// 如果满足跳转条件
if inst.CurrentNodeTaskCount > 0 && inst.CurrentNodeCompletedTaskCount == inst.CurrentNodeTaskCount {
// 查询流程执行流
exec, err := m.procExecModel.FindOne(inst.Id)
if err != nil {
if err == model.ErrNotFound {
logx.Error(errors.Wrap(err, "找不到流程执行流"))
} else {
logx.Error(err)
}
return
}
// 查询实例当前节点的任务
tasks, err := m.procTaskModel.FindAllByProcInstIdStepStatus(inst.Id, inst.CurrentStep, 0)
if err != nil {
logx.Error(err)
return
}
if len(tasks) < 1 {
logx.Error(errors.New("找不到任务"))
return
}
// 更新执行流中的表单
var form string
if inst.CurrentNodeTaskCount == 1 { // 如果是普通节点
formBytes, err := jsonpatch.MergePatch([]byte(exec.Form), []byte(tasks[0].Form))
if err != nil {
logx.Error(errors.Wrap(err, "表单合并错误"))
return
}
form = string(formBytes)
} else { // 如果是会签节点
taskForms := make([]string, 0)
for _, task := range tasks {
taskForms = append(taskForms, task.Form)
}
form, err = m.formMerge(tasks[0].Key, taskForms)
if err != nil {
logx.Error(err)
return
}
}
execForm, err := jsonpatch.MergePatch([]byte(exec.Form), []byte(form))
if err != nil {
logx.Error(err)
return
}
exec.Form = string(execForm)
// 更新流程实例的任务跳转
inst.CurrentStep = inst.CurrentStep + 1
nextNode, err := exec.GetNodeInfoByStep(int(inst.CurrentStep))
if nextNode.Type == model.NtEnd {
inst.CurrentNodeTaskCount = 0
inst.CurrentNodeCompletedTaskCount = 0
} else if nextNode.Type != model.NtTask {
logx.Error(errors.New("执行流中间出现了非任务节点"))
return
} else {
inst.CurrentNodeTaskCount = int64(nextNode.ActorsCount)
inst.CurrentNodeCompletedTaskCount = 0
}
// 开启一个事务,保存更新后的流程实例和执行流
if err := m.conn.Transact(func(session sqlx.Session) error {
if err := m.procInstModel.Update(*inst, session); err != nil {
return err
}
if err := m.procExecModel.Update(*exec, session); err != nil {
return err
}
return nil
}); err != nil {
logx.Error(err)
return
}
// 发出流程实例step更新信号
go m.procInstListener(inst, m)
}
}
// checkProcInstCanNext 检查是否有流程实例满足跳转条件但是没有跳转的,如果有就跳转
func (m *FlowEngine) checkProcInstCanNext() {
for {
insts, err := m.procInstModel.FindCanNext(10)
if err != nil {
return
}
if len(insts) == 0 {
return
}
for _, inst := range insts {
m.tryJump2NextNode2(inst)
}
}
}
// Start 启动定时任务,检查是否有流程实例满足流转条件而没有流转的(目的是给流转失败的流程实例做补偿)
func (m *FlowEngine) Start() {
if m.c != nil {
return
}
c := cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(cron.DefaultLogger)))
_, err := c.AddFunc("@every 3s", func() {
m.checkProcInstCanNext()
time.Sleep(time.Second * 10)
println("done: " + time.Now().String())
})
if err != nil {
fmt.Println(err)
}
c.Start()
m.c = c
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/whit/flow-engine.git
git@gitee.com:whit/flow-engine.git
whit
flow-engine
flow-engine
master

搜索帮助