2 Star 0 Fork 0

mirrors_dlbeer/libdlb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
AsyncIntro.txt 20.82 KB
一键复制 编辑 原始数据 按行查看 历史
Daniel Beer 提交于 2013-03-19 15:22 . Add some basic documentation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
libdlb: introduction to asynchronous programming
Daniel Beer <dlbeer@gmail.com>
19 Mar 2013
Contents
========================================================================
1. What is asynchronous programming?
2. Example: asynchronous counter
2.1. Principles
3. Asynchronous IO primitives
4. Mailbox-based patterns
4.1. Producer/consumer
4.2. Fork/join
4.3. Reactor
1. What is asynchronous programming?
========================================================================
System calls like read(), write() and sleep() from POSIX are APIs for
sequential programming. You call them to perform a single IO operation,
and they return when that operation is completed. This is fine for
programs with simple IO requirements, but in more complex programs and
in servers, it's often required that you be able to carry out and wait
for several IO operations concurrently -- you wouldn't want an HTTP
server to have to complete an entire request before serving a new
client.
There are two popular strategies for doing this:
* Threads: spawn a new thread for each separable IO operation --
perhaps one thread for each client in an HTTP server. Each thread
can then be written as a series of sequential IO operations.
Pros: simple programming interface, ability to use multiple cores
Cons: significant OS overhead if each thread is largely IO bound
* Events: wait for an IO to become ready by registering a callback.
When IO becomes possible, the callback is invoked from a
single-threaded event loop and then performs the IO.
Pros: very low OS overhead, low latency
Cons: unable to make use of multiple cores, more difficult
programming interface due to inversion of control.
The strategy described here is one which combines good features from
both, and can be implemented without the need for a new language, OS or
runtime:
* Asynchronous programming: begin an IO operation by registering a
callback. The IO is carried out in the background and when complete,
the callback is invoked in one of several threads in a pool.
Pros: very low OS overhead, low latency, able to make use of
multiple cores
Cons: difficult programming interface (similar to events)
The description above of asynchronous programming would lead you to
believe that it's nearly the same as event-based programming. There are
a few subtle, but important, differences:
* In an event-based system, pending events may be cancelled, and if
this is done, the callback is prevented from being invoked. This
isn't possible to do in a deterministic way in an asynchronous
system (but cancellation of a kind is possible).
* In an event-based system, an event may be periodic (for example, an
interval timer). Asynchronous operations maintain a strict
one-to-one relationship between requests and callback.
Both of these differences are related to the way in which callbacks are
dispatched. An event-based system dispatches from a single thread,
whereas an asynchronous system dispatches from a thread pool. This
introduces some constraints and rules which must be followed in order to
avoid race conditions.
2. Example: asynchronous counter
========================================================================
/* This program counts to 10, displaying one number per second using
* asynchronous waits.
*/
#include <stdio.h>
#include "io/ioq.h"
#include "io/mailbox.h"
/* The IO queue manager. */
static struct ioq global_queue;
/* A mailbox is a thread-safe object that contains an atomic flags
* register. We can wait asynchronously for flags to change state, but
* we're not making use of this feature here -- we just want an atomic
* variable.
*/
static struct mailbox done_box;
/* This is our asynchronous timer callback. We don't know exactly which
* thread it will run on, but it'll be one of the threads from the IO
* queue's thread pool.
*/
static void timer_continue(struct waitq_timer *my_timer)
{
static int counter = 0;
/* Every time the timer wait finishes, we increment our counter
* and display the new value.
*/
counter++;
printf("%d\n", counter);
if (counter >= 10) {
/* If this is the tenth time we've invoked the callback,
* raise a mailbox flag to indicate that we're done.
*/
mailbox_raise(&done_box, 1);
/* The main thread can read the flag, but it doesn't
* know that the flag has changed (because we're running
* in a different thread). We need to interrupt the call
* in the main thread to ioq_iterate() in order to make
* it check.
*/
ioq_notify(&global_queue);
} else {
/* We haven't yet run 10 times. Asynchronously wait once
* more. Note that as soon as we call
* waitq_timer_wait(), the callback is "live" and may be
* invoked. That's ok, because this is the last thing
* we're doing in this callback instance, so it's safe
* to run another instance immediately.
*/
waitq_timer_wait(my_timer, 1000, timer_continue);
}
}
int main(void)
{
struct waitq_timer my_timer;
/* Initialize the IO queue with a single thread in the thread
* pool. This might fail, but we're omitting error handling for
* brevity.
*/
if (ioq_init(&global_queue, 1) < 0)
abort();
/* Initialize our mailbox. We need to associate it with an IO
* queue for asynchronous waits to work (although we're not
* using them here).
*/
mailbox_init(&done_box, ioq_runq(&done_box));
/* Initialize the asynchronous timer by associating it with the
* IO queue.
*/
waitq_timer_init(&my_timer, ioq_waitq(&global_queue));
/* Begin an asynchronous wait on the timer. Only one
* asynchronous wait may be in progress at a time -- as soon as
* timer_continue() is invoked, that's our signal that the
* operation has completed.
*/
waitq_timer_wait(&my_timer, 1000, timer_continue);
/* Keep running the IO queue until a flag is raised in the
* mailbox.
*/
while (!mailbox_take(&done_box, MAILBOX_ALL_FLAGS))
ioq_iterate(&global_queue);
/* Free resources. Note that the timer doesn't acquire resources
* for itself, and doesn't need to be destroyed.
*/
mailbox_destroy(&done_box);
ioq_destroy(&global_queue);
return 0;
}
2.1. Principles
------------------------------------------------------------------------
With reference to the example program, note the following principles:
* One-to-one correspondence: ONE request = ONE callback, always. Note
that waitq_timer_wait() is invoked 10 times with timer_continue()
specified as a callback. timer_continue() is also called 10 times.
* Cancellation: cancellation wasn't shown above, but it can be done,
and it doesn't work the same way as in an event-based program. After
a call to waitq_timer_wait(), we could have cancelled the timer
with:
waitq_timer_cancel(&my_timer)
What would this have done? It would have caused timer_continue() to
be invoked as soon as possible, rather than after 1000 ms had
elapsed -- so the one-to-one relationship between requests and
callbacks is preserved. Note that it's the caller's problem to
distinguish between "real" callbacks and cancellations. This could
be done either by setting a (thread-safe) flag before
waitq_timer_cancel(), or by checking the clock against the expected
deadline in the timer callback.
This is known as asynchronous cancellation, whereas the event-based
kind (where the callback is prevented from being invoked) is known
as synchronous cancellation. We can't do synchronous cancellation,
because at the time you call waitq_timer_cancel(), the callback
might already be executing in a different thread!
Since cancellation is often performed from a different logical
thread than the one which will run the callback, it sometimes
happens that the cancellation request occurs *after* the callback
has already executed. If you intend to re-use the IO object, this is
a possible source of race conditions if you aren't careful.
* Thread safety: IO objects, such as struct waitq_timer, are not
thread-safe themselves, although they do interact with "hidden"
threads in the IO system. What this means in practice is that you
must avoid accessing the same IO object from different threads
simultaneously.
There is one exception to this, and that is cancellation: you may
call waitq_timer_cancel(), for example, from any thread.
* Limited subscription: IO objects may have at most only one
outstanding IO operation in progress at any time.
In the example program, thread safety is guaranteed by the fact that we
never access my_timer between waitq_timer_wait() and timer_continue().
The second, and subsequent calls to waitq_timer_wait(), are done in tail
position from timer_continue().
This is a common pattern in asynchronous programming when dealing with a
single isolated sequence of IO operations: because all requests are made
in tail position from callbacks, the sequence of callbacks is
effectively single threaded. The following diagram illustrates the
execution of code in callbacks and the hidden IO operations:
Thread pool IO manager
=======================================================
+------------+
| Init func |
+------------+ ====== wait() ======>
+----------------+
| IO operation 1 |
<===== complete ===== +----------------+
+------------+
| Callback 1 |
+------------+ ====== wait() ======>
+----------------+
| IO operation 2 |
<===== complete ===== +----------------+
+------------+
| Callback 2 |
+------------+
The callbacks and IO operations are strictly interleaved, with control
transfers taking place during requests and completions.
While the diagram above shows IO operations as distinct, in practice,
multiple IO operations will be efficiently multiplexed in the IO manager
using facilities such as epoll().
3. Asynchronous IO primitives
========================================================================
The following header files contain useful IO primitives:
* runq.h: struct runq_task is the simplest IO primitive. A request
to runq_task_exec() invokes the given callback on the thread pool as
soon as possible. This is useful if you want to defer work, or as an
easy way to parallelize operations (in conjunction with the mailbox,
described below).
* waitq.h: struct waitq_timer is a one-shot timer. You can
asynchronously wait by specifying a time period in milliseconds and
a callback to be invoked after the time period has elapsed. Timers
support asynchronous cancellation.
* ioq.h: struct ioq_fd is a file descriptor monitor. After
initializing it by associating it with a file descriptor and an IO
queue, you can asynchronously wait for readability, writability, or
error conditions on the selected file descriptor. These objects
support asynchronous cancellation.
Note two important restrictions:
- You may not associate more than one struct ioq_fd with the same
file descriptor.
- You may not have more than one outstanding wait in progress at a
time. If you need to wait for a new condition when you're
already waiting for an old one, you must first asynchronously
cancel the old wait operation.
* io/mailbox.h: struct mailbox is an asynchrous IPC primitive,
intended for signalling between logical threads. It contains of a
thread-safe 32-bit register, each bit of which is treated as a
"flag". Two atomic operations are possible:
- raise(): set any subset of flags in the mailbox, leaving the
others untouched.
- take(): clear any subset of flags in the mailbox, leaving the
others untouched. The state of the mailbox, prior to clearing,
is returned.
These two operations alone make it useful as a simple atomic
variable. However, to allow actual IPC, two asynchronous wait
operations are provided:
- wait(): given a subset of flags and a callback, invoke the
callback when any of the flags in the subset become raised.
- wait_any(): given a set of flags and a callback, invoke the
callback when ALL of the flags in the subset become raised.
The wait operations are "level triggered": if the condition waited
for is already true at the time of the call, the callback will be
invoked immediately (but still asynchronously).
Check the header files for more details. Most of the asynchronous
operations are implemented using intrusive data structures in a way
which guarantees that they can't fail.
4. Mailbox-based patterns
========================================================================
The simplest pattern of asynchronous programming is the "single-strand",
or single logical thread consisting of a sequence of asynchronous
operations, as shown in the example code above.
More complex and interesting patterns are possible by making use of
struct mailbox, defined in io/mailbox.h. Some useful concepts are
described in subsections here.
4.1. Producer/consumer
------------------------------------------------------------------------
The most obvious use of a mailbox is to use it as an IPC primitive. In
this case, it may be associated with a shared data structure (in this
example, a FIFO queue) to provide more sophisticated communication.
First, a flag is chosen to be associated with the FIFO queue. It doesn't
matter which, just as long as it's not used for any other purpose. Our
data structures and definitions are:
#define QUEUE_READY_FLAG MAILBOX_FLAG(0)
thr_mutex_t lock;
struct slist fifo;
struct mailbox mbox;
When the producer wants to send an item via the queue, it does:
/* Safely place the data in the queue */
thr_mutex_lock(&lock);
slist_append(&fifo, &my_data.node);
thr_mutex_unlock(&lock);
/* Wake up the consumer */
mailbox_raise(&mbox, QUEUE_READY_FLAG);
The consumer should repeatedly wait for the flag to be raised, and empty
the queue:
/* Attempt to pop a datum from the queue with the
* lock held.
*/
static struct datum *pop_queue_locked(void)
{
struct slist_node *n;
thr_mutex_lock(&lock);
n = slist_pop(&fifo);
thr_mutex_unlock(&lock);
if (n)
return container_of(n, struct datum, node);
return NULL;
}
/* Queue handler: call this function once to start the consumption
* of data.
*/
static void consume_queue(struct mailbox *m)
{
/* Acknowledge the wake-up request */
mailbox_take(&mbox, QUEUE_READY_FLAG);
/* Remove data from the queue. We must loop until the queue
* is empty to avoid stalling (all wakeups for items in the
* queue have already been delivered -- we won't be woken up
* again unless new data arrives).
*/
for (;;) {
struct datum *d = pop_queue_locked();
/* Nothing left? Stop processing and wait again. */
if (!d)
break;
process_data(d);
}
/* Wait again. this must be the last thing we do, in order
* to maintain a single strand of execution.
*/
mailbox_wait(&mbox, QUEUE_READY_FLAG, consume_queue);
}
4.2. Fork/join
------------------------------------------------------------------------
Another useful pattern, making use of the wait_all operation, is using a
mailbox to coordinate parallel operations. These operations can be
either IO operations or computations (using the run queue).
In this example, we use a mailbox to synchronize with the completion of
two parallel tasks:
#define TASK_A_FLAG MAILBOX_FLAG(0)
#define TASK_B_FLAG MAILBOX_FLAG(1)
static struct mailbox box;
static struct runq_task task_a;
static struct runq_task task_b;
static void task_a_func(struct runq_task *t)
{
do_something_a();
mailbox_raise(&box, TASK_A_FLAG);
}
static void task_b_func(struct runq_task *t)
{
do_something_b();
mailbox_raise(&box, TASK_B_FLAG);
}
static void all_done(struct mailbox *m)
{
/* Tasks A and B have both completed... */
mailbox_take(&box, TASK_A_FLAG | TASK_B_FLAG);
/* Process the results here */
}
static void do_both_tasks(void)
{
/* Asynchronously start both tasks. Both task_a_func and
* task_b_func can immediately be scheduled in parallel.
*/
runq_task_exec(&task_a, task_a_func);
runq_task_exec(&task_b, task_b_func);
/* Asynchronously wait for both tasks to finish */
mailbox_wait_all(&box, TASK_A_FLAG | TASK_B_FLAG, all_done);
}
In this example, task_a_func and task_b_func were computations, started
via a struct runq_task. However, they could just have easily been
callbacks from an asynchronous IO operation, or a mix of both.
A few rules must be observed for thread-safety:
* task_a_func and task_b_func can run in parallel, and must not share
data without correct locking.
* Any data used by either task can't be touched until they have
finished. In practice, this means avoiding touching any of the
task's data between the call to initiate the task (runq_task_exec)
and the callback which indicates that both tasks have completed
(all_done).
* Since mailbox_raise() may result in the execution of all_done(), it
is the last statement in the task callbacks.
* Similarly, mailbox_wait_all() may also result in the execution of
all_done(), and it is again the last statement in the initiating
function.
4.3. Reactor
------------------------------------------------------------------------
Often, it's useful to express parts of a program in terms of a state
machine responding to a serialized sequence of events (in an
event-driven system, your entire program would be of this form). In an
asynchronously programmed server, you might want to run a state machine
for each client, for example.
This can be done by constructing a main loop based around an
asynchronous mailbox wait: a callback function wakes up on receipt of a
mailbox flag, processes events, and then waits again. Other asynchronous
operations started by the main loop run concurrently to perform simple
IO tasks, raising flags upon completion.
The general strategy for implementing this is to first identify the IO
operations which you will need to perform. These should be as primitive
as possible, because all the completion handler is expected to do is
report to the main loop that the IO operation has completed.
The main loop itself centres, as indicated above, around a mailbox:
static struct mailbox loop_box;
static void main_loop(void)
{
/* Take events from the mailbox */
const mailbox_flags_t events =
mailbox_take(&loop_box, MAILBOX_ALL_FLAGS);
/* React to flags in events... */
/* (see below for details) */
/* Wait for more events */
mailbox_wait(&loop_box, MAILBOX_ALL_FLAGS, main_loop);
}
For each IO operation, you need the following functions and data
structures:
/* A unique flag to indicate completion */
#define IO_FLAG_A MAILBOX_FLAG(0)
/* A state flag to indicate that the operation is in progress (you
* might prefer to share bits of a bit-field). This flag is accessed
* only by the main loop, so no locking is required.
*/
static int a_in_progress;
/* A function to start the IO operation */
static void begin_a(void)
{
if (!a_in_progress) {
io_operation_wait(&my_object, a_complete);
a_in_progress = 1;
}
}
/* ...and a callback function. This does nothing except for waking
* up the main loop via a mailbox flag.
*/
static void a_complete(void)
{
mailbox_raise(&loop_box, IO_FLAG_A);
}
In the main loop, you need to react to the receipt of IO_FLAG_A:
if (events & IO_FLAG_A) {
a_in_progress = 0;
react_to_a();
/* You might also want to call begin_a(), if this is a
* continuous process.
*/
}
Note that thread-safety is achieved via the fact that the only functions
to run in parallel with the main thread are the IO completion callbacks,
and all they do is call mailbox_raise(), which is an entirely safe
operation. The end result is a serialized sequence of invocations of
main_loop(), each retrieving and responding to a set of events.
The only tricky part is terminating the loop: at the time of
termination, it may be that you have IO operations outstanding. When
they complete, they will attempt to raise a mailbox flag, so the mailbox
can't be destroyed until this happens.
Fortunately, shut-down can be arranged safely provided that you are
keeping track of outstanding operations. First, request cancellation of
each IO operation which you have recorded as being in progress
(cancellation will be specific to the particular type of IO operation).
Record that you are in the process of shutting down, and refrain from
starting new operations when old ones complete.
As each operation completes, check to see if there are any more
outstanding. If no more operations are outstanding, it's safe to
terminate by cleaning up data structures and simply returing from the
main loop.
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_dlbeer/libdlb.git
git@gitee.com:mirrors_dlbeer/libdlb.git
mirrors_dlbeer
libdlb
libdlb
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385