00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <stdio.h>
00034 #include <stdlib.h>
00035 #include <sys/types.h>
00036 #include <regex.h>
00037 #include <math.h>
00038
00039 #include "../../mem/mem.h"
00040 #include "../../mem/shm_mem.h"
00041 #include "../../sr_module.h"
00042 #include "../../dprint.h"
00043 #include "../../timer.h"
00044 #include "../../timer_ticks.h"
00045 #include "../../ut.h"
00046 #include "../../locking.h"
00047 #include "../../mod_fix.h"
00048 #include "../../data_lump.h"
00049 #include "../../data_lump_rpl.h"
00050 #include "../../lib/kcore/statistics.h"
00051 #include "../../modules/sl/sl.h"
00052 #include "../../lib/kmi/mi.h"
00053
00054 #include "pl_ht.h"
00055 #include "pl_db.h"
00056
00057 MODULE_VERSION
00058
00059
00060
00061
00062 #define RL_TIMER_INTERVAL 10
00063
00065 sl_api_t slb;
00066
00067 enum {
00068 LOAD_SOURCE_CPU,
00069 LOAD_SOURCE_EXTERNAL
00070 };
00071
00072 str_map_t source_names[] = {
00073 {str_init("cpu"), LOAD_SOURCE_CPU},
00074 {str_init("external"), LOAD_SOURCE_EXTERNAL},
00075 {{0, 0}, 0},
00076 };
00077
00078 static int pl_drop_code = 503;
00079 static str pl_drop_reason = str_init("Server Unavailable");
00080
00081 typedef struct pl_queue {
00082 int * pipe;
00083 int pipe_mp;
00084 str * method;
00085 str method_mp;
00086 } pl_queue_t;
00087
00088 static struct timer_ln* pl_timer;
00089
00090
00091
00092 static double * load_value;
00093 static double * pid_kp, * pid_ki, * pid_kd;
00094 double * _pl_pid_setpoint;
00095 static int * drop_rate;
00096
00097 static int * network_load_value;
00098
00099
00100 static int load_source_mp = LOAD_SOURCE_CPU;
00101 static int * load_source;
00102
00103
00104 static int timer_interval = RL_TIMER_INTERVAL;
00105 int _pl_cfg_setpoint;
00106
00107
00108
00110 static int mod_init(void);
00111 static ticks_t pl_timer_handle(ticks_t, struct timer_ln*, void*);
00112 static int w_pl_check(struct sip_msg*, char *, char *);
00113 static int w_pl_drop_default(struct sip_msg*, char *, char *);
00114 static int w_pl_drop_forced(struct sip_msg*, char *, char *);
00115 static int w_pl_drop(struct sip_msg*, char *, char *);
00116 static void destroy(void);
00117
00118 static cmd_export_t cmds[]={
00119 {"pl_check", (cmd_function)w_pl_check, 1, fixup_spve_null,
00120 0, REQUEST_ROUTE|LOCAL_ROUTE},
00121 {"pl_drop", (cmd_function)w_pl_drop_default, 0, 0,
00122 0, REQUEST_ROUTE|LOCAL_ROUTE},
00123 {"pl_drop", (cmd_function)w_pl_drop_forced, 1, fixup_uint_null,
00124 0, REQUEST_ROUTE|LOCAL_ROUTE},
00125 {"pl_drop", (cmd_function)w_pl_drop, 2, fixup_uint_uint,
00126 0, REQUEST_ROUTE|LOCAL_ROUTE},
00127 {0,0,0,0,0,0}
00128 };
00129 static param_export_t params[]={
00130 {"timer_interval", INT_PARAM, &timer_interval},
00131 {"reply_code", INT_PARAM, &pl_drop_code},
00132 {"reply_reason", STR_PARAM, &pl_drop_reason.s},
00133 {"db_url", STR_PARAM, &pl_db_url},
00134 {"plp_table_name", STR_PARAM, &rlp_table_name},
00135 {"plp_pipeid_colunm", STR_PARAM, &rlp_pipeid_col},
00136 {"plp_limit_column", STR_PARAM, &rlp_limit_col},
00137 {"plp_algorithm_column", STR_PARAM, &rlp_algorithm_col},
00138
00139 {0,0,0}
00140 };
00141
00142 struct mi_root* mi_stats(struct mi_root* cmd_tree, void* param);
00143 struct mi_root* mi_set_pipe(struct mi_root* cmd_tree, void* param);
00144 struct mi_root* mi_get_pipes(struct mi_root* cmd_tree, void* param);
00145 struct mi_root* mi_set_pid(struct mi_root* cmd_tree, void* param);
00146 struct mi_root* mi_get_pid(struct mi_root* cmd_tree, void* param);
00147 struct mi_root* mi_push_load(struct mi_root* cmd_tree, void* param);
00148
00149 static mi_export_t mi_cmds [] = {
00150 {"pl_stats", mi_stats, MI_NO_INPUT_FLAG, 0, 0},
00151 {"pl_set_pipe", mi_set_pipe, 0, 0, 0},
00152 {"pl_get_pipes", mi_get_pipes, MI_NO_INPUT_FLAG, 0, 0},
00153 {"pl_set_pid", mi_set_pid, 0, 0, 0},
00154 {"pl_get_pid", mi_get_pid, MI_NO_INPUT_FLAG, 0, 0},
00155 {"pl_push_load", mi_push_load, 0, 0, 0},
00156 {0,0,0,0,0}
00157 };
00158
00160 struct module_exports exports= {
00161 "pipelimit",
00162 DEFAULT_DLFLAGS,
00163 cmds,
00164 params,
00165 0,
00166 mi_cmds,
00167 0,
00168 0,
00169 mod_init,
00170 0,
00171 (destroy_function) destroy,
00172 0
00173 };
00174
00175
00176
00177 static int get_cpuload(double * load)
00178 {
00179 static
00180 long long o_user, o_nice, o_sys, o_idle, o_iow, o_irq, o_sirq, o_stl;
00181 long long n_user, n_nice, n_sys, n_idle, n_iow, n_irq, n_sirq, n_stl;
00182 static int first_time = 1;
00183 FILE * f = fopen("/proc/stat", "r");
00184
00185 if (! f) {
00186 LM_ERR("could not open /proc/stat\n");
00187 return -1;
00188 }
00189 if (fscanf(f, "cpu %lld%lld%lld%lld%lld%lld%lld%lld",
00190 &n_user, &n_nice, &n_sys, &n_idle, &n_iow, &n_irq, &n_sirq, &n_stl) < 0) {
00191 LM_ERR("could not parse load informations\n");
00192 return -1;
00193 }
00194 fclose(f);
00195
00196 if (first_time) {
00197 first_time = 0;
00198 *load = 0;
00199 } else {
00200 long long d_total = (n_user - o_user) +
00201 (n_nice - o_nice) +
00202 (n_sys - o_sys) +
00203 (n_idle - o_idle) +
00204 (n_iow - o_iow) +
00205 (n_irq - o_irq) +
00206 (n_sirq - o_sirq) +
00207 (n_stl - o_stl);
00208 long long d_idle = (n_idle - o_idle);
00209
00210 *load = 1.0 - ((double)d_idle) / (double)d_total;
00211 }
00212
00213 o_user = n_user;
00214 o_nice = n_nice;
00215 o_sys = n_sys;
00216 o_idle = n_idle;
00217 o_iow = n_iow;
00218 o_irq = n_irq;
00219 o_sirq = n_sirq;
00220 o_stl = n_stl;
00221
00222 return 0;
00223 }
00224
00225 static double int_err = 0.0;
00226 static double last_err = 0.0;
00227
00228
00229
00230
00231 static void do_update_load(void)
00232 {
00233 static char spcs[51];
00234 int load;
00235 double err, dif_err, output;
00236
00237
00238 err = *_pl_pid_setpoint - *load_value;
00239
00240 dif_err = err - last_err;
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 if (int_err < 0 || err < 0)
00251 int_err += err;
00252
00253 output = (*pid_kp) * err +
00254 (*pid_ki) * int_err +
00255 (*pid_kd) * dif_err;
00256 last_err = err;
00257
00258 *drop_rate = (output > 0) ? output : 0;
00259
00260 load = 0.5 + 100.0 * *load_value;
00261
00262 memset(spcs, '-', load / 4);
00263 spcs[load / 4] = 0;
00264
00265
00266
00267
00268
00269 }
00270
00271 static void update_cpu_load(void)
00272 {
00273 if (get_cpuload(load_value))
00274 return;
00275
00276 do_update_load();
00277 }
00278
00279
00280 static int mod_init(void)
00281 {
00282 if(register_mi_mod(exports.name, mi_cmds)!=0)
00283 {
00284 LM_ERR("failed to register MI commands\n");
00285 return -1;
00286 }
00287 if(pl_init_htable(16)<0)
00288 {
00289 LM_ERR("could not allocate pipes htable\n");
00290 return -1;
00291 }
00292 if(pl_init_db()<0)
00293 {
00294 LM_ERR("could not load pipes description\n");
00295 return -1;
00296 }
00297
00298 if ((pl_timer = timer_alloc()) == NULL) {
00299 LM_ERR("could not allocate timer\n");
00300 return -1;
00301 }
00302 timer_init(pl_timer, pl_timer_handle, 0, F_TIMER_FAST);
00303 timer_add(pl_timer, MS_TO_TICKS(1500));
00304
00305
00306 if (sl_load_api(&slb)!=0) {
00307 LM_ERR("cannot bind to SL API\n");
00308 return -1;
00309 }
00310
00311 network_load_value = shm_malloc(sizeof(int));
00312 if (network_load_value==NULL) {
00313 LM_ERR("oom for network_load_value\n");
00314 return -1;
00315 }
00316
00317 load_value = shm_malloc(sizeof(double));
00318 if (load_value==NULL) {
00319 LM_ERR("oom for load_value\n");
00320 return -1;
00321 }
00322 load_source = shm_malloc(sizeof(int));
00323 if (load_source==NULL) {
00324 LM_ERR("oom for load_source\n");
00325 return -1;
00326 }
00327 pid_kp = shm_malloc(sizeof(double));
00328 if (pid_kp==NULL) {
00329 LM_ERR("oom for pid_kp\n");
00330 return -1;
00331 }
00332 pid_ki = shm_malloc(sizeof(double));
00333 if (pid_ki==NULL) {
00334 LM_ERR("oom for pid_ki\n");
00335 return -1;
00336 }
00337 pid_kd = shm_malloc(sizeof(double));
00338 if (pid_kd==NULL) {
00339 LM_ERR("oom for pid_kd\n");
00340 return -1;
00341 }
00342 _pl_pid_setpoint = shm_malloc(sizeof(double));
00343 if (_pl_pid_setpoint==NULL) {
00344 LM_ERR("oom for pid_setpoint\n");
00345 return -1;
00346 }
00347 drop_rate = shm_malloc(sizeof(int));
00348 if (drop_rate==NULL) {
00349 LM_ERR("oom for drop_rate\n");
00350 return -1;
00351 }
00352
00353 *network_load_value = 0;
00354 *load_value = 0.0;
00355 *load_source = load_source_mp;
00356 *pid_kp = 0.0;
00357 *pid_ki = -25.0;
00358 *pid_kd = 0.0;
00359 *_pl_pid_setpoint = 0.01 * (double)_pl_cfg_setpoint;
00360 *drop_rate = 0;
00361
00362 pl_drop_reason.len = strlen(pl_drop_reason.s);
00363
00364 return 0;
00365 }
00366
00367
00368 static void destroy(void)
00369 {
00370 pl_destroy_htable();
00371
00372 if (network_load_value) {
00373 shm_free(network_load_value);
00374 network_load_value = NULL;
00375 }
00376 if (load_value) {
00377 shm_free(load_value);
00378 load_value = NULL;
00379 }
00380 if (load_source) {
00381 shm_free(load_source);
00382 load_source = NULL;
00383 }
00384 if (pid_kp) {
00385 shm_free(pid_kp);
00386 pid_kp= NULL;
00387 }
00388 if (pid_ki) {
00389 shm_free(pid_ki);
00390 pid_ki = NULL;
00391 }
00392 if (pid_kd) {
00393 shm_free(pid_kd);
00394 pid_kd = NULL;
00395 }
00396 if (_pl_pid_setpoint) {
00397 shm_free(_pl_pid_setpoint);
00398 _pl_pid_setpoint = NULL;
00399 }
00400 if (drop_rate) {
00401 shm_free(drop_rate);
00402 drop_rate = NULL;
00403 }
00404
00405 if (pl_timer) {
00406 timer_free(pl_timer);
00407 pl_timer = NULL;
00408 }
00409
00410 }
00411
00412
00413 static int pl_drop(struct sip_msg * msg, unsigned int low, unsigned int high)
00414 {
00415 str hdr;
00416 int ret;
00417
00418 LM_DBG("(%d, %d)\n", low, high);
00419
00420 if (slb.freply != 0) {
00421 if (low != 0 && high != 0) {
00422 hdr.s = (char *)pkg_malloc(64);
00423 if (hdr.s == 0) {
00424 LM_ERR("Can't allocate memory for Retry-After header\n");
00425 return 0;
00426 }
00427 hdr.len = 0;
00428 if (! hdr.s) {
00429 LM_ERR("no memory for hdr\n");
00430 return 0;
00431 }
00432
00433 if (high == low) {
00434 hdr.len = snprintf(hdr.s, 63, "Retry-After: %d\r\n", low);
00435 } else {
00436 hdr.len = snprintf(hdr.s, 63, "Retry-After: %d\r\n",
00437 low + rand() % (high - low + 1));
00438 }
00439
00440 if (add_lump_rpl(msg, hdr.s, hdr.len, LUMP_RPL_HDR)==0) {
00441 LM_ERR("Can't add header\n");
00442 pkg_free(hdr.s);
00443 return 0;
00444 }
00445
00446 ret = slb.freply(msg, pl_drop_code, &pl_drop_reason);
00447
00448 pkg_free(hdr.s);
00449 } else {
00450 ret = slb.freply(msg, pl_drop_code, &pl_drop_reason);
00451 }
00452 } else {
00453 LM_ERR("Can't send reply\n");
00454 return 0;
00455 }
00456 return ret;
00457 }
00458
00459 static int w_pl_drop(struct sip_msg* msg, char *p1, char *p2)
00460 {
00461 unsigned int low, high;
00462
00463 low = (unsigned int)(unsigned long)p1;
00464 high = (unsigned int)(unsigned long)p2;
00465
00466 if (high < low) {
00467 return pl_drop(msg, low, low);
00468 } else {
00469 return pl_drop(msg, low, high);
00470 }
00471 }
00472
00473 static int w_pl_drop_forced(struct sip_msg* msg, char *p1, char *p2)
00474 {
00475 unsigned int i;
00476
00477 if (p1) {
00478 i = (unsigned int)(unsigned long)p1;
00479 LM_DBG("send retry in %d s\n", i);
00480 } else {
00481 i = 5;
00482 LM_DBG("send default retry in %d s\n", i);
00483 }
00484 return pl_drop(msg, i, i);
00485 }
00486
00487 static int w_pl_drop_default(struct sip_msg* msg, char *p1, char *p2)
00488 {
00489 return pl_drop(msg, 0, 0);
00490 }
00491
00492
00493
00494
00495 int hash[100] = {18, 50, 51, 39, 49, 68, 8, 78, 61, 75, 53, 32, 45, 77, 31,
00496 12, 26, 10, 37, 99, 29, 0, 52, 82, 91, 22, 7, 42, 87, 43, 73, 86, 70,
00497 69, 13, 60, 24, 25, 6, 93, 96, 97, 84, 47, 79, 64, 90, 81, 4, 15, 63,
00498 44, 57, 40, 21, 28, 46, 94, 35, 58, 11, 30, 3, 20, 41, 74, 34, 88, 62,
00499 54, 33, 92, 76, 85, 5, 72, 9, 83, 56, 17, 95, 55, 80, 98, 66, 14, 16,
00500 38, 71, 23, 2, 67, 36, 65, 27, 1, 19, 59, 89, 48};
00501
00502
00508 static int pipe_push(struct sip_msg * msg, str *pipeid)
00509 {
00510 int ret;
00511 pl_pipe_t *pipe = NULL;
00512
00513 pipe = pl_pipe_get(pipeid, 1);
00514 if(pipe==NULL)
00515 {
00516 LM_ERR("pipe not found [%.*s]\n", pipeid->len, pipeid->s);
00517 return -1;
00518 }
00519
00520 pipe->counter++;
00521
00522 switch (pipe->algo) {
00523 case PIPE_ALGO_NOP:
00524 LM_ERR("no algorithm defined for pipe %.*s\n",
00525 pipeid->len, pipeid->s);
00526 ret = 1;
00527 break;
00528 case PIPE_ALGO_TAILDROP:
00529 ret = (pipe->counter <= pipe->limit * timer_interval) ? 1 : -1;
00530 break;
00531 case PIPE_ALGO_RED:
00532 if (pipe->load == 0)
00533 ret = 1;
00534 else
00535 ret = (! (pipe->counter % pipe->load)) ? 1 : -1;
00536 break;
00537 case PIPE_ALGO_FEEDBACK:
00538 ret = (hash[pipe->counter % 100] < *drop_rate) ? -1 : 1;
00539 break;
00540 case PIPE_ALGO_NETWORK:
00541 ret = -1 * pipe->load;
00542 break;
00543 default:
00544 LM_ERR("unknown ratelimit algorithm: %d\n", pipe->algo);
00545 ret = 1;
00546 }
00547 LM_DBG("pipe=%.*s algo=%d limit=%d pkg_load=%d counter=%d "
00548 "load=%2.1lf network_load=%d => %s\n",
00549 pipeid->len, pipeid->s,
00550 pipe->algo, pipe->limit,
00551 pipe->load, pipe->counter,
00552 *load_value, *network_load_value, (ret == 1) ? "ACCEPT" : "DROP");
00553
00554 pl_pipe_release(pipeid);
00555
00556 return ret;
00557 }
00558
00565 static int pl_check(struct sip_msg * msg, str *pipeid)
00566 {
00567 int ret;
00568
00569 ret = pipe_push(msg, pipeid);
00570
00571 return ret;
00572 }
00573
00574 static int w_pl_check(struct sip_msg* msg, char *p1, char *p2)
00575 {
00576 str pipeid = {0, 0};
00577
00578 if(fixup_get_svalue(msg, (gparam_p)p1, &pipeid)!=0
00579 || pipeid.s == 0)
00580 {
00581 LM_ERR("invalid pipeid parameter");
00582 return -1;
00583 }
00584
00585 return pl_check(msg, &pipeid);
00586 }
00587
00588
00589
00590 static ticks_t pl_timer_handle(ticks_t ticks, struct timer_ln* tl, void* data)
00591 {
00592 switch (*load_source) {
00593 case LOAD_SOURCE_CPU:
00594 update_cpu_load();
00595 break;
00596 }
00597
00598 *network_load_value = get_total_bytes_waiting();
00599
00600 pl_pipe_timer_update(timer_interval, *network_load_value);
00601
00602 return (ticks_t)(-1);
00603 }
00604
00605
00606 struct mi_root* mi_get_pid(struct mi_root* cmd_tree, void* param)
00607 {
00608 struct mi_root *rpl_tree;
00609 struct mi_node *node=NULL, *rpl=NULL;
00610 struct mi_attr* attr;
00611
00612 rpl_tree = init_mi_tree( 200, MI_OK_S, MI_OK_LEN);
00613 if (rpl_tree==0)
00614 return 0;
00615 rpl = &rpl_tree->node;
00616 node = add_mi_node_child(rpl, 0, "PID", 3, 0, 0);
00617 if(node == NULL)
00618 goto error;
00619 rpl_pipe_lock(0);
00620 attr= addf_mi_attr(node, 0, "ki", 2, "%0.3f", *pid_ki);
00621 if(attr == NULL)
00622 goto error;
00623 attr= addf_mi_attr(node, 0, "kp", 2, "%0.3f", *pid_kp);
00624 if(attr == NULL)
00625 goto error;
00626 attr= addf_mi_attr(node, 0, "kd", 2, "%0.3f", *pid_kd);
00627 rpl_pipe_release(0);
00628 if(attr == NULL)
00629 goto error;
00630
00631 return rpl_tree;
00632
00633 error:
00634 rpl_pipe_release(0);
00635 LM_ERR("Unable to create reply\n");
00636 free_mi_tree(rpl_tree);
00637 return 0;
00638 }
00639
00640 struct mi_root* mi_set_pid(struct mi_root* cmd_tree, void* param)
00641 {
00642 struct mi_node *node;
00643 char i[5], p[5], d[5];
00644
00645 node = cmd_tree->node.kids;
00646 if (node == NULL) return init_mi_tree( 400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN);
00647 if ( !node->value.s || !node->value.len || node->value.len >= 5)
00648 goto bad_syntax;
00649 memcpy(i, node->value.s, node->value.len);
00650 i[node->value.len] = '\0';
00651
00652 node = node->next;
00653 if ( !node->value.s || !node->value.len || node->value.len >= 5)
00654 goto bad_syntax;
00655 memcpy(p, node->value.s, node->value.len);
00656 p[node->value.len] = '\0';
00657
00658 node = node->next;
00659 if ( !node->value.s || !node->value.len || node->value.len >= 5)
00660 goto bad_syntax;
00661 memcpy(d, node->value.s, node->value.len);
00662 d[node->value.len] = '\0';
00663
00664 rpl_pipe_lock(0);
00665 *pid_ki = strtod(i, NULL);
00666 *pid_kp = strtod(p, NULL);
00667 *pid_kd = strtod(d, NULL);
00668 rpl_pipe_release(0);
00669
00670 return init_mi_tree( 200, MI_OK_S, MI_OK_LEN);
00671 bad_syntax:
00672 return init_mi_tree( 400, MI_BAD_PARM_S, MI_BAD_PARM_LEN);
00673 }
00674
00675 struct mi_root* mi_push_load(struct mi_root* cmd_tree, void* param)
00676 {
00677 struct mi_node *node;
00678 double value;
00679 char c[5];
00680
00681 node = cmd_tree->node.kids;
00682 if (node == NULL) return init_mi_tree( 400, MI_MISSING_PARM_S, MI_MISSING_PARM_LEN);
00683 if ( !node->value.s || !node->value.len || node->value.len >= 5)
00684 goto bad_syntax;
00685 memcpy(c, node->value.s, node->value.len);
00686 c[node->value.len] = '\0';
00687 value = strtod(c, NULL);
00688 if (value < 0.0 || value > 1.0) {
00689 LM_ERR("value out of range: %0.3f in not in [0.0,1.0]\n", value);
00690 goto bad_syntax;
00691 }
00692 rpl_pipe_lock(0);
00693 *load_value = value;
00694 rpl_pipe_release(0);
00695
00696 do_update_load();
00697
00698 return init_mi_tree( 200, MI_OK_S, MI_OK_LEN);
00699 bad_syntax:
00700 return init_mi_tree( 400, MI_BAD_PARM_S, MI_BAD_PARM_LEN);
00701 }
00702