22 Star 36 Fork 15

蔡东赟/beanstalkd-win

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
prot.c 54.09 KB
一键复制 编辑 原始数据 按行查看 历史
zhangjiazhu 提交于 2015-07-27 18:34 . iocp 模型的 beanstalkd ~~~
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136
#ifdef WIN32
# include "dat_w32.h"
#else
# include <sys/socket.h>
# include <sys/utsname.h>
# include <netinet/in.h>
# include <sys/resource.h>
#endif
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <inttypes.h>
#include <stdarg.h>
#include "dat.h"
/* job body cannot be greater than this many bytes long */
size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define NAME_CHARS \
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" \
"abcdefghijklmnopqrstuvwxyz" \
"0123456789-+/;.$_()"
#define CMD_PUT "put "
#define CMD_PEEKJOB "peek "
#define CMD_PEEK_READY "peek-ready"
#define CMD_PEEK_DELAYED "peek-delayed"
#define CMD_PEEK_BURIED "peek-buried"
#define CMD_RESERVE "reserve"
#define CMD_RESERVE_TIMEOUT "reserve-with-timeout "
#define CMD_DELETE "delete "
#define CMD_RELEASE "release "
#define CMD_BURY "bury "
#define CMD_KICK "kick "
#define CMD_JOBKICK "kick-job "
#define CMD_TOUCH "touch "
#define CMD_STATS "stats"
#define CMD_JOBSTATS "stats-job "
#define CMD_USE "use "
#define CMD_WATCH "watch "
#define CMD_IGNORE "ignore "
#define CMD_LIST_TUBES "list-tubes"
#define CMD_LIST_TUBE_USED "list-tube-used"
#define CMD_LIST_TUBES_WATCHED "list-tubes-watched"
#define CMD_STATS_TUBE "stats-tube "
#define CMD_QUIT "quit"
#define CMD_PAUSE_TUBE "pause-tube"
#define CONSTSTRLEN(m) (sizeof(m) - 1)
#define CMD_PEEK_READY_LEN CONSTSTRLEN(CMD_PEEK_READY)
#define CMD_PEEK_DELAYED_LEN CONSTSTRLEN(CMD_PEEK_DELAYED)
#define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED)
#define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB)
#define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE)
#define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT)
#define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE)
#define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE)
#define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY)
#define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
#define CMD_JOBKICK_LEN CONSTSTRLEN(CMD_JOBKICK)
#define CMD_TOUCH_LEN CONSTSTRLEN(CMD_TOUCH)
#define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
#define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
#define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
#define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
#define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
#define CMD_LIST_TUBES_LEN CONSTSTRLEN(CMD_LIST_TUBES)
#define CMD_LIST_TUBE_USED_LEN CONSTSTRLEN(CMD_LIST_TUBE_USED)
#define CMD_LIST_TUBES_WATCHED_LEN CONSTSTRLEN(CMD_LIST_TUBES_WATCHED)
#define CMD_STATS_TUBE_LEN CONSTSTRLEN(CMD_STATS_TUBE)
#define CMD_PAUSE_TUBE_LEN CONSTSTRLEN(CMD_PAUSE_TUBE)
#define MSG_FOUND "FOUND"
#define MSG_NOTFOUND "NOT_FOUND\r\n"
#define MSG_RESERVED "RESERVED"
#define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n"
#define MSG_TIMED_OUT "TIMED_OUT\r\n"
#define MSG_DELETED "DELETED\r\n"
#define MSG_RELEASED "RELEASED\r\n"
#define MSG_BURIED "BURIED\r\n"
#define MSG_KICKED "KICKED\r\n"
#define MSG_TOUCHED "TOUCHED\r\n"
#define MSG_BURIED_FMT "BURIED %"PRIu64"\r\n"
#define MSG_INSERTED_FMT "INSERTED %"PRIu64"\r\n"
#define MSG_NOT_IGNORED "NOT_IGNORED\r\n"
#define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
#define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
#define MSG_TOUCHED_LEN CONSTSTRLEN(MSG_TOUCHED)
#define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
#define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
#define MSG_KICKED_LEN CONSTSTRLEN(MSG_KICKED)
#define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
#define MSG_OUT_OF_MEMORY "OUT_OF_MEMORY\r\n"
#define MSG_INTERNAL_ERROR "INTERNAL_ERROR\r\n"
#define MSG_DRAINING "DRAINING\r\n"
#define MSG_BAD_FORMAT "BAD_FORMAT\r\n"
#define MSG_UNKNOWN_COMMAND "UNKNOWN_COMMAND\r\n"
#define MSG_EXPECTED_CRLF "EXPECTED_CRLF\r\n"
#define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"
#define STATE_WANTCOMMAND 0
#define STATE_WANTDATA 1
#define STATE_SENDJOB 2
#define STATE_SENDWORD 3
#define STATE_WAIT 4
#define STATE_BITBUCKET 5
#define STATE_CLOSE 6
#define OP_UNKNOWN 0
#define OP_PUT 1
#define OP_PEEKJOB 2
#define OP_RESERVE 3
#define OP_DELETE 4
#define OP_RELEASE 5
#define OP_BURY 6
#define OP_KICK 7
#define OP_STATS 8
#define OP_JOBSTATS 9
#define OP_PEEK_BURIED 10
#define OP_USE 11
#define OP_WATCH 12
#define OP_IGNORE 13
#define OP_LIST_TUBES 14
#define OP_LIST_TUBE_USED 15
#define OP_LIST_TUBES_WATCHED 16
#define OP_STATS_TUBE 17
#define OP_PEEK_READY 18
#define OP_PEEK_DELAYED 19
#define OP_RESERVE_TIMEOUT 20
#define OP_TOUCH 21
#define OP_QUIT 22
#define OP_PAUSE_TUBE 23
#define OP_JOBKICK 24
#define TOTAL_OPS 25
#define STATS_FMT "---\n" \
"current-jobs-urgent: %u\n" \
"current-jobs-ready: %u\n" \
"current-jobs-reserved: %u\n" \
"current-jobs-delayed: %u\n" \
"current-jobs-buried: %u\n" \
"cmd-put: %" PRIu64 "\n" \
"cmd-peek: %" PRIu64 "\n" \
"cmd-peek-ready: %" PRIu64 "\n" \
"cmd-peek-delayed: %" PRIu64 "\n" \
"cmd-peek-buried: %" PRIu64 "\n" \
"cmd-reserve: %" PRIu64 "\n" \
"cmd-reserve-with-timeout: %" PRIu64 "\n" \
"cmd-delete: %" PRIu64 "\n" \
"cmd-release: %" PRIu64 "\n" \
"cmd-use: %" PRIu64 "\n" \
"cmd-watch: %" PRIu64 "\n" \
"cmd-ignore: %" PRIu64 "\n" \
"cmd-bury: %" PRIu64 "\n" \
"cmd-kick: %" PRIu64 "\n" \
"cmd-touch: %" PRIu64 "\n" \
"cmd-stats: %" PRIu64 "\n" \
"cmd-stats-job: %" PRIu64 "\n" \
"cmd-stats-tube: %" PRIu64 "\n" \
"cmd-list-tubes: %" PRIu64 "\n" \
"cmd-list-tube-used: %" PRIu64 "\n" \
"cmd-list-tubes-watched: %" PRIu64 "\n" \
"cmd-pause-tube: %" PRIu64 "\n" \
"job-timeouts: %" PRIu64 "\n" \
"total-jobs: %" PRIu64 "\n" \
"max-job-size: %"PRIu32"\n" \
"current-tubes: %"PRIu32"\n" \
"current-connections: %u\n" \
"current-producers: %u\n" \
"current-workers: %u\n" \
"current-waiting: %u\n" \
"total-connections: %u\n" \
"pid: %ld\n" \
"version: %s\n" \
"rusage-utime: %d.%06d\n" \
"rusage-stime: %d.%06d\n" \
"uptime: %u\n" \
"binlog-oldest-index: %d\n" \
"binlog-current-index: %d\n" \
"binlog-records-migrated: %" PRId64 "\n" \
"binlog-records-written: %" PRId64 "\n" \
"binlog-max-size: %d\n" \
"id: %s\n" \
"hostname: %s\n" \
"\r\n"
#define STATS_TUBE_FMT "---\n" \
"name: %s\n" \
"current-jobs-urgent: %u\n" \
"current-jobs-ready: %u\n" \
"current-jobs-reserved: %u\n" \
"current-jobs-delayed: %u\n" \
"current-jobs-buried: %u\n" \
"total-jobs: %" PRIu64 "\n" \
"current-using: %u\n" \
"current-watching: %u\n" \
"current-waiting: %u\n" \
"cmd-delete: %" PRIu64 "\n" \
"cmd-pause-tube: %u\n" \
"pause: %" PRIu64 "\n" \
"pause-time-left: %" PRId64 "\n" \
"\r\n"
#define STATS_JOB_FMT "---\n" \
"id: %" PRIu64 "\n" \
"tube: %s\n" \
"state: %s\n" \
"pri: %u\n" \
"age: %" PRId64 "\n" \
"delay: %" PRId64 "\n" \
"ttr: %" PRId64 "\n" \
"time-left: %" PRId64 "\n" \
"file: %d\n" \
"reserves: %u\n" \
"timeouts: %u\n" \
"releases: %u\n" \
"buries: %u\n" \
"kicks: %u\n" \
"\r\n"
/* this number is pretty arbitrary */
#define BUCKET_BUF_SIZE 1024
static char bucket[BUCKET_BUF_SIZE];
static uint ready_ct = 0;
static struct stats global_stat = {0, 0, 0, 0, 0};
static tube default_tube;
static int drain_mode = 0;
static int64 started_at;
enum {
NumIdBytes = 8
};
static char id[NumIdBytes * 2 + 1]; // hex-encoded len of NumIdBytes
#ifndef WIN32
static struct utsname node_info;
#endif
static uint64 op_ct[TOTAL_OPS], timeout_ct = 0;
static Conn *dirty;
static const char * op_names[] = {
"<unknown>",
CMD_PUT,
CMD_PEEKJOB,
CMD_RESERVE,
CMD_DELETE,
CMD_RELEASE,
CMD_BURY,
CMD_KICK,
CMD_STATS,
CMD_JOBSTATS,
CMD_PEEK_BURIED,
CMD_USE,
CMD_WATCH,
CMD_IGNORE,
CMD_LIST_TUBES,
CMD_LIST_TUBE_USED,
CMD_LIST_TUBES_WATCHED,
CMD_STATS_TUBE,
CMD_PEEK_READY,
CMD_PEEK_DELAYED,
CMD_RESERVE_TIMEOUT,
CMD_TOUCH,
CMD_QUIT,
CMD_PAUSE_TUBE,
CMD_JOBKICK,
};
static job remove_buried_job(job j);
static int
buried_job_p(tube t)
{
return job_list_any_p(&t->buried);
}
static void
reply(Conn *c, char *line, int len, int state)
{
if (!c) return;
connwant(c, 'w');
c->next = dirty;
dirty = c;
c->reply = line;
c->reply_len = len;
c->reply_sent = 0;
c->state = state;
if (verbose >= 2) {
printf(">%d reply %.*s\n", c->sock.fd, len-2, line);
}
}
static void
protrmdirty(Conn *c)
{
Conn *x, *newdirty = NULL;
while (dirty) {
x = dirty;
dirty = dirty->next;
x->next = NULL;
if (x != c) {
x->next = newdirty;
newdirty = x;
}
}
dirty = newdirty;
}
#define reply_msg(c,m) reply((c),(m),CONSTSTRLEN(m),STATE_SENDWORD)
#define reply_serr(c,e) (twarnx("server error: %s",(e)),\
reply_msg((c),(e)))
static void
reply_line(Conn*, int, const char*, ...)
__attribute__((format(printf, 3, 4)));
static void
reply_line(Conn *c, int state, const char *fmt, ...)
{
int r;
va_list ap;
va_start(ap, fmt);
r = vsnprintf(c->reply_buf, LINE_BUF_SIZE, fmt, ap);
va_end(ap);
/* Make sure the buffer was big enough. If not, we have a bug. */
if (r >= LINE_BUF_SIZE) return reply_serr(c, MSG_INTERNAL_ERROR);
return reply(c, c->reply_buf, r, state);
}
static void
reply_job(Conn *c, job j, const char *word)
{
/* tell this connection which job to send */
c->out_job = j;
c->out_job_sent = 0;
return reply_line(c, STATE_SENDJOB, "%s %"PRIu64" %u\r\n",
word, j->r.id, j->r.body_size - 2);
}
Conn *
remove_waiting_conn(Conn *c)
{
tube t;
size_t i;
if (!conn_waiting(c)) return NULL;
c->type &= ~CONN_TYPE_WAITING;
global_stat.waiting_ct--;
for (i = 0; i < c->watch.used; i++) {
t = c->watch.items[i];
t->stat.waiting_ct--;
ms_remove(&t->waiting, c);
}
return c;
}
static void
reserve_job(Conn *c, job j)
{
j->r.deadline_at = nanoseconds() + j->r.ttr;
global_stat.reserved_ct++; /* stats */
j->tube->stat.reserved_ct++;
j->r.reserve_ct++;
j->r.state = Reserved;
job_insert(&c->reserved_jobs, j);
j->reserver = c;
c->pending_timeout = -1;
if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) {
c->soonest_job = j;
}
return reply_job(c, j, MSG_RESERVED);
}
static job
next_eligible_job(int64 now)
{
tube t;
size_t i;
job j = NULL, candidate;
for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
if (t->pause) {
if (t->deadline_at > now) continue;
t->pause = 0;
}
if (t->waiting.used && t->ready.len) {
candidate = t->ready.data[0];
if (!j || job_pri_less(candidate, j)) {
j = candidate;
}
}
}
return j;
}
static void
process_queue()
{
job j;
int64 now = nanoseconds();
while ((j = next_eligible_job(now))) {
heapremove(&j->tube->ready, j->heap_index);
ready_ct--;
if (j->r.pri < URGENT_THRESHOLD) {
global_stat.urgent_ct--;
j->tube->stat.urgent_ct--;
}
reserve_job(remove_waiting_conn(ms_take(&j->tube->waiting)), j);
}
}
static job
delay_q_peek()
{
int i;
tube t;
job j = NULL, nj;
for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
if (t->delay.len == 0) {
continue;
}
nj = t->delay.data[0];
if (!j || nj->r.deadline_at < j->r.deadline_at) j = nj;
}
return j;
}
static int
enqueue_job(Server *s, job j, int64 delay, char update_store)
{
int r;
j->reserver = NULL;
if (delay) {
j->r.deadline_at = nanoseconds() + delay;
r = heapinsert(&j->tube->delay, j);
if (!r) return 0;
j->r.state = Delayed;
} else {
r = heapinsert(&j->tube->ready, j);
if (!r) return 0;
j->r.state = Ready;
ready_ct++;
if (j->r.pri < URGENT_THRESHOLD) {
global_stat.urgent_ct++;
j->tube->stat.urgent_ct++;
}
}
if (update_store) {
if (!walwrite(&s->wal, j)) {
return 0;
}
walmaint(&s->wal);
}
process_queue();
return 1;
}
static int
bury_job(Server *s, job j, char update_store)
{
int z;
if (update_store) {
z = walresvupdate(&s->wal, j);
if (!z) return 0;
j->walresv += z;
}
job_insert(&j->tube->buried, j);
global_stat.buried_ct++;
j->tube->stat.buried_ct++;
j->r.state = Buried;
j->reserver = NULL;
j->r.bury_ct++;
if (update_store) {
if (!walwrite(&s->wal, j)) {
return 0;
}
walmaint(&s->wal);
}
return 1;
}
void
enqueue_reserved_jobs(Conn *c)
{
int r;
job j;
while (job_list_any_p(&c->reserved_jobs)) {
j = job_remove(c->reserved_jobs.next);
r = enqueue_job(c->srv, j, 0, 0);
if (r < 1) bury_job(c->srv, j, 0);
global_stat.reserved_ct--;
j->tube->stat.reserved_ct--;
c->soonest_job = NULL;
}
}
static job
delay_q_take()
{
job j = delay_q_peek();
if (!j) {
return 0;
}
heapremove(&j->tube->delay, j->heap_index);
return j;
}
static int
kick_buried_job(Server *s, job j)
{
int r;
int z;
z = walresvupdate(&s->wal, j);
if (!z) return 0;
j->walresv += z;
remove_buried_job(j);
j->r.kick_ct++;
r = enqueue_job(s, j, 0, 1);
if (r == 1) return 1;
/* ready queue is full, so bury it */
bury_job(s, j, 0);
return 0;
}
static uint
get_delayed_job_ct()
{
tube t;
size_t i;
uint count = 0;
for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
count += t->delay.len;
}
return count;
}
static int
kick_delayed_job(Server *s, job j)
{
int r;
int z;
z = walresvupdate(&s->wal, j);
if (!z) return 0;
j->walresv += z;
heapremove(&j->tube->delay, j->heap_index);
j->r.kick_ct++;
r = enqueue_job(s, j, 0, 1);
if (r == 1) return 1;
/* ready queue is full, so delay it again */
r = enqueue_job(s, j, j->r.delay, 0);
if (r == 1) return 0;
/* last resort */
bury_job(s, j, 0);
return 0;
}
/* return the number of jobs successfully kicked */
static uint
kick_buried_jobs(Server *s, tube t, uint n)
{
uint i;
for (i = 0; (i < n) && buried_job_p(t); ++i) {
kick_buried_job(s, t->buried.next);
}
return i;
}
/* return the number of jobs successfully kicked */
static uint
kick_delayed_jobs(Server *s, tube t, uint n)
{
uint i;
for (i = 0; (i < n) && (t->delay.len > 0); ++i) {
kick_delayed_job(s, (job)t->delay.data[0]);
}
return i;
}
static uint
kick_jobs(Server *s, tube t, uint n)
{
if (buried_job_p(t)) return kick_buried_jobs(s, t, n);
return kick_delayed_jobs(s, t, n);
}
static job
remove_buried_job(job j)
{
if (!j || j->r.state != Buried) return NULL;
j = job_remove(j);
if (j) {
global_stat.buried_ct--;
j->tube->stat.buried_ct--;
}
return j;
}
static job
remove_delayed_job(job j)
{
if (!j || j->r.state != Delayed) return NULL;
heapremove(&j->tube->delay, j->heap_index);
return j;
}
static job
remove_ready_job(job j)
{
if (!j || j->r.state != Ready) return NULL;
heapremove(&j->tube->ready, j->heap_index);
ready_ct--;
if (j->r.pri < URGENT_THRESHOLD) {
global_stat.urgent_ct--;
j->tube->stat.urgent_ct--;
}
return j;
}
static void
enqueue_waiting_conn(Conn *c)
{
tube t;
size_t i;
global_stat.waiting_ct++;
c->type |= CONN_TYPE_WAITING;
for (i = 0; i < c->watch.used; i++) {
t = c->watch.items[i];
t->stat.waiting_ct++;
ms_append(&t->waiting, c);
}
}
static job
find_reserved_job_in_conn(Conn *c, job j)
{
return (j && j->reserver == c && j->r.state == Reserved) ? j : NULL;
}
static job
touch_job(Conn *c, job j)
{
j = find_reserved_job_in_conn(c, j);
if (j) {
j->r.deadline_at = nanoseconds() + j->r.ttr;
c->soonest_job = NULL;
}
return j;
}
static job
peek_job(uint64 id)
{
return job_find(id);
}
static void
check_err(Conn *c, const char *s)
{
if (errno == EAGAIN) return;
if (errno == EINTR) return;
if (errno == EWOULDBLOCK) return;
twarn("%s", s);
c->state = STATE_CLOSE;
return;
}
/* Scan the given string for the sequence "\r\n" and return the line length.
* Always returns at least 2 if a match is found. Returns 0 if no match. */
static int
scan_line_end(const char *s, int size)
{
char *match;
match = memchr(s, '\r', size - 1);
if (!match) return 0;
/* this is safe because we only scan size - 1 chars above */
if (match[1] == '\n') return match - s + 2;
return 0;
}
static int
cmd_len(Conn *c)
{
return scan_line_end(c->cmd, c->cmd_read);
}
/* parse the command line */
static int
which_cmd(Conn *c)
{
#define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED);
TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED);
TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT);
TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE);
TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE);
TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE);
TEST_CMD(c->cmd, CMD_BURY, OP_BURY);
TEST_CMD(c->cmd, CMD_KICK, OP_KICK);
TEST_CMD(c->cmd, CMD_JOBKICK, OP_JOBKICK);
TEST_CMD(c->cmd, CMD_TOUCH, OP_TOUCH);
TEST_CMD(c->cmd, CMD_JOBSTATS, OP_JOBSTATS);
TEST_CMD(c->cmd, CMD_STATS_TUBE, OP_STATS_TUBE);
TEST_CMD(c->cmd, CMD_STATS, OP_STATS);
TEST_CMD(c->cmd, CMD_USE, OP_USE);
TEST_CMD(c->cmd, CMD_WATCH, OP_WATCH);
TEST_CMD(c->cmd, CMD_IGNORE, OP_IGNORE);
TEST_CMD(c->cmd, CMD_LIST_TUBES_WATCHED, OP_LIST_TUBES_WATCHED);
TEST_CMD(c->cmd, CMD_LIST_TUBE_USED, OP_LIST_TUBE_USED);
TEST_CMD(c->cmd, CMD_LIST_TUBES, OP_LIST_TUBES);
TEST_CMD(c->cmd, CMD_QUIT, OP_QUIT);
TEST_CMD(c->cmd, CMD_PAUSE_TUBE, OP_PAUSE_TUBE);
return OP_UNKNOWN;
}
/* Copy up to body_size trailing bytes into the job, then the rest into the cmd
* buffer. If c->in_job exists, this assumes that c->in_job->body is empty.
* This function is idempotent(). */
static void
fill_extra_data(Conn *c)
{
int extra_bytes, job_data_bytes = 0, cmd_bytes;
if (!c->sock.fd) return; /* the connection was closed */
if (!c->cmd_len) return; /* we don't have a complete command */
/* how many extra bytes did we read? */
extra_bytes = c->cmd_read - c->cmd_len;
/* how many bytes should we put into the job body? */
if (c->in_job) {
job_data_bytes = min(extra_bytes, c->in_job->r.body_size);
memcpy(c->in_job->body, c->cmd + c->cmd_len, job_data_bytes);
c->in_job_read = job_data_bytes;
} else if (c->in_job_read) {
/* we are in bit-bucket mode, throwing away data */
job_data_bytes = min(extra_bytes, c->in_job_read);
c->in_job_read -= job_data_bytes;
}
/* how many bytes are left to go into the future cmd? */
cmd_bytes = extra_bytes - job_data_bytes;
memmove(c->cmd, c->cmd + c->cmd_len + job_data_bytes, cmd_bytes);
c->cmd_read = cmd_bytes;
c->cmd_len = 0; /* we no longer know the length of the new command */
}
static void
_skip(Conn *c, int n, char *line, int len)
{
/* Invert the meaning of in_job_read while throwing away data -- it
* counts the bytes that remain to be thrown away. */
c->in_job = 0;
c->in_job_read = n;
fill_extra_data(c);
if (c->in_job_read == 0) return reply(c, line, len, STATE_SENDWORD);
c->reply = line;
c->reply_len = len;
c->reply_sent = 0;
c->state = STATE_BITBUCKET;
return;
}
#define skip(c,n,m) (_skip(c,n,m,CONSTSTRLEN(m)))
static void
enqueue_incoming_job(Conn *c)
{
int r;
job j = c->in_job;
c->in_job = NULL; /* the connection no longer owns this job */
c->in_job_read = 0;
/* check if the trailer is present and correct */
if (memcmp(j->body + j->r.body_size - 2, "\r\n", 2)) {
job_free(j);
return reply_msg(c, MSG_EXPECTED_CRLF);
}
if (verbose >= 2) {
printf("<%d job %"PRIu64"\n", c->sock.fd, j->r.id);
}
if (drain_mode) {
job_free(j);
return reply_serr(c, MSG_DRAINING);
}
if (j->walresv) return reply_serr(c, MSG_INTERNAL_ERROR);
j->walresv = walresvput(&c->srv->wal, j);
if (!j->walresv) return reply_serr(c, MSG_OUT_OF_MEMORY);
/* we have a complete job, so let's stick it in the pqueue */
r = enqueue_job(c->srv, j, j->r.delay, 1);
if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
global_stat.total_jobs_ct++;
j->tube->stat.total_jobs_ct++;
if (r == 1) return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->r.id);
/* out of memory trying to grow the queue, so it gets buried */
bury_job(c->srv, j, 0);
reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
}
static uint
uptime()
{
return (nanoseconds() - started_at) / 1000000000;
}
static int
fmt_stats(char *buf, size_t size, void *x)
{
int whead = 0, wcur = 0;
Server *srv;
#ifndef WIN32
struct rusage ru = {{0, 0}, {0, 0}};
#endif
srv = x;
if (srv->wal.head) {
whead = srv->wal.head->seq;
}
if (srv->wal.cur) {
wcur = srv->wal.cur->seq;
}
#ifndef WIN32
getrusage(RUSAGE_SELF, &ru); /* don't care if it fails */
#endif
return snprintf(buf, size, STATS_FMT,
global_stat.urgent_ct,
ready_ct,
global_stat.reserved_ct,
get_delayed_job_ct(),
global_stat.buried_ct,
op_ct[OP_PUT],
op_ct[OP_PEEKJOB],
op_ct[OP_PEEK_READY],
op_ct[OP_PEEK_DELAYED],
op_ct[OP_PEEK_BURIED],
op_ct[OP_RESERVE],
op_ct[OP_RESERVE_TIMEOUT],
op_ct[OP_DELETE],
op_ct[OP_RELEASE],
op_ct[OP_USE],
op_ct[OP_WATCH],
op_ct[OP_IGNORE],
op_ct[OP_BURY],
op_ct[OP_KICK],
op_ct[OP_TOUCH],
op_ct[OP_STATS],
op_ct[OP_JOBSTATS],
op_ct[OP_STATS_TUBE],
op_ct[OP_LIST_TUBES],
op_ct[OP_LIST_TUBE_USED],
op_ct[OP_LIST_TUBES_WATCHED],
op_ct[OP_PAUSE_TUBE],
timeout_ct,
global_stat.total_jobs_ct,
job_data_size_limit,
tubes.used,
count_cur_conns(),
count_cur_producers(),
count_cur_workers(),
global_stat.waiting_ct,
count_tot_conns(),
(long) getpid(),
version,
#ifndef WIN32
(int) ru.ru_utime.tv_sec, (int) ru.ru_utime.tv_usec,
(int) ru.ru_stime.tv_sec, (int) ru.ru_stime.tv_usec,
#else
0,
0,
#endif
uptime(),
whead,
wcur,
srv->wal.nmig,
srv->wal.nrec,
srv->wal.filesize,
#ifndef WIN32
id,
node_info.nodename
#else
"null",
"null"
#endif
);
}
/* Read a priority value from the given buffer and place it in pri.
* Update end to point to the address after the last character consumed.
* Pri and end can be NULL. If they are both NULL, read_pri() will do the
* conversion and return the status code but not update any values. This is an
* easy way to check for errors.
* If end is NULL, read_pri will also check that the entire input string was
* consumed and return an error code otherwise.
* Return 0 on success, or nonzero on failure.
* If a failure occurs, pri and end are not modified. */
static int
read_pri(uint *pri, const char *buf, char **end)
{
char *tend;
uint tpri;
errno = 0;
while (buf[0] == ' ') buf++;
if (buf[0] < '0' || '9' < buf[0]) return -1;
tpri = strtoul(buf, &tend, 10);
if (tend == buf) return -1;
if (errno && errno != ERANGE) return -1;
if (!end && tend[0] != '\0') return -1;
if (pri) *pri = tpri;
if (end) *end = tend;
return 0;
}
/* Read a delay value from the given buffer and place it in delay.
* The interface and behavior are analogous to read_pri(). */
static int
read_delay(int64 *delay, const char *buf, char **end)
{
int r;
uint delay_sec;
r = read_pri(&delay_sec, buf, end);
if (r) return r;
*delay = ((int64) delay_sec) * 1000000000;
return 0;
}
/* Read a timeout value from the given buffer and place it in ttr.
* The interface and behavior are the same as in read_delay(). */
static int
read_ttr(int64 *ttr, const char *buf, char **end)
{
return read_delay(ttr, buf, end);
}
/* Read a tube name from the given buffer moving the buffer to the name start */
static int
read_tube_name(char **tubename, char *buf, char **end)
{
size_t len;
while (buf[0] == ' ') buf++;
len = strspn(buf, NAME_CHARS);
if (len == 0) return -1;
if (tubename) *tubename = buf;
if (end) *end = buf + len;
return 0;
}
static void
wait_for_job(Conn *c, int timeout)
{
c->state = STATE_WAIT;
enqueue_waiting_conn(c);
/* Set the pending timeout to the requested timeout amount */
c->pending_timeout = timeout;
connwant(c, 'h'); // only care if they hang up
c->next = dirty;
dirty = c;
}
typedef int(*fmt_fn)(char *, size_t, void *);
static void
do_stats(Conn *c, fmt_fn fmt, void *data)
{
int r, stats_len;
/* first, measure how big a buffer we will need */
stats_len = fmt(NULL, 0, data) + 16;
c->out_job = allocate_job(stats_len); /* fake job to hold stats data */
if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
/* Mark this job as a copy so it can be appropriately freed later on */
c->out_job->r.state = Copy;
/* now actually format the stats data */
r = fmt(c->out_job->body, stats_len, data);
/* and set the actual body size */
c->out_job->r.body_size = r;
if (r > stats_len) return reply_serr(c, MSG_INTERNAL_ERROR);
c->out_job_sent = 0;
return reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
}
static void
do_list_tubes(Conn *c, ms l)
{
char *buf;
tube t;
size_t i, resp_z;
/* first, measure how big a buffer we will need */
resp_z = 6; /* initial "---\n" and final "\r\n" */
for (i = 0; i < l->used; i++) {
t = l->items[i];
resp_z += 3 + strlen(t->name); /* including "- " and "\n" */
}
c->out_job = allocate_job(resp_z); /* fake job to hold response data */
if (!c->out_job) return reply_serr(c, MSG_OUT_OF_MEMORY);
/* Mark this job as a copy so it can be appropriately freed later on */
c->out_job->r.state = Copy;
/* now actually format the response */
buf = c->out_job->body;
buf += snprintf(buf, 5, "---\n");
for (i = 0; i < l->used; i++) {
t = l->items[i];
buf += snprintf(buf, 4 + strlen(t->name), "- %s\n", t->name);
}
buf[0] = '\r';
buf[1] = '\n';
c->out_job_sent = 0;
return reply_line(c, STATE_SENDJOB, "OK %"PRIu32"\r\n", resp_z - 2);
}
static int
fmt_job_stats(char *buf, size_t size, job j)
{
int64 t;
int64 time_left;
int file = 0;
t = nanoseconds();
if (j->r.state == Reserved || j->r.state == Delayed) {
time_left = (j->r.deadline_at - t) / 1000000000;
} else {
time_left = 0;
}
if (j->file) {
file = j->file->seq;
}
return snprintf(buf, size, STATS_JOB_FMT,
j->r.id,
j->tube->name,
job_state(j),
j->r.pri,
(t - j->r.created_at) / 1000000000,
j->r.delay / 1000000000,
j->r.ttr / 1000000000,
time_left,
file,
j->r.reserve_ct,
j->r.timeout_ct,
j->r.release_ct,
j->r.bury_ct,
j->r.kick_ct);
}
static int
fmt_stats_tube(char *buf, size_t size, tube t)
{
uint64 time_left;
if (t->pause > 0) {
time_left = (t->deadline_at - nanoseconds()) / 1000000000;
} else {
time_left = 0;
}
return snprintf(buf, size, STATS_TUBE_FMT,
t->name,
t->stat.urgent_ct,
t->ready.len,
t->stat.reserved_ct,
t->delay.len,
t->stat.buried_ct,
t->stat.total_jobs_ct,
t->using_ct,
t->watching_ct,
t->stat.waiting_ct,
t->stat.total_delete_ct,
t->stat.pause_ct,
t->pause / 1000000000,
time_left);
}
static void
maybe_enqueue_incoming_job(Conn *c)
{
job j = c->in_job;
/* do we have a complete job? */
if (c->in_job_read == j->r.body_size) return enqueue_incoming_job(c);
/* otherwise we have incomplete data, so just keep waiting */
c->state = STATE_WANTDATA;
}
/* j can be NULL */
static job
remove_this_reserved_job(Conn *c, job j)
{
j = job_remove(j);
if (j) {
global_stat.reserved_ct--;
j->tube->stat.reserved_ct--;
j->reserver = NULL;
}
c->soonest_job = NULL;
return j;
}
static job
remove_reserved_job(Conn *c, job j)
{
return remove_this_reserved_job(c, find_reserved_job_in_conn(c, j));
}
static int
name_is_ok(const char *name, size_t max)
{
size_t len = strlen(name);
return len > 0 && len <= max &&
strspn(name, NAME_CHARS) == len && name[0] != '-';
}
void
prot_remove_tube(tube t)
{
ms_remove(&tubes, t);
}
static void
dispatch_cmd(Conn *c)
{
int r, i, timeout = -1;
int z;
uint count;
job j = 0, k = 0;
byte type;
char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
uint pri, body_size;
int64 delay, ttr;
uint64 id;
tube t = NULL;
/* NUL-terminate this string so we can use strtol and friends */
c->cmd[c->cmd_len - 2] = '\0';
/* check for possible maliciousness */
if (strlen(c->cmd) != c->cmd_len - 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
type = which_cmd(c);
if (verbose >= 2) {
printf("<%d command %s\n", c->sock.fd, op_names[type]);
}
switch (type) {
case OP_PUT:
r = read_pri(&pri, c->cmd + 4, &delay_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
r = read_delay(&delay, delay_buf, &ttr_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
r = read_ttr(&ttr, ttr_buf, &size_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
errno = 0;
body_size = strtoul(size_buf, &end_buf, 10);
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
if (body_size > job_data_size_limit) {
/* throw away the job body and respond with JOB_TOO_BIG */
return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
}
/* don't allow trailing garbage */
if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
connsetproducer(c);
if (ttr < 1000000000) {
ttr = 1000000000;
}
c->in_job = make_job(pri, delay, ttr, body_size + 2, c->use);
/* OOM? */
if (!c->in_job) {
/* throw away the job body and respond with OUT_OF_MEMORY */
twarnx("server error: " MSG_OUT_OF_MEMORY);
return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
}
fill_extra_data(c);
/* it's possible we already have a complete job */
maybe_enqueue_incoming_job(c);
break;
case OP_PEEK_READY:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
if (c->use->ready.len) {
j = job_copy(c->use->ready.data[0]);
}
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
reply_job(c, j, MSG_FOUND);
break;
case OP_PEEK_DELAYED:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_PEEK_DELAYED_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
if (c->use->delay.len) {
j = job_copy(c->use->delay.data[0]);
}
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
reply_job(c, j, MSG_FOUND);
break;
case OP_PEEK_BURIED:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_PEEK_BURIED_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
j = job_copy(buried_job_p(c->use)? j = c->use->buried.next : NULL);
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
reply_job(c, j, MSG_FOUND);
break;
case OP_PEEKJOB:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_PEEKJOB_LEN, &end_buf, 10);
#endif
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
/* So, peek is annoying, because some other connection might free the
* job while we are still trying to write it out. So we copy it and
* then free the copy when it's done sending. */
j = job_copy(peek_job(id));
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
reply_job(c, j, MSG_FOUND);
break;
case OP_RESERVE_TIMEOUT:
errno = 0;
timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10);
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
case OP_RESERVE: /* FALLTHROUGH */
/* don't allow trailing garbage */
if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
connsetworker(c);
if (conndeadlinesoon(c) && !conn_ready(c)) {
return reply_msg(c, MSG_DEADLINE_SOON);
}
/* try to get a new job for this guy */
wait_for_job(c, timeout);
process_queue();
break;
case OP_DELETE:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_DELETE_LEN, &end_buf, 10);
#endif
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = job_find(id);
j = (k = remove_reserved_job(c, j)) ? k :
(k = remove_ready_job(j)) ? k :
(k = remove_buried_job(j)) ? k :
remove_delayed_job(j);
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
j->tube->stat.total_delete_ct++;
j->r.state = Invalid;
r = walwrite(&c->srv->wal, j);
walmaint(&c->srv->wal);
job_free(j);
if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
break;
case OP_RELEASE:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_RELEASE_LEN, &pri_buf, 10);
#endif
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
r = read_pri(&pri, pri_buf, &delay_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
r = read_delay(&delay, delay_buf, NULL);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = remove_reserved_job(c, job_find(id));
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
/* We want to update the delay deadline on disk, so reserve space for
* that. */
if (delay) {
z = walresvupdate(&c->srv->wal, j);
if (!z) return reply_serr(c, MSG_OUT_OF_MEMORY);
j->walresv += z;
}
j->r.pri = pri;
j->r.delay = delay;
j->r.release_ct++;
r = enqueue_job(c->srv, j, delay, !!delay);
if (r < 0) return reply_serr(c, MSG_INTERNAL_ERROR);
if (r == 1) {
return reply(c, MSG_RELEASED, MSG_RELEASED_LEN, STATE_SENDWORD);
}
/* out of memory trying to grow the queue, so it gets buried */
bury_job(c->srv, j, 0);
reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
break;
case OP_BURY:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_BURY_LEN, &pri_buf, 10);
#endif
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
r = read_pri(&pri, pri_buf, NULL);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = remove_reserved_job(c, job_find(id));
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
j->r.pri = pri;
r = bury_job(c->srv, j, 1);
if (!r) return reply_serr(c, MSG_INTERNAL_ERROR);
reply(c, MSG_BURIED, MSG_BURIED_LEN, STATE_SENDWORD);
break;
case OP_KICK:
errno = 0;
count = strtoul(c->cmd + CMD_KICK_LEN, &end_buf, 10);
if (end_buf == c->cmd + CMD_KICK_LEN) {
return reply_msg(c, MSG_BAD_FORMAT);
}
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
i = kick_jobs(c->srv, c->use, count);
return reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
case OP_JOBKICK:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_JOBKICK_LEN, &end_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_JOBKICK_LEN, &end_buf, 10);
#endif
if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = job_find(id);
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
if ((j->r.state == Buried && kick_buried_job(c->srv, j)) ||
(j->r.state == Delayed && kick_delayed_job(c->srv, j))) {
reply(c, MSG_KICKED, MSG_KICKED_LEN, STATE_SENDWORD);
} else {
return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
}
break;
case OP_TOUCH:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_TOUCH_LEN, &end_buf, 10);
#endif
if (errno) return twarn("strtoull"), reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = touch_job(c, job_find(id));
if (j) {
reply(c, MSG_TOUCHED, MSG_TOUCHED_LEN, STATE_SENDWORD);
} else {
return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
}
break;
case OP_STATS:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_STATS_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
do_stats(c, fmt_stats, c->srv);
break;
case OP_JOBSTATS:
errno = 0;
#ifndef WIN32
id = strtoull(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
#else
id = _strtoui64(c->cmd + CMD_JOBSTATS_LEN, &end_buf, 10);
#endif
if (errno) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
j = peek_job(id);
if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
if (!j->tube) return reply_serr(c, MSG_INTERNAL_ERROR);
do_stats(c, (fmt_fn) fmt_job_stats, j);
break;
case OP_STATS_TUBE:
name = c->cmd + CMD_STATS_TUBE_LEN;
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
t = tube_find(name);
if (!t) return reply_msg(c, MSG_NOTFOUND);
do_stats(c, (fmt_fn) fmt_stats_tube, t);
t = NULL;
break;
case OP_LIST_TUBES:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_LIST_TUBES_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
do_list_tubes(c, &tubes);
break;
case OP_LIST_TUBE_USED:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_LIST_TUBE_USED_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
break;
case OP_LIST_TUBES_WATCHED:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_LIST_TUBES_WATCHED_LEN + 2) {
return reply_msg(c, MSG_BAD_FORMAT);
}
op_ct[type]++;
do_list_tubes(c, &c->watch);
break;
case OP_USE:
name = c->cmd + CMD_USE_LEN;
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
TUBE_ASSIGN(t, tube_find_or_make(name));
if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
c->use->using_ct--;
TUBE_ASSIGN(c->use, t);
TUBE_ASSIGN(t, NULL);
c->use->using_ct++;
reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
break;
case OP_WATCH:
name = c->cmd + CMD_WATCH_LEN;
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
TUBE_ASSIGN(t, tube_find_or_make(name));
if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
r = 1;
if (!ms_contains(&c->watch, t)) r = ms_append(&c->watch, t);
TUBE_ASSIGN(t, NULL);
if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
reply_line(c, STATE_SENDWORD, "WATCHING %"PRIu32"\r\n", c->watch.used);
break;
case OP_IGNORE:
name = c->cmd + CMD_IGNORE_LEN;
if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
op_ct[type]++;
t = NULL;
for (i = 0; i < c->watch.used; i++) {
t = c->watch.items[i];
if (strncmp(t->name, name, MAX_TUBE_NAME_LEN) == 0) break;
t = NULL;
}
if (t && c->watch.used < 2) return reply_msg(c, MSG_NOT_IGNORED);
if (t) ms_remove(&c->watch, t); /* may free t if refcount => 0 */
t = NULL;
reply_line(c, STATE_SENDWORD, "WATCHING %"PRIu32"\r\n", c->watch.used);
break;
case OP_QUIT:
c->state = STATE_CLOSE;
break;
case OP_PAUSE_TUBE:
op_ct[type]++;
r = read_tube_name(&name, c->cmd + CMD_PAUSE_TUBE_LEN, &delay_buf);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
r = read_delay(&delay, delay_buf, NULL);
if (r) return reply_msg(c, MSG_BAD_FORMAT);
*delay_buf = '\0';
t = tube_find(name);
if (!t) return reply_msg(c, MSG_NOTFOUND);
// Always pause for a positive amount of time, to make sure
// that waiting clients wake up when the deadline arrives.
if (delay == 0) {
delay = 1;
}
t->deadline_at = nanoseconds() + delay;
t->pause = delay;
t->stat.pause_ct++;
reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
break;
default:
return reply_msg(c, MSG_UNKNOWN_COMMAND);
}
}
/* There are three reasons this function may be called. We need to check for
* all of them.
*
* 1. A reserved job has run out of time.
* 2. A waiting client's reserved job has entered the safety margin.
* 3. A waiting client's requested timeout has occurred.
*
* If any of these happen, we must do the appropriate thing. */
static void
conn_timeout(Conn *c)
{
int r, should_timeout = 0;
job j;
/* Check if the client was trying to reserve a job. */
if (conn_waiting(c) && conndeadlinesoon(c)) should_timeout = 1;
/* Check if any reserved jobs have run out of time. We should do this
* whether or not the client is waiting for a new reservation. */
while ((j = connsoonestjob(c))) {
if (j->r.deadline_at >= nanoseconds()) break;
/* This job is in the middle of being written out. If we return it to
* the ready queue, someone might free it before we finish writing it
* out to the socket. So we'll copy it here and free the copy when it's
* done sending. */
if (j == c->out_job) {
c->out_job = job_copy(c->out_job);
}
timeout_ct++; /* stats */
j->r.timeout_ct++;
r = enqueue_job(c->srv, remove_this_reserved_job(c, j), 0, 0);
if (r < 1) bury_job(c->srv, j, 0); /* out of memory, so bury it */
connsched(c);
}
if (should_timeout) {
return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON);
} else if (conn_waiting(c) && c->pending_timeout >= 0) {
c->pending_timeout = -1;
return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
}
}
void
enter_drain_mode(int sig)
{
drain_mode = 1;
}
static void
do_cmd(Conn *c)
{
dispatch_cmd(c);
fill_extra_data(c);
}
static void
reset_conn(Conn *c)
{
connwant(c, 'r');
c->next = dirty;
dirty = c;
/* was this a peek or stats command? */
if (c->out_job && c->out_job->r.state == Copy) job_free(c->out_job);
c->out_job = NULL;
c->reply_sent = 0; /* now that we're done, reset this */
c->state = STATE_WANTCOMMAND;
}
static void
conn_data(Conn *c)
{
int r, to_read;
job j;
struct iovec iov[2];
switch (c->state) {
case STATE_WANTCOMMAND:
r = net_read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
if (r == -1) return check_err(c, "read()");
if (r == 0) {
c->state = STATE_CLOSE;
return;
}
c->cmd_read += r; /* we got some bytes */
c->cmd_len = cmd_len(c); /* find the EOL */
/* yay, complete command line */
if (c->cmd_len) return do_cmd(c);
/* c->cmd_read > LINE_BUF_SIZE can't happen */
/* command line too long? */
if (c->cmd_read == LINE_BUF_SIZE) {
c->cmd_read = 0; /* discard the input so far */
return reply_msg(c, MSG_BAD_FORMAT);
}
/* otherwise we have an incomplete line, so just keep waiting */
break;
case STATE_BITBUCKET:
/* Invert the meaning of in_job_read while throwing away data -- it
* counts the bytes that remain to be thrown away. */
to_read = min(c->in_job_read, BUCKET_BUF_SIZE);
r = net_read(c->sock.fd, bucket, to_read);
if (r == -1) return check_err(c, "read()");
if (r == 0) {
c->state = STATE_CLOSE;
return;
}
c->in_job_read -= r; /* we got some bytes */
/* (c->in_job_read < 0) can't happen */
if (c->in_job_read == 0) {
return reply(c, c->reply, c->reply_len, STATE_SENDWORD);
}
break;
case STATE_WANTDATA:
j = c->in_job;
r = net_read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
if (r == -1) return check_err(c, "read()");
if (r == 0) {
c->state = STATE_CLOSE;
return;
}
c->in_job_read += r; /* we got some bytes */
/* (j->in_job_read > j->r.body_size) can't happen */
maybe_enqueue_incoming_job(c);
break;
case STATE_SENDWORD:
r= net_write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
if (r == -1) return check_err(c, "write()");
if (r == 0) {
c->state = STATE_CLOSE;
return;
}
c->reply_sent += r; /* we got some bytes */
/* (c->reply_sent > c->reply_len) can't happen */
if (c->reply_sent == c->reply_len) return reset_conn(c);
/* otherwise we sent an incomplete reply, so just keep waiting */
break;
case STATE_SENDJOB:
j = c->out_job;
iov[0].iov_base = (void *)(c->reply + c->reply_sent);
iov[0].iov_len = c->reply_len - c->reply_sent; /* maybe 0 */
iov[1].iov_base = j->body + c->out_job_sent;
iov[1].iov_len = j->r.body_size - c->out_job_sent;
#if !defined WIN32
r = writev(c->sock.fd, iov, 2);
if (r == -1) return check_err(c, "writev()");
if (r == 0) {
c->state = STATE_CLOSE;
return;
}
#else
do {
char *const buf = (char *) malloc (iov[0].iov_len + iov[1].iov_len);
memcpy(buf, iov[0].iov_base, iov[0].iov_len);
memcpy(buf + iov[0].iov_len, iov[1].iov_base, iov[1].iov_len);
r = net_write(c->sock.fd, buf, iov[0].iov_len + iov[1].iov_len);
free(buf);
}
while(FALSE);
if (r == -1) return check_err(c, "write()");
#endif
/* update the sent values */
c->reply_sent += r;
if (c->reply_sent >= c->reply_len) {
c->out_job_sent += c->reply_sent - c->reply_len;
c->reply_sent = c->reply_len;
}
/* (c->out_job_sent > j->r.body_size) can't happen */
/* are we done? */
if (c->out_job_sent == j->r.body_size) {
if (verbose >= 2) {
printf(">%d job %"PRIu64"\n", c->sock.fd, j->r.id);
}
return reset_conn(c);
}
/* otherwise we sent incomplete data, so just keep waiting */
break;
case STATE_WAIT:
if (c->halfclosed) {
c->pending_timeout = -1;
return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT);
}
break;
}
}
#define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
#define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)
static void
update_conns()
{
int r;
Conn *c;
while (dirty) {
c = dirty;
dirty = dirty->next;
c->next = NULL;
r = sockwant(&c->sock, c->rw);
if (r == -1) {
twarn("sockwant");
connclose(c);
}
}
}
static void
h_conn(const int fd, const short which, Conn *c)
{
if (fd != c->sock.fd) {
twarnx("Argh! event fd doesn't match conn fd.");
net_close(fd);
connclose(c);
update_conns();
return;
}
if (which == 'h') {
c->halfclosed = 1;
}
conn_data(c);
while (cmd_data_ready(c) && (c->cmd_len = cmd_len(c))) do_cmd(c);
if (c->state == STATE_CLOSE) {
protrmdirty(c);
connclose(c);
}
update_conns();
}
static void
prothandle(Conn *c, int ev)
{
h_conn(c->sock.fd, ev, c);
}
int64
prottick(Server *s)
{
int r;
job j;
int64 now;
int i;
tube t;
int64 period = 0x34630B8A000LL; /* 1 hour in nanoseconds */
int64 d;
now = nanoseconds();
while ((j = delay_q_peek())) {
d = j->r.deadline_at - now;
if (d > 0) {
period = min(period, d);
break;
}
j = delay_q_take();
r = enqueue_job(s, j, 0, 0);
if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */
}
for (i = 0; i < tubes.used; i++) {
t = tubes.items[i];
d = t->deadline_at - now;
if (t->pause && d <= 0) {
t->pause = 0;
process_queue();
}
else if (d > 0) {
period = min(period, d);
}
}
while (s->conns.len) {
Conn *c = s->conns.data[0];
d = c->tickat - now;
if (d > 0) {
period = min(period, d);
break;
}
heapremove(&s->conns, 0);
conn_timeout(c);
}
update_conns();
return period;
}
void
h_accept(const int fd, const short which, Server *s)
{
Conn *c;
int cfd, flags, r;
socklen_t addrlen;
struct sockaddr_in6 addr;
addrlen = sizeof addr;
cfd = net_accept(fd, (struct sockaddr *)&addr, &addrlen);
if (cfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) twarn("accept()");
update_conns();
return;
}
if (verbose) {
printf("accept %d\n", cfd);
}
#ifndef WIN32
flags = fcntl(cfd, F_GETFL, 0);
if (flags < 0) {
twarn("getting flags");
close(cfd);
if (verbose) {
printf("close %d\n", cfd);
}
update_conns();
return;
}
r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
if (r < 0) {
twarn("setting O_NONBLOCK");
close(cfd);
if (verbose) {
printf("close %d\n", cfd);
}
update_conns();
return;
}
#else
do
{
DWORD yes = 1;
r = ioctlsocket(cfd, FIONBIO, &yes);
if (r == -1) {
twarn("setting FIONBIO");
net_close(fd);
continue;
}
}
while(FALSE);
#endif
c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
if (!c) {
twarnx("make_conn() failed");
net_close(cfd);
if (verbose) {
printf("close %d\n", cfd);
}
update_conns();
return;
}
c->srv = s;
c->sock.x = c;
c->sock.f = (Handle)prothandle;
c->sock.fd = cfd;
r = sockwant(&c->sock, 'r');
if (r == -1) {
twarn("sockwant");
net_close(cfd);
if (verbose) {
printf("close %d\n", cfd);
}
update_conns();
return;
}
update_conns();
}
void
prot_init()
{
started_at = nanoseconds();
memset(op_ct, 0, sizeof(op_ct));
#ifndef WIN32
int dev_random = open("/dev/urandom", O_RDONLY);
if (dev_random < 0) {
twarn("open /dev/urandom");
exit(50);
}
int i, r;
byte rand_data[NumIdBytes];
r = read(dev_random, &rand_data, NumIdBytes);
if (r != NumIdBytes) {
twarn("read /dev/urandom");
exit(50);
}
for (i = 0; i < NumIdBytes; i++) {
sprintf(id + (i * 2), "%02x", rand_data[i]);
}
close(dev_random);
if (uname(&node_info) == -1) {
warn("uname");
exit(50);
}
#endif
ms_init(&tubes, NULL, NULL);
TUBE_ASSIGN(default_tube, tube_find_or_make("default"));
if (!default_tube) twarnx("Out of memory during startup!");
}
// For each job in list, inserts the job into the appropriate data
// structures and adds it to the log.
//
// Returns 1 on success, 0 on failure.
int
prot_replay(Server *s, job list)
{
job j, nj;
int64 t, delay;
int r, z;
for (j = list->next ; j != list ; j = nj) {
nj = j->next;
job_remove(j);
z = walresvupdate(&s->wal, j);
if (!z) {
twarnx("failed to reserve space");
return 0;
}
delay = 0;
switch (j->r.state) {
case Buried:
bury_job(s, j, 0);
break;
case Delayed:
t = nanoseconds();
if (t < j->r.deadline_at) {
delay = j->r.deadline_at - t;
}
/* fall through */
default:
r = enqueue_job(s, j, delay, 0);
if (r < 1) twarnx("error recovering job %"PRIu64, j->r.id);
}
}
return 1;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/lomox/beanstalkd-win.git
git@gitee.com:lomox/beanstalkd-win.git
lomox
beanstalkd-win
beanstalkd-win
master

搜索帮助