代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/distributed-beget 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
diff --git b/services/include/list.h b/services/include/list.h
new file mode 100644
index 0000000..f45bdf5
--- /dev/null
+++ b/services/include/list.h
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 Huawei Device Co., Ltd.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BASE_STARTUP_INITLITE_LIST_H
+#define BASE_STARTUP_INITLITE_LIST_H
+#include <stddef.h>
+
+#ifdef __cplusplus
+#if __cplusplus
+extern "C" {
+#endif
+#endif
+
+typedef struct ListNode {
+ struct ListNode *next;
+ struct ListNode *prev;
+} ListNode, ListHead;
+
+#define ListEmpty(node) \
+ do { \
+ node.next = &node; \
+ node.prev = &node; \
+ } while (0) \
+
+#define ListEntry(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
+
+
+void OH_ListAddTail(struct ListNode *head, struct ListNode *item);
+void OH_ListRemove(struct ListNode *item);
+
+#ifdef __cplusplus
+#if __cplusplus
+}
+#endif
+#endif
+
+#endif // BASE_STARTUP_INITLITE_LIST_H
diff --git a/services/param/base/BUILD.gn b/services/param/base/BUILD.gn
index 178ac87..b253055 100644
--- a/services/param/base/BUILD.gn
+++ b/services/param/base/BUILD.gn
@@ -11,7 +11,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/ohos.gni")
-import("//build/config/sysroot.gni")
config("exported_header_files") {
visibility = [ ":*" ]
@@ -19,7 +18,7 @@ config("exported_header_files") {
"//base/startup/init/interfaces/innerkits/include",
"//base/startup/init/services/include/param",
"//base/startup/init/services/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
}
@@ -30,7 +29,7 @@ comm_sources = [
base_include_dirs = [
"//base/startup/init/services/param/include",
"//base/startup/init/services/param/base",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
source_set("parameterbase") {
diff --git a/services/param/linux/param_request.c b/services/param/linux/param_request.c
index afd95fe..76947f2 100644
--- a/services/param/linux/param_request.c
+++ b/services/param/linux/param_request.c
@@ -29,9 +29,8 @@
#include <stdio.h>
#include "beget_ext.h"
-#include "param_manager.h"
-static void ClearEnv(ParamRequestMsg* pmsg, ParamRespMsg* respmsg, int fd)
+static void ClearEnv(ParamReqMsg* pmsg, ParamRespMsg* respmsg, int fd)
{
if (pmsg != NULL)
free(pmsg);
@@ -49,9 +48,8 @@ static int GetClientSocket()
struct sockaddr_un serverAddr;
bzero(&serverAddr, sizeof(serverAddr));
serverAddr.sun_family = PF_UNIX;
- strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME) + 1);
+ strncpy(serverAddr.sun_path, PIPE_NAME, strlen(PIPE_NAME));
if (connect(cfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
- close(cfd);
perror("Failed to connect");
return -1;
}
@@ -59,33 +57,33 @@ static int GetClientSocket()
return cfd;
}
-static struct ParamRequestMsg* GetRequestMsg(uint32_t type, uint32_t size)
+static struct ParamReqMsg* GetRequestMsg(uint32_t type, uint32_t size)
{
uint32_t data_alloc_size = size;
if (data_alloc_size > PARAM_VALUE_LEN_MAX || data_alloc_size == 0)
data_alloc_size = PARAM_VALUE_LEN_MAX;
- struct ParamRequestMsg *pmsg;
+ struct ParamReqMsg *pmsg;
if (type == GET_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg));
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg));
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg));
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg));
} else if (type == SET_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size);
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size);
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size);
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size);
} else if (type == WAIT_PARAMETER) {
- pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + data_alloc_size);
- BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + data_alloc_size);
+ pmsg = (struct ParamReqMsg*)malloc(sizeof(struct ParamReqMsg) + data_alloc_size);
+ BEGET_ERROR_CHECK(pmsg != NULL, return NULL, "Failed to malloc ParamReqMsg");
+ bzero(pmsg, sizeof(struct ParamReqMsg) + data_alloc_size);
}
pmsg->datasize = data_alloc_size;
pmsg->type = type;
return pmsg;
}
-static struct ParamRespMsg* StartRequest(int fd, struct ParamRequestMsg* pmsg)
+static struct ParamRespMsg* StartRequest(int fd, struct ParamReqMsg* pmsg)
{
- int ret = send(fd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0);
+ int ret = send(fd, pmsg, sizeof(struct ParamReqMsg) + pmsg->datasize, 0);
BEGET_ERROR_CHECK(ret > 0, return NULL, "Failed to send msg");
struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
@@ -106,13 +104,13 @@ int SystemSetParameter(const char *name, const char *value)
int fd = GetClientSocket();
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value));
+ struct ParamReqMsg* pmsg = GetRequestMsg(SET_PARAMETER, strlen(value));
if (pmsg == NULL) {
close(fd);
return -1;
}
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
strncpy(pmsg->data, value, pmsg->datasize);
int ret;
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
@@ -138,10 +136,10 @@ int SystemReadParam(const char *name, char *value, uint32_t *len)
int fd = GetClientSocket();
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len);
+ struct ParamReqMsg* pmsg = GetRequestMsg(GET_PARAMETER, *len);
BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg");
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
int ret;
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
if (respmsg == NULL) {
@@ -175,11 +173,14 @@ int SystemWaitParameter(const char *name, const char *value, int32_t timeout)
if (fd < 0)
return -1;
- struct ParamRequestMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1);
+ struct ParamReqMsg* pmsg = GetRequestMsg(WAIT_PARAMETER, strlen(value) + 1);
BEGET_ERROR_CHECK(pmsg != NULL, close(fd);return -1, "Invalid pmsg");
+ if (timeout < 0) {
+ timeout = 30;
+ }
pmsg->timeout = timeout;
- strncpy(pmsg->key, name, sizeof(pmsg->key) - 1);
+ strncpy(pmsg->key, name, sizeof(pmsg->key));
strncpy(pmsg->data, value, sizeof(pmsg->datasize));
struct ParamRespMsg* respmsg = StartRequest(fd, pmsg);
if (respmsg == NULL) {
diff --git a/services/param/linux/param_request.h b/services/param/linux/param_request.h
index dd95f1b..8264ed4 100644
--- a/services/param/linux/param_request.h
+++ b/services/param/linux/param_request.h
@@ -3,13 +3,13 @@
#include "parameter.h"
-typedef struct ParamRequestMsg {
+typedef struct ParamReqMsg {
uint32_t type;
uint32_t datasize;
uint32_t timeout;
char key[PARAM_NAME_LEN_MAX];
char data[0];
-} ParamRequestMsg;
+} ParamReqMsg;
typedef struct ParamRespMsg {
uint32_t flag;
diff --git a/services/param_service/BUILD.gn b/services/param_service/BUILD.gn
index 84f429f..cfcabce 100644
--- a/services/param_service/BUILD.gn
+++ b/services/param_service/BUILD.gn
@@ -21,6 +21,8 @@ ohos_executable("param_service") {
"src/param_server.c",
"src/le_utils.c",
"src/trie_comm.c",
+ "src/hash.c",
+ "src/base_task.c"
]
include_dirs = [
@@ -28,10 +30,14 @@ ohos_executable("param_service") {
"//base/startup/init/interfaces/innerkits/include/syspara",
"//base/startup/init/interfaces/innerkits/include",
"//base/startup/init/services/param/include",
- "//base/startup/init/services/param/linux/",
+ "//base/startup/init/services/param/linux",
+ "//base/startup/init/services/include",
]
deps = [ "//base/startup/init/services/utils:libinit_utils" ]
+
+ cflags = [ "-Wno-incompatible-pointer-types" ]
+
external_deps = [
"c_utils:utils",
]
diff --git b/services/param_service/include/base_task.h b/services/param_service/include/base_task.h
new file mode 100644
index 0000000..372c33e
--- /dev/null
+++ b/services/param_service/include/base_task.h
@@ -0,0 +1,79 @@
+#ifndef BSAE_TASK_H
+#define BSAE_TASK_H
+#include <stdint.h>
+
+#include "list.h"
+#include "hash.h"
+#include "base_task.h"
+#include "param_request.h"
+
+#define DEFAULT_MAX_EVENTS 1024
+
+typedef void* LoopHandle;
+
+typedef enum : uint32_t {
+ Event_Read,
+ Event_Write,
+} EventType;
+
+typedef enum : uint32_t {
+ NORMAL_TYPE,
+ WAIT_TYPE,
+} ClientType;
+
+typedef struct EventBuffer_ {
+ uint32_t datasize;
+ uint8_t data[0];
+} EventBuffer;
+
+typedef struct Content_ {
+ HashNode hashNode;
+ ParamRespMsg *respmsg;
+} Content;
+
+typedef struct BaseTask_ {
+ int taskId;
+ HashNode hashNode;
+ void (*close)(LoopHandle, struct BaseTask_*);
+ void (*handleEvent)(LoopHandle, struct BaseTask_*, EventType);
+} BaseTask;
+
+typedef struct WaitInfo_ {
+ ListNode anchor;
+ int32_t timeout;
+ int32_t taskId;
+ char condition[0];
+} WaitInfo;
+
+typedef struct ClientTask_ {
+ BaseTask base;
+ uint32_t type;
+ void (*recvMessage)(LoopHandle, BaseTask*);
+ void (*sendMessage)(LoopHandle, BaseTask*);
+ void (*disconnect)(LoopHandle, BaseTask*);
+ union {
+ EventBuffer *content;
+ void *extraInfo;
+ } info;
+} ClientTask;
+
+typedef struct ServerTask_ {
+ BaseTask base;
+ void (*incommingConnect)(LoopHandle, BaseTask*);
+} ServerTask;
+
+typedef struct EventLoop_ {
+ int epollFd;
+ int maxevents;
+ void (*Run)(struct EventLoop_*);
+ void (*AddEvent)(struct EventLoop_*, BaseTask*, EventType);
+ void (*ModEvent)(struct EventLoop_*, BaseTask*, EventType);
+ void (*DelEvent)(struct EventLoop_*, BaseTask*);
+ HashTab *tab;
+} EventLoop;
+
+EventLoop* GetDefaultLoop();
+BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size);
+void RunLoop(EventLoop *loop);
+
+#endif // BSAE_TASK_H
\ No newline at end of file
diff --git b/services/param_service/include/hash.h b/services/param_service/include/hash.h
new file mode 100644
index 0000000..d898e30
--- /dev/null
+++ b/services/param_service/include/hash.h
@@ -0,0 +1,37 @@
+#ifndef HASH_H
+#define HASH_H
+
+#include <stddef.h>
+
+#define HASHNODE_ENTRY(ptr, type, member) ((type*)((char*)(ptr) - offsetof(type, member)))
+
+typedef struct HashNode {
+ struct HashNode *next;
+} HashNode;
+
+typedef struct HashTab {
+ int (*nodeHash)(HashNode*);
+ int (*keyHash)(const void *key);
+ int (*nodeCompare)(HashNode*, HashNode*);
+ int (*keyCompare)(HashNode *node, const void *key);
+ void (*nodeFree)(HashNode*);
+ int maxBucket;
+ HashNode *buckets[0];
+} HashTab;
+
+typedef struct {
+ int (*nodeHash)(HashNode*);
+ int (*keyHash)(const void *key);
+ int (*nodeCompare)(HashNode*, HashNode*);
+ int (*keyCompare)(HashNode *node, const void *key);
+ void (*nodeFree)(HashNode*);
+ int maxBucket;
+} HashInfo;
+
+int HashTabCreate(HashTab **tab, HashInfo *info);
+int HashNodeAdd(HashTab *tab, HashNode *node);
+void HashNodeRemove(HashTab *tab, HashNode *node); // only remove, don't free
+int HashTabDestroy(HashTab *tab);
+HashNode* GetHashNode(HashTab *tab, const void* key);
+
+#endif // HASH_H
\ No newline at end of file
diff --git a/services/param_service/include/param_server.h b/services/param_service/include/param_server.h
index 7bca45f..91668a9 100644
--- a/services/param_service/include/param_server.h
+++ b/services/param_service/include/param_server.h
@@ -1,22 +1,13 @@
-#ifndef LE_SOCKET_H
-#define LE_SOCKET_H
-#include <stdint.h>
+#ifndef PARAM_SERVER_H
+#define PARAM_SERVER_H
+#include <pthread.h>
+
#include "param_utils.h"
#include "parameter.h"
+#include "list.h"
+#include "base_task.h"
-#define LOOP_MAX_CLIENT 1024
-#define LOOP_MAX_SOCKET 1024
-
-struct EventArgs {
- int epollFd;
- int clientFd;
-};
-
-enum {
- SOCK_UNKNOWN = -1,
- SOCK_DISCONNECTED,
- SOCK_CONNECTED,
-};
+#define MAX_CLIENT 1024
-void ParamServerStart();
-#endif // LE_SOCKET_H
+int ParamServerInit(EventLoop*);
+#endif // PARAM_SERVER_H
diff --git a/services/param_service/include/trie_comm.h b/services/param_service/include/trie_comm.h
index dfd08ec..df1181a 100644
--- a/services/param_service/include/trie_comm.h
+++ b/services/param_service/include/trie_comm.h
@@ -7,10 +7,10 @@
#define WORKSPACE_NAME WORKSPACE_DIR "/param.tmp"
#define WORKSPACE_SIZE (1024*1000)
-typedef struct ListNode {
+typedef struct TrieListNode {
uint32_t prev;
uint32_t next;
-} ListNode;
+} TrieListNode;
typedef struct ParamNode {
uint8_t keyLen;
@@ -19,7 +19,7 @@ typedef struct ParamNode {
} ParamNode;
typedef struct TrieNode {
- ListNode node;
+ TrieListNode node;
uint32_t child;
uint32_t left;
uint32_t right;
@@ -39,5 +39,4 @@ int ParamWorkSpaceInit();
int SetParamtoMem(const char* key, const char* value);
int GetParamFromMem(const char* key, char* value, uint32_t len);
int WaitParam(const char* key, const char* value, uint32_t timeout);
-void DumpParam();
#endif // TRIE_UTILS_H
\ No newline at end of file
diff --git b/services/param_service/include/trie_queue.h b/services/param_service/include/trie_queue.h
new file mode 100644
index 0000000..6c96f96
--- /dev/null
+++ b/services/param_service/include/trie_queue.h
@@ -0,0 +1,84 @@
+#ifndef TRIE_QUEUE_H
+#define TRIE_QUEUE_H
+#include <stdlib.h>
+
+#include "trie_comm.h"
+#include <stdio.h>
+
+typedef struct QueueItem {
+ struct QueueItem* prev;
+ struct QueueItem* next;
+ TrieNode* node;
+} QueueItem;
+
+typedef struct TrieNodeQueue {
+ int size;
+ int ready;
+ QueueItem queue;
+ void (*push)(struct TrieNodeQueue*, TrieNode*);
+ TrieNode* (*pop)(struct TrieNodeQueue*);
+} TrieNodeQueue;
+
+void TrieNodePush(TrieNodeQueue* tq, TrieNode* node);
+TrieNode* TrieNodePop(TrieNodeQueue* tq);
+
+inline void TrieQueueFirstStageInit(TrieNodeQueue* tq)
+{
+ if (tq == NULL) {
+ return;
+ }
+ tq->size = 0;
+ tq->ready = 0;
+ tq->queue.next = &(tq->queue);
+ tq->queue.prev = &(tq->queue);
+ tq->push = TrieNodePush;
+ tq->pop = TrieNodePop;
+}
+
+inline void TrieQueueSecondStageInit(TrieNodeQueue* tq)
+{
+ if (tq == NULL) {
+ return;
+ }
+ tq->ready = 1;
+}
+
+inline void TrieNodePush(TrieNodeQueue* tq, TrieNode* node)
+{
+ if (tq == NULL || node == NULL) {
+ return;
+ }
+ if (!tq->ready) {
+ return;
+ }
+ QueueItem* item = (QueueItem*)malloc(sizeof(QueueItem));
+ if (!item) {
+ return;
+ }
+
+ QueueItem* queue = &tq->queue;
+ item->node = node;
+ item->next = queue;
+ item->prev = queue->prev;
+ queue->prev->next = item;
+ queue->prev = item;
+ tq->size++;
+}
+
+inline TrieNode* TrieNodePop(TrieNodeQueue* tq)
+{
+ if (tq == NULL || tq->size == 0) {
+ return NULL;
+ }
+
+ QueueItem* queue = &tq->queue;
+ QueueItem* item = queue->prev;
+ queue->prev = queue->prev->prev;
+ queue->prev->next = queue;
+ tq->size--;
+ TrieNode* node = item->node;
+ free(item);
+ return node;
+}
+
+#endif // TRIE_QUEUE_H
\ No newline at end of file
diff --git b/services/param_service/src/base_task.c b/services/param_service/src/base_task.c
new file mode 100644
index 0000000..e97d7dc
--- /dev/null
+++ b/services/param_service/src/base_task.c
@@ -0,0 +1,118 @@
+#include <errno.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+
+#include "beget_ext.h"
+#include "base_task.h"
+
+static EventLoop *staticloop = NULL;
+
+void AddEvent_(EventLoop *loop, BaseTask *task, EventType event)
+{
+ struct epoll_event ev = {};
+ ev.data.fd = task->taskId;
+ if (event == Event_Read) {
+ ev.events = EPOLLIN;
+ } else if (event == Event_Write) {
+ ev.events = EPOLLOUT;
+ }
+
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_ADD, task->taskId, &ev);
+}
+
+void ModEvent_(EventLoop *loop, BaseTask *task, EventType event)
+{
+ struct epoll_event ev = {};
+ ev.data.fd = task->taskId;
+ if (event == Event_Read) {
+ ev.events = EPOLLIN;
+ } else if (event == Event_Write) {
+ ev.events = EPOLLOUT;
+ }
+
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_MOD, task->taskId, &ev);
+}
+
+void DelEvent_(EventLoop *loop, BaseTask *task)
+{
+ (void)epoll_ctl(loop->epollFd, EPOLL_CTL_DEL, task->taskId, NULL);
+}
+
+void ProcessEvent(EventLoop *loop, int fd, EventType type)
+{
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return, "%s, invalid param", __func__);
+ HashNode *node = GetHashNode(loop->tab, &fd);
+ if (node == NULL) {
+ return;
+ }
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ task->handleEvent(loop, task, type);
+}
+
+void Run_(EventLoop *loop)
+{
+ BEGET_ERROR_CHECK(loop != NULL, return, "invalid param");
+ struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * loop->maxevents);
+ BEGET_ERROR_CHECK(events!= NULL, return, "fail to allocate space to epoll event");
+
+ while (1) {
+ int num = epoll_wait(loop->epollFd, events, loop->maxevents, -1);
+ for (int i = 0; i < num; ++i) {
+ if (events[i].events & EPOLLIN) {
+ ProcessEvent(loop, events[i].data.fd, Event_Read);
+ } else if (events[i].events & EPOLLOUT) {
+ ProcessEvent(loop, events[i].data.fd, Event_Write);
+ }
+ }
+ }
+}
+
+static int CreateLoop(EventLoop **loop)
+{
+ if (loop == NULL) {
+ return -1;
+ }
+ EventLoop *handle= (EventLoop*)calloc(1, sizeof(EventLoop));
+ BEGET_ERROR_CHECK(handle != NULL, return -1, "fail to allocate space for EventLoop");
+ handle->epollFd = epoll_create(DEFAULT_MAX_EVENTS);
+ BEGET_ERROR_CHECK(handle->epollFd > 0, free(handle);return -1, "failed to create epoll. errno [%d]", errno);
+ handle->Run = Run_;
+ handle->AddEvent = AddEvent_;
+ handle->ModEvent = ModEvent_;
+ handle->DelEvent = DelEvent_;
+ handle->maxevents = DEFAULT_MAX_EVENTS;
+ handle->tab == NULL;
+ *loop = handle;
+ return 0;
+}
+
+
+BaseTask* CreateBaseTask(EventLoop *loop, uint32_t size)
+{
+ BaseTask *task = (BaseTask*)calloc(1, size);
+ BEGET_ERROR_CHECK(task != NULL, return NULL, "fail to create base task");
+ task->hashNode.next = NULL;
+ return task;
+}
+
+EventLoop* GetDefaultLoop()
+{
+ if (staticloop != NULL) {
+ return staticloop;
+ }
+ int ret = CreateLoop(&staticloop);
+ BEGET_ERROR_CHECK(ret == 0, return NULL, "fail to create default loop");
+ return staticloop;
+}
+
+void RunLoop(EventLoop *loop)
+{
+ if (loop != NULL && loop->Run != NULL) {
+ loop->Run(loop);
+ }
+}
+
+void StopLoop()
+{
+
+}
\ No newline at end of file
diff --git b/services/param_service/src/hash.c b/services/param_service/src/hash.c
new file mode 100644
index 0000000..ad3021c
--- /dev/null
+++ b/services/param_service/src/hash.c
@@ -0,0 +1,106 @@
+#include "hash.h"
+#include "beget_ext.h"
+
+#include <stdlib.h>
+
+int HashTabCreate(HashTab **tab, HashInfo *info)
+{
+ BEGET_ERROR_CHECK(tab != NULL && info != NULL, return -1, "%s : invalid arguments", __func__);
+ *tab = (HashTab*)calloc(1, sizeof(HashTab) + sizeof(HashNode) * info->maxBucket);
+ BEGET_ERROR_CHECK(*tab != NULL, return -1, "fail to calloc hash tab");
+ (*tab)->nodeHash = info->nodeHash;
+ (*tab)->keyHash = info->keyHash;
+ (*tab)->nodeCompare = info->nodeCompare;
+ (*tab)->keyCompare = info->keyCompare;
+ (*tab)->nodeFree = info->nodeFree;
+ (*tab)->maxBucket = info->maxBucket;
+ return 0;
+}
+
+static HashNode* CheckHashNodeIsExist(HashTab *tab, HashNode* head, HashNode *target)
+{
+ int ret;
+ HashNode *tmp = head;
+ while (tmp) {
+ ret = tab->nodeCompare(tmp, target);
+ if (ret == 0) {
+ return tmp;
+ }
+ tmp = tmp->next;
+ }
+ return NULL;
+}
+
+int HashNodeAdd(HashTab *tab, HashNode *node)
+{
+ BEGET_ERROR_CHECK(tab != NULL && node != NULL, return -1, "%s : invalid param", __func__);
+ int hashCode = tab->nodeHash(node);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *tmp = CheckHashNodeIsExist(tab, tab->buckets[hashCode], node);
+ if (tmp != NULL) {
+ BEGET_LOGE("node was exist");
+ return -1;
+ }
+ node->next = tab->buckets[hashCode];
+ tab->buckets[hashCode] = node;
+ return 0;
+}
+
+void HashNodeRemove(HashTab *tab, HashNode *node)
+{
+ BEGET_ERROR_CHECK(tab != NULL && node != NULL, return, "%s : invalid param", __func__);
+ int hashCode = tab->nodeHash(node);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *current = tab->buckets[hashCode];
+ HashNode *prepare = NULL;
+
+ while (current) {
+ int ret = tab->nodeCompare(current, node);
+ if (ret == 0) {
+ if (current == tab->buckets[hashCode]) {
+ tab->buckets[hashCode] = current->next;
+ } else {
+ prepare->next = current->next;
+ }
+ return;
+ }
+ prepare = current;
+ current = current->next;
+ }
+}
+
+int HashTabDestroy(HashTab *tab)
+{
+ BEGET_ERROR_CHECK(tab != NULL, return -1, "%s : invalid arguments", __func__);
+ if (tab->nodeFree == NULL) {
+ BEGET_LOGE("%s : can not find node free func", __func__);
+ return -1;
+ }
+ for (int i = 0; i < tab->maxBucket; ++i) {
+ while(tab->buckets[i]) {
+ HashNode *next = tab->buckets[i]->next;
+ tab->nodeFree(tab->buckets[i]);
+ tab->buckets[i] = next;
+ }
+ }
+ return 0;
+}
+
+HashNode* GetHashNode(HashTab *tab, const void *key)
+{
+ BEGET_ERROR_CHECK(tab != NULL && key != NULL, return NULL, "%s : invalid param", __func__);
+ int hashCode = tab->keyHash(key);
+ hashCode = hashCode > 0 ? hashCode : -hashCode;
+ hashCode = hashCode % tab->maxBucket;
+ HashNode *tmp = tab->buckets[hashCode];
+ while (tmp != NULL) {
+ int ret = tab->keyCompare(tmp, key);
+ if (ret == 0) {
+ return tmp;
+ }
+ tmp = tmp->next;
+ }
+ return NULL;
+}
\ No newline at end of file
diff --git a/services/param_service/src/le_utils.c b/services/param_service/src/le_utils.c
index 8fa0401..c4f5b69 100644
--- a/services/param_service/src/le_utils.c
+++ b/services/param_service/src/le_utils.c
@@ -3,8 +3,6 @@
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
-#include <sys/stat.h>
-#include "securec.h"
#define MAX_BUF 1024
diff --git a/services/param_service/src/main.c b/services/param_service/src/main.c
index 1daa246..58ef853 100644
--- a/services/param_service/src/main.c
+++ b/services/param_service/src/main.c
@@ -6,8 +6,7 @@
#include <sys/prctl.h>
#include <string.h>
#include "beget_ext.h"
-#include "trie_comm.h"
-#include "param_server.h"
+#include "base_task.h";
int main(int argc, char* argv[])
{
@@ -17,11 +16,18 @@ int main(int argc, char* argv[])
return -1;
}
- int ret = ParamWorkSpaceInit();
+ EventLoop *defaultLoop = GetDefaultLoop();
+ int ret = ParamServerInit(defaultLoop);
+ if (ret != 0) {
+ BEGET_LOGE("ParamServerInit failed\n");
+ }
+
+ ret = ParamWorkSpaceInit();
if (ret != 0) {
BEGET_LOGE("ParamWorkSpaceInit failed\n");
exit(EXIT_FAILURE);
}
- ParamServerStart();
+
+ RunLoop(defaultLoop);
return 0;
}
diff --git a/services/param_service/src/param_server.c b/services/param_service/src/param_server.c
index 8b38d0f..9ae06a3 100644
--- a/services/param_service/src/param_server.c
+++ b/services/param_service/src/param_server.c
@@ -3,13 +3,13 @@
#include <sys/un.h>
#include <sys/stat.h>
#include <sys/epoll.h>
+#include <sys/timerfd.h>
#include <errno.h>
#include <unistd.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <pthread.h>
#include "beget_ext.h"
#include "param_server.h"
@@ -19,153 +19,445 @@
#include "securec.h"
#include "le_utils.h"
-void HandleEvent(struct EventArgs* args)
-{
- int clientFd = args->clientFd;
- int epollFd = args->epollFd;
- struct ParamRequestMsg* pmsg = (struct ParamRequestMsg*)malloc(sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX);
- BEGET_ERROR_CHECK(pmsg != NULL, return, "failed to malloc ParamRequestMsg");
- bzero(pmsg, sizeof(struct ParamRequestMsg) + PARAM_VALUE_LEN_MAX);
- pmsg->datasize = PARAM_VALUE_LEN_MAX;
- int status = SOCK_CONNECTED;
- while (1) {
- int ret = recv(clientFd, pmsg, sizeof(struct ParamRequestMsg) + pmsg->datasize, 0);
- if (ret == 0) {
- status = SOCK_DISCONNECTED;
- break;
- } else if (ret < 0) {
- if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
- continue;
- }
- status = SOCK_UNKNOWN;
- break;
- } else {
- break;
- }
+static int ParamTimerCreate(EventLoop *loop);
+struct ParamTimer {
+ int8_t isCreate;
+ BaseTask *task;
+};
+
+static struct ParamTimer ptimer = {0, NULL};
+
+static ListNode* GetAwaitHead()
+{
+ static ListNode *awaitHead = NULL;
+ if (awaitHead == NULL) {
+ ListNode *head = (ListNode*)malloc(sizeof(ListNode));
+ BEGET_ERROR_CHECK(head != NULL, return NULL, "%s, failed to allocate space", __func__);
+ head->next = head;
+ head->prev = head;
+ awaitHead = head;
}
+ return awaitHead;
+}
- if (status != SOCK_CONNECTED) {
- epoll_ctl(epollFd, EPOLL_CTL_DEL, clientFd, NULL);
- free(pmsg);
+static void CloseTask(LoopHandle handle, BaseTask *task)
+{
+ if (handle == NULL || task == NULL) {
return;
}
+ EventLoop *loop = (EventLoop*)handle;
+ loop->DelEvent(loop, task);
+ close(task->taskId);
+ HashNodeRemove(loop->tab, &task->hashNode);
+ free(task);
+}
+
+int32_t SocketRecv(LoopHandle handle, BaseTask *task, EventBuffer *buf, size_t length)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__);
+ int32_t readlen = (int32_t)recv(task->taskId, buf->data + buf->datasize, length, 0);
+ if (readlen > 0) {
+ buf->datasize += readlen;
+ }
+ return readlen;
+}
- int ret;
- struct ParamRespMsg* respmsg = (struct ParamRespMsg*)malloc(sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
- BEGET_ERROR_CHECK(respmsg != NULL, free(pmsg);return, "Failed to malloc ParamRespMsg");
- bzero(respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX);
- switch(pmsg->type) {
+int32_t SocketSend(LoopHandle handle, BaseTask *task, EventBuffer *buf)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && buf != NULL, return -1, "%s : invalid param", __func__);
+ int32_t writelen = (int32_t)send(task->taskId, buf->data, buf->datasize, 0);
+ return writelen;
+}
+
+static void CheckAndTriggerWait(LoopHandle handle, char *key, char *value)
+{
+ BEGET_ERROR_CHECK(handle != NULL && key != NULL && value != NULL, return, "%s, invalid value", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ char fullStr[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX] = {0};
+ sprintf(fullStr, "%s=%s", key, value);
+ ListNode *head = GetAwaitHead();
+ ListNode *tmp = head->next;
+ BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__);
+ while (tmp != head) {
+ WaitInfo *info = ListEntry(tmp, WaitInfo, anchor);
+ if (strcmp(info->condition, fullStr) == 0) {
+ HashNode *hashNode = GetHashNode(loop->tab, &info->taskId);
+ BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode);
+ loop->AddEvent(loop, task, Event_Write);
+ OH_ListRemove(&info->anchor);
+ }
+ tmp = tmp->next;
+ }
+}
+
+static void HandleMessageInner_(LoopHandle handle, ClientTask *task, ParamReqMsg *msg)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL && msg != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ uint32_t flag;
+ char data[PARAM_VALUE_LEN_MAX] = {0};
+ switch(msg->type) {
case SET_PARAMETER: {
- ret = SetParamtoMem(pmsg->key, pmsg->data);
- respmsg->flag = ret;
+ task->type = NORMAL_TYPE;
+ flag = SetParamtoMem(msg->key, msg->data);
+ if (flag == 0) {
+ CheckAndTriggerWait(handle, msg->key, msg->data);
+ }
break;
}
case GET_PARAMETER: {
- if (pmsg->datasize > PARAM_VALUE_LEN_MAX) {
- pmsg->datasize = PARAM_VALUE_LEN_MAX;
- }
- ret = GetParamFromMem(pmsg->key, respmsg->data, pmsg->datasize);
- respmsg->flag = ret;
- if (ret == 0) {
- respmsg->datasize = strlen(respmsg->data);
- }
+ task->type = NORMAL_TYPE;
+ flag = GetParamFromMem(msg->key, data, PARAM_VALUE_LEN_MAX);
break;
}
case WAIT_PARAMETER: {
- ret = WaitParam(pmsg->key, pmsg->data, pmsg->timeout);
- respmsg->flag = ret;
+ task->type = NORMAL_TYPE;
+ flag = WaitParam(msg->key, msg->data, msg->timeout);
+ if (flag != 0) {
+ task->type = WAIT_TYPE;
+ WaitInfo *info = (WaitInfo*)malloc(sizeof(WaitInfo) + msg->datasize);
+ BEGET_ERROR_CHECK(info != NULL, break, "%s, failed to allocate wait info space", __func__);
+ sprintf(info->condition, "%s=%s", msg->key, msg->data);
+ info->timeout = msg->timeout;
+ info->taskId = task->base.taskId;
+ task->info.extraInfo = (void*)info;
+ OH_ListAddTail(GetAwaitHead(), &info->anchor);
+ loop->DelEvent(loop, (BaseTask*)task);
+ ParamTimerCreate(loop);
+ return;
+ }
break;
}
default:
- respmsg->flag = -1;
+ task->type = NORMAL_TYPE;
+ flag = -1;
break;
}
- ret = send(clientFd, respmsg, sizeof(struct ParamRespMsg) + PARAM_VALUE_LEN_MAX, 0);
- if (ret < 0) {
- BEGET_LOGE("Failed to send data to : %d\n", clientFd);
+ EventBuffer *buf;
+ if (strlen(data) > 0) {
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg) + strlen(data) + 1);
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg) + strlen(data) + 1;
+ ParamRespMsg *respmsg = (ParamRespMsg*)(buf->data);
+ respmsg->flag = flag;
+ respmsg->datasize = strlen(data);
+ (void)memcpy_s(respmsg->data, strlen(data) + 1, data, strlen(data) + 1);
+ } else {
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg));
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg);
+ ParamRespMsg *respmsg = buf->data;
+ respmsg->flag = flag;
}
- free(pmsg);
- free(respmsg);
+
+ task->info.content = buf;
+ loop->ModEvent(loop, (BaseTask*)task, Event_Write);
}
-int CtlAdd(int epollfd, int fd, uint32_t event)
+static void OnSendMessage(LoopHandle handle, BaseTask *task)
{
- struct epoll_event ev = {
- .data.fd = fd,
- .events = event,
- };
- int ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
- BEGET_ERROR_CHECK(ret == 0, return -1, "failed to add epoll_ctl fd %d. errno [%d]", fd, errno);
- return 0;
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ ClientTask *clienttask = (ClientTask*)task;
+
+ EventBuffer *buf = NULL;
+ if (clienttask->type == NORMAL_TYPE) {
+ BEGET_ERROR_CHECK(clienttask->info.content != NULL, return, "no message to send");
+ buf = clienttask->info.content;
+ } else if (clienttask->type == WAIT_TYPE) {
+ BEGET_ERROR_CHECK(clienttask->info.extraInfo != NULL, return, "no message to send");
+ WaitInfo *info = (WaitInfo*)clienttask->info.extraInfo;
+ buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + sizeof(ParamRespMsg));
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, failed to allocate buf", __func__);
+ buf->datasize = sizeof(ParamRespMsg);
+ ParamRespMsg *respmsg = buf->data;
+ respmsg->datasize = 0;
+ if (info->timeout > 0) {
+ respmsg->flag = 0;
+ } else {
+ respmsg->flag = -1;
+ }
+ free(info);
+ }
+
+ int32_t ret = SocketSend(handle, task, buf);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ BEGET_LOGE("%s, resource busy, try again", __func__);
+ return;
+ }
+ BEGET_LOGE("%s, SocketSend fail, errno : %d", __func__, errno);
+ }
+
+ free(buf);
+ loop->ModEvent(loop, task, Event_Read);
}
-void StartEpoll(int listenfd)
-{
- int epollfd = epoll_create(LOOP_MAX_SOCKET);
- BEGET_ERROR_CHECK(epollfd > 0, return, "failed to create epoll. errno [%d]", errno);
-
- int ret = CtlAdd(epollfd, listenfd, EPOLLIN);
- BEGET_ERROR_CHECK(ret == 0, close(epollfd); return, "failed to CtlAdd");
-
- struct epoll_event *events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * LOOP_MAX_SOCKET);
- BEGET_ERROR_CHECK(events != NULL, close(epollfd); return, "failed to alloc memory for epoll_event");
-
- while(1) {
- int number = epoll_wait(epollfd, events, LOOP_MAX_SOCKET, -1);
- for (int index = 0; index < number; ++index) {
- int fd_ = events[index].data.fd;
- if (fd_ == listenfd) {
- struct sockaddr_un clientAddr;
- socklen_t addrlen = sizeof(clientAddr);
- bzero(&clientAddr, addrlen);
- int clientfd = accept(listenfd, (struct sockaddr*)&clientAddr, &addrlen);
- BEGET_ERROR_CHECK(clientfd >= 0, close(epollfd); return, "failed to accept socket");
- SetNoBlock(clientfd);
- SetCloseExec(clientfd);
- ret = CtlAdd(epollfd, clientfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
- BEGET_ERROR_CHECK(ret == 0, continue, "failed to CtlAdd");
- } else {
- pthread_t threadId;
- struct EventArgs args = {epollfd, fd_};
- ret = pthread_create(&threadId, NULL, (void*)HandleEvent, (void*)&args);
- BEGET_ERROR_CHECK(ret == 0, continue, "faild to create pthread to handle parameter event");
+static void OnRecvMessage(LoopHandle handle, BaseTask *task)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ uint32_t payload = (uint32_t)sizeof(ParamReqMsg);
+ EventBuffer *buf = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload);
+ BEGET_ERROR_CHECK(buf != NULL, return, "%s, fail to allocate recv buf", __func__);
+
+ int32_t recvlen = payload;
+ while (buf->datasize != payload) {
+ int32_t ret = SocketRecv(handle, task, buf, recvlen);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ continue;
+ }
+ BEGET_LOGE("Process SocketRecv fail, errno : ", errno);
+ goto CLOSE;
+ } else if (ret == 0) {
+ BEGET_LOGI("%d, client normal exist", task->taskId);
+ task->close(handle, task);
+ goto CLOSE;
+ }
+ recvlen = payload - buf->datasize;
+ }
+
+ ParamReqMsg *reqmsg = buf->data;
+ if (reqmsg->datasize > 0) {
+ EventBuffer *tmp = (EventBuffer*)calloc(1, sizeof(EventBuffer) + payload + reqmsg->datasize);
+ (void)memcpy_s(tmp, payload, buf, payload);
+ free(buf);
+ buf = tmp;
+ reqmsg = buf->data;
+ recvlen = reqmsg->datasize;
+ payload += reqmsg->datasize;
+ while (buf->datasize != payload) {
+ int32_t ret = SocketRecv(handle, task, buf, recvlen);
+ if (ret < 0) {
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) {
+ continue;
+ }
+ BEGET_LOGE("Process SocketRecv fail, errno : ", errno);
+ goto CLOSE;
+ } else if (ret == 0) {
+ BEGET_LOGI("%d, client normal exist", task->taskId);
+ task->close(handle, task);
+ goto CLOSE;
}
+ recvlen = payload - buf->datasize;
}
}
- close(epollfd);
- free(events);
+
+ HandleMessageInner_(handle, (ClientTask*)task, reqmsg);
+CLOSE:
+ free(buf);
}
-int CreateSocket()
+static void HandleClientEvent(LoopHandle handle, BaseTask *task, EventType type)
{
- unlink(PIPE_NAME);
- int listenfd = socket(PF_UNIX, SOCK_STREAM, 0);
- BEGET_ERROR_CHECK(listenfd > 0, return -1, "failed to create socket. errno [%d]", errno);
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ ClientTask *clienttask = (ClientTask*)task;
+ if (type == Event_Read) {
+ clienttask->recvMessage(handle ,task);
+ } else if (type == Event_Write) {
+ clienttask->sendMessage(handle ,task);
+ } else {
+ BEGET_LOGE("%s, invalid type", __func__);
+ }
+}
+
+static void ServerInCommingConnect(LoopHandle handle, BaseTask *task)
+{
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+
+ struct sockaddr_un clientAddr;
+ socklen_t addrlen = sizeof(clientAddr);
+ int clientfd = accept(task->taskId, (struct sockaddr*)&clientAddr, &addrlen);
+ BEGET_ERROR_CHECK(clientfd >= 0, return, "%s : failed to accept socket, %d", __func__, errno);
+ BEGET_LOGV("client fd = %d", clientfd);
+ SetNoBlock(clientfd);
+ SetCloseExec(clientfd);
+ ClientTask *clienttask = (ClientTask*)CreateBaseTask(loop, sizeof(ClientTask));
+ BEGET_ERROR_CHECK(clienttask != NULL, close(clientfd); return, "%s : failed to create client task. errno [%d]", __func__, errno);
+ clienttask->base.taskId = clientfd;
+ clienttask->base.close = CloseTask;
+ clienttask->base.handleEvent = HandleClientEvent;
+ clienttask->recvMessage = OnRecvMessage;
+ clienttask->sendMessage = OnSendMessage;
+ clienttask->disconnect = NULL;
+ HashNodeAdd(loop->tab, &clienttask->base.hashNode);
+ loop->AddEvent(loop, (BaseTask*)clienttask, Event_Read);
+}
+
+static void HandleServerEvent(LoopHandle handle, BaseTask *task, EventType type)
+{
+ (void)type;
+ BEGET_ERROR_CHECK(handle != NULL && task != NULL, return, "%s : invalid param", __func__);
+ ServerTask *servertask = (ServerTask*)task;
+ servertask->incommingConnect(handle, task);
+}
+
+static void CheckWaitParamTimeout(LoopHandle handle, uint64_t expire)
+{
+ BEGET_ERROR_CHECK(handle != NULL, return, "%s, invalid valud", __func__);
+ EventLoop *loop = (EventLoop*)handle;
+ ListNode *head = GetAwaitHead();
+ ListNode *tmp = head->next;
+ BEGET_ERROR_CHECK(head != NULL, return, "%s, invalid list node", __func__);
+ while (tmp != head) {
+ WaitInfo *info = ListEntry(tmp, WaitInfo, anchor);
+ if (info->timeout > 0) {
+ info->timeout -= expire;
+ } else {
+ HashNode *hashNode = GetHashNode(loop->tab, &info->taskId);
+ BaseTask *task = HASHNODE_ENTRY(hashNode, BaseTask, hashNode);
+ loop->AddEvent(loop, task, Event_Write);
+ OH_ListRemove(&info->anchor);
+ }
+ tmp = tmp->next;
+ }
+ if (head->next == head) {
+ ptimer.task->close(handle, ptimer.task);
+ ptimer.isCreate = 0;
+ }
+}
+static void HandleTimerEvent(LoopHandle handle, BaseTask *task, EventType type)
+{
+ (void)type;
+ uint64_t exp;
+ read(task->taskId, &exp, sizeof(uint64_t));
+ CheckWaitParamTimeout(handle, exp);
+ BEGET_LOGI("Entry timer task, exp : %ld", exp);
+}
+
+static int ParamServerCreate(EventLoop *loop)
+{
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__);
+ int server = socket(PF_UNIX, SOCK_STREAM, 0);
+ BEGET_ERROR_CHECK(server > 0, return -1, "failed to create socket. errno [%d]", errno);
struct sockaddr_un serverAddr;
-
(void)memset_s(&serverAddr, sizeof(serverAddr), 0, sizeof(serverAddr));
serverAddr.sun_family = AF_UNIX;
strncpy(serverAddr.sun_path, PIPE_NAME, sizeof(serverAddr.sun_path));
uint32_t size = offsetof(struct sockaddr_un, sun_path) + strlen(PIPE_NAME);
- int ret = bind(listenfd, (struct sockaddr*)&serverAddr, size);
- BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to bind socket. errno [%d]", errno);
+ int ret = bind(server, (struct sockaddr*)&serverAddr, size);
+ BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to bind socket. errno [%d]", errno);
+
+ SetNoBlock(server);
+ SetCloseExec(server);
+ ret = listen(server, MAX_CLIENT);
+ BEGET_ERROR_CHECK(ret >= 0, close(server); return -1, "failed to listen socket. errno [%d]", errno);
+
+ ServerTask *servertask = (ServerTask*)CreateBaseTask(loop, sizeof(ServerTask));
+ BEGET_ERROR_CHECK(servertask != NULL, close(server); return -1, "failed to create server task. errno [%d]", errno);
+ servertask->base.taskId = server;
+ servertask->base.close = CloseTask;
+ servertask->base.handleEvent = HandleServerEvent;
+ servertask->incommingConnect = ServerInCommingConnect;
+ HashNodeAdd(loop->tab, &servertask->base.hashNode);
+ loop->AddEvent(loop, (BaseTask*)servertask, Event_Read);
+ return 0;
+}
+
+static int ParamTimerCreate(EventLoop *loop)
+{
+ if (ptimer.isCreate) {
+ return 0;
+ }
+ BEGET_ERROR_CHECK(loop != NULL && loop->tab != NULL, return -1, "%s : invalid loop", __func__);
+ int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
+ BEGET_ERROR_CHECK(timerfd > 0, return -1, "failed to create timerfd. errno [%d]", errno);
+ struct itimerspec timespec = {
+ .it_interval.tv_sec = 1,
+ .it_interval.tv_nsec = 0,
+ .it_value.tv_sec = 1,
+ .it_value.tv_nsec = 0,
+ };
+ int ret = timerfd_settime(timerfd, 0, ×pec, NULL);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "failed to set timerfd. errno [%d]", errno);
+ ptimer.task = (BaseTask*)CreateBaseTask(loop, sizeof(BaseTask));
+ BEGET_ERROR_CHECK(ptimer.task != NULL, close(timerfd); return -1, "failed to create timer task. errno [%d]", errno);
+ ptimer.task->taskId = timerfd;
+ ptimer.task->handleEvent = HandleTimerEvent;
+ ptimer.task->close = CloseTask;
+ HashNodeAdd(loop->tab, &ptimer.task->hashNode);
+ loop->AddEvent(loop, ptimer.task, Event_Read);
+ ptimer.isCreate = 1;
+ return 0;
+}
+
+static int NodeHash(HashNode *node)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ return task->taskId;
+}
+
+static int KeyHash(const void *key)
+{
+ int id = *(int*)key;
+ return id;
+}
+
+static int NodeCompare(HashNode *node_1, HashNode *node_2)
+{
+ BaseTask *task_1 = HASHNODE_ENTRY(node_1, BaseTask, hashNode);
+ BaseTask *task_2 = HASHNODE_ENTRY(node_2, BaseTask, hashNode);
+ return (task_1->taskId - task_2->taskId);
+}
- SetNoBlock(listenfd);
- SetCloseExec(listenfd);
- ret = listen(listenfd, LOOP_MAX_CLIENT);
- BEGET_ERROR_CHECK(ret >= 0, close(listenfd); return -1, "failed to listen socket. errno [%d]", errno);
- ret = chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
- BEGET_ERROR_CHECK(ret == 0, close(listenfd); return -1, "failed to chmod %s. errno [%d]", PIPE_NAME, errno);
+static int KeyCompare(HashNode *node, const void *key)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ if (task == NULL) {
+ BEGET_LOGE("%s, invalid task", __func__);
+ return -1;
+ }
+ return (task->taskId - *((int*)key));
+}
- return listenfd;
+static void NodeFree(HashNode *node)
+{
+ BaseTask *task = HASHNODE_ENTRY(node, BaseTask, hashNode);
+ if (task->close != NULL) {
+ task->close(GetDefaultLoop(), task);
+ }
+ free(task);
}
-void ParamServerStart()
+static void ResourceInit()
{
+ unlink(PIPE_NAME);
+ chmod(PIPE_NAME, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
MakeDirRecursive(PIPE_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
- int listenfd = CreateSocket();
- StartEpoll(listenfd);
+}
+
+static int InitDefaultLoopTab(EventLoop *defaultLoop)
+{
+ BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s : invalid loop", __func__);
+ HashInfo info = {
+ .nodeHash = NodeHash,
+ .keyHash = KeyHash,
+ .nodeCompare = NodeCompare,
+ .keyCompare = KeyCompare,
+ .nodeFree = NodeFree,
+ .maxBucket = 128,
+ };
+ HashTab *tab = NULL;
+ int ret = HashTabCreate(&tab, &info);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "failed to create hash tab. errno [%d]", ret);
+ defaultLoop->tab = tab;
+ return 0;
+}
+
+int ParamServerInit(EventLoop *defaultLoop)
+{
+ BEGET_ERROR_CHECK(defaultLoop != NULL, return -1, "%s, invalid event loop", __func__);
+ ResourceInit();
+
+ int ret;
+ ret = InitDefaultLoopTab(defaultLoop);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to init loop", __func__);
+ ret = ParamServerCreate(defaultLoop);
+ BEGET_ERROR_CHECK(ret == 0, return -1, "%s : failed to create param server.", __func__);
+ return 0;
}
diff --git a/services/param_service/src/trie_comm.c b/services/param_service/src/trie_comm.c
index fd8184a..ac31243 100644
--- a/services/param_service/src/trie_comm.c
+++ b/services/param_service/src/trie_comm.c
@@ -14,20 +14,23 @@
#include <sys/mman.h>
#include <time.h>
#include <signal.h>
+#include <semaphore.h>
#include "trie_comm.h"
+#include "trie_queue.h"
#include "le_utils.h"
#include "param_utils.h"
#include "parameter.h"
#include "securec.h"
static TrieHeader* paramWorkSpace;
+static TrieNodeQueue trieQueue = {0};
+static sem_t dump_sem;
static pthread_rwlock_t rwlock;
-static pthread_mutex_t mtlock;
-static atomic_bool cnt;
-static atomic_bool waitCnt;
+static pthread_mutex_t queuelock;
+static atomic_int updateCnt = 0;
-uint32_t trie_alloc(char* name)
+static uint32_t trie_alloc(char* name)
{
BEGET_ERROR_CHECK(name != NULL, return 0, "invalid name");
uint32_t keySize = strlen(name) + 1;
@@ -48,7 +51,7 @@ uint32_t trie_alloc(char* name)
return nowOffset;
}
-uint32_t param_alloc(uint32_t size)
+static uint32_t param_alloc(uint32_t size)
{
uint32_t allocSize = PARAM_ALIGN(sizeof(ParamNode) + size);
uint32_t nowOffset = paramWorkSpace->currOffest;
@@ -61,41 +64,41 @@ uint32_t param_alloc(uint32_t size)
return nowOffset;
}
-TrieNode* GetRootNode()
+static TrieNode* GetRootNode()
{
BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return NULL, "failed");
return (paramWorkSpace->shareAddr + paramWorkSpace->rootOffest);
}
-TrieNode* GetTrieEntry(uint32_t index)
+static TrieNode* GetTrieEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
TrieNode* entry = paramWorkSpace->shareAddr + index;
return entry;
}
-ParamNode* GetParamEntry(uint32_t index)
+static ParamNode* GetParamEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
ParamNode* entry = paramWorkSpace->shareAddr + index;
return entry;
}
-ListNode* GetListNodeEntry(uint32_t index)
+static TrieListNode* GetTrieListNodeEntry(uint32_t index)
{
BEGET_ERROR_CHECK(index <= paramWorkSpace->currOffest, return NULL, "invalid index");
- ListNode* entry = paramWorkSpace->shareAddr + index;;
+ TrieListNode* entry = paramWorkSpace->shareAddr + index;;
return entry;
}
-TrieNode* ListNodeGetTrieEntry(ListNode* node)
+static TrieNode* TrieListNodeGetTrieEntry(TrieListNode* node)
{
BEGET_ERROR_CHECK(node != NULL, return NULL, "invalid node");
TrieNode* entry = (TrieNode*)((char*)node - offsetof(TrieNode, node));
return entry;
}
-void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
+static void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
{
BEGET_ERROR_CHECK(remainKey != NULL, return, "invalid remainKey");
BEGET_ERROR_CHECK(subKey != NULL, return, "invalid subKey");
@@ -107,7 +110,7 @@ void GetSubKey(const char* remainKey, char** subKey, uint32_t* prefixLen)
}
}
-int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen)
+static int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen)
{
uint32_t nodeKeyLen = strlen(nodeKey);
if (nodeKeyLen > prefixKeyLen) {
@@ -118,7 +121,7 @@ int CompareKey(const char* nodeKey, const char* prefixKey, uint32_t prefixKeyLen
return strncmp(nodeKey, prefixKey, prefixKeyLen);
}
-TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
+static TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
{
if (current == NULL || remainKey == NULL)
return NULL;
@@ -138,7 +141,7 @@ TrieNode* FindSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pre
return FindSubTrieNode(subTrieNode, remainKey, prefixLen);
}
-TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
+static TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t prefixLen)
{
if (current == NULL || remainKey == NULL)
return NULL;
@@ -170,10 +173,10 @@ TrieNode* AddSubTrieNode(TrieNode* current, const char* remainKey, uint32_t pref
return AddSubTrieNode(subTrieNode, remainKey, prefixLen);
}
-int CheckParamName(const char* name)
+static int CheckParamName(const char* name)
{
BEGET_ERROR_CHECK(name != NULL, return -1, "invalid parameter name");
- size_t nameLen = strlen(name);
+ int nameLen = (int)strlen(name);
if (name[0] == '.' || name[nameLen - 1] == '.')
return -1;
for (int i = 0; i < nameLen; ++i) {
@@ -191,145 +194,7 @@ int CheckParamName(const char* name)
return 0;
}
-int SetParamtoMem(const char* key, const char* value)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
- BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len");
- BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len");
- BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name");
-
- TrieNode* root = GetRootNode();
- TrieNode* current = GetRootNode();
- if (root == NULL || current == NULL)
- return -1;
-
- char* remainKey = (char *)key;
- pthread_rwlock_wrlock(&rwlock);
- while(1) {
- char* subKey;
- uint32_t prefixLen;
- GetSubKey(remainKey, &subKey, &prefixLen);
- if (current->child != 0) {
- current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode");
- } else {
- char prefixKey[PARAM_NAME_LEN_MAX] = {0};
- (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen);
- current->child = trie_alloc(prefixKey);
- BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node");
- current = GetTrieEntry(current->child);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry");
- }
- if (subKey == NULL) {
- if (current->dataIndex) {
- int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ;
- BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter");
- ParamNode* saveParam = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
- (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value));
- saveParam->valueLen = strlen(value);
- break;
- }
- uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2;
- current->dataIndex = param_alloc(allocSize);
- ParamNode* saveParam = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry");
- sprintf(saveParam->data, "%s=%s", key, value);
- saveParam->keyLen = strlen(key);
- saveParam->valueLen = strlen(value);
-
- current->node.prev = root->node.prev;
- current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr;
- ListNode* rootPrevListNode = GetListNodeEntry(root->node.prev);
- TrieNode* rootPrevTrieNode = ListNodeGetTrieEntry(rootPrevListNode);
- BEGET_ERROR_CHECK((rootPrevListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry");
- rootPrevTrieNode->node.next = (void*)(¤t->node) - paramWorkSpace->shareAddr;
- root->node.prev = (void*)(¤t->node) - paramWorkSpace->shareAddr;
- break;
- }
- remainKey = subKey + 1;
- }
- atomic_store(&cnt, 1);
- atomic_store(&waitCnt, 1);
- pthread_rwlock_unlock(&rwlock);
- return 0;
-}
-
-int GetParamFromMem(const char* key, char* value, uint32_t len)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
-
- TrieNode* current = GetRootNode();
- if (current == NULL)
- return -1;
-
- ParamNode* paramData;
- char* remainKey = (char *)key;
- pthread_rwlock_rdlock(&rwlock);
- while (1) {
- char* subKey;
- uint32_t prefixLen;
- GetSubKey(remainKey, &subKey, &prefixLen);
- current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
- BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key);
- if (subKey == NULL) {
- paramData = GetParamEntry(current->dataIndex);
- BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
- break;
- }
- remainKey = subKey + 1;
- }
-
- if (len > paramData->valueLen) {
- (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen);
- value[paramData->valueLen] = '\0';
- } else {
- (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len);
- value[len] = '\0';
- }
- pthread_rwlock_unlock(&rwlock);
- return 0;
-}
-
-int WaitParam(const char* key, const char* value, uint32_t timeout)
-{
- BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
- BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
- int ret;
- char tmp[PARAM_VALUE_LEN_MAX] = {0};
- ret = GetParamFromMem(key, tmp, sizeof(tmp));
- if (ret == 0) {
- if (strncmp(value, "*", strlen(value)) == 0) {
- return 0;
- }
- if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
- return 0;
- }
- bzero(tmp, sizeof(tmp));
- }
- while (timeout != 0) {
- if (atomic_load(&waitCnt)) {
- atomic_store(&waitCnt, 0);
- ret = GetParamFromMem(key, tmp, sizeof(tmp));
- if (ret == 0) {
- if (strlen(tmp) == 1 && strncmp(value, "*", 1) == 0) {
- return 0;
- }
- if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
- return 0;
- }
- bzero(tmp, sizeof(tmp));
- }
- }
- --timeout;
- sleep(1);
- }
- return -1;
-}
-
-void WritetoDisk(TrieNode* node, FILE* fp)
+static void WritetoDisk(TrieNode* node, FILE* fp)
{
BEGET_ERROR_CHECK(node != NULL, return, "invalid node");
BEGET_ERROR_CHECK(fp != NULL, return, "invalid file descriptor ");
@@ -344,57 +209,125 @@ void WritetoDisk(TrieNode* node, FILE* fp)
fputs(buf, fp);
}
-void DumpParam()
+static void FullWrite()
+{
+ // clean up trieQueue
+ pthread_mutex_lock(&queuelock);
+ while (trieQueue.size > 0) {
+ trieQueue.pop(&trieQueue);
+ }
+ pthread_mutex_unlock(&queuelock);
+ unlink(USER_PARAM_FILE);
+ FILE* fp = fopen(USER_PARAM_FILE, "w+");
+ TrieNode* root = GetRootNode();
+ TrieListNode* current = GetTrieListNodeEntry(root->node.next);
+ BEGET_ERROR_CHECK((root != NULL) && (current != NULL), fclose(fp); return, "can not get root node or get list entry");
+ while (current != &root->node) {
+ TrieNode* trienode = TrieListNodeGetTrieEntry(current);
+ WritetoDisk(trienode, fp);
+ current = GetTrieListNodeEntry(current->next);
+ }
+ fclose(fp);
+}
+
+static void AppendWrite()
{
- if (atomic_load(&cnt)) {
- pthread_mutex_lock(&mtlock);
- atomic_store(&cnt, 0);
- unlink(USER_PARAM_FILE);
- FILE* fp = fopen(USER_PARAM_FILE, "a+");
- TrieNode* root = GetRootNode();
- ListNode* current = GetListNodeEntry(root->node.next);
- BEGET_ERROR_CHECK((root != NULL) && (current != NULL), pthread_mutex_unlock(&mtlock); fclose(fp); return, "can not get root node or get list entry");
- while (current != &root->node) {
- TrieNode* trienode = ListNodeGetTrieEntry(current);
+ FILE* fp = fopen(USER_PARAM_FILE, "a+");
+ pthread_mutex_lock(&queuelock);
+ while (trieQueue.size > 0) {
+ TrieNode* trienode = trieQueue.pop(&trieQueue);
+ if (trienode != NULL) {
WritetoDisk(trienode, fp);
- current = GetListNodeEntry(current->next);
}
- fclose(fp);
- pthread_mutex_unlock(&mtlock);
}
+ pthread_mutex_unlock(&queuelock);
+ fclose(fp);
}
-void ProcessParamFile(char* fileName)
+static void DumpParam()
+{
+ FullWrite(); // Init阶段,相同key值的字段仅有一份
+ while (1) {
+ sem_wait(&dump_sem);
+ sem_init(&dump_sem, 0, 0);
+ /*
+ persist属性的字段为持久化保存,当含有persist字段的key更新或新增次数未达到累积次数时(此处临界值为50),从队列中获取更改或新增的字段,对file进行追加型写入,保存方式如下:
+ 第一次设置时,file 内容:
+ persist.openeuler.version=22.03
+ 第二次设置时,file 内容:
+ persist.openeuler.version=22.03
+ persist.openeuler.version=23.03
+ 当达到累积次数时,不再从队列中获取更改或新增的字段,遍历整个workspace,重新对file写入, 此时相同key值的字段仅有一份,保存方式如下:
+ 第一次设置时,file 内容:
+ persist.openeuler.version=22.03
+ 第二次设置时,file 内容:
+ persist.openeuler.version=23.03
+ */
+ if (atomic_load(&updateCnt) >= 50) {
+ atomic_store(&updateCnt, 0);
+ FullWrite();
+ } else {
+ AppendWrite();
+ }
+ }
+}
+
+static void ProcessParamFile(char* fileName)
{
BEGET_ERROR_CHECK(access(fileName, F_OK) == 0, perror("error"); return, "failed to access %s", fileName);
FILE* fp = fopen(fileName, "r");
BEGET_ERROR_CHECK(fp != NULL, return, "failed to open %s", fileName);
- char buf[PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX];
- bzero(buf, sizeof(buf));
- while (fgets(buf, sizeof(buf), fp) != NULL) {
- buf[strlen(buf) - 1] = '\0';
- if (*buf == '#')
+
+ char *line = (char*)calloc(1, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX);
+ while (fgets(line, PARAM_NAME_LEN_MAX + PARAM_VALUE_LEN_MAX, fp) != NULL) {
+ line[strlen(line) - 1] = '\0';
+ if (*line == '#')
continue;
- char *sep = buf;
- char *key = NULL;
+
+ // Skip the line beginning spaces
+ while (isspace(*line) && (*line != '\0')) {
+ line++;
+ }
+
+ // Skip the spaces at the end of line
+ int len = strlen(line);
+ while (len > 0 && isspace(line[len - 1])) {
+ line[len - 1] = '\0';
+ len--;
+ }
+
+ if (*line == '\0')
+ continue;
+
+ char *sep = line;
+ char *key = sep;
char *value = NULL;
while (*sep != '\0') {
+ if (isspace(*sep)) {
+ *sep = '\0';
+ }
if (*sep == '=') {
*sep = '\0';
value = sep + 1;
- key = buf;
break;
}
- ++sep;
+ sep++;
+ }
+
+ if (value) {
+ // Skip the value beginning spaces
+ while (isspace(*value) && (*value != '\0')) {
+ value++;
+ }
+ if (*value == '\0')
+ continue;
+ SetParamtoMem(key, value);
}
- if (key) {
- SetParamtoMem(key, value);
- }
}
fclose(fp);
}
-void ReadFileInDir(char* dir, char* postfix)
+static void ReadFileInDir(char* dir, char* postfix)
{
BEGET_ERROR_CHECK((dir != NULL) && (postfix != NULL), return, "invalid directory");
DIR* pDir = opendir(dir);
@@ -417,7 +350,7 @@ void ReadFileInDir(char* dir, char* postfix)
closedir(pDir);
}
-void LoadParam(char* dir)
+static void LoadParam(char* dir)
{
BEGET_ERROR_CHECK(dir != NULL, return, "invalid directory");
struct stat st;
@@ -428,35 +361,16 @@ void LoadParam(char* dir)
}
// 定时持久化数据
-void CreateParamListener()
+static int CreateParamListener()
{
- atomic_init(&cnt, 0);
- struct sigevent sigev;
- bzero(&sigev, sizeof(struct sigevent));
- sigev.sigev_notify = SIGEV_THREAD;
- sigev.sigev_notify_function = DumpParam;
- sigev.sigev_notify_attributes = NULL;
-
- timer_t timerId;
- if (timer_create(CLOCK_REALTIME, &sigev, &timerId) != 0) {
- perror("timer_create:");
- exit(EXIT_FAILURE);
- }
-
- struct itimerspec value;
- bzero(&value, sizeof(struct itimerspec));
- value.it_value.tv_sec = 1;
- value.it_value.tv_nsec = 0;
- value.it_interval.tv_sec = 1;
- value.it_interval.tv_nsec = 0;
-
- if (timer_settime(timerId, 0, &value, NULL) != 0) {
- perror("timer_settime:");
- exit(EXIT_FAILURE);
- }
+ pthread_t dp;
+ int ret = pthread_create(&dp, NULL, (void*)DumpParam, NULL);
+ BEGET_ERROR_CHECK(ret == 0, return ret, "failed to create param listener");
+ pthread_detach(dp);
+ return 0;
}
-void InitRootNode()
+static void InitRootNode()
{
BEGET_ERROR_CHECK(paramWorkSpace != NULL, return, "invalid paramWorkSpace");
TrieNode* rootNode = paramWorkSpace->shareAddr + trie_alloc("#");
@@ -469,16 +383,16 @@ int ParamWorkSpaceInit()
MakeDirRecursive(SYSTEM_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
MakeDirRecursive(USER_PARAM_PATH, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
MakeDirRecursive(WORKSPACE_DIR, S_IRWXU | S_IRWXU | S_IRWXU | S_IROTH | S_IXOTH);
+
+ sem_init(&dump_sem, 0, 0);
pthread_rwlock_init(&rwlock, NULL);
- pthread_mutex_init(&mtlock, NULL);
- atomic_init(&waitCnt, 0);
+ pthread_mutex_init(&queuelock, NULL);
paramWorkSpace = (TrieHeader*)malloc(sizeof(TrieHeader));
BEGET_ERROR_CHECK(paramWorkSpace != NULL, return -1, "failed to malloc for param workspace");
int fd = open(WORKSPACE_NAME, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
BEGET_ERROR_CHECK(fd > 0, return -1, "failed to open %s", WORKSPACE_NAME);
- int ret = ftruncate(fd, WORKSPACE_SIZE);
- (void)ret;
+ ftruncate(fd, WORKSPACE_SIZE);
paramWorkSpace->shareAddr = mmap(NULL, WORKSPACE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
BEGET_ERROR_CHECK(paramWorkSpace->shareAddr != MAP_FAILED, return -1, "failed to create mmap");
paramWorkSpace->rootOffest = 0;
@@ -486,8 +400,142 @@ int ParamWorkSpaceInit()
paramWorkSpace->trieSize = 0;
paramWorkSpace->paramSize = 0;
InitRootNode();
+
+ // before LoadParam
+ TrieQueueFirstStageInit(&trieQueue);
LoadParam(SYSTEM_PARAM_PATH);
LoadParam(USER_PARAM_PATH);
- CreateParamListener();
+ // behind LoadParam
+ TrieQueueSecondStageInit(&trieQueue);
+
+ return CreateParamListener();
+}
+
+int SetParamtoMem(const char* key, const char* value)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+ BEGET_ERROR_CHECK((strlen(key) > 0) && (strlen(key) <= PARAM_NAME_LEN_MAX), return -1, "invalid key len");
+ BEGET_ERROR_CHECK((strlen(value) > 0) && (strlen(value) <= PARAM_VALUE_LEN_MAX), return -1, "invalid value len");
+ BEGET_ERROR_CHECK(CheckParamName(key) == 0, return -1, "invalid parameter name");
+
+ TrieNode* root = GetRootNode();
+ TrieNode* current = GetRootNode();
+ if (root == NULL || current == NULL)
+ return -1;
+
+ char* remainKey = key;
+ pthread_rwlock_wrlock(&rwlock);
+ while(1) {
+ char* subKey;
+ uint32_t prefixLen;
+ GetSubKey(remainKey, &subKey, &prefixLen);
+ if (current->child != 0) {
+ current = AddSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not AddSubTrieNode");
+ } else {
+ char prefixKey[PARAM_NAME_LEN_MAX] = {0};
+ (void)memcpy_s(prefixKey, PARAM_NAME_LEN_MAX, remainKey, prefixLen);
+ current->child = trie_alloc(prefixKey);
+ BEGET_ERROR_CHECK(current->child != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not alloc tire node");
+ current = GetTrieEntry(current->child);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get trie entry");
+ }
+ if (subKey == NULL) {
+ if (current->dataIndex) { // Param update
+ int ret = strncmp(key, CONST_PREFIX, strlen(CONST_PREFIX)) ;
+ BEGET_ERROR_CHECK(ret != 0, pthread_rwlock_unlock(&rwlock); return -1, "can not change the value of a constant parameter");
+ ParamNode* saveParam = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK(saveParam != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
+ (void)memcpy_s(saveParam->data + saveParam->keyLen + 1, PARAM_VALUE_LEN_MAX, value, strlen(value));
+ saveParam->valueLen = strlen(value);
+ atomic_fetch_add_explicit(&updateCnt, 1, memory_order_relaxed);
+ break;
+ } else { // Param add
+ uint32_t allocSize = strlen(key) + PARAM_VALUE_LEN_MAX + 2;
+ current->dataIndex = param_alloc(allocSize);
+ ParamNode* saveParam = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK((current->dataIndex != 0) && (saveParam != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not alloc param or get param entry");
+ sprintf(saveParam->data, "%s=%s", key, value);
+ saveParam->keyLen = strlen(key);
+ saveParam->valueLen = strlen(value);
+
+ current->node.prev = root->node.prev;
+ current->node.next = (void*)(&root->node) - paramWorkSpace->shareAddr;
+ TrieListNode* rootPrevTrieListNode = GetTrieListNodeEntry(root->node.prev);
+ TrieNode* rootPrevTrieNode = TrieListNodeGetTrieEntry(rootPrevTrieListNode);
+ BEGET_ERROR_CHECK((rootPrevTrieListNode != NULL) && (rootPrevTrieNode != NULL), pthread_rwlock_unlock(&rwlock); return -1, "can not get list entry or get trie entry");
+ rootPrevTrieNode->node.next = (void*)(¤t->node) - paramWorkSpace->shareAddr;
+ root->node.prev = (void*)(¤t->node) - paramWorkSpace->shareAddr;
+ break;
+ }
+ }
+ remainKey = subKey + 1;
+ }
+
+ if (strncmp(key, PARAM_PERSIST_PREFIX, strlen(PARAM_PERSIST_PREFIX)) == 0) {
+ pthread_mutex_lock(&queuelock);
+ trieQueue.push(&trieQueue, current);
+ pthread_mutex_unlock(&queuelock);
+ sem_post(&dump_sem);
+ }
+ pthread_rwlock_unlock(&rwlock);
return 0;
}
+
+int GetParamFromMem(const char* key, char* value, uint32_t len)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+
+ TrieNode* current = GetRootNode();
+ if (current == NULL)
+ return -1;
+
+ ParamNode* paramData;
+ char* remainKey = key;
+ pthread_rwlock_rdlock(&rwlock);
+ while (1) {
+ char* subKey;
+ uint32_t prefixLen;
+ GetSubKey(remainKey, &subKey, &prefixLen);
+ current = FindSubTrieNode(GetTrieEntry(current->child), remainKey, prefixLen);
+ BEGET_ERROR_CHECK(current != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not find sub trie node : %s", key);
+ if (subKey == NULL) {
+ paramData = GetParamEntry(current->dataIndex);
+ BEGET_ERROR_CHECK(paramData != NULL, pthread_rwlock_unlock(&rwlock); return -1, "can not get param entry");
+ break;
+ }
+ remainKey = subKey + 1;
+ }
+
+ if (len > paramData->valueLen) {
+ (void)memcpy_s(value, PARAM_VALUE_LEN_MAX, paramData->data + paramData->keyLen + 1, paramData->valueLen);
+ value[paramData->valueLen] = '\0';
+ } else {
+ (void)memcpy_s(value, len, paramData->data + paramData->keyLen + 1, len);
+ value[len] = '\0';
+ }
+ pthread_rwlock_unlock(&rwlock);
+ return 0;
+}
+
+int WaitParam(const char* key, const char* value, uint32_t timeout)
+{
+ BEGET_ERROR_CHECK((paramWorkSpace != NULL) && (paramWorkSpace->shareAddr != NULL), return -1, "invalid paramWorkSpace");
+ BEGET_ERROR_CHECK((key != NULL ) && (value != NULL), return -1, "invalid key or value");
+ int ret;
+ char tmp[PARAM_VALUE_LEN_MAX] = {0};
+ ret = GetParamFromMem(key, tmp, sizeof(tmp));
+ if (ret == 0) {
+ if (strncmp(value, "*", strlen(value)) == 0) {
+ return 0;
+ }
+ if (strlen(value) == strlen(tmp) && strncmp(value, tmp, strlen(value)) == 0) {
+ return 0;
+ }
+ bzero(tmp, sizeof(tmp));
+ }
+
+ return -1;
+}
diff --git a/services/utils/BUILD.gn b/services/utils/BUILD.gn
index e5f6a96..30529be 100755
--- a/services/utils/BUILD.gn
+++ b/services/utils/BUILD.gn
@@ -10,24 +10,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import("//build/config/sysroot.gni")
config("exported_header_files") {
visibility = [ ":*" ]
include_dirs = [
"//base/startup/init/services/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
]
}
import("//build/ohos.gni")
ohos_static_library("libinit_utils") {
- sources = [ "init_utils.c" ]
+ sources = [
+ "init_utils.c",
+ "list.c"
+ ]
public_configs = [ ":exported_header_files" ]
include_dirs = [
"//base/startup/init/interfaces/innerkits/include",
"//third_party/bounds_checking_function/include",
- "${sysroot}/usr/include/hilog",
+ "//base/hiviewdfx/hilog/interfaces/native/innerkits/include",
+ "//base/startup/init/services/include"
]
deps = [
"//base/hiviewdfx/hilog/interfaces/native/innerkits:libhilog",
diff --git a/services/utils/init_utils.c b/services/utils/init_utils.c
index 8b4b2e0..733f863 100644
--- a/services/utils/init_utils.c
+++ b/services/utils/init_utils.c
@@ -49,7 +49,7 @@ float ConvertMicrosecondToSecond(int x)
}
#ifndef __LITEOS_M__
-__attribute__((unused)) static bool CheckDigit(const char *name)
+static bool CheckDigit(const char *name)
{
size_t nameLen = strlen(name);
for (size_t i = 0; i < nameLen; ++i) {
diff --git b/services/utils/list.c b/services/utils/list.c
new file mode 100644
index 0000000..2ef1ad5
--- /dev/null
+++ b/services/utils/list.c
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2021 Huawei Device Co., Ltd.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "list.h"
+
+#include <stddef.h>
+#include <stdlib.h>
+
+void OH_ListAddTail(struct ListNode *head, struct ListNode *item)
+{
+ if (head == NULL || item == NULL) {
+ return;
+ }
+ item->next = head;
+ item->prev = head->prev;
+ head->prev->next = item;
+ head->prev = item;
+}
+
+void OH_ListRemove(struct ListNode *item)
+{
+ if (item == NULL) {
+ return;
+ }
+ item->next->prev = item->prev;
+ item->prev->next = item->next;
+}
\ No newline at end of file
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。