modules/ratelimit/ratelimit.c

00001 /*
00002  * $Id$
00003  *
00004  * ratelimit module
00005  *
00006  * Copyright (C) 2006 Hendrik Scholz <hscholz@raisdorf.net>
00007  * Copyright (C) 2008-2010 Ovidiu Sas <osas@voipembedded.com>
00008  *
00009  * This file is part of Kamailio, a free SIP server.
00010  *
00011  * Kamailio is free software; you can redistribute it and/or modify
00012  * it under the terms of the GNU General Public License as published by
00013  * the Free Software Foundation; either version 2 of the License, or
00014  * (at your option) any later version
00015  *
00016  * Kamailio is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  * GNU General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU General Public License 
00022  * along with this program; if not, write to the Free Software 
00023  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00024  *
00025  * History:
00026  * ---------
00027  *
00028  * 2008-01-10 ported from SER project (osas)
00029  * 2008-01-16 ported enhancements from openims project (osas)
00030  * 2020-04-28 add sip-router rpc interface (osas)
00031  */
00032 
00033 #include <stdio.h>
00034 #include <stdlib.h>
00035 #include <sys/types.h>
00036 #include <regex.h>
00037 #include <math.h>
00038 
00039 
00040 #include "../../mem/mem.h"
00041 #include "../../mem/shm_mem.h"
00042 #include "../../sr_module.h"
00043 #include "../../dprint.h"
00044 #include "../../timer.h"
00045 #include "../../timer_ticks.h"
00046 #include "../../ut.h"
00047 #include "../../locking.h"
00048 #include "../../mod_fix.h"
00049 #include "../../data_lump.h"
00050 #include "../../data_lump_rpl.h"
00051 #include "../../lib/kcore/statistics.h"
00052 #include "../../rpc_lookup.h"
00053 
00054 
00055 MODULE_VERSION
00056 
00057 #define MAX_PIPES       16
00058 #define MAX_QUEUES      10
00059 
00060 /*
00061  * timer interval length in seconds, tunable via modparam
00062  */
00063 #define RL_TIMER_INTERVAL 10
00064 
00065 #define RXLS(m, str, i) (int)((m)[i].rm_eo - (m)[i].rm_so), (str) + (m)[i].rm_so
00066 #define RXL(m, str, i) (int)((m)[i].rm_eo - (m)[i].rm_so)
00067 #define RXS(m, str, i) (str) + (m)[i].rm_so
00068 
00069 static inline int str_cmp(const str * a, const str * b);
00070 static inline int str_i_cmp(const str * a, const str * b);
00071 
00072 typedef struct str_map {
00073         str     str;
00074         int     id;
00075 } str_map_t;
00076 
00077 static int str_map_str(const str_map_t * map, const str * key, int * ret);
00078 static int str_map_int(const str_map_t * map, int key, str * ret);
00079 
00080 /* PIPE_ALGO_FEEDBACK holds cpu usage to a fixed value using 
00081  * negative feedback according to the PID controller model
00082  *
00083  * <http://en.wikipedia.org/wiki/PID_controller>
00084  */
00085 enum {
00086         PIPE_ALGO_NOP = 0,
00087         PIPE_ALGO_RED,
00088         PIPE_ALGO_TAILDROP,
00089         PIPE_ALGO_FEEDBACK,
00090         PIPE_ALGO_NETWORK
00091 };
00092 
00093 str_map_t algo_names[] = {
00094         {str_init("NOP"),       PIPE_ALGO_NOP},
00095         {str_init("RED"),       PIPE_ALGO_RED},
00096         {str_init("TAILDROP"),  PIPE_ALGO_TAILDROP},
00097         {str_init("FEEDBACK"),  PIPE_ALGO_FEEDBACK},
00098         {str_init("NETWORK"),   PIPE_ALGO_NETWORK},
00099         {{0, 0},                0},
00100 };
00101 
00102 
00103 /* at jiri@iptel.org's suggestion:
00104  *
00105  * set this to 'cpu' to have openser look at /proc/stat every time_interval
00106  * or set it to 'external' and you can push data in from an external source
00107  * via the fifo interface
00108  */
00109 enum {
00110         LOAD_SOURCE_CPU,
00111         LOAD_SOURCE_EXTERNAL
00112 };
00113 
00114 str_map_t source_names[] = {
00115         {str_init("cpu"),       LOAD_SOURCE_CPU},
00116         {str_init("external"),  LOAD_SOURCE_EXTERNAL},
00117         {{0, 0},                0},
00118 };
00119 
00120 
00121 typedef struct pipe {
00122         /* stuff that gets read as a modparam or set via fifo */
00123         int *   algo;
00124         int             algo_mp;
00125         int *   limit;
00126         int             limit_mp;
00127 
00128         /* updated values */
00129         int *   counter;
00130         int *   last_counter;
00131         int *   load;
00132 } pipe_t;
00133 
00134 typedef struct rl_queue {
00135         int     *       pipe;
00136         int             pipe_mp;
00137         str     *       method;
00138         str             method_mp;
00139 } rl_queue_t;
00140 
00141 static struct timer_ln* rl_timer;
00142 
00143 /* === these change after startup */
00144 gen_lock_t * rl_lock;
00145 
00146 static double * load_value;     /* actual load, used by PIPE_ALGO_FEEDBACK */
00147 static double * pid_kp, * pid_ki, * pid_kd, * pid_setpoint; /* PID tuning params */
00148 static int * drop_rate;         /* updated by PIPE_ALGO_FEEDBACK */
00149 
00150 static int * network_load_value;      /* network load */
00151 
00152 /* where to get the load for feedback. values: cpu, external */
00153 static int load_source_mp = LOAD_SOURCE_CPU;
00154 static int * load_source;
00155 
00156 typedef struct pipe_params {
00157         int no;
00158         int algo;
00159         int limit;
00160 } pipe_params_t;
00161 
00162 typedef struct rl_queue_params {
00163         int pipe;
00164         str method;
00165 } rl_queue_params_t;
00166 
00167 static pipe_t pipes[MAX_PIPES];
00168 static rl_queue_t queues[MAX_QUEUES];
00169 
00170 static int nqueues_mp = 0;
00171 static int * nqueues;
00172 
00173 static  str * rl_dbg_str = NULL;
00174 
00175 /* these only change in the mod_init() process -- no locking needed */
00176 static int timer_interval = RL_TIMER_INTERVAL;
00177 static int cfg_setpoint;        /* desired load, used when reading modparams */
00178 /* === */
00179 
00180 #ifndef RL_DEBUG_LOCKS
00181 # define LOCK_GET lock_get
00182 # define LOCK_RELEASE lock_release
00183 #else
00184 # define LOCK_GET(l) do { \
00185         LM_INFO("%d: + get\n", __LINE__); \
00186         lock_get(l); \
00187         LM_INFO("%d: - get\n", __LINE__); \
00188 } while (0)
00189 
00190 # define LOCK_RELEASE(l) do { \
00191         LM_INFO("%d: + release\n", __LINE__); \
00192         lock_release(l); \
00193         LM_INFO("%d: - release\n", __LINE__); \
00194 } while (0)
00195 #endif
00196 
00197 static int params_inited = 0;
00198 static regex_t pipe_params_regex;
00199 static regex_t queue_params_regex;
00200 
00202 static int mod_init(void);
00203 static ticks_t rl_timer_handle(ticks_t, struct timer_ln*, void*);
00204 static int w_rl_check_default(struct sip_msg*, char *, char *);
00205 static int w_rl_check_forced(struct sip_msg*, char *, char *);
00206 static int w_rl_check_forced_pipe(struct sip_msg*, char *, char *);
00207 static int add_queue_params(modparam_t, void *);
00208 static int add_pipe_params(modparam_t, void *);
00209 /* RESERVED for future use
00210 static int set_load_source(modparam_t, void *);
00211 */
00212 static void destroy(void);
00213 
00214 static cmd_export_t cmds[]={
00215         {"rl_check",      (cmd_function)w_rl_check_default,     0, 0,               0,               REQUEST_ROUTE|LOCAL_ROUTE},
00216         {"rl_check",      (cmd_function)w_rl_check_forced,      1, fixup_pvar_null,
00217                 fixup_free_pvar_null, REQUEST_ROUTE|LOCAL_ROUTE},
00218         {"rl_check_pipe", (cmd_function)w_rl_check_forced_pipe, 1, fixup_uint_null, 0,               REQUEST_ROUTE|LOCAL_ROUTE},
00219         {0,0,0,0,0,0}
00220 };
00221 static param_export_t params[]={
00222         {"timer_interval", INT_PARAM,                &timer_interval},
00223         {"queue",          STR_PARAM|USE_FUNC_PARAM, (void *)add_queue_params},
00224         {"pipe",           STR_PARAM|USE_FUNC_PARAM, (void *)add_pipe_params},
00225         /* RESERVED for future use
00226         {"load_source",    STR_PARAM|USE_FUNC_PARAM, (void *)set_load_source},
00227         */
00228         {0,0,0}
00229 };
00230 
00231 static rpc_export_t rpc_methods[];
00232 
00234 struct module_exports exports= {
00235         "ratelimit",
00236         DEFAULT_DLFLAGS,                /* dlopen flags */
00237         cmds,
00238         params,
00239         0,                              /* exported statistics */
00240         0,                              /* exported MI functions */
00241         0,                              /* exported pseudo-variables */
00242         0,                              /* extra processes */
00243         mod_init,                       /* module initialization function */
00244         0,
00245         (destroy_function) destroy,     /* module exit function */
00246         0                               /* per-child init function */
00247 };
00248 
00249 
00254 static int str_map_str(const str_map_t * map, const str * key, int * ret)
00255 {
00256         for (; map->str.s; map++) 
00257                 if (! str_cmp(&map->str, key)) {
00258                         *ret = map->id;
00259                         return 0;
00260                 }
00261         LM_DBG("str_map_str() failed map=%p key=%.*s\n", map, key->len, key->s);
00262         return -1;
00263 }
00264 
00269 static int str_map_int(const str_map_t * map, int key, str * ret)
00270 {
00271         for (; map->str.s; map++) 
00272                 if (map->id == key) {
00273                         *ret = map->str;
00274                         return 0;
00275                 }
00276         LM_DBG("str_map_str() failed map=%p key=%d\n", map, key);
00277         return -1;
00278 }
00279 
00284 static int str_cpy(str * dest, str * src)
00285 {
00286         dest->len = src->len;
00287         dest->s = shm_malloc(src->len);
00288         if (! dest->s) {
00289                 LM_ERR("oom: '%.*s'\n", src->len, src->s);
00290                 return -1;
00291         }
00292         memcpy(dest->s, src->s, src->len);
00293         return 0;
00294 }
00295 
00296 /* not using /proc/loadavg because it only works when our_timer_interval == theirs */
00297 static int get_cpuload(double * load)
00298 {
00299         static 
00300         long long o_user, o_nice, o_sys, o_idle, o_iow, o_irq, o_sirq, o_stl;
00301         long long n_user, n_nice, n_sys, n_idle, n_iow, n_irq, n_sirq, n_stl;
00302         static int first_time = 1;
00303         FILE * f = fopen("/proc/stat", "r");
00304 
00305         if (! f) {
00306                 LM_ERR("could not open /proc/stat\n");
00307                 return -1;
00308         }
00309         if (fscanf(f, "cpu  %lld%lld%lld%lld%lld%lld%lld%lld",
00310                         &n_user, &n_nice, &n_sys, &n_idle, &n_iow, &n_irq, &n_sirq, &n_stl) < 0) {
00311                   LM_ERR("could not parse load informations\n");
00312                   return -1;
00313         }
00314         fclose(f);
00315 
00316         if (first_time) {
00317                 first_time = 0;
00318                 *load = 0;
00319         } else {                
00320                 long long d_total =     (n_user - o_user)       + 
00321                                         (n_nice - o_nice)       + 
00322                                         (n_sys  - o_sys)        + 
00323                                         (n_idle - o_idle)       + 
00324                                         (n_iow  - o_iow)        + 
00325                                         (n_irq  - o_irq)        + 
00326                                         (n_sirq - o_sirq)       + 
00327                                         (n_stl  - o_stl);
00328                 long long d_idle =      (n_idle - o_idle);
00329 
00330                 *load = 1.0 - ((double)d_idle) / (double)d_total;
00331         }
00332 
00333         o_user  = n_user; 
00334         o_nice  = n_nice; 
00335         o_sys   = n_sys; 
00336         o_idle  = n_idle; 
00337         o_iow   = n_iow; 
00338         o_irq   = n_irq; 
00339         o_sirq  = n_sirq; 
00340         o_stl   = n_stl;
00341         
00342         return 0;
00343 }
00344 
00345 static double int_err = 0.0;
00346 static double last_err = 0.0;
00347 
00348 /* (*load_value) is expected to be in the 0.0 - 1.0 range
00349  * (expects rl_lock to be taken)
00350  */
00351 static void do_update_load(void)
00352 {
00353         static char spcs[51];
00354         int load;
00355         double err, dif_err, output;
00356 
00357         /* PID update */
00358         err = *pid_setpoint - *load_value;
00359 
00360         dif_err = err - last_err;
00361 
00362         /*
00363          * TODO?: the 'if' is needed so low cpu loads for 
00364          * long periods (which can't be compensated by 
00365          * negative drop rates) don't confuse the controller
00366          *
00367          * NB: - "err < 0" means "desired_cpuload < actual_cpuload"
00368          *     - int_err is integral(err) over time
00369          */
00370         if (int_err < 0 || err < 0)
00371                 int_err += err;
00372 
00373         output =        (*pid_kp) * err + 
00374                                 (*pid_ki) * int_err + 
00375                                 (*pid_kd) * dif_err;
00376         last_err = err;
00377 
00378         *drop_rate = (output > 0) ? output  : 0;
00379 
00380         load = 0.5 + 100.0 * *load_value; /* round instead of floor */
00381 
00382         memset(spcs, '-', load / 4);
00383         spcs[load / 4] = 0;
00384 
00385         /*
00386         LM_DBG("p=% 6.2lf i=% 6.2lf d=% 6.2lf o=% 6.2lf %s|%d%%\n",
00387                 err, int_err, dif_err, output, spcs, load);
00388         */
00389 }
00390 
00391 static void update_cpu_load(void)
00392 {
00393         if (get_cpuload(load_value)) 
00394                 return;
00395 
00396         do_update_load();
00397 }
00398 
00399 /* initialize ratelimit module */
00400 static int mod_init(void)
00401 {
00402         int i;
00403 
00404         if (rpc_register_array(rpc_methods)!=0) {
00405                 LM_ERR("failed to register RPC commands\n");
00406                 return -1;
00407         }
00408 
00409         rl_lock = lock_alloc();
00410         if (! rl_lock) {
00411                 LM_ERR("oom in lock_alloc()\n");
00412                 return -1;
00413         }
00414 
00415         if (lock_init(rl_lock)==0) {
00416                 LM_ERR("failed to init lock\n");
00417                 return -1;
00418         }
00419 
00420         /* register timer to reset counters */
00421         if ((rl_timer = timer_alloc()) == NULL) {
00422                 LM_ERR("could not allocate timer\n");
00423                 return -1;
00424         }
00425         timer_init(rl_timer, rl_timer_handle, 0, F_TIMER_FAST);
00426         timer_add(rl_timer, MS_TO_TICKS(1000*timer_interval));
00427 
00428         network_load_value = shm_malloc(sizeof(int));
00429         if (network_load_value==NULL) {
00430                 LM_ERR("oom for network_load_value\n");
00431                 return -1;
00432         }
00433 
00434         load_value = shm_malloc(sizeof(double));
00435         if (load_value==NULL) {
00436                 LM_ERR("oom for load_value\n");
00437                 return -1;
00438         }
00439         load_source = shm_malloc(sizeof(int));
00440         if (load_source==NULL) {
00441                 LM_ERR("oom for load_source\n");
00442                 return -1;
00443         }
00444         pid_kp = shm_malloc(sizeof(double));
00445         if (pid_kp==NULL) {
00446                 LM_ERR("oom for pid_kp\n");
00447                 return -1;
00448         }
00449         pid_ki = shm_malloc(sizeof(double));
00450         if (pid_ki==NULL) {
00451                 LM_ERR("oom for pid_ki\n");
00452                 return -1;
00453         }
00454         pid_kd = shm_malloc(sizeof(double));
00455         if (pid_kd==NULL) {
00456                 LM_ERR("oom for pid_kd\n");
00457                 return -1;
00458         }
00459         pid_setpoint = shm_malloc(sizeof(double));
00460         if (pid_setpoint==NULL) {
00461                 LM_ERR("oom for pid_setpoint\n");
00462                 return -1;
00463         }
00464         drop_rate = shm_malloc(sizeof(int));
00465         if (drop_rate==NULL) {
00466                 LM_ERR("oom for drop_rate\n");
00467                 return -1;
00468         }
00469         nqueues = shm_malloc(sizeof(int));
00470         if (nqueues==NULL) {
00471                 LM_ERR("oom for nqueues\n");
00472                 return -1;
00473         }
00474         rl_dbg_str = shm_malloc(sizeof(str));
00475         if (rl_dbg_str==NULL) {
00476                 LM_ERR("oom for rl_dbg_str\n");
00477                 return -1;
00478         }
00479 
00480         *network_load_value = 0;
00481         *load_value = 0.0;
00482         *load_source = load_source_mp;
00483         *pid_kp = 0.0;
00484         *pid_ki = -25.0;
00485         *pid_kd = 0.0;
00486         *pid_setpoint = 0.01 * (double)cfg_setpoint;
00487         *drop_rate      = 0;
00488         *nqueues = nqueues_mp;
00489         rl_dbg_str->s = NULL;
00490         rl_dbg_str->len = 0;
00491 
00492         for (i=0; i<MAX_PIPES; i++) {
00493                 pipes[i].algo    = shm_malloc(sizeof(int));
00494                 if (pipes[i].algo==NULL) {
00495                         LM_ERR("oom for pipes[%d].algo\n", i);
00496                         return -1;
00497                 }
00498                 pipes[i].limit   = shm_malloc(sizeof(int));
00499                 if (pipes[i].limit==NULL) {
00500                         LM_ERR("oom for pipes[%d].limit\n", i);
00501                         return -1;
00502                 }
00503                 pipes[i].load    = shm_malloc(sizeof(int));
00504                 if (pipes[i].load==NULL) {
00505                         LM_ERR("oom for pipes[%d].load\n", i);
00506                         return -1;
00507                 }
00508                 pipes[i].counter = shm_malloc(sizeof(int));
00509                 if (pipes[i].counter==NULL) {
00510                         LM_ERR("oom for pipes[%d].counter\n", i);
00511                         return -1;
00512                 }
00513                 pipes[i].last_counter = shm_malloc(sizeof(int));
00514                 if (pipes[i].last_counter==NULL) {
00515                         LM_ERR("oom for pipes[%d].last_counter\n", i);
00516                         return -1;
00517                 }
00518                 *pipes[i].algo    = pipes[i].algo_mp;
00519                 *pipes[i].limit   = pipes[i].limit_mp;
00520                 *pipes[i].load    = 0;
00521                 *pipes[i].counter = 0;
00522                 *pipes[i].last_counter = 0;
00523         }
00524 
00525         for (i=0; i<*nqueues; i++) {
00526                 queues[i].pipe   = shm_malloc(sizeof(int));
00527                 if (queues[i].pipe==NULL) {
00528                         LM_ERR("oom for queues[%d].pipe\n", i);
00529                         return -1;
00530                 }
00531                 queues[i].method = shm_malloc(sizeof(str));
00532                 if (queues[i].method==NULL) {
00533                         LM_ERR("oom for queues[%d].method\n", i);
00534                         return -1;
00535                 }
00536 
00537                 *queues[i].pipe   = queues[i].pipe_mp;
00538                 if (queues[i].method_mp.s == NULL) {
00539                         LM_ERR("unexpected NULL method for queues[%d].method_mp\n", i);
00540                         return -1;
00541                 }
00542                 if(str_cpy(queues[i].method, &queues[i].method_mp)) {
00543                         LM_ERR("oom str_cpy(queues[%d].method\n", i);
00544                         return -1;
00545                 }
00546                 pkg_free(queues[i].method_mp.s);
00547                 queues[i].method_mp.s = NULL;
00548                 queues[i].method_mp.len = 0;
00549         }
00550 
00551         return 0;
00552 }
00553 
00554 
00555 static void destroy(void)
00556 {
00557         int i;
00558 
00559         regfree(&pipe_params_regex);
00560         regfree(&queue_params_regex);
00561 
00562         for (i=0;  i<MAX_PIPES; i++) {
00563                 if (pipes[i].algo) {
00564                         shm_free(pipes[i].algo);
00565                         pipes[i].algo = NULL;
00566                 }
00567                 if (pipes[i].load) {
00568                         shm_free(pipes[i].load);
00569                         pipes[i].load = NULL;
00570                 }
00571                 if (pipes[i].counter) {
00572                         shm_free(pipes[i].counter);
00573                         pipes[i].counter = NULL;
00574                 }
00575                 if (pipes[i].last_counter) {
00576                         shm_free(pipes[i].last_counter);
00577                         pipes[i].last_counter = NULL;
00578                 }
00579                 if (pipes[i].limit) {
00580                         shm_free(pipes[i].limit);
00581                         pipes[i].limit = NULL;
00582                 }
00583         }
00584 
00585         if (nqueues) {
00586                 for (i=0; i<*nqueues; i++) {
00587                         if (queues[i].pipe) {
00588                                 shm_free(queues[i].pipe);
00589                                 queues[i].pipe = NULL;
00590                         }
00591                         if (queues[i].method) {
00592                                 if (queues[i].method->s) {
00593                                         shm_free(queues[i].method->s);
00594                                         queues[i].method->s = NULL;
00595                                         queues[i].method->len = 0;
00596                                 }
00597                                 shm_free(queues[i].method);
00598                                 queues[i].method = NULL;
00599                         }
00600                 }
00601         }
00602 
00603         if (network_load_value) {
00604                 shm_free(network_load_value);
00605                 network_load_value = NULL;
00606         }
00607         if (load_value) {
00608                 shm_free(load_value);
00609                 load_value = NULL;
00610         }
00611         if (load_source) {
00612                 shm_free(load_source);
00613                 load_source = NULL;
00614         }
00615         if (pid_kp) {
00616                 shm_free(pid_kp);
00617                 pid_kp= NULL;
00618         }
00619         if (pid_ki) {
00620                 shm_free(pid_ki);
00621                 pid_ki = NULL;
00622         }
00623         if (pid_kd) {
00624                 shm_free(pid_kd);
00625                 pid_kd = NULL;
00626         }
00627         if (pid_setpoint) {
00628                 shm_free(pid_setpoint);
00629                 pid_setpoint = NULL;
00630         }
00631         if (drop_rate) {
00632                 shm_free(drop_rate);
00633                 drop_rate = NULL;
00634         }
00635         if (nqueues) {
00636                 shm_free(nqueues);
00637                 nqueues = NULL;
00638         }
00639         if (rl_dbg_str) {
00640                 if (rl_dbg_str->s) {
00641                         shm_free(rl_dbg_str->s);
00642                         rl_dbg_str->s = NULL;
00643                         rl_dbg_str->len = 0;
00644                 }
00645                 shm_free(rl_dbg_str);
00646                 rl_dbg_str = NULL;
00647         }
00648 
00649         if (rl_timer) {
00650                 timer_free(rl_timer);
00651                 rl_timer = NULL;
00652         }
00653 
00654         if (rl_lock) {
00655                 lock_destroy(rl_lock);
00656                 lock_dealloc((void *)rl_lock);
00657         }
00658 }
00659 
00660 
00661 static inline int str_cmp(const str * a , const str * b)
00662 {
00663         return ! (a->len == b->len && ! strncmp(a->s, b->s, a->len));
00664 }
00665 
00666 static inline int str_i_cmp(const str * a, const str * b)
00667 {
00668         return ! (a->len == b->len && ! strncasecmp(a->s, b->s, a->len));
00669 }
00670 
00671 str queue_other = str_init("*");
00672 
00678 static int find_queue(struct sip_msg * msg, int * queue)
00679 {
00680         str method = msg->first_line.u.request.method;
00681         int i;
00682 
00683         *queue = -1;
00684         for (i=0; i<*nqueues; i++)
00685                 if (! str_i_cmp(queues[i].method, &method)) {
00686                         *queue = i;
00687                         return 0;
00688                 } else if (! str_i_cmp(queues[i].method, &queue_other)) {
00689                         *queue = i;
00690                 }
00691         
00692         if (*queue >= 0)
00693                 return 0;
00694         
00695         LM_INFO("no queue matches\n");
00696         return -1;
00697 }
00698 
00699 /* this is here to avoid using rand() ... which doesn't _always_ return
00700  * exactly what we want (see NOTES section in 'man 3 rand')
00701  */
00702 int hash[100] = {18, 50, 51, 39, 49, 68, 8, 78, 61, 75, 53, 32, 45, 77, 31, 
00703         12, 26, 10, 37, 99, 29, 0, 52, 82, 91, 22, 7, 42, 87, 43, 73, 86, 70, 
00704         69, 13, 60, 24, 25, 6, 93, 96, 97, 84, 47, 79, 64, 90, 81, 4, 15, 63, 
00705         44, 57, 40, 21, 28, 46, 94, 35, 58, 11, 30, 3, 20, 41, 74, 34, 88, 62, 
00706         54, 33, 92, 76, 85, 5, 72, 9, 83, 56, 17, 95, 55, 80, 98, 66, 14, 16, 
00707         38, 71, 23, 2, 67, 36, 65, 27, 1, 19, 59, 89, 48};
00708 
00709 
00715 static int pipe_push(struct sip_msg * msg, int id)
00716 {
00717         int ret;
00718 
00719         (*pipes[id].counter)++;
00720 
00721         switch (*pipes[id].algo) {
00722                 case PIPE_ALGO_NOP:
00723                         LM_ERR("no algorithm defined for pipe %d\n", id);
00724                         ret = 1;
00725                         break;
00726                 case PIPE_ALGO_TAILDROP:
00727                         ret = (*pipes[id].counter <= *pipes[id].limit * timer_interval) ? 1 : -1;
00728                         break;
00729                 case PIPE_ALGO_RED:
00730                         if (*pipes[id].load == 0)
00731                                 ret = 1;
00732                         else
00733                                 ret = (! (*pipes[id].counter % *pipes[id].load)) ? 1 : -1;
00734                         break;
00735                 case PIPE_ALGO_FEEDBACK:
00736                         ret = (hash[*pipes[id].counter % 100] < *drop_rate) ? -1 : 1;
00737                         break;
00738                 case PIPE_ALGO_NETWORK:
00739                         ret = -1 * *pipes[id].load;
00740                         break;
00741                 default:
00742                         LM_ERR("unknown ratelimit algorithm: %d\n", *pipes[id].algo);
00743                         ret = 1;
00744         }
00745 
00746         return ret;     
00747 }
00748 
00755 static int rl_check(struct sip_msg * msg, int forced_pipe)
00756 {
00757         int que_id, pipe_id, ret;
00758         str method = msg->first_line.u.request.method;
00759 
00760         if (forced_pipe >=0 && (forced_pipe>=MAX_PIPES || *pipes[forced_pipe].algo==PIPE_ALGO_NOP)) {
00761                 LM_ERR("forced pipe %d out of range or not defined", forced_pipe);
00762                 return -1;
00763         }
00764 
00765         LOCK_GET(rl_lock);
00766         if (forced_pipe < 0) { 
00767                 if (find_queue(msg, &que_id)) {
00768                         pipe_id = que_id = 0;
00769                         ret = -1;
00770                         goto out_release;
00771                 }
00772                 pipe_id = *queues[que_id].pipe;
00773         } else {
00774                 que_id = 0;
00775                 pipe_id = forced_pipe;
00776         }
00777 
00778         ret = pipe_push(msg, pipe_id);
00779 out_release:
00780         LOCK_RELEASE(rl_lock);
00781 
00782         /* no locks here because it's only read and pipes[pipe_id] is always alloc'ed */
00783         LM_DBG("meth=%.*s queue=%d pipe=%d algo=%d limit=%d pkg_load=%d counter=%d "
00784                 "load=%2.1lf network_load=%d => %s\n",
00785                 method.len, method.s, que_id, pipe_id,
00786                 *pipes[pipe_id].algo, *pipes[pipe_id].limit,
00787                 *pipes[pipe_id].load, *pipes[pipe_id].counter,
00788                 *load_value, *network_load_value, (ret == 1) ? "ACCEPT" : "DROP");
00789 
00790         return ret;
00791 }
00792 
00793 static int w_rl_check_forced(struct sip_msg* msg, char *p1, char *p2)
00794 {
00795         int pipe = -1;
00796         pv_value_t pv_val;
00797 
00798         if (p1 && (pv_get_spec_value(msg, (pv_spec_t *)p1, &pv_val) == 0)) {
00799                 if (pv_val.flags & PV_VAL_INT) {
00800                         pipe = pv_val.ri;
00801                         LM_DBG("pipe=%d\n", pipe);
00802                 } else if (pv_val.flags & PV_VAL_STR) {
00803                         if(str2int(&(pv_val.rs), (unsigned int*)&pipe) != 0) {
00804                                 LM_ERR("Unable to get pipe from pv '%.*s'"
00805                                         "=> defaulting to method type checking\n",
00806                                         pv_val.rs.len, pv_val.rs.s);
00807                                 pipe = -1;
00808                         }
00809                 } else {
00810                         LM_ERR("pv not a str or int => defaulting to method type checking\n");
00811                         pipe = -1;
00812                 }
00813         } else {
00814                 LM_ERR("Unable to get pipe from pv:%p"
00815                         " => defaulting to method type checking\n", p1);
00816                 pipe = -1;
00817         }
00818         return rl_check(msg, pipe);
00819 }
00820 static int w_rl_check_forced_pipe(struct sip_msg* msg, char *p1, char *p2)
00821 {
00822         int pipe;
00823 
00824         pipe = (int)(unsigned int)(unsigned long)p1;
00825         LM_DBG("trying pipe %d\n", pipe);
00826         return rl_check(msg, pipe);
00827 }
00828 
00829 static int w_rl_check_default(struct sip_msg* msg, char *p1, char *p2)
00830 {
00831         return rl_check(msg, -1);
00832 }
00833 
00834 /* RESERVED for future use
00835 static int set_load_source(modparam_t type, void * val)
00836 {
00837         str src_name = { .s = val, .len = strlen(val) };
00838         int src_id;
00839 
00840         if (str_map_str(source_names, &src_name, &src_id)) {
00841                 LM_ERR("unknown load source: %.*s\n", src_name.len, src_name.s);
00842                 return -1;
00843         }
00844 
00845         load_source_mp = src_id;
00846         LM_INFO("switched to load source: %.*s\n", src_name.len, src_name.s);
00847 
00848         return 0;
00849 }
00850 */
00851 
00856 static int init_params(void)
00857 {
00858         if (regcomp(&pipe_params_regex, "^([0-9]+):([^: ]+):([0-9]+)$", REG_EXTENDED|REG_ICASE) ||
00859                 regcomp(&queue_params_regex, "^([0-9]+):([^: ]+)$", REG_EXTENDED|REG_ICASE)) {
00860                 LM_ERR("can't compile modparam regexes\n");
00861                 return -1;
00862         }
00863 
00864         memset(pipes, 0, sizeof(pipes));
00865         memset(queues, 0, sizeof(queues));
00866 
00867         params_inited = 1;
00868         return 0;
00869 }
00870 
00871 
00876 static int parse_pipe_params(char * line, pipe_params_t * params)
00877 {
00878         regmatch_t m[4];
00879         str algo_str;
00880 
00881         if (! params_inited && init_params())
00882                 return -1;
00883         if (regexec(&pipe_params_regex, line, 4, m, 0)) {
00884                 LM_ERR("invalid param tuple: %s\n", line);
00885                 return -1;
00886         }
00887         LM_DBG("pipe: [%.*s|%.*s|%.*s]\n",
00888                 RXLS(m, line, 1), RXLS(m, line, 2), RXLS(m, line, 3));
00889         
00890         params->no = atoi(RXS(m, line, 1));
00891         params->limit = atoi(RXS(m, line, 3));
00892 
00893         algo_str.s   = RXS(m, line, 2);
00894         algo_str.len = RXL(m, line, 2);
00895         if (str_map_str(algo_names, &algo_str, &params->algo))
00896                 return -1;
00897 
00898         return 0;
00899 }
00900 
00905 static int parse_queue_params(char * line, rl_queue_params_t * params)
00906 {
00907         regmatch_t m[3];
00908         int len;
00909 
00910         if (! params_inited && init_params())
00911                 return -1;
00912         if (regexec(&queue_params_regex, line, 3, m, 0)) {
00913                 LM_ERR("invalid param tuple: %s\n", line);
00914                 return -1;
00915         }
00916         LM_DBG("queue: [%.*s|%.*s]\n",
00917                 RXLS(m, line, 1), RXLS(m, line, 2));
00918         
00919         params->pipe = atoi(RXS(m, line, 1));
00920 
00921         len = RXL(m, line, 2);
00922         params->method.s = (char *)pkg_malloc(len+1);
00923         if (params->method.s == 0) {
00924                 LM_ERR("no memory left for method in params\n");
00925                 return -1;
00926         }
00927         params->method.len = len;
00928         memcpy(params->method.s, RXS(m, line, 2), len+1);
00929 
00930         return 0;
00931 }
00932 
00940 static int check_feedback_setpoints(int modparam)
00941 {
00942         int i, sp;
00943 
00944         cfg_setpoint = -1;
00945 
00946         for (i=0; i<MAX_PIPES; i++)
00947                 if (pipes[i].algo_mp == PIPE_ALGO_FEEDBACK) {
00948                         sp = modparam ? pipes[i].limit_mp : *pipes[i].limit;
00949 
00950                         if (sp < 0 || sp > 100) {
00951                                 LM_ERR("FEEDBACK cpu load must be >=0 and <= 100\n");
00952                                 return -1;
00953                         } else if (cfg_setpoint == -1) {
00954                                 cfg_setpoint = sp;
00955                         } else if (sp != cfg_setpoint) {
00956                                 LM_ERR("pipe %d: FEEDBACK cpu load values must "
00957                                         "be equal for all pipes\n", i);
00958                                 return -1;
00959                         }
00960                 }
00961 
00962         return 0;
00963 }
00964 
00965 
00966 static int add_pipe_params(modparam_t type, void * val)
00967 {
00968         char * param_line = val;
00969         pipe_params_t params;
00970 
00971         if (parse_pipe_params(param_line, &params))
00972                 return -1;
00973         
00974         if (params.no < 0 || params.no >= MAX_PIPES) {
00975                 LM_ERR("pipe number %d not allowed (MAX_PIPES=%d, 0-based)\n",
00976                         params.no, MAX_PIPES);
00977                 return -1;
00978         }
00979 
00980         pipes[params.no].algo_mp = params.algo;
00981         pipes[params.no].limit_mp = params.limit;
00982 
00983         return check_feedback_setpoints(1);
00984 }
00985 
00986 static int add_queue_params(modparam_t type, void * val)
00987 {
00988         char * param_line = val;
00989         rl_queue_params_t params;
00990 
00991         if (nqueues_mp >= MAX_QUEUES) {
00992                 LM_ERR("MAX_QUEUES reached (%d)\n", MAX_QUEUES);
00993                 return -1;
00994         }
00995 
00996         if (parse_queue_params(param_line, &params))
00997                 return -1;
00998 
00999         if (params.pipe >= MAX_PIPES) {
01000                 LM_ERR("pipe number %d not allowed (MAX_PIPES=%d, 0-based)\n",
01001                         params.pipe, MAX_PIPES);
01002                 return -1;
01003         }
01004 
01005         queues[nqueues_mp].pipe_mp = params.pipe;
01006         queues[nqueues_mp].method_mp = params.method;
01007         nqueues_mp++;
01008 
01009         return 0;
01010 }
01011 
01012 
01013 /* timer housekeeping, invoked each timer interval to reset counters */
01014 static ticks_t rl_timer_handle(ticks_t ticks, struct timer_ln* tl, void* data)
01015 {
01016         int i, len;
01017         char *c, *p;
01018 
01019         LOCK_GET(rl_lock);
01020         switch (*load_source) {
01021                 case LOAD_SOURCE_CPU:
01022                         update_cpu_load();
01023                         break;
01024         }
01025 
01026         *network_load_value = get_total_bytes_waiting();
01027 
01028         if (rl_dbg_str->s) {
01029                 c = p = rl_dbg_str->s;
01030                 memset(c, ' ', rl_dbg_str->len);
01031                 for (i=0; i<MAX_PIPES; i++) {
01032                         c = int2str(*pipes[i].counter, &len);
01033                         if (len < 4) {
01034                                 memcpy( p + (5-len), c, len );
01035                         } else {
01036                                 memset(p, '*', 5);
01037                                 LM_WARN("Counter pipes[%d] to big: %d\n",
01038                                         i, *pipes[i].counter);
01039                         }
01040                         p = p + 5;
01041                 }
01042                 LM_WARN("%.*s\n", rl_dbg_str->len, rl_dbg_str->s);
01043         }
01044 
01045         for (i=0; i<MAX_PIPES; i++) {
01046                 if( *pipes[i].algo == PIPE_ALGO_NETWORK ) {
01047                         *pipes[i].load = ( *network_load_value > *pipes[i].limit ) ? 1 : -1;
01048                 } else if (*pipes[i].limit && timer_interval) {
01049                         *pipes[i].load = *pipes[i].counter / (*pipes[i].limit * timer_interval);
01050                 }
01051                 *pipes[i].last_counter = *pipes[i].counter;
01052                 *pipes[i].counter = 0;
01053         }
01054         LOCK_RELEASE(rl_lock);
01055         return (ticks_t)(-1); /* periodical */
01056 }
01057 
01058 
01059 /* rpc function documentation */
01060 static const char *rpc_stats_doc[2] = {
01061         "Print ratelimit statistics: PIPE[<pipe_id>]: \
01062 <last_counter>/<pipe_limit> (drop rate: <drop_rate>)", 0
01063 };
01064 
01065 static const char *rpc_get_pipes_doc[2] = {
01066         "Print pipes info: PIPE[<pipe_id>]: \
01067 <pipe_algo_id>:<pipe_algo> <last_counter>/<pipe_limit> (drop rate: <drop_rate>) [<current_counter>]", 0
01068 };
01069 
01070 static const char *rpc_set_pipe_doc[2] = {
01071         "Sets a pipe params: <pipe_id> <pipe_algorithm> <pipe_limit>", 0
01072 };
01073 
01074 static const char *rpc_get_queues_doc[2] = {
01075         "Print queues info: QUEUE[queue_id]: <pipe_id>:<queue_method>", 0
01076 };
01077 
01078 static const char *rpc_set_queue_doc[2] = {
01079         "Sets queue params: <quue_id> <queue_method> <pipe_id>", 0
01080 };
01081 
01082 static const char *rpc_get_pid_doc[2] = {
01083         "Print PID Controller parameters for the FEEDBACK algorithm: \
01084 <ki> <kp> <kd>", 0
01085 };
01086 
01087 static const char *rpc_set_pid_doc[2] = {
01088         "Sets the PID Controller parameters for the FEEDBACK algorithm: \
01089 <ki> <kp> <kd>", 0
01090 };
01091 
01092 static const char *rpc_push_load_doc[2] = {
01093         "Force the value of the load parameter for FEEDBACK algorithm: \
01094 <load>", 0
01095 };
01096 
01097 static const char *rpc_set_dbg_doc[2] = {
01098         "Sets the ratelimit debug/monitoing logs: 0-off 1-on", 0
01099 };
01100 
01101 
01102 /* rpc function implementations */
01103 static void rpc_stats(rpc_t *rpc, void *c) {
01104         int i;
01105 
01106         LOCK_GET(rl_lock);
01107         for (i=0; i<MAX_PIPES; i++) {
01108                 if (rpc->printf(c, "PIPE[%d]: %d/%d (drop rate: %d)",
01109                         i, *pipes[i].last_counter, *pipes[i].limit,
01110                         *pipes[i].load) < 0) goto error;
01111         }
01112 error:
01113         LOCK_RELEASE(rl_lock);
01114 }
01115 
01116 static void rpc_get_pipes(rpc_t *rpc, void *c) {
01117         str algo;
01118         int i;
01119 
01120         LOCK_GET(rl_lock);
01121         for (i=0; i<MAX_PIPES; i++) {
01122                 if (*pipes[i].algo != PIPE_ALGO_NOP) {
01123                         if (str_map_int(algo_names, *pipes[i].algo, &algo))
01124                                 goto error;
01125                         if (rpc->printf(c, "PIPE[%d]: %d:%.*s %d/%d (drop rate: %d) [%d]",
01126                                 i, *pipes[i].algo, algo.len, algo.s,
01127                                 *pipes[i].last_counter, *pipes[i].limit,
01128                                 *pipes[i].load, *pipes[i].counter) < 0) goto error;
01129                 }
01130         }
01131 error:
01132         LOCK_RELEASE(rl_lock);
01133 }
01134 
01135 static void rpc_set_pipe(rpc_t *rpc, void *c) {
01136         int pipe_no = MAX_PIPES, algo_id, limit = 0;
01137         str algo_str;
01138 
01139         if (rpc->scan(c, "dSd", &pipe_no, &algo_str, &limit) < 3) return;
01140 
01141         if (str_map_str(algo_names, &algo_str, &algo_id)) {
01142                 LM_ERR("unknown algorithm: '%.*s'\n", algo_str.len, algo_str.s);
01143                 rpc->fault(c, 400, "Unknown algorithm");
01144                 return;
01145         }
01146 
01147         LM_DBG("set_pipe: %d:%d:%d\n", pipe_no, algo_id, limit);
01148 
01149         if (pipe_no >= MAX_PIPES || pipe_no < 0) {
01150                 LM_ERR("wrong pipe_no: %d\n", pipe_no);
01151                 rpc->fault(c, 400, "Unknown pipe");
01152                 return;
01153         }
01154 
01155         LOCK_GET(rl_lock);
01156         *pipes[pipe_no].algo = algo_id;
01157         *pipes[pipe_no].limit = limit;
01158 
01159         if (check_feedback_setpoints(0)) {
01160                 LM_ERR("feedback limits don't match\n");
01161                 rpc->fault(c, 400, "Feedback limits don't match");
01162         } else {
01163                 *pid_setpoint = 0.01 * (double)cfg_setpoint;
01164         }
01165 
01166         LOCK_RELEASE(rl_lock);
01167 }
01168 
01169 static void rpc_get_queues(rpc_t *rpc, void *c) {
01170         int i;
01171 
01172         LOCK_GET(rl_lock);
01173         for (i=0; i<MAX_QUEUES; i++) {
01174                 if (queues[i].pipe) {
01175                         if (rpc->printf(c, "QUEUE[%d]: %d:%.*s",
01176                                 i, *queues[i].pipe,
01177                                 (*queues[i].method).len,
01178                                 (*queues[i].method).s) < 0) goto error;
01179                 }
01180         }
01181 error:
01182         LOCK_RELEASE(rl_lock);
01183 }
01184 
01185 static void rpc_set_queue(rpc_t *rpc, void *c) {
01186         unsigned int queue_no = MAX_QUEUES, pipe_no = MAX_PIPES;
01187         str method, method_buf;
01188 
01189 
01190         if (rpc->scan(c, "dSd", &queue_no, &method, &pipe_no) < 3) return;
01191 
01192         if (pipe_no >= MAX_PIPES || pipe_no < 0) {
01193                 LM_ERR("Invalid pipe number: %d\n", pipe_no);
01194                 rpc->fault(c, 400, "Invalid pipe number");
01195                 return;
01196         }
01197 
01198         if (str_cpy(&method_buf, &method)) {
01199                 LM_ERR("out of memory\n");
01200                 rpc->fault(c, 400, "OOM");
01201                 return;
01202         }
01203 
01204         LOCK_GET(rl_lock);
01205         if (queue_no >= *nqueues) {
01206                 LM_ERR("MAX_QUEUES reached for queue: %d\n", queue_no);
01207                 rpc->fault(c, 400, "MAX_QUEUES reached");
01208                 LOCK_RELEASE(rl_lock);
01209                 return;
01210         }
01211 
01212         *queues[queue_no].pipe = pipe_no;
01213         if (!queues[queue_no].method->s)
01214                 shm_free(queues[queue_no].method->s);
01215         queues[queue_no].method->s = method_buf.s;
01216         queues[queue_no].method->len = method_buf.len;
01217         LOCK_RELEASE(rl_lock);
01218 }
01219 
01220 static void rpc_get_pid(rpc_t *rpc, void *c) {
01221         rpc->printf(c, "ki[%f] kp[%f] kd[%f] ", *pid_ki, *pid_kp, *pid_kd);
01222 }
01223 
01224 static void rpc_set_pid(rpc_t *rpc, void *c) {
01225         double ki, kp, kd;
01226 
01227         if (rpc->scan(c, "fff", &ki, &kp, &kd) < 3) return;
01228 
01229         LOCK_GET(rl_lock);
01230         *pid_ki = ki;
01231         *pid_kp = kp;
01232         *pid_kd = kd;
01233         LOCK_RELEASE(rl_lock);
01234 }
01235 
01236 
01237 static void rpc_push_load(rpc_t *rpc, void *c) {
01238         double value;
01239 
01240         if (rpc->scan(c, "f", &value) < 1) return;
01241 
01242         if (value < 0.0 || value > 1.0) {
01243                 LM_ERR("value out of range: %0.3f in not in [0.0,1.0]\n", value);
01244                 rpc->fault(c, 400, "Value out of range");
01245                 return;
01246         }
01247         LOCK_GET(rl_lock);
01248         *load_value = value;
01249         LOCK_RELEASE(rl_lock);
01250 
01251         do_update_load();
01252 }
01253 
01254 static void rpc_set_dbg(rpc_t *rpc, void *c) {
01255         int dbg_mode = 0;
01256 
01257         if (rpc->scan(c, "d", &dbg_mode) < 1) return;
01258 
01259         LOCK_GET(rl_lock);
01260         if (dbg_mode) {
01261                 if (!rl_dbg_str->s) {
01262                         rl_dbg_str->len = (MAX_PIPES * 5 * sizeof(char));
01263                         rl_dbg_str->s = (char *)shm_malloc(rl_dbg_str->len);
01264                         if (!rl_dbg_str->s) {
01265                                 rl_dbg_str->len = 0;
01266                                 LM_ERR("oom: %d\n", rl_dbg_str->len);
01267                         }
01268                 }
01269         } else {
01270                 if (rl_dbg_str->s) {
01271                         shm_free(rl_dbg_str->s);
01272                         rl_dbg_str->s = NULL;
01273                         rl_dbg_str->len = 0;
01274                 }
01275         }
01276         LOCK_RELEASE(rl_lock);
01277 }
01278 
01279 static rpc_export_t rpc_methods[] = {
01280         {"rl.stats",            rpc_stats,      rpc_stats_doc,          0},
01281         {"rl.get_pipes",        rpc_get_pipes,  rpc_get_pipes_doc,      0},
01282         {"rl.set_pipe",         rpc_set_pipe,   rpc_set_pipe_doc,       0},
01283         {"rl.get_queues",       rpc_get_queues, rpc_get_queues_doc,     0},
01284         {"rl.set_queue",        rpc_set_queue,  rpc_set_queue_doc,      0},
01285         {"rl.get_pid",          rpc_get_pid,    rpc_get_pid_doc,        0},
01286         {"rl.set_pid",          rpc_set_pid,    rpc_set_pid_doc,        0},
01287         {"rl.push_load",        rpc_push_load,  rpc_push_load_doc,      0},
01288         {"rl.set_dbg",          rpc_set_dbg,    rpc_set_dbg_doc,        0},
01289         {0, 0, 0, 0}
01290 };
01291