00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include <stdio.h>
00034 #include <stdlib.h>
00035 #include <sys/types.h>
00036 #include <regex.h>
00037 #include <math.h>
00038
00039
00040 #include "../../mem/mem.h"
00041 #include "../../mem/shm_mem.h"
00042 #include "../../sr_module.h"
00043 #include "../../dprint.h"
00044 #include "../../timer.h"
00045 #include "../../timer_ticks.h"
00046 #include "../../ut.h"
00047 #include "../../locking.h"
00048 #include "../../mod_fix.h"
00049 #include "../../data_lump.h"
00050 #include "../../data_lump_rpl.h"
00051 #include "../../lib/kcore/statistics.h"
00052 #include "../../rpc_lookup.h"
00053
00054
00055 MODULE_VERSION
00056
00057 #define MAX_PIPES 16
00058 #define MAX_QUEUES 10
00059
00060
00061
00062
00063 #define RL_TIMER_INTERVAL 10
00064
00065 #define RXLS(m, str, i) (int)((m)[i].rm_eo - (m)[i].rm_so), (str) + (m)[i].rm_so
00066 #define RXL(m, str, i) (int)((m)[i].rm_eo - (m)[i].rm_so)
00067 #define RXS(m, str, i) (str) + (m)[i].rm_so
00068
00069 static inline int str_cmp(const str * a, const str * b);
00070 static inline int str_i_cmp(const str * a, const str * b);
00071
00072 typedef struct str_map {
00073 str str;
00074 int id;
00075 } str_map_t;
00076
00077 static int str_map_str(const str_map_t * map, const str * key, int * ret);
00078 static int str_map_int(const str_map_t * map, int key, str * ret);
00079
00080
00081
00082
00083
00084
00085 enum {
00086 PIPE_ALGO_NOP = 0,
00087 PIPE_ALGO_RED,
00088 PIPE_ALGO_TAILDROP,
00089 PIPE_ALGO_FEEDBACK,
00090 PIPE_ALGO_NETWORK
00091 };
00092
00093 str_map_t algo_names[] = {
00094 {str_init("NOP"), PIPE_ALGO_NOP},
00095 {str_init("RED"), PIPE_ALGO_RED},
00096 {str_init("TAILDROP"), PIPE_ALGO_TAILDROP},
00097 {str_init("FEEDBACK"), PIPE_ALGO_FEEDBACK},
00098 {str_init("NETWORK"), PIPE_ALGO_NETWORK},
00099 {{0, 0}, 0},
00100 };
00101
00102
00103
00104
00105
00106
00107
00108
00109 enum {
00110 LOAD_SOURCE_CPU,
00111 LOAD_SOURCE_EXTERNAL
00112 };
00113
00114 str_map_t source_names[] = {
00115 {str_init("cpu"), LOAD_SOURCE_CPU},
00116 {str_init("external"), LOAD_SOURCE_EXTERNAL},
00117 {{0, 0}, 0},
00118 };
00119
00120
00121 typedef struct pipe {
00122
00123 int * algo;
00124 int algo_mp;
00125 int * limit;
00126 int limit_mp;
00127
00128
00129 int * counter;
00130 int * last_counter;
00131 int * load;
00132 } pipe_t;
00133
00134 typedef struct rl_queue {
00135 int * pipe;
00136 int pipe_mp;
00137 str * method;
00138 str method_mp;
00139 } rl_queue_t;
00140
00141 static struct timer_ln* rl_timer;
00142
00143
00144 gen_lock_t * rl_lock;
00145
00146 static double * load_value;
00147 static double * pid_kp, * pid_ki, * pid_kd, * pid_setpoint;
00148 static int * drop_rate;
00149
00150 static int * network_load_value;
00151
00152
00153 static int load_source_mp = LOAD_SOURCE_CPU;
00154 static int * load_source;
00155
00156 typedef struct pipe_params {
00157 int no;
00158 int algo;
00159 int limit;
00160 } pipe_params_t;
00161
00162 typedef struct rl_queue_params {
00163 int pipe;
00164 str method;
00165 } rl_queue_params_t;
00166
00167 static pipe_t pipes[MAX_PIPES];
00168 static rl_queue_t queues[MAX_QUEUES];
00169
00170 static int nqueues_mp = 0;
00171 static int * nqueues;
00172
00173 static str * rl_dbg_str = NULL;
00174
00175
00176 static int timer_interval = RL_TIMER_INTERVAL;
00177 static int cfg_setpoint;
00178
00179
00180 #ifndef RL_DEBUG_LOCKS
00181 # define LOCK_GET lock_get
00182 # define LOCK_RELEASE lock_release
00183 #else
00184 # define LOCK_GET(l) do { \
00185 LM_INFO("%d: + get\n", __LINE__); \
00186 lock_get(l); \
00187 LM_INFO("%d: - get\n", __LINE__); \
00188 } while (0)
00189
00190 # define LOCK_RELEASE(l) do { \
00191 LM_INFO("%d: + release\n", __LINE__); \
00192 lock_release(l); \
00193 LM_INFO("%d: - release\n", __LINE__); \
00194 } while (0)
00195 #endif
00196
00197 static int params_inited = 0;
00198 static regex_t pipe_params_regex;
00199 static regex_t queue_params_regex;
00200
00202 static int mod_init(void);
00203 static ticks_t rl_timer_handle(ticks_t, struct timer_ln*, void*);
00204 static int w_rl_check_default(struct sip_msg*, char *, char *);
00205 static int w_rl_check_forced(struct sip_msg*, char *, char *);
00206 static int w_rl_check_forced_pipe(struct sip_msg*, char *, char *);
00207 static int add_queue_params(modparam_t, void *);
00208 static int add_pipe_params(modparam_t, void *);
00209
00210
00211
00212 static void destroy(void);
00213
00214 static cmd_export_t cmds[]={
00215 {"rl_check", (cmd_function)w_rl_check_default, 0, 0, 0, REQUEST_ROUTE|LOCAL_ROUTE},
00216 {"rl_check", (cmd_function)w_rl_check_forced, 1, fixup_pvar_null,
00217 fixup_free_pvar_null, REQUEST_ROUTE|LOCAL_ROUTE},
00218 {"rl_check_pipe", (cmd_function)w_rl_check_forced_pipe, 1, fixup_uint_null, 0, REQUEST_ROUTE|LOCAL_ROUTE},
00219 {0,0,0,0,0,0}
00220 };
00221 static param_export_t params[]={
00222 {"timer_interval", INT_PARAM, &timer_interval},
00223 {"queue", STR_PARAM|USE_FUNC_PARAM, (void *)add_queue_params},
00224 {"pipe", STR_PARAM|USE_FUNC_PARAM, (void *)add_pipe_params},
00225
00226
00227
00228 {0,0,0}
00229 };
00230
00231 static rpc_export_t rpc_methods[];
00232
00234 struct module_exports exports= {
00235 "ratelimit",
00236 DEFAULT_DLFLAGS,
00237 cmds,
00238 params,
00239 0,
00240 0,
00241 0,
00242 0,
00243 mod_init,
00244 0,
00245 (destroy_function) destroy,
00246 0
00247 };
00248
00249
00254 static int str_map_str(const str_map_t * map, const str * key, int * ret)
00255 {
00256 for (; map->str.s; map++)
00257 if (! str_cmp(&map->str, key)) {
00258 *ret = map->id;
00259 return 0;
00260 }
00261 LM_DBG("str_map_str() failed map=%p key=%.*s\n", map, key->len, key->s);
00262 return -1;
00263 }
00264
00269 static int str_map_int(const str_map_t * map, int key, str * ret)
00270 {
00271 for (; map->str.s; map++)
00272 if (map->id == key) {
00273 *ret = map->str;
00274 return 0;
00275 }
00276 LM_DBG("str_map_str() failed map=%p key=%d\n", map, key);
00277 return -1;
00278 }
00279
00284 static int str_cpy(str * dest, str * src)
00285 {
00286 dest->len = src->len;
00287 dest->s = shm_malloc(src->len);
00288 if (! dest->s) {
00289 LM_ERR("oom: '%.*s'\n", src->len, src->s);
00290 return -1;
00291 }
00292 memcpy(dest->s, src->s, src->len);
00293 return 0;
00294 }
00295
00296
00297 static int get_cpuload(double * load)
00298 {
00299 static
00300 long long o_user, o_nice, o_sys, o_idle, o_iow, o_irq, o_sirq, o_stl;
00301 long long n_user, n_nice, n_sys, n_idle, n_iow, n_irq, n_sirq, n_stl;
00302 static int first_time = 1;
00303 FILE * f = fopen("/proc/stat", "r");
00304
00305 if (! f) {
00306 LM_ERR("could not open /proc/stat\n");
00307 return -1;
00308 }
00309 if (fscanf(f, "cpu %lld%lld%lld%lld%lld%lld%lld%lld",
00310 &n_user, &n_nice, &n_sys, &n_idle, &n_iow, &n_irq, &n_sirq, &n_stl) < 0) {
00311 LM_ERR("could not parse load informations\n");
00312 return -1;
00313 }
00314 fclose(f);
00315
00316 if (first_time) {
00317 first_time = 0;
00318 *load = 0;
00319 } else {
00320 long long d_total = (n_user - o_user) +
00321 (n_nice - o_nice) +
00322 (n_sys - o_sys) +
00323 (n_idle - o_idle) +
00324 (n_iow - o_iow) +
00325 (n_irq - o_irq) +
00326 (n_sirq - o_sirq) +
00327 (n_stl - o_stl);
00328 long long d_idle = (n_idle - o_idle);
00329
00330 *load = 1.0 - ((double)d_idle) / (double)d_total;
00331 }
00332
00333 o_user = n_user;
00334 o_nice = n_nice;
00335 o_sys = n_sys;
00336 o_idle = n_idle;
00337 o_iow = n_iow;
00338 o_irq = n_irq;
00339 o_sirq = n_sirq;
00340 o_stl = n_stl;
00341
00342 return 0;
00343 }
00344
00345 static double int_err = 0.0;
00346 static double last_err = 0.0;
00347
00348
00349
00350
00351 static void do_update_load(void)
00352 {
00353 static char spcs[51];
00354 int load;
00355 double err, dif_err, output;
00356
00357
00358 err = *pid_setpoint - *load_value;
00359
00360 dif_err = err - last_err;
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370 if (int_err < 0 || err < 0)
00371 int_err += err;
00372
00373 output = (*pid_kp) * err +
00374 (*pid_ki) * int_err +
00375 (*pid_kd) * dif_err;
00376 last_err = err;
00377
00378 *drop_rate = (output > 0) ? output : 0;
00379
00380 load = 0.5 + 100.0 * *load_value;
00381
00382 memset(spcs, '-', load / 4);
00383 spcs[load / 4] = 0;
00384
00385
00386
00387
00388
00389 }
00390
00391 static void update_cpu_load(void)
00392 {
00393 if (get_cpuload(load_value))
00394 return;
00395
00396 do_update_load();
00397 }
00398
00399
00400 static int mod_init(void)
00401 {
00402 int i;
00403
00404 if (rpc_register_array(rpc_methods)!=0) {
00405 LM_ERR("failed to register RPC commands\n");
00406 return -1;
00407 }
00408
00409 rl_lock = lock_alloc();
00410 if (! rl_lock) {
00411 LM_ERR("oom in lock_alloc()\n");
00412 return -1;
00413 }
00414
00415 if (lock_init(rl_lock)==0) {
00416 LM_ERR("failed to init lock\n");
00417 return -1;
00418 }
00419
00420
00421 if ((rl_timer = timer_alloc()) == NULL) {
00422 LM_ERR("could not allocate timer\n");
00423 return -1;
00424 }
00425 timer_init(rl_timer, rl_timer_handle, 0, F_TIMER_FAST);
00426 timer_add(rl_timer, MS_TO_TICKS(1000*timer_interval));
00427
00428 network_load_value = shm_malloc(sizeof(int));
00429 if (network_load_value==NULL) {
00430 LM_ERR("oom for network_load_value\n");
00431 return -1;
00432 }
00433
00434 load_value = shm_malloc(sizeof(double));
00435 if (load_value==NULL) {
00436 LM_ERR("oom for load_value\n");
00437 return -1;
00438 }
00439 load_source = shm_malloc(sizeof(int));
00440 if (load_source==NULL) {
00441 LM_ERR("oom for load_source\n");
00442 return -1;
00443 }
00444 pid_kp = shm_malloc(sizeof(double));
00445 if (pid_kp==NULL) {
00446 LM_ERR("oom for pid_kp\n");
00447 return -1;
00448 }
00449 pid_ki = shm_malloc(sizeof(double));
00450 if (pid_ki==NULL) {
00451 LM_ERR("oom for pid_ki\n");
00452 return -1;
00453 }
00454 pid_kd = shm_malloc(sizeof(double));
00455 if (pid_kd==NULL) {
00456 LM_ERR("oom for pid_kd\n");
00457 return -1;
00458 }
00459 pid_setpoint = shm_malloc(sizeof(double));
00460 if (pid_setpoint==NULL) {
00461 LM_ERR("oom for pid_setpoint\n");
00462 return -1;
00463 }
00464 drop_rate = shm_malloc(sizeof(int));
00465 if (drop_rate==NULL) {
00466 LM_ERR("oom for drop_rate\n");
00467 return -1;
00468 }
00469 nqueues = shm_malloc(sizeof(int));
00470 if (nqueues==NULL) {
00471 LM_ERR("oom for nqueues\n");
00472 return -1;
00473 }
00474 rl_dbg_str = shm_malloc(sizeof(str));
00475 if (rl_dbg_str==NULL) {
00476 LM_ERR("oom for rl_dbg_str\n");
00477 return -1;
00478 }
00479
00480 *network_load_value = 0;
00481 *load_value = 0.0;
00482 *load_source = load_source_mp;
00483 *pid_kp = 0.0;
00484 *pid_ki = -25.0;
00485 *pid_kd = 0.0;
00486 *pid_setpoint = 0.01 * (double)cfg_setpoint;
00487 *drop_rate = 0;
00488 *nqueues = nqueues_mp;
00489 rl_dbg_str->s = NULL;
00490 rl_dbg_str->len = 0;
00491
00492 for (i=0; i<MAX_PIPES; i++) {
00493 pipes[i].algo = shm_malloc(sizeof(int));
00494 if (pipes[i].algo==NULL) {
00495 LM_ERR("oom for pipes[%d].algo\n", i);
00496 return -1;
00497 }
00498 pipes[i].limit = shm_malloc(sizeof(int));
00499 if (pipes[i].limit==NULL) {
00500 LM_ERR("oom for pipes[%d].limit\n", i);
00501 return -1;
00502 }
00503 pipes[i].load = shm_malloc(sizeof(int));
00504 if (pipes[i].load==NULL) {
00505 LM_ERR("oom for pipes[%d].load\n", i);
00506 return -1;
00507 }
00508 pipes[i].counter = shm_malloc(sizeof(int));
00509 if (pipes[i].counter==NULL) {
00510 LM_ERR("oom for pipes[%d].counter\n", i);
00511 return -1;
00512 }
00513 pipes[i].last_counter = shm_malloc(sizeof(int));
00514 if (pipes[i].last_counter==NULL) {
00515 LM_ERR("oom for pipes[%d].last_counter\n", i);
00516 return -1;
00517 }
00518 *pipes[i].algo = pipes[i].algo_mp;
00519 *pipes[i].limit = pipes[i].limit_mp;
00520 *pipes[i].load = 0;
00521 *pipes[i].counter = 0;
00522 *pipes[i].last_counter = 0;
00523 }
00524
00525 for (i=0; i<*nqueues; i++) {
00526 queues[i].pipe = shm_malloc(sizeof(int));
00527 if (queues[i].pipe==NULL) {
00528 LM_ERR("oom for queues[%d].pipe\n", i);
00529 return -1;
00530 }
00531 queues[i].method = shm_malloc(sizeof(str));
00532 if (queues[i].method==NULL) {
00533 LM_ERR("oom for queues[%d].method\n", i);
00534 return -1;
00535 }
00536
00537 *queues[i].pipe = queues[i].pipe_mp;
00538 if (queues[i].method_mp.s == NULL) {
00539 LM_ERR("unexpected NULL method for queues[%d].method_mp\n", i);
00540 return -1;
00541 }
00542 if(str_cpy(queues[i].method, &queues[i].method_mp)) {
00543 LM_ERR("oom str_cpy(queues[%d].method\n", i);
00544 return -1;
00545 }
00546 pkg_free(queues[i].method_mp.s);
00547 queues[i].method_mp.s = NULL;
00548 queues[i].method_mp.len = 0;
00549 }
00550
00551 return 0;
00552 }
00553
00554
00555 static void destroy(void)
00556 {
00557 int i;
00558
00559 regfree(&pipe_params_regex);
00560 regfree(&queue_params_regex);
00561
00562 for (i=0; i<MAX_PIPES; i++) {
00563 if (pipes[i].algo) {
00564 shm_free(pipes[i].algo);
00565 pipes[i].algo = NULL;
00566 }
00567 if (pipes[i].load) {
00568 shm_free(pipes[i].load);
00569 pipes[i].load = NULL;
00570 }
00571 if (pipes[i].counter) {
00572 shm_free(pipes[i].counter);
00573 pipes[i].counter = NULL;
00574 }
00575 if (pipes[i].last_counter) {
00576 shm_free(pipes[i].last_counter);
00577 pipes[i].last_counter = NULL;
00578 }
00579 if (pipes[i].limit) {
00580 shm_free(pipes[i].limit);
00581 pipes[i].limit = NULL;
00582 }
00583 }
00584
00585 if (nqueues) {
00586 for (i=0; i<*nqueues; i++) {
00587 if (queues[i].pipe) {
00588 shm_free(queues[i].pipe);
00589 queues[i].pipe = NULL;
00590 }
00591 if (queues[i].method) {
00592 if (queues[i].method->s) {
00593 shm_free(queues[i].method->s);
00594 queues[i].method->s = NULL;
00595 queues[i].method->len = 0;
00596 }
00597 shm_free(queues[i].method);
00598 queues[i].method = NULL;
00599 }
00600 }
00601 }
00602
00603 if (network_load_value) {
00604 shm_free(network_load_value);
00605 network_load_value = NULL;
00606 }
00607 if (load_value) {
00608 shm_free(load_value);
00609 load_value = NULL;
00610 }
00611 if (load_source) {
00612 shm_free(load_source);
00613 load_source = NULL;
00614 }
00615 if (pid_kp) {
00616 shm_free(pid_kp);
00617 pid_kp= NULL;
00618 }
00619 if (pid_ki) {
00620 shm_free(pid_ki);
00621 pid_ki = NULL;
00622 }
00623 if (pid_kd) {
00624 shm_free(pid_kd);
00625 pid_kd = NULL;
00626 }
00627 if (pid_setpoint) {
00628 shm_free(pid_setpoint);
00629 pid_setpoint = NULL;
00630 }
00631 if (drop_rate) {
00632 shm_free(drop_rate);
00633 drop_rate = NULL;
00634 }
00635 if (nqueues) {
00636 shm_free(nqueues);
00637 nqueues = NULL;
00638 }
00639 if (rl_dbg_str) {
00640 if (rl_dbg_str->s) {
00641 shm_free(rl_dbg_str->s);
00642 rl_dbg_str->s = NULL;
00643 rl_dbg_str->len = 0;
00644 }
00645 shm_free(rl_dbg_str);
00646 rl_dbg_str = NULL;
00647 }
00648
00649 if (rl_timer) {
00650 timer_free(rl_timer);
00651 rl_timer = NULL;
00652 }
00653
00654 if (rl_lock) {
00655 lock_destroy(rl_lock);
00656 lock_dealloc((void *)rl_lock);
00657 }
00658 }
00659
00660
00661 static inline int str_cmp(const str * a , const str * b)
00662 {
00663 return ! (a->len == b->len && ! strncmp(a->s, b->s, a->len));
00664 }
00665
00666 static inline int str_i_cmp(const str * a, const str * b)
00667 {
00668 return ! (a->len == b->len && ! strncasecmp(a->s, b->s, a->len));
00669 }
00670
00671 str queue_other = str_init("*");
00672
00678 static int find_queue(struct sip_msg * msg, int * queue)
00679 {
00680 str method = msg->first_line.u.request.method;
00681 int i;
00682
00683 *queue = -1;
00684 for (i=0; i<*nqueues; i++)
00685 if (! str_i_cmp(queues[i].method, &method)) {
00686 *queue = i;
00687 return 0;
00688 } else if (! str_i_cmp(queues[i].method, &queue_other)) {
00689 *queue = i;
00690 }
00691
00692 if (*queue >= 0)
00693 return 0;
00694
00695 LM_INFO("no queue matches\n");
00696 return -1;
00697 }
00698
00699
00700
00701
00702 int hash[100] = {18, 50, 51, 39, 49, 68, 8, 78, 61, 75, 53, 32, 45, 77, 31,
00703 12, 26, 10, 37, 99, 29, 0, 52, 82, 91, 22, 7, 42, 87, 43, 73, 86, 70,
00704 69, 13, 60, 24, 25, 6, 93, 96, 97, 84, 47, 79, 64, 90, 81, 4, 15, 63,
00705 44, 57, 40, 21, 28, 46, 94, 35, 58, 11, 30, 3, 20, 41, 74, 34, 88, 62,
00706 54, 33, 92, 76, 85, 5, 72, 9, 83, 56, 17, 95, 55, 80, 98, 66, 14, 16,
00707 38, 71, 23, 2, 67, 36, 65, 27, 1, 19, 59, 89, 48};
00708
00709
00715 static int pipe_push(struct sip_msg * msg, int id)
00716 {
00717 int ret;
00718
00719 (*pipes[id].counter)++;
00720
00721 switch (*pipes[id].algo) {
00722 case PIPE_ALGO_NOP:
00723 LM_ERR("no algorithm defined for pipe %d\n", id);
00724 ret = 1;
00725 break;
00726 case PIPE_ALGO_TAILDROP:
00727 ret = (*pipes[id].counter <= *pipes[id].limit * timer_interval) ? 1 : -1;
00728 break;
00729 case PIPE_ALGO_RED:
00730 if (*pipes[id].load == 0)
00731 ret = 1;
00732 else
00733 ret = (! (*pipes[id].counter % *pipes[id].load)) ? 1 : -1;
00734 break;
00735 case PIPE_ALGO_FEEDBACK:
00736 ret = (hash[*pipes[id].counter % 100] < *drop_rate) ? -1 : 1;
00737 break;
00738 case PIPE_ALGO_NETWORK:
00739 ret = -1 * *pipes[id].load;
00740 break;
00741 default:
00742 LM_ERR("unknown ratelimit algorithm: %d\n", *pipes[id].algo);
00743 ret = 1;
00744 }
00745
00746 return ret;
00747 }
00748
00755 static int rl_check(struct sip_msg * msg, int forced_pipe)
00756 {
00757 int que_id, pipe_id, ret;
00758 str method = msg->first_line.u.request.method;
00759
00760 if (forced_pipe >=0 && (forced_pipe>=MAX_PIPES || *pipes[forced_pipe].algo==PIPE_ALGO_NOP)) {
00761 LM_ERR("forced pipe %d out of range or not defined", forced_pipe);
00762 return -1;
00763 }
00764
00765 LOCK_GET(rl_lock);
00766 if (forced_pipe < 0) {
00767 if (find_queue(msg, &que_id)) {
00768 pipe_id = que_id = 0;
00769 ret = -1;
00770 goto out_release;
00771 }
00772 pipe_id = *queues[que_id].pipe;
00773 } else {
00774 que_id = 0;
00775 pipe_id = forced_pipe;
00776 }
00777
00778 ret = pipe_push(msg, pipe_id);
00779 out_release:
00780 LOCK_RELEASE(rl_lock);
00781
00782
00783 LM_DBG("meth=%.*s queue=%d pipe=%d algo=%d limit=%d pkg_load=%d counter=%d "
00784 "load=%2.1lf network_load=%d => %s\n",
00785 method.len, method.s, que_id, pipe_id,
00786 *pipes[pipe_id].algo, *pipes[pipe_id].limit,
00787 *pipes[pipe_id].load, *pipes[pipe_id].counter,
00788 *load_value, *network_load_value, (ret == 1) ? "ACCEPT" : "DROP");
00789
00790 return ret;
00791 }
00792
00793 static int w_rl_check_forced(struct sip_msg* msg, char *p1, char *p2)
00794 {
00795 int pipe = -1;
00796 pv_value_t pv_val;
00797
00798 if (p1 && (pv_get_spec_value(msg, (pv_spec_t *)p1, &pv_val) == 0)) {
00799 if (pv_val.flags & PV_VAL_INT) {
00800 pipe = pv_val.ri;
00801 LM_DBG("pipe=%d\n", pipe);
00802 } else if (pv_val.flags & PV_VAL_STR) {
00803 if(str2int(&(pv_val.rs), (unsigned int*)&pipe) != 0) {
00804 LM_ERR("Unable to get pipe from pv '%.*s'"
00805 "=> defaulting to method type checking\n",
00806 pv_val.rs.len, pv_val.rs.s);
00807 pipe = -1;
00808 }
00809 } else {
00810 LM_ERR("pv not a str or int => defaulting to method type checking\n");
00811 pipe = -1;
00812 }
00813 } else {
00814 LM_ERR("Unable to get pipe from pv:%p"
00815 " => defaulting to method type checking\n", p1);
00816 pipe = -1;
00817 }
00818 return rl_check(msg, pipe);
00819 }
00820 static int w_rl_check_forced_pipe(struct sip_msg* msg, char *p1, char *p2)
00821 {
00822 int pipe;
00823
00824 pipe = (int)(unsigned int)(unsigned long)p1;
00825 LM_DBG("trying pipe %d\n", pipe);
00826 return rl_check(msg, pipe);
00827 }
00828
00829 static int w_rl_check_default(struct sip_msg* msg, char *p1, char *p2)
00830 {
00831 return rl_check(msg, -1);
00832 }
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851
00856 static int init_params(void)
00857 {
00858 if (regcomp(&pipe_params_regex, "^([0-9]+):([^: ]+):([0-9]+)$", REG_EXTENDED|REG_ICASE) ||
00859 regcomp(&queue_params_regex, "^([0-9]+):([^: ]+)$", REG_EXTENDED|REG_ICASE)) {
00860 LM_ERR("can't compile modparam regexes\n");
00861 return -1;
00862 }
00863
00864 memset(pipes, 0, sizeof(pipes));
00865 memset(queues, 0, sizeof(queues));
00866
00867 params_inited = 1;
00868 return 0;
00869 }
00870
00871
00876 static int parse_pipe_params(char * line, pipe_params_t * params)
00877 {
00878 regmatch_t m[4];
00879 str algo_str;
00880
00881 if (! params_inited && init_params())
00882 return -1;
00883 if (regexec(&pipe_params_regex, line, 4, m, 0)) {
00884 LM_ERR("invalid param tuple: %s\n", line);
00885 return -1;
00886 }
00887 LM_DBG("pipe: [%.*s|%.*s|%.*s]\n",
00888 RXLS(m, line, 1), RXLS(m, line, 2), RXLS(m, line, 3));
00889
00890 params->no = atoi(RXS(m, line, 1));
00891 params->limit = atoi(RXS(m, line, 3));
00892
00893 algo_str.s = RXS(m, line, 2);
00894 algo_str.len = RXL(m, line, 2);
00895 if (str_map_str(algo_names, &algo_str, ¶ms->algo))
00896 return -1;
00897
00898 return 0;
00899 }
00900
00905 static int parse_queue_params(char * line, rl_queue_params_t * params)
00906 {
00907 regmatch_t m[3];
00908 int len;
00909
00910 if (! params_inited && init_params())
00911 return -1;
00912 if (regexec(&queue_params_regex, line, 3, m, 0)) {
00913 LM_ERR("invalid param tuple: %s\n", line);
00914 return -1;
00915 }
00916 LM_DBG("queue: [%.*s|%.*s]\n",
00917 RXLS(m, line, 1), RXLS(m, line, 2));
00918
00919 params->pipe = atoi(RXS(m, line, 1));
00920
00921 len = RXL(m, line, 2);
00922 params->method.s = (char *)pkg_malloc(len+1);
00923 if (params->method.s == 0) {
00924 LM_ERR("no memory left for method in params\n");
00925 return -1;
00926 }
00927 params->method.len = len;
00928 memcpy(params->method.s, RXS(m, line, 2), len+1);
00929
00930 return 0;
00931 }
00932
00940 static int check_feedback_setpoints(int modparam)
00941 {
00942 int i, sp;
00943
00944 cfg_setpoint = -1;
00945
00946 for (i=0; i<MAX_PIPES; i++)
00947 if (pipes[i].algo_mp == PIPE_ALGO_FEEDBACK) {
00948 sp = modparam ? pipes[i].limit_mp : *pipes[i].limit;
00949
00950 if (sp < 0 || sp > 100) {
00951 LM_ERR("FEEDBACK cpu load must be >=0 and <= 100\n");
00952 return -1;
00953 } else if (cfg_setpoint == -1) {
00954 cfg_setpoint = sp;
00955 } else if (sp != cfg_setpoint) {
00956 LM_ERR("pipe %d: FEEDBACK cpu load values must "
00957 "be equal for all pipes\n", i);
00958 return -1;
00959 }
00960 }
00961
00962 return 0;
00963 }
00964
00965
00966 static int add_pipe_params(modparam_t type, void * val)
00967 {
00968 char * param_line = val;
00969 pipe_params_t params;
00970
00971 if (parse_pipe_params(param_line, ¶ms))
00972 return -1;
00973
00974 if (params.no < 0 || params.no >= MAX_PIPES) {
00975 LM_ERR("pipe number %d not allowed (MAX_PIPES=%d, 0-based)\n",
00976 params.no, MAX_PIPES);
00977 return -1;
00978 }
00979
00980 pipes[params.no].algo_mp = params.algo;
00981 pipes[params.no].limit_mp = params.limit;
00982
00983 return check_feedback_setpoints(1);
00984 }
00985
00986 static int add_queue_params(modparam_t type, void * val)
00987 {
00988 char * param_line = val;
00989 rl_queue_params_t params;
00990
00991 if (nqueues_mp >= MAX_QUEUES) {
00992 LM_ERR("MAX_QUEUES reached (%d)\n", MAX_QUEUES);
00993 return -1;
00994 }
00995
00996 if (parse_queue_params(param_line, ¶ms))
00997 return -1;
00998
00999 if (params.pipe >= MAX_PIPES) {
01000 LM_ERR("pipe number %d not allowed (MAX_PIPES=%d, 0-based)\n",
01001 params.pipe, MAX_PIPES);
01002 return -1;
01003 }
01004
01005 queues[nqueues_mp].pipe_mp = params.pipe;
01006 queues[nqueues_mp].method_mp = params.method;
01007 nqueues_mp++;
01008
01009 return 0;
01010 }
01011
01012
01013
01014 static ticks_t rl_timer_handle(ticks_t ticks, struct timer_ln* tl, void* data)
01015 {
01016 int i, len;
01017 char *c, *p;
01018
01019 LOCK_GET(rl_lock);
01020 switch (*load_source) {
01021 case LOAD_SOURCE_CPU:
01022 update_cpu_load();
01023 break;
01024 }
01025
01026 *network_load_value = get_total_bytes_waiting();
01027
01028 if (rl_dbg_str->s) {
01029 c = p = rl_dbg_str->s;
01030 memset(c, ' ', rl_dbg_str->len);
01031 for (i=0; i<MAX_PIPES; i++) {
01032 c = int2str(*pipes[i].counter, &len);
01033 if (len < 4) {
01034 memcpy( p + (5-len), c, len );
01035 } else {
01036 memset(p, '*', 5);
01037 LM_WARN("Counter pipes[%d] to big: %d\n",
01038 i, *pipes[i].counter);
01039 }
01040 p = p + 5;
01041 }
01042 LM_WARN("%.*s\n", rl_dbg_str->len, rl_dbg_str->s);
01043 }
01044
01045 for (i=0; i<MAX_PIPES; i++) {
01046 if( *pipes[i].algo == PIPE_ALGO_NETWORK ) {
01047 *pipes[i].load = ( *network_load_value > *pipes[i].limit ) ? 1 : -1;
01048 } else if (*pipes[i].limit && timer_interval) {
01049 *pipes[i].load = *pipes[i].counter / (*pipes[i].limit * timer_interval);
01050 }
01051 *pipes[i].last_counter = *pipes[i].counter;
01052 *pipes[i].counter = 0;
01053 }
01054 LOCK_RELEASE(rl_lock);
01055 return (ticks_t)(-1);
01056 }
01057
01058
01059
01060 static const char *rpc_stats_doc[2] = {
01061 "Print ratelimit statistics: PIPE[<pipe_id>]: \
01062 <last_counter>/<pipe_limit> (drop rate: <drop_rate>)", 0
01063 };
01064
01065 static const char *rpc_get_pipes_doc[2] = {
01066 "Print pipes info: PIPE[<pipe_id>]: \
01067 <pipe_algo_id>:<pipe_algo> <last_counter>/<pipe_limit> (drop rate: <drop_rate>) [<current_counter>]", 0
01068 };
01069
01070 static const char *rpc_set_pipe_doc[2] = {
01071 "Sets a pipe params: <pipe_id> <pipe_algorithm> <pipe_limit>", 0
01072 };
01073
01074 static const char *rpc_get_queues_doc[2] = {
01075 "Print queues info: QUEUE[queue_id]: <pipe_id>:<queue_method>", 0
01076 };
01077
01078 static const char *rpc_set_queue_doc[2] = {
01079 "Sets queue params: <quue_id> <queue_method> <pipe_id>", 0
01080 };
01081
01082 static const char *rpc_get_pid_doc[2] = {
01083 "Print PID Controller parameters for the FEEDBACK algorithm: \
01084 <ki> <kp> <kd>", 0
01085 };
01086
01087 static const char *rpc_set_pid_doc[2] = {
01088 "Sets the PID Controller parameters for the FEEDBACK algorithm: \
01089 <ki> <kp> <kd>", 0
01090 };
01091
01092 static const char *rpc_push_load_doc[2] = {
01093 "Force the value of the load parameter for FEEDBACK algorithm: \
01094 <load>", 0
01095 };
01096
01097 static const char *rpc_set_dbg_doc[2] = {
01098 "Sets the ratelimit debug/monitoing logs: 0-off 1-on", 0
01099 };
01100
01101
01102
01103 static void rpc_stats(rpc_t *rpc, void *c) {
01104 int i;
01105
01106 LOCK_GET(rl_lock);
01107 for (i=0; i<MAX_PIPES; i++) {
01108 if (rpc->printf(c, "PIPE[%d]: %d/%d (drop rate: %d)",
01109 i, *pipes[i].last_counter, *pipes[i].limit,
01110 *pipes[i].load) < 0) goto error;
01111 }
01112 error:
01113 LOCK_RELEASE(rl_lock);
01114 }
01115
01116 static void rpc_get_pipes(rpc_t *rpc, void *c) {
01117 str algo;
01118 int i;
01119
01120 LOCK_GET(rl_lock);
01121 for (i=0; i<MAX_PIPES; i++) {
01122 if (*pipes[i].algo != PIPE_ALGO_NOP) {
01123 if (str_map_int(algo_names, *pipes[i].algo, &algo))
01124 goto error;
01125 if (rpc->printf(c, "PIPE[%d]: %d:%.*s %d/%d (drop rate: %d) [%d]",
01126 i, *pipes[i].algo, algo.len, algo.s,
01127 *pipes[i].last_counter, *pipes[i].limit,
01128 *pipes[i].load, *pipes[i].counter) < 0) goto error;
01129 }
01130 }
01131 error:
01132 LOCK_RELEASE(rl_lock);
01133 }
01134
01135 static void rpc_set_pipe(rpc_t *rpc, void *c) {
01136 int pipe_no = MAX_PIPES, algo_id, limit = 0;
01137 str algo_str;
01138
01139 if (rpc->scan(c, "dSd", &pipe_no, &algo_str, &limit) < 3) return;
01140
01141 if (str_map_str(algo_names, &algo_str, &algo_id)) {
01142 LM_ERR("unknown algorithm: '%.*s'\n", algo_str.len, algo_str.s);
01143 rpc->fault(c, 400, "Unknown algorithm");
01144 return;
01145 }
01146
01147 LM_DBG("set_pipe: %d:%d:%d\n", pipe_no, algo_id, limit);
01148
01149 if (pipe_no >= MAX_PIPES || pipe_no < 0) {
01150 LM_ERR("wrong pipe_no: %d\n", pipe_no);
01151 rpc->fault(c, 400, "Unknown pipe");
01152 return;
01153 }
01154
01155 LOCK_GET(rl_lock);
01156 *pipes[pipe_no].algo = algo_id;
01157 *pipes[pipe_no].limit = limit;
01158
01159 if (check_feedback_setpoints(0)) {
01160 LM_ERR("feedback limits don't match\n");
01161 rpc->fault(c, 400, "Feedback limits don't match");
01162 } else {
01163 *pid_setpoint = 0.01 * (double)cfg_setpoint;
01164 }
01165
01166 LOCK_RELEASE(rl_lock);
01167 }
01168
01169 static void rpc_get_queues(rpc_t *rpc, void *c) {
01170 int i;
01171
01172 LOCK_GET(rl_lock);
01173 for (i=0; i<MAX_QUEUES; i++) {
01174 if (queues[i].pipe) {
01175 if (rpc->printf(c, "QUEUE[%d]: %d:%.*s",
01176 i, *queues[i].pipe,
01177 (*queues[i].method).len,
01178 (*queues[i].method).s) < 0) goto error;
01179 }
01180 }
01181 error:
01182 LOCK_RELEASE(rl_lock);
01183 }
01184
01185 static void rpc_set_queue(rpc_t *rpc, void *c) {
01186 unsigned int queue_no = MAX_QUEUES, pipe_no = MAX_PIPES;
01187 str method, method_buf;
01188
01189
01190 if (rpc->scan(c, "dSd", &queue_no, &method, &pipe_no) < 3) return;
01191
01192 if (pipe_no >= MAX_PIPES || pipe_no < 0) {
01193 LM_ERR("Invalid pipe number: %d\n", pipe_no);
01194 rpc->fault(c, 400, "Invalid pipe number");
01195 return;
01196 }
01197
01198 if (str_cpy(&method_buf, &method)) {
01199 LM_ERR("out of memory\n");
01200 rpc->fault(c, 400, "OOM");
01201 return;
01202 }
01203
01204 LOCK_GET(rl_lock);
01205 if (queue_no >= *nqueues) {
01206 LM_ERR("MAX_QUEUES reached for queue: %d\n", queue_no);
01207 rpc->fault(c, 400, "MAX_QUEUES reached");
01208 LOCK_RELEASE(rl_lock);
01209 return;
01210 }
01211
01212 *queues[queue_no].pipe = pipe_no;
01213 if (!queues[queue_no].method->s)
01214 shm_free(queues[queue_no].method->s);
01215 queues[queue_no].method->s = method_buf.s;
01216 queues[queue_no].method->len = method_buf.len;
01217 LOCK_RELEASE(rl_lock);
01218 }
01219
01220 static void rpc_get_pid(rpc_t *rpc, void *c) {
01221 rpc->printf(c, "ki[%f] kp[%f] kd[%f] ", *pid_ki, *pid_kp, *pid_kd);
01222 }
01223
01224 static void rpc_set_pid(rpc_t *rpc, void *c) {
01225 double ki, kp, kd;
01226
01227 if (rpc->scan(c, "fff", &ki, &kp, &kd) < 3) return;
01228
01229 LOCK_GET(rl_lock);
01230 *pid_ki = ki;
01231 *pid_kp = kp;
01232 *pid_kd = kd;
01233 LOCK_RELEASE(rl_lock);
01234 }
01235
01236
01237 static void rpc_push_load(rpc_t *rpc, void *c) {
01238 double value;
01239
01240 if (rpc->scan(c, "f", &value) < 1) return;
01241
01242 if (value < 0.0 || value > 1.0) {
01243 LM_ERR("value out of range: %0.3f in not in [0.0,1.0]\n", value);
01244 rpc->fault(c, 400, "Value out of range");
01245 return;
01246 }
01247 LOCK_GET(rl_lock);
01248 *load_value = value;
01249 LOCK_RELEASE(rl_lock);
01250
01251 do_update_load();
01252 }
01253
01254 static void rpc_set_dbg(rpc_t *rpc, void *c) {
01255 int dbg_mode = 0;
01256
01257 if (rpc->scan(c, "d", &dbg_mode) < 1) return;
01258
01259 LOCK_GET(rl_lock);
01260 if (dbg_mode) {
01261 if (!rl_dbg_str->s) {
01262 rl_dbg_str->len = (MAX_PIPES * 5 * sizeof(char));
01263 rl_dbg_str->s = (char *)shm_malloc(rl_dbg_str->len);
01264 if (!rl_dbg_str->s) {
01265 rl_dbg_str->len = 0;
01266 LM_ERR("oom: %d\n", rl_dbg_str->len);
01267 }
01268 }
01269 } else {
01270 if (rl_dbg_str->s) {
01271 shm_free(rl_dbg_str->s);
01272 rl_dbg_str->s = NULL;
01273 rl_dbg_str->len = 0;
01274 }
01275 }
01276 LOCK_RELEASE(rl_lock);
01277 }
01278
01279 static rpc_export_t rpc_methods[] = {
01280 {"rl.stats", rpc_stats, rpc_stats_doc, 0},
01281 {"rl.get_pipes", rpc_get_pipes, rpc_get_pipes_doc, 0},
01282 {"rl.set_pipe", rpc_set_pipe, rpc_set_pipe_doc, 0},
01283 {"rl.get_queues", rpc_get_queues, rpc_get_queues_doc, 0},
01284 {"rl.set_queue", rpc_set_queue, rpc_set_queue_doc, 0},
01285 {"rl.get_pid", rpc_get_pid, rpc_get_pid_doc, 0},
01286 {"rl.set_pid", rpc_set_pid, rpc_set_pid_doc, 0},
01287 {"rl.push_load", rpc_push_load, rpc_push_load_doc, 0},
01288 {"rl.set_dbg", rpc_set_dbg, rpc_set_dbg_doc, 0},
01289 {0, 0, 0, 0}
01290 };
01291