15 Star 63 Fork 12

宏哥/mq2

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mq2.sql 4.97 KB
一键复制 编辑 原始数据 按行查看 历史
宏哥 提交于 2017-07-09 16:22 . 更新 mq2.sql
--Message queue implemented by PG
--@Author Anthony Chen
--@Copyright reserved
--2016-12-30
CREATE TABLE mq_config (
table_name name not null unique,
channel name primary key
);
CREATE TABLE mq_base (
msg_id bigserial not null,
type_id bigint not null default 0,
sent boolean not null default false ,
sent_at timestamp with time zone not null default now(),
payload jsonb not null,
delivered_at timestamp with time zone
,CONSTRAINT pk_mq_base PRIMARY KEY (msg_id)
);
--Use of this index SET "enable_seqscan = OFF" should be executed to avoid seq scan
CREATE UNIQUE INDEX idx_mq_base_not_sent
ON mq_base USING btree (msg_id,type_id) WHERE sent = false;
CREATE OR REPLACE FUNCTION mq_trigger_notify() RETURNS TRIGGER
LANGUAGE PLPGSQL AS
$$
DECLARE t_channel name;
BEGIN
SELECT channel INTO t_channel FROM mq_config
WHERE table_name = TG_RELNAME;
EXECUTE 'NOTIFY ' || quote_ident(t_channel) || ', '
|| quote_literal(NEW.msg_id);
RETURN NEW;
END;
$$;
CREATE OR REPLACE FUNCTION mq_create_queue
(in_channel text)
RETURNS mq_config
LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER AS $$
DECLARE
out_val mq_config%ROWTYPE;
t_table_name name;
BEGIN
t_table_name := 'mq_queue_' || in_channel;
INSERT INTO mq_config (table_name, channel)
VALUES (t_table_name, in_channel) returning * into out_val;
EXECUTE 'CREATE TABLE ' || quote_ident(t_table_name) || '(
like mq_base INCLUDING ALL )';
EXECUTE ' GRANT ALL ON TABLE '||quote_ident(t_table_name) || ' TO public';
EXECUTE 'CREATE TRIGGER tr_mq_notify
AFTER INSERT ON ' || quote_ident(t_table_name) || '
FOR EACH ROW EXECUTE PROCEDURE mq_trigger_notify()';
RETURN out_val;
END;
$$;
REVOKE EXECUTE ON FUNCTION mq_create_queue(text) FROM public;
CREATE OR REPLACE FUNCTION mq_drop_queue(in_channel name) RETURNS bool
LANGUAGE plpgsql VOLATILE SECURITY DEFINER AS $$
declare t_table_name name;
BEGIN
SELECT table_name INTO t_table_name FROM mq_config
WHERE channel = in_channel;
EXECUTE 'DROP TABLE ' || quote_ident(t_table_name) || ' CASCADE';
DELETE FROM mq_config WHERE channel = in_channel;
RETURN FOUND;
END;
$$;
REVOKE EXECUTE ON FUNCTION mq_drop_queue(in_channel name) FROM public;
CREATE OR REPLACE FUNCTION mq_send
(in_channel text, in_payload jsonb,in_type bigint DEFAULT 0 )
RETURNS mq_base
LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
out_val mq_base%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
IF NOT FOUND THEN
RAISE EXCEPTION 'Channel Not Found';
END IF;
EXECUTE 'INSERT INTO ' || quote_ident(channel_entry.table_name)
|| ' (type_id, payload) VALUES ( '
|| in_type ||','
|| quote_literal(in_payload) || '::jsonb )
RETURNING msg_id,type_id,sent, sent_at,payload, delivered_at'
INTO out_val ;
RETURN out_val;
END;
$$;
CREATE OR REPLACE FUNCTION mq_recv(
in_channel name, --Channel Name
in_num_msgs bigint DEFAULT 1, --Number of message to get
in_type_id bigint DEFAULT NULL::bigint --Message type id
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
IF in_type_id is NULL THEN
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED )
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
ELSE
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false and type_id= $e$ || in_type_id || $e$
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED
)
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
END IF;
END;
$$;
COMMENT ON FUNCTION mq_send(text, jsonb, bigint) IS 'SET enable_seqscan = OFF; --TO BOOST PERFORMANCE--';
CREATE OR REPLACE FUNCTION mq_recv(
in_channel name, --Channel Name
in_type_rs text, --Message type id
in_num_msgs bigint DEFAULT 1 --Number of message to get
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
BEGIN
SELECT * INTO channel_entry FROM mq_config
WHERE channel = in_channel;
RETURN QUERY EXECUTE
$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
SET delivered_at = now(),sent=true
WHERE msg_id IN (SELECT msg_id
FROM $e$ || quote_ident(channel_entry.table_name) ||
$e$ WHERE sent=false and type_id in $e$ || in_type_rs || $e$
LIMIT $e$ || in_num_msgs || $e$
FOR UPDATE SKIP LOCKED
)
RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
END;
$$;
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
消息服务器/消息队列
1
https://gitee.com/brh/mq2.git
git@gitee.com:brh/mq2.git
brh
mq2
mq2
master

搜索帮助