00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00048 #include <stdio.h>
00049 #include <string.h>
00050 #include <stdlib.h>
00051 #include <time.h>
00052
00053 #include "../../ut.h"
00054 #include "../../trim.h"
00055 #include "../../dprint.h"
00056 #include "../../action.h"
00057 #include "../../route.h"
00058 #include "../../dset.h"
00059 #include "../../mem/shm_mem.h"
00060 #include "../../parser/parse_uri.h"
00061 #include "../../parser/parse_from.h"
00062 #include "../../parser/parse_param.h"
00063 #include "../../usr_avp.h"
00064 #include "../../lib/kmi/mi.h"
00065 #include "../../parser/digest/digest.h"
00066 #include "../../resolve.h"
00067 #include "../../lvalue.h"
00068 #include "../../modules/tm/tm_load.h"
00069 #include "../../lib/srdb1/db.h"
00070 #include "../../lib/srdb1/db_res.h"
00071 #include "../../str.h"
00072 #include "../../script_cb.h"
00073 #include "../../lib/kcore/faked_msg.h"
00074
00075 #include "ds_ht.h"
00076 #include "api.h"
00077 #include "dispatch.h"
00078
00079 #define DS_TABLE_VERSION 1
00080 #define DS_TABLE_VERSION2 2
00081 #define DS_TABLE_VERSION3 3
00082 #define DS_TABLE_VERSION4 4
00083
00084 #define DS_ALG_RROBIN 4
00085 #define DS_ALG_LOAD 10
00086
00087 static int _ds_table_version = DS_TABLE_VERSION;
00088
00089 static ds_ht_t *_dsht_load = NULL;
00090
00091
00092 extern int ds_force_dst;
00093
00094 static db_func_t ds_dbf;
00095 static db1_con_t* ds_db_handle=0;
00096
00097 ds_set_t **ds_lists=NULL;
00098
00099 int *ds_list_nr = NULL;
00100 int *crt_idx = NULL;
00101 int *next_idx = NULL;
00102
00103 #define _ds_list (ds_lists[*crt_idx])
00104 #define _ds_list_nr (*ds_list_nr)
00105
00106 static void ds_run_route(struct sip_msg *msg, str *uri, char *route);
00107
00108 void destroy_list(int);
00109
00113 int ds_hash_load_init(unsigned int htsize, int expire, int initexpire)
00114 {
00115 if(_dsht_load != NULL)
00116 return 0;
00117 _dsht_load = ds_ht_init(htsize, expire, initexpire);
00118 if(_dsht_load == NULL)
00119 return -1;
00120 return 0;
00121 }
00122
00126 int ds_hash_load_destroy(void)
00127 {
00128 if(_dsht_load == NULL)
00129 return -1;
00130 ds_ht_destroy(_dsht_load);
00131 _dsht_load = NULL;
00132 return 0;
00133 }
00134
00138 int ds_print_sets(void)
00139 {
00140 ds_set_t *si = NULL;
00141 int i;
00142
00143 if(_ds_list==NULL)
00144 return -1;
00145
00146
00147 si = _ds_list;
00148 while(si)
00149 {
00150 for(i=0; i<si->nr; i++)
00151 {
00152 LM_DBG("dst>> %d %.*s %d %d (%.*s,%d,%d)\n", si->id,
00153 si->dlist[i].uri.len, si->dlist[i].uri.s,
00154 si->dlist[i].flags, si->dlist[i].priority,
00155 si->dlist[i].attrs.duid.len, si->dlist[i].attrs.duid.s,
00156 si->dlist[i].attrs.maxload,
00157 si->dlist[i].attrs.weight);
00158 }
00159 si = si->next;
00160 }
00161
00162 return 0;
00163 }
00164
00168 int init_data(void)
00169 {
00170 int * p;
00171
00172 ds_lists = (ds_set_t**)shm_malloc(2*sizeof(ds_set_t*));
00173 if(!ds_lists)
00174 {
00175 LM_ERR("Out of memory\n");
00176 return -1;
00177 }
00178 ds_lists[0] = ds_lists[1] = 0;
00179
00180
00181 p = (int*)shm_malloc(3*sizeof(int));
00182 if(!p)
00183 {
00184 LM_ERR("Out of memory\n");
00185 return -1;
00186 }
00187
00188 crt_idx = p;
00189 next_idx = p+1;
00190 ds_list_nr = p+2;
00191 *crt_idx= *next_idx = 0;
00192
00193 return 0;
00194 }
00195
00199 int ds_set_attrs(ds_dest_t *dest, str *attrs)
00200 {
00201 param_t* params_list = NULL;
00202 param_hooks_t phooks;
00203 param_t *pit=NULL;
00204 str param;
00205
00206 if(attrs==NULL || attrs->len<=0)
00207 return 0;
00208 if(attrs->s[attrs->len-1]==';')
00209 attrs->len--;
00210
00211 dest->attrs.body.s = (char*)shm_malloc(attrs->len+1);
00212 if(dest->attrs.body.s==NULL)
00213 {
00214 LM_ERR("no more shm\n");
00215 return -1;
00216 }
00217 memcpy(dest->attrs.body.s, attrs->s, attrs->len);
00218 dest->attrs.body.s[attrs->len] = '\0';
00219 dest->attrs.body.len = attrs->len;
00220
00221 param = dest->attrs.body;
00222 if (parse_params(¶m, CLASS_ANY, &phooks, ¶ms_list)<0)
00223 return -1;
00224 for (pit = params_list; pit; pit=pit->next)
00225 {
00226 if (pit->name.len==4
00227 && strncasecmp(pit->name.s, "duid", 4)==0) {
00228 dest->attrs.duid = pit->body;
00229 } else if(pit->name.len==6
00230 && strncasecmp(pit->name.s, "weight", 4)==0) {
00231 str2sint(&pit->body, &dest->attrs.weight);
00232 } else if(pit->name.len==7
00233 && strncasecmp(pit->name.s, "maxload", 7)==0) {
00234 str2sint(&pit->body, &dest->attrs.maxload);
00235 }
00236 }
00237 return 0;
00238 }
00239
00243 int add_dest2list(int id, str uri, int flags, int priority, str *attrs,
00244 int list_idx, int * setn)
00245 {
00246 ds_dest_t *dp = NULL;
00247 ds_set_t *sp = NULL;
00248 ds_dest_t *dp0 = NULL;
00249 ds_dest_t *dp1 = NULL;
00250
00251
00252 static char hn[256];
00253 struct hostent* he;
00254 struct sip_uri puri;
00255 int orig_id = 0, orig_nr = 0;
00256 ds_set_t *orig_ds_lists = ds_lists[list_idx];
00257
00258
00259 if(parse_uri(uri.s, uri.len, &puri)!=0 || puri.host.len>254)
00260 {
00261 LM_ERR("bad uri [%.*s]\n", uri.len, uri.s);
00262 goto err;
00263 }
00264
00265
00266 sp = ds_lists[list_idx];
00267 while(sp)
00268 {
00269 if(sp->id == id)
00270 break;
00271 sp = sp->next;
00272 }
00273
00274 if(sp==NULL)
00275 {
00276 sp = (ds_set_t*)shm_malloc(sizeof(ds_set_t));
00277 if(sp==NULL)
00278 {
00279 LM_ERR("no more memory.\n");
00280 goto err;
00281 }
00282
00283 memset(sp, 0, sizeof(ds_set_t));
00284 sp->next = ds_lists[list_idx];
00285 ds_lists[list_idx] = sp;
00286 *setn = *setn+1;
00287 }
00288 orig_id = sp->id;
00289 orig_nr = sp->nr;
00290 sp->id = id;
00291 sp->nr++;
00292
00293
00294 dp = (ds_dest_t*)shm_malloc(sizeof(ds_dest_t));
00295 if(dp==NULL)
00296 {
00297 LM_ERR("no more memory!\n");
00298 goto err;
00299 }
00300 memset(dp, 0, sizeof(ds_dest_t));
00301
00302 dp->uri.s = (char*)shm_malloc((uri.len+1)*sizeof(char));
00303 if(dp->uri.s==NULL)
00304 {
00305 LM_ERR("no more memory!\n");
00306 goto err;
00307 }
00308 strncpy(dp->uri.s, uri.s, uri.len);
00309 dp->uri.s[uri.len]='\0';
00310 dp->uri.len = uri.len;
00311 dp->flags = flags;
00312 dp->priority = priority;
00313
00314 if(ds_set_attrs(dp, attrs)<0)
00315 {
00316 LM_ERR("cannot set attributes!\n");
00317 goto err;
00318 }
00319
00320
00321
00322 strncpy(hn, puri.host.s, puri.host.len);
00323 hn[puri.host.len]='\0';
00324
00325
00326 he=resolvehost(hn);
00327 if (he==0)
00328 {
00329 LM_ERR("could not resolve %.*s\n", puri.host.len, puri.host.s);
00330 goto err;
00331 }
00332
00333 hostent2ip_addr(&dp->ip_address, he, 0);
00334
00335
00336 dp->port = puri.port_no;
00337
00338 if(sp->dlist==NULL)
00339 {
00340 sp->dlist = dp;
00341 } else {
00342 dp1 = NULL;
00343 dp0 = sp->dlist;
00344
00345 while(dp0) {
00346 if(dp0->priority > dp->priority)
00347 break;
00348 dp1 = dp0;
00349 dp0=dp0->next;
00350 }
00351 if(dp1==NULL)
00352 {
00353 dp->next = sp->dlist;
00354 sp->dlist = dp;
00355 } else {
00356 dp->next = dp1->next;
00357 dp1->next = dp;
00358 }
00359 }
00360
00361 LM_DBG("dest [%d/%d] <%.*s>\n", sp->id, sp->nr, dp->uri.len, dp->uri.s);
00362
00363 return 0;
00364 err:
00365
00366 if(dp!=NULL)
00367 {
00368 if(dp->uri.s!=NULL)
00369 shm_free(dp->uri.s);
00370 shm_free(dp);
00371 }
00372
00373 if (sp != NULL)
00374 {
00375 sp->id = orig_id;
00376 sp->nr = orig_nr;
00377 if (sp->nr == 0)
00378 {
00379 shm_free(sp);
00380 ds_lists[list_idx] = orig_ds_lists;
00381 }
00382 }
00383
00384 return -1;
00385 }
00386
00390 int dp_init_weights(ds_set_t *dset)
00391 {
00392 int j;
00393 int k;
00394 int t;
00395
00396 if(dset==NULL || dset->dlist==NULL)
00397 return -1;
00398
00399
00400 if(dset->dlist[0].attrs.weight==0)
00401 return 0;
00402
00403 t = 0;
00404 for(j=0; j<dset->nr; j++)
00405 {
00406 for(k=0; k<dset->dlist[j].attrs.weight; k++)
00407 {
00408 if(t>=100)
00409 goto randomize;
00410 dset->wlist[t] = (unsigned int)j;
00411 t++;
00412 }
00413 }
00414 j = (t-1>=0)?t-1:0;
00415 for(; t<100; t++)
00416 dset->wlist[t] = (unsigned int)j;
00417 randomize:
00418 srand(time(0));
00419 for (j=0; j<100; j++)
00420 {
00421 k = j + (rand() % (100-j));
00422 t = (int)dset->wlist[j];
00423 dset->wlist[j] = dset->wlist[k];
00424 dset->wlist[k] = (unsigned int)t;
00425 }
00426
00427 return 0;
00428 }
00429
00431 int reindex_dests(int list_idx, int setn)
00432 {
00433 int j;
00434 ds_set_t *sp = NULL;
00435 ds_dest_t *dp = NULL, *dp0= NULL;
00436
00437 for(sp = ds_lists[list_idx]; sp!= NULL; sp = sp->next)
00438 {
00439 dp0 = (ds_dest_t*)shm_malloc(sp->nr*sizeof(ds_dest_t));
00440 if(dp0==NULL)
00441 {
00442 LM_ERR("no more memory!\n");
00443 goto err1;
00444 }
00445 memset(dp0, 0, sp->nr*sizeof(ds_dest_t));
00446
00447
00448 for(j=sp->nr-1; j>=0 && sp->dlist!= NULL; j--)
00449 {
00450 memcpy(&dp0[j], sp->dlist, sizeof(ds_dest_t));
00451 if(j==sp->nr-1)
00452 dp0[j].next = NULL;
00453 else
00454 dp0[j].next = &dp0[j+1];
00455
00456
00457 dp = sp->dlist;
00458 sp->dlist = dp->next;
00459
00460 shm_free(dp);
00461 dp=NULL;
00462 }
00463 sp->dlist = dp0;
00464 dp_init_weights(sp);
00465 }
00466
00467 LM_DBG("found [%d] dest sets\n", setn);
00468 return 0;
00469
00470 err1:
00471 return -1;
00472 }
00473
00475 int ds_load_list(char *lfile)
00476 {
00477 char line[256], *p;
00478 FILE *f = NULL;
00479 int id, setn, flags, priority;
00480 str uri;
00481 str attrs;
00482
00483 if( (*crt_idx) != (*next_idx)) {
00484 LM_WARN("load command already generated, aborting reload...\n");
00485 return 0;
00486 }
00487
00488 if(lfile==NULL || strlen(lfile)<=0)
00489 {
00490 LM_ERR("bad list file\n");
00491 return -1;
00492 }
00493
00494 f = fopen(lfile, "r");
00495 if(f==NULL)
00496 {
00497 LM_ERR("can't open list file [%s]\n", lfile);
00498 return -1;
00499
00500 }
00501
00502 id = setn = flags = priority = 0;
00503
00504 *next_idx = (*crt_idx + 1)%2;
00505 destroy_list(*next_idx);
00506
00507 p = fgets(line, 256, f);
00508 while(p)
00509 {
00510
00511 while(*p && (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n'))
00512 p++;
00513 if(*p=='\0' || *p=='#')
00514 goto next_line;
00515
00516
00517 id = 0;
00518 while(*p>='0' && *p<='9')
00519 {
00520 id = id*10+ (*p-'0');
00521 p++;
00522 }
00523
00524
00525 while(*p && (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n'))
00526 p++;
00527 if(*p=='\0' || *p=='#')
00528 {
00529 LM_ERR("bad line [%s]\n", line);
00530 goto error;
00531 }
00532
00533
00534 uri.s = p;
00535 while(*p && *p!=' ' && *p!='\t' && *p!='\r' && *p!='\n' && *p!='#')
00536 p++;
00537 uri.len = p-uri.s;
00538
00539
00540 while(*p && (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n'))
00541 p++;
00542
00543
00544 flags = 0;
00545 priority = 0;
00546 attrs.s = 0; attrs.len = 0;
00547 if(*p=='\0' || *p=='#')
00548 goto add_destination;
00549
00550 while(*p>='0' && *p<='9')
00551 {
00552 flags = flags*10+ (*p-'0');
00553 p++;
00554 }
00555
00556
00557 while(*p && (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n'))
00558 p++;
00559
00560
00561 if(*p=='\0' || *p=='#')
00562 goto add_destination;
00563
00564 while(*p>='0' && *p<='9')
00565 {
00566 priority = priority*10+ (*p-'0');
00567 p++;
00568 }
00569
00570
00571 while(*p && (*p==' ' || *p=='\t' || *p=='\r' || *p=='\n'))
00572 p++;
00573 if(*p=='\0' || *p=='#')
00574 goto add_destination;
00575
00576
00577 attrs.s = p;
00578 while(*p && *p!=' ' && *p!='\t' && *p!='\r' && *p!='\n' && *p!='#')
00579 p++;
00580 attrs.len = p-attrs.s;
00581
00582 add_destination:
00583 if(add_dest2list(id, uri, flags, priority, &attrs,
00584 *next_idx, &setn) != 0)
00585 LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
00586 uri.len, uri.s, id);
00587 next_line:
00588 p = fgets(line, 256, f);
00589 }
00590
00591 if(reindex_dests(*next_idx, setn)!=0){
00592 LM_ERR("error on reindex\n");
00593 goto error;
00594 }
00595
00596 fclose(f);
00597 f = NULL;
00598
00599 _ds_list_nr = setn;
00600 *crt_idx = *next_idx;
00601 ds_ht_clear_slots(_dsht_load);
00602 ds_print_sets();
00603 return 0;
00604
00605 error:
00606 if(f!=NULL)
00607 fclose(f);
00608 destroy_list(*next_idx);
00609 *next_idx = *crt_idx;
00610 return -1;
00611 }
00612
00616 int ds_connect_db(void)
00617 {
00618 if(ds_db_url.s==NULL)
00619 return -1;
00620
00621 if((ds_db_handle = ds_dbf.init(&ds_db_url)) == 0) {
00622 LM_ERR("cannot initialize db connection\n");
00623 return -1;
00624 }
00625 return 0;
00626 }
00627
00631 void ds_disconnect_db(void)
00632 {
00633 if(ds_db_handle)
00634 {
00635 ds_dbf.close(ds_db_handle);
00636 ds_db_handle = 0;
00637 }
00638 }
00639
00641 int init_ds_db(void)
00642 {
00643 int ret;
00644
00645 if(ds_table_name.s == 0)
00646 {
00647 LM_ERR("invalid database name\n");
00648 return -1;
00649 }
00650
00651
00652 if (db_bind_mod(&ds_db_url, &ds_dbf) < 0)
00653 {
00654 LM_ERR("Unable to bind to a database driver\n");
00655 return -1;
00656 }
00657
00658 if(ds_connect_db()!=0){
00659
00660 LM_ERR("unable to connect to the database\n");
00661 return -1;
00662 }
00663
00664 _ds_table_version = db_table_version(&ds_dbf, ds_db_handle, &ds_table_name);
00665 if (_ds_table_version < 0)
00666 {
00667 LM_ERR("failed to query table version\n");
00668 return -1;
00669 } else if (_ds_table_version != DS_TABLE_VERSION
00670 && _ds_table_version != DS_TABLE_VERSION2
00671 && _ds_table_version != DS_TABLE_VERSION3
00672 && _ds_table_version != DS_TABLE_VERSION4) {
00673 LM_ERR("invalid table version (found %d , required %d, %d, %d or %d)\n"
00674 "(use kamdbctl reinit)\n",
00675 _ds_table_version, DS_TABLE_VERSION, DS_TABLE_VERSION2,
00676 DS_TABLE_VERSION3, DS_TABLE_VERSION4);
00677 return -1;
00678 }
00679
00680 ret = ds_load_db();
00681
00682 ds_disconnect_db();
00683
00684 return ret;
00685 }
00686
00688 int ds_load_db(void)
00689 {
00690 int i, id, nr_rows, setn;
00691 int flags;
00692 int priority;
00693 int nrcols;
00694 str uri;
00695 str attrs = {0, 0};
00696 db1_res_t * res;
00697 db_val_t * values;
00698 db_row_t * rows;
00699
00700 db_key_t query_cols[5] = {&ds_set_id_col, &ds_dest_uri_col,
00701 &ds_dest_flags_col, &ds_dest_priority_col,
00702 &ds_dest_attrs_col};
00703
00704 nrcols = 2;
00705 if(_ds_table_version == DS_TABLE_VERSION2)
00706 nrcols = 3;
00707 else if(_ds_table_version == DS_TABLE_VERSION3)
00708 nrcols = 4;
00709 else if(_ds_table_version == DS_TABLE_VERSION4)
00710 nrcols = 5;
00711
00712 if( (*crt_idx) != (*next_idx))
00713 {
00714 LM_WARN("load command already generated, aborting reload...\n");
00715 return 0;
00716 }
00717
00718 if(ds_db_handle == NULL){
00719 LM_ERR("invalid DB handler\n");
00720 return -1;
00721 }
00722
00723 if (ds_dbf.use_table(ds_db_handle, &ds_table_name) < 0)
00724 {
00725 LM_ERR("error in use_table\n");
00726 return -1;
00727 }
00728
00729
00730 if(ds_dbf.query(ds_db_handle,0,0,0,query_cols,0,nrcols,0,&res) < 0)
00731 {
00732 LM_ERR("error while querying database\n");
00733 return -1;
00734 }
00735
00736 nr_rows = RES_ROW_N(res);
00737 rows = RES_ROWS(res);
00738 if(nr_rows == 0)
00739 LM_WARN("no dispatching data in the db -- empty destination set\n");
00740
00741 setn = 0;
00742 *next_idx = (*crt_idx + 1)%2;
00743 destroy_list(*next_idx);
00744
00745 for(i=0; i<nr_rows; i++)
00746 {
00747 values = ROW_VALUES(rows+i);
00748
00749 id = VAL_INT(values);
00750 uri.s = VAL_STR(values+1).s;
00751 uri.len = strlen(uri.s);
00752 flags = 0;
00753 if(nrcols>=3)
00754 flags = VAL_INT(values+2);
00755 priority=0;
00756 if(nrcols>=4)
00757 priority = VAL_INT(values+3);
00758
00759 attrs.s = 0; attrs.len = 0;
00760 if(nrcols>=5)
00761 {
00762 attrs.s = VAL_STR(values+4).s;
00763 attrs.len = strlen(attrs.s);
00764 }
00765 if(add_dest2list(id, uri, flags, priority, &attrs,
00766 *next_idx, &setn) != 0)
00767 LM_WARN("unable to add destination %.*s to set %d -- skipping\n",
00768 uri.len, uri.s, id);
00769 }
00770 ds_dbf.free_result(ds_db_handle, res);
00771
00772 if(reindex_dests(*next_idx, setn)!=0)
00773 {
00774 LM_ERR("error on reindex\n");
00775 goto err2;
00776 }
00777
00778
00779 _ds_list_nr = setn;
00780 *crt_idx = *next_idx;
00781 ds_ht_clear_slots(_dsht_load);
00782
00783 ds_print_sets();
00784
00785 return 0;
00786
00787 err2:
00788 destroy_list(*next_idx);
00789 ds_dbf.free_result(ds_db_handle, res);
00790 *next_idx = *crt_idx;
00791
00792 return -1;
00793 }
00794
00796 int ds_destroy_list(void)
00797 {
00798 if (ds_lists) {
00799 destroy_list(0);
00800 destroy_list(1);
00801 shm_free(ds_lists);
00802 }
00803
00804 if (crt_idx)
00805 shm_free(crt_idx);
00806
00807 return 0;
00808 }
00809
00813 void destroy_list(int list_id)
00814 {
00815 ds_set_t *sp = NULL;
00816 ds_dest_t *dest = NULL;
00817
00818 sp = ds_lists[list_id];
00819
00820 while(sp)
00821 {
00822 for(dest = sp->dlist; dest!= NULL; dest=dest->next)
00823 {
00824 if(dest->uri.s!=NULL)
00825 {
00826 shm_free(dest->uri.s);
00827 dest->uri.s = NULL;
00828 }
00829 }
00830 if (sp->dlist != NULL)
00831 shm_free(sp->dlist);
00832 sp = sp->next;
00833 }
00834
00835 ds_lists[list_id] = NULL;
00836 }
00837
00841 unsigned int ds_get_hash(str *x, str *y)
00842 {
00843 char* p;
00844 register unsigned v;
00845 register unsigned h;
00846
00847 if(!x && !y)
00848 return 0;
00849 h=0;
00850 if(x)
00851 {
00852 p=x->s;
00853 if (x->len>=4)
00854 {
00855 for (; p<=(x->s+x->len-4); p+=4)
00856 {
00857 v=(*p<<24)+(p[1]<<16)+(p[2]<<8)+p[3];
00858 h+=v^(v>>3);
00859 }
00860 }
00861 v=0;
00862 for (;p<(x->s+x->len); p++)
00863 {
00864 v<<=8;
00865 v+=*p;
00866 }
00867 h+=v^(v>>3);
00868 }
00869 if(y)
00870 {
00871 p=y->s;
00872 if (y->len>=4)
00873 {
00874 for (; p<=(y->s+y->len-4); p+=4)
00875 {
00876 v=(*p<<24)+(p[1]<<16)+(p[2]<<8)+p[3];
00877 h+=v^(v>>3);
00878 }
00879 }
00880
00881 v=0;
00882 for (;p<(y->s+y->len); p++)
00883 {
00884 v<<=8;
00885 v+=*p;
00886 }
00887 h+=v^(v>>3);
00888 }
00889 h=((h)+(h>>11))+((h>>13)+(h>>23));
00890
00891 return (h)?h:1;
00892 }
00893
00894
00909 static inline int get_uri_hash_keys(str* key1, str* key2,
00910 str* uri, struct sip_uri* parsed_uri, int flags)
00911 {
00912 struct sip_uri tmp_p_uri;
00913
00914 if (parsed_uri==0)
00915 {
00916 if (parse_uri(uri->s, uri->len, &tmp_p_uri)<0)
00917 {
00918 LM_ERR("invalid uri %.*s\n", uri->len, uri->len?uri->s:"");
00919 goto error;
00920 }
00921 parsed_uri=&tmp_p_uri;
00922 }
00923
00924 if (parsed_uri->host.s==0)
00925 {
00926 LM_ERR("invalid uri, no host present: %.*s\n",
00927 uri->len, uri->len?uri->s:"");
00928 goto error;
00929 }
00930
00931
00932
00933
00934 *key1=parsed_uri->user;
00935 key2->s=0;
00936 key2->len=0;
00937 if (!(flags & DS_HASH_USER_ONLY))
00938 {
00939 *key2=parsed_uri->host;
00940
00941 if (parsed_uri->port.s!=0)
00942 {
00943
00944 if (parsed_uri->port_no !=
00945 ((parsed_uri->type==SIPS_URI_T)?SIPS_PORT:SIP_PORT))
00946 key2->len+=parsed_uri->port.len+1 ;
00947 }
00948 }
00949 if (key1->s==0)
00950 {
00951 LM_WARN("empty username in: %.*s\n", uri->len, uri->len?uri->s:"");
00952 }
00953 return 0;
00954 error:
00955 return -1;
00956 }
00957
00958
00962 int ds_hash_fromuri(struct sip_msg *msg, unsigned int *hash)
00963 {
00964 str from;
00965 str key1;
00966 str key2;
00967
00968 if(msg==NULL || hash == NULL)
00969 {
00970 LM_ERR("bad parameters\n");
00971 return -1;
00972 }
00973
00974 if(parse_from_header(msg)<0)
00975 {
00976 LM_ERR("cannot parse From hdr\n");
00977 return -1;
00978 }
00979
00980 if(msg->from==NULL || get_from(msg)==NULL)
00981 {
00982 LM_ERR("cannot get From uri\n");
00983 return -1;
00984 }
00985
00986 from = get_from(msg)->uri;
00987 trim(&from);
00988 if (get_uri_hash_keys(&key1, &key2, &from, 0, ds_flags)<0)
00989 return -1;
00990 *hash = ds_get_hash(&key1, &key2);
00991
00992 return 0;
00993 }
00994
00995
00999 int ds_hash_touri(struct sip_msg *msg, unsigned int *hash)
01000 {
01001 str to;
01002 str key1;
01003 str key2;
01004
01005 if(msg==NULL || hash == NULL)
01006 {
01007 LM_ERR("bad parameters\n");
01008 return -1;
01009 }
01010 if ((msg->to==0) && ((parse_headers(msg, HDR_TO_F, 0)==-1) ||
01011 (msg->to==0)))
01012 {
01013 LM_ERR("cannot parse To hdr\n");
01014 return -1;
01015 }
01016
01017
01018 to = get_to(msg)->uri;
01019 trim(&to);
01020
01021 if (get_uri_hash_keys(&key1, &key2, &to, 0, ds_flags)<0)
01022 return -1;
01023 *hash = ds_get_hash(&key1, &key2);
01024
01025 return 0;
01026 }
01027
01028
01032 int ds_hash_callid(struct sip_msg *msg, unsigned int *hash)
01033 {
01034 str cid;
01035 if(msg==NULL || hash == NULL)
01036 {
01037 LM_ERR("bad parameters\n");
01038 return -1;
01039 }
01040
01041 if(msg->callid==NULL && ((parse_headers(msg, HDR_CALLID_F, 0)==-1) ||
01042 (msg->callid==NULL)) )
01043 {
01044 LM_ERR("cannot parse Call-Id\n");
01045 return -1;
01046 }
01047
01048 cid.s = msg->callid->body.s;
01049 cid.len = msg->callid->body.len;
01050 trim(&cid);
01051
01052 *hash = ds_get_hash(&cid, NULL);
01053
01054 return 0;
01055 }
01056
01057
01061 int ds_hash_ruri(struct sip_msg *msg, unsigned int *hash)
01062 {
01063 str* uri;
01064 str key1;
01065 str key2;
01066
01067
01068 if(msg==NULL || hash == NULL)
01069 {
01070 LM_ERR("bad parameters\n");
01071 return -1;
01072 }
01073 if (parse_sip_msg_uri(msg)<0){
01074 LM_ERR("bad request uri\n");
01075 return -1;
01076 }
01077
01078 uri=GET_RURI(msg);
01079 if (get_uri_hash_keys(&key1, &key2, uri, &msg->parsed_uri, ds_flags)<0)
01080 return -1;
01081
01082 *hash = ds_get_hash(&key1, &key2);
01083 return 0;
01084 }
01085
01089 int ds_hash_authusername(struct sip_msg *msg, unsigned int *hash)
01090 {
01091
01092 struct hdr_field* h = 0;
01093
01094 str username = {0, 0};
01095
01096 auth_body_t* cred;
01097
01098 if(msg==NULL || hash == NULL)
01099 {
01100 LM_ERR("bad parameters\n");
01101 return -1;
01102 }
01103 if (parse_headers(msg, HDR_PROXYAUTH_F, 0) == -1)
01104 {
01105 LM_ERR("error parsing headers!\n");
01106 return -1;
01107 }
01108 if (msg->proxy_auth && !msg->proxy_auth->parsed)
01109 parse_credentials(msg->proxy_auth);
01110 if (msg->proxy_auth && msg->proxy_auth->parsed) {
01111 h = msg->proxy_auth;
01112 }
01113 if (!h)
01114 {
01115 if (parse_headers(msg, HDR_AUTHORIZATION_F, 0) == -1)
01116 {
01117 LM_ERR("error parsing headers!\n");
01118 return -1;
01119 }
01120 if (msg->authorization && !msg->authorization->parsed)
01121 parse_credentials(msg->authorization);
01122 if (msg->authorization && msg->authorization->parsed) {
01123 h = msg->authorization;
01124 }
01125 }
01126 if (!h)
01127 {
01128 LM_DBG("No Authorization-Header!\n");
01129 return 1;
01130 }
01131
01132 cred=(auth_body_t*)(h->parsed);
01133 if (!cred || !cred->digest.username.user.len)
01134 {
01135 LM_ERR("No Authorization-Username or Credentials!\n");
01136 return 1;
01137 }
01138
01139 username.s = cred->digest.username.user.s;
01140 username.len = cred->digest.username.user.len;
01141
01142 trim(&username);
01143
01144 *hash = ds_get_hash(&username, NULL);
01145
01146 return 0;
01147 }
01148
01149
01153 int ds_hash_pvar(struct sip_msg *msg, unsigned int *hash)
01154 {
01155
01156 str hash_str = {0, 0};
01157
01158 if(msg==NULL || hash == NULL || hash_param_model == NULL)
01159 {
01160 LM_ERR("bad parameters\n");
01161 return -1;
01162 }
01163 if (pv_printf_s(msg, hash_param_model, &hash_str)<0) {
01164 LM_ERR("error - cannot print the format\n");
01165 return -1;
01166 }
01167
01168
01169 trim(&hash_str);
01170 if (hash_str.len <= 0) {
01171 LM_ERR("String is empty!\n");
01172 return -1;
01173 }
01174 LM_DBG("Hashing %.*s!\n", hash_str.len, hash_str.s);
01175
01176 *hash = ds_get_hash(&hash_str, NULL);
01177
01178 return 0;
01179 }
01180
01184 static inline int ds_get_index(int group, ds_set_t **index)
01185 {
01186 ds_set_t *si = NULL;
01187
01188 if(index==NULL || group<0 || _ds_list==NULL)
01189 return -1;
01190
01191
01192 si = _ds_list;
01193 while(si)
01194 {
01195 if(si->id == group)
01196 {
01197 *index = si;
01198 break;
01199 }
01200 si = si->next;
01201 }
01202
01203 if(si==NULL)
01204 {
01205 LM_ERR("destination set [%d] not found\n", group);
01206 return -1;
01207 }
01208
01209 return 0;
01210 }
01211
01212
01216 int ds_get_leastloaded(ds_set_t *dset)
01217 {
01218 int j;
01219 int k;
01220 int t;
01221
01222 k = 0;
01223 t = dset->dlist[k].dload;
01224 for(j=1; j<dset->nr; j++)
01225 {
01226 if(!ds_skip_dst(dset->dlist[j].flags & DS_PROBING_DST))
01227 {
01228 if(dset->dlist[j].dload<t)
01229 {
01230 k = j;
01231 t = dset->dlist[k].dload;
01232 }
01233 }
01234 }
01235 return k;
01236 }
01237
01241 int ds_load_add(struct sip_msg *msg, ds_set_t *dset, int setid, int dst)
01242 {
01243 if(dset->dlist[dst].attrs.duid.len==0)
01244 {
01245 LM_ERR("dst unique id not set for %d (%.*s)\n", setid,
01246 msg->callid->body.len, msg->callid->body.s);
01247 return -1;
01248 }
01249
01250 if(ds_add_cell(_dsht_load, &msg->callid->body,
01251 &dset->dlist[dst].attrs.duid, setid)<0)
01252 {
01253 LM_ERR("cannot add load to %d (%.*s)\n", setid,
01254 msg->callid->body.len, msg->callid->body.s);
01255 return -1;
01256 }
01257 dset->dlist[dst].dload++;
01258 return 0;
01259 }
01260
01264 int ds_load_replace(struct sip_msg *msg, str *duid)
01265 {
01266 ds_cell_t *it;
01267 int set;
01268 int olddst;
01269 int newdst;
01270 ds_set_t *idx = NULL;
01271 int i;
01272
01273 if(duid->len<=0)
01274 {
01275 LM_ERR("invalid dst unique id not set for (%.*s)\n",
01276 msg->callid->body.len, msg->callid->body.s);
01277 return -1;
01278 }
01279
01280 if((it=ds_get_cell(_dsht_load, &msg->callid->body))==NULL)
01281 {
01282 LM_ERR("cannot find load for (%.*s)\n",
01283 msg->callid->body.len, msg->callid->body.s);
01284 return -1;
01285 }
01286 set = it->dset;
01287
01288 if(ds_get_index(set, &idx)!=0)
01289 {
01290 ds_unlock_cell(_dsht_load, &msg->callid->body);
01291 LM_ERR("destination set [%d] not found\n", set);
01292 return -1;
01293 }
01294 olddst = -1;
01295 newdst = -1;
01296 for(i=0; i<idx->nr; i++)
01297 {
01298 if(idx->dlist[i].attrs.duid.len==it->duid.len
01299 && strncasecmp(idx->dlist[i].attrs.duid.s, it->duid.s,
01300 it->duid.len)==0)
01301 {
01302 olddst = i;
01303 if(newdst!=-1)
01304 break;
01305 }
01306 if(idx->dlist[i].attrs.duid.len==duid->len
01307 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s,
01308 duid->len)==0)
01309 {
01310 newdst = i;
01311 if(olddst!=-1)
01312 break;
01313 }
01314 }
01315 if(olddst==-1)
01316 {
01317 ds_unlock_cell(_dsht_load, &msg->callid->body);
01318 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
01319 it->duid.len, it->duid.s);
01320 return -1;
01321 }
01322 if(newdst==-1)
01323 {
01324 ds_unlock_cell(_dsht_load, &msg->callid->body);
01325 LM_ERR("new destination address not found for [%d, %.*s]\n", set,
01326 duid->len, duid->s);
01327 return -1;
01328 }
01329
01330 ds_unlock_cell(_dsht_load, &msg->callid->body);
01331 ds_del_cell(_dsht_load, &msg->callid->body);
01332 if(idx->dlist[olddst].dload>0)
01333 idx->dlist[olddst].dload--;
01334
01335 if(ds_load_add(msg, idx, set, newdst)<0)
01336 {
01337 LM_ERR("unable to replace destination load [%.*s / %.*s]\n",
01338 duid->len, duid->s, msg->callid->body.len, msg->callid->body.s);
01339 return -1;
01340 }
01341 return 0;
01342 }
01343
01347 int ds_load_remove(struct sip_msg *msg)
01348 {
01349 ds_cell_t *it;
01350 int set;
01351 int olddst;
01352 ds_set_t *idx = NULL;
01353 int i;
01354
01355 if((it=ds_get_cell(_dsht_load, &msg->callid->body))==NULL)
01356 {
01357 LM_ERR("cannot find load for (%.*s)\n",
01358 msg->callid->body.len, msg->callid->body.s);
01359 return -1;
01360 }
01361 set = it->dset;
01362
01363 if(ds_get_index(set, &idx)!=0)
01364 {
01365 ds_unlock_cell(_dsht_load, &msg->callid->body);
01366 LM_ERR("destination set [%d] not found\n", set);
01367 return -1;
01368 }
01369 olddst = -1;
01370 for(i=0; i<idx->nr; i++)
01371 {
01372 if(idx->dlist[i].attrs.duid.len==it->duid.len
01373 && strncasecmp(idx->dlist[i].attrs.duid.s, it->duid.s,
01374 it->duid.len)==0)
01375 {
01376 olddst = i;
01377 break;
01378 }
01379 }
01380 if(olddst==-1)
01381 {
01382 ds_unlock_cell(_dsht_load, &msg->callid->body);
01383 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
01384 it->duid.len, it->duid.s);
01385 return -1;
01386 }
01387
01388 ds_unlock_cell(_dsht_load, &msg->callid->body);
01389 ds_del_cell(_dsht_load, &msg->callid->body);
01390 if(idx->dlist[olddst].dload>0)
01391 idx->dlist[olddst].dload--;
01392
01393 return 0;
01394 }
01395
01396
01400 int ds_load_remove_byid(int set, str *duid)
01401 {
01402 int olddst;
01403 ds_set_t *idx = NULL;
01404 int i;
01405
01406
01407 if(ds_get_index(set, &idx)!=0)
01408 {
01409 LM_ERR("destination set [%d] not found\n", set);
01410 return -1;
01411 }
01412 olddst = -1;
01413 for(i=0; i<idx->nr; i++)
01414 {
01415 if(idx->dlist[i].attrs.duid.len==duid->len
01416 && strncasecmp(idx->dlist[i].attrs.duid.s, duid->s,
01417 duid->len)==0)
01418 {
01419 olddst = i;
01420 break;
01421 }
01422 }
01423 if(olddst==-1)
01424 {
01425 LM_ERR("old destination address not found for [%d, %.*s]\n", set,
01426 duid->len, duid->s);
01427 return -1;
01428 }
01429
01430 if(idx->dlist[olddst].dload>0)
01431 idx->dlist[olddst].dload--;
01432
01433 return 0;
01434 }
01435
01439 int ds_load_state(struct sip_msg *msg, int state)
01440 {
01441 ds_cell_t *it;
01442
01443 if((it=ds_get_cell(_dsht_load, &msg->callid->body))==NULL)
01444 {
01445 LM_DBG("cannot find load for (%.*s)\n",
01446 msg->callid->body.len, msg->callid->body.s);
01447 return -1;
01448 }
01449
01450 it->state = state;
01451 ds_unlock_cell(_dsht_load, &msg->callid->body);
01452
01453 return 0;
01454 }
01455
01456
01460 int ds_load_update(struct sip_msg *msg)
01461 {
01462 if(parse_headers(msg, HDR_CSEQ_F|HDR_CALLID_F, 0)!=0
01463 || msg->cseq==NULL || msg->callid==NULL)
01464 {
01465 LM_ERR("cannot parse cseq and callid headers\n");
01466 return -1;
01467 }
01468 if(msg->first_line.type==SIP_REQUEST)
01469 {
01470 if(msg->first_line.u.request.method_value==METHOD_BYE
01471 || msg->first_line.u.request.method_value==METHOD_CANCEL)
01472 {
01473
01474 ds_load_remove(msg);
01475 }
01476 return 0;
01477 }
01478
01479 if(get_cseq(msg)->method_id==METHOD_INVITE)
01480 {
01481
01482 if(REPLY_CLASS(msg)==2)
01483 ds_load_state(msg, DS_LOAD_CONFIRMED);
01484 }
01485 return 0;
01486 }
01487
01491 int ds_load_unset(struct sip_msg *msg)
01492 {
01493 struct search_state st;
01494 struct usr_avp *prev_avp;
01495 int_str avp_value;
01496
01497 if(dstid_avp_name.n==0)
01498 return 0;
01499
01500
01501 if(msg->first_line.type==SIP_REQUEST
01502 && msg->first_line.u.request.method_value==METHOD_INVITE)
01503 {
01504 prev_avp = search_first_avp(dstid_avp_type, dstid_avp_name,
01505 &avp_value, &st);
01506 if(prev_avp==NULL)
01507 return 0;
01508 }
01509 return ds_load_remove(msg);
01510 }
01511
01515 static inline int ds_update_dst(struct sip_msg *msg, str *uri, int mode)
01516 {
01517 struct action act;
01518 struct run_act_ctx ra_ctx;
01519 switch(mode)
01520 {
01521 case 1:
01522 memset(&act, '\0', sizeof(act));
01523 act.type = SET_HOSTALL_T;
01524 act.val[0].type = STRING_ST;
01525 if(uri->len>4
01526 && strncasecmp(uri->s,"sip:",4)==0)
01527 act.val[0].u.string = uri->s+4;
01528 else
01529 act.val[0].u.string = uri->s;
01530 init_run_actions_ctx(&ra_ctx);
01531 if (do_action(&ra_ctx, &act, msg) < 0) {
01532 LM_ERR("error while setting host\n");
01533 return -1;
01534 }
01535 break;
01536 default:
01537 if (set_dst_uri(msg, uri) < 0) {
01538 LM_ERR("error while setting dst uri\n");
01539 return -1;
01540 }
01541
01542
01543 ruri_mark_new();
01544 break;
01545 }
01546 return 0;
01547 }
01548
01552 int ds_select_dst(struct sip_msg *msg, int set, int alg, int mode)
01553 {
01554 int i, cnt;
01555 unsigned int hash;
01556 int_str avp_val;
01557 ds_set_t *idx = NULL;
01558
01559 if(msg==NULL)
01560 {
01561 LM_ERR("bad parameters\n");
01562 return -1;
01563 }
01564
01565 if(_ds_list==NULL || _ds_list_nr<=0)
01566 {
01567 LM_ERR("no destination sets\n");
01568 return -1;
01569 }
01570
01571 if((mode==0) && (ds_force_dst==0)
01572 && (msg->dst_uri.s!=NULL || msg->dst_uri.len>0))
01573 {
01574 LM_ERR("destination already set [%.*s]\n", msg->dst_uri.len,
01575 msg->dst_uri.s);
01576 return -1;
01577 }
01578
01579
01580
01581 if(ds_get_index(set, &idx)!=0)
01582 {
01583 LM_ERR("destination set [%d] not found\n", set);
01584 return -1;
01585 }
01586
01587 LM_DBG("set [%d]\n", set);
01588
01589 hash = 0;
01590 switch(alg)
01591 {
01592 case 0:
01593 if(ds_hash_callid(msg, &hash)!=0)
01594 {
01595 LM_ERR("can't get callid hash\n");
01596 return -1;
01597 }
01598 break;
01599 case 1:
01600 if(ds_hash_fromuri(msg, &hash)!=0)
01601 {
01602 LM_ERR("can't get From uri hash\n");
01603 return -1;
01604 }
01605 break;
01606 case 2:
01607 if(ds_hash_touri(msg, &hash)!=0)
01608 {
01609 LM_ERR("can't get To uri hash\n");
01610 return -1;
01611 }
01612 break;
01613 case 3:
01614 if (ds_hash_ruri(msg, &hash)!=0)
01615 {
01616 LM_ERR("can't get ruri hash\n");
01617 return -1;
01618 }
01619 break;
01620 case DS_ALG_RROBIN:
01621 hash = idx->last;
01622 idx->last = (idx->last+1) % idx->nr;
01623 break;
01624 case 5:
01625 i = ds_hash_authusername(msg, &hash);
01626 switch (i)
01627 {
01628 case 0:
01629
01630 break;
01631 case 1:
01632
01633 hash = idx->last;
01634 idx->last = (idx->last+1) % idx->nr;
01635 break;
01636 default:
01637 LM_ERR("can't get authorization hash\n");
01638 return -1;
01639 break;
01640 }
01641 break;
01642 case 6:
01643 hash = rand() % idx->nr;
01644 break;
01645 case 7:
01646 if (ds_hash_pvar(msg, &hash)!=0)
01647 {
01648 LM_ERR("can't get PV hash\n");
01649 return -1;
01650 }
01651 break;
01652 case 8:
01653 hash = 0;
01654 break;
01655 case 9:
01656 hash = idx->wlist[idx->wlast];
01657 idx->wlast = (idx->wlast+1) % 100;
01658 break;
01659 case DS_ALG_LOAD:
01660
01661 if(msg->first_line.u.request.method_value!=METHOD_INVITE)
01662 {
01663
01664 hash = 0;
01665 alg = 0;
01666 break;
01667 }
01668 if(dstid_avp_name.n==0)
01669 {
01670 LM_ERR("no dst ID avp for load distribution"
01671 " - using first entry...\n");
01672 hash = 0;
01673 alg = 0;
01674 } else {
01675 hash = ds_get_leastloaded(idx);
01676 if(ds_load_add(msg, idx, set, hash)<0)
01677 {
01678 LM_ERR("unable to update destination load"
01679 " - classic dispatching\n");
01680 alg = 0;
01681 }
01682 }
01683 break;
01684 default:
01685 LM_WARN("algo %d not implemented - using first entry...\n", alg);
01686 hash = 0;
01687 }
01688
01689 LM_DBG("alg hash [%u]\n", hash);
01690 cnt = 0;
01691
01692 if(ds_use_default!=0 && idx->nr!=1)
01693 hash = hash%(idx->nr-1);
01694 else
01695 hash = hash%idx->nr;
01696 i=hash;
01697 while (ds_skip_dst(idx->dlist[i].flags))
01698 {
01699 if(ds_use_default!=0 && idx->nr!=1)
01700 i = (i+1)%(idx->nr-1);
01701 else
01702 i = (i+1)%idx->nr;
01703 if(i==hash)
01704 {
01705
01706 if(ds_use_default!=0)
01707 {
01708 i = idx->nr-1;
01709 if(ds_skip_dst(idx->dlist[i].flags))
01710 return -1;
01711 break;
01712 } else {
01713 return -1;
01714 }
01715 }
01716 }
01717
01718 hash = i;
01719
01720 if(ds_update_dst(msg, &idx->dlist[hash].uri, mode)!=0)
01721 {
01722 LM_ERR("cannot set dst addr\n");
01723 return -1;
01724 }
01725
01726 if(alg==DS_ALG_RROBIN)
01727 idx->last = (hash+1) % idx->nr;
01728
01729 LM_DBG("selected [%d-%d/%d] <%.*s>\n", alg, set, hash,
01730 idx->dlist[hash].uri.len, idx->dlist[hash].uri.s);
01731
01732 if(!(ds_flags&DS_FAILOVER_ON))
01733 return 1;
01734
01735 if(dst_avp_name.n!=0)
01736 {
01737
01738 if(ds_use_default!=0 && hash!=idx->nr-1)
01739 {
01740 avp_val.s = idx->dlist[idx->nr-1].uri;
01741 if(add_avp(AVP_VAL_STR|dst_avp_type, dst_avp_name, avp_val)!=0)
01742 return -1;
01743
01744 if(attrs_avp_name.n!=0 && idx->dlist[idx->nr-1].attrs.body.len>0)
01745 {
01746 avp_val.s = idx->dlist[idx->nr-1].attrs.body;
01747 if(add_avp(AVP_VAL_STR|attrs_avp_type, attrs_avp_name,
01748 avp_val)!=0)
01749 return -1;
01750 }
01751 if(alg==DS_ALG_LOAD)
01752 {
01753 if(idx->dlist[idx->nr-1].attrs.duid.len<=0)
01754 {
01755 LM_ERR("no uid for destination: %d %.*s\n", set,
01756 idx->dlist[idx->nr-1].uri.len,
01757 idx->dlist[idx->nr-1].uri.s);
01758 return -1;
01759 }
01760 avp_val.s = idx->dlist[idx->nr-1].attrs.duid;
01761 if(add_avp(AVP_VAL_STR|dstid_avp_type, dstid_avp_name,
01762 avp_val)!=0)
01763 return -1;
01764 }
01765 cnt++;
01766 }
01767
01768
01769
01770 for(i=hash-1; i>=0; i--)
01771 {
01772 if(ds_skip_dst(idx->dlist[i].flags)
01773 || (ds_use_default!=0 && i==(idx->nr-1)))
01774 continue;
01775 LM_DBG("using entry [%d/%d]\n", set, i);
01776 avp_val.s = idx->dlist[i].uri;
01777 if(add_avp(AVP_VAL_STR|dst_avp_type, dst_avp_name, avp_val)!=0)
01778 return -1;
01779
01780 if(attrs_avp_name.n!=0 && idx->dlist[i].attrs.body.len>0)
01781 {
01782 avp_val.s = idx->dlist[i].attrs.body;
01783 if(add_avp(AVP_VAL_STR|attrs_avp_type, attrs_avp_name,
01784 avp_val)!=0)
01785 return -1;
01786 }
01787 if(alg==DS_ALG_LOAD)
01788 {
01789 if(idx->dlist[i].attrs.duid.len<=0)
01790 {
01791 LM_ERR("no uid for destination: %d %.*s\n", set,
01792 idx->dlist[i].uri.len,
01793 idx->dlist[i].uri.s);
01794 return -1;
01795 }
01796 avp_val.s = idx->dlist[i].attrs.duid;
01797 if(add_avp(AVP_VAL_STR|dstid_avp_type, dstid_avp_name,
01798 avp_val)!=0)
01799 return -1;
01800 }
01801 cnt++;
01802 }
01803
01804 for(i=idx->nr-1; i>hash; i--)
01805 {
01806 if(ds_skip_dst(idx->dlist[i].flags)
01807 || (ds_use_default!=0 && i==(idx->nr-1)))
01808 continue;
01809 LM_DBG("using entry [%d/%d]\n", set, i);
01810 avp_val.s = idx->dlist[i].uri;
01811 if(add_avp(AVP_VAL_STR|dst_avp_type, dst_avp_name, avp_val)!=0)
01812 return -1;
01813
01814 if(attrs_avp_name.n!=0 && idx->dlist[i].attrs.body.len>0)
01815 {
01816 avp_val.s = idx->dlist[i].attrs.body;
01817 if(add_avp(AVP_VAL_STR|attrs_avp_type, attrs_avp_name,
01818 avp_val)!=0)
01819 return -1;
01820 }
01821 if(alg==DS_ALG_LOAD)
01822 {
01823 if(idx->dlist[i].attrs.duid.len<=0)
01824 {
01825 LM_ERR("no uid for destination: %d %.*s\n", set,
01826 idx->dlist[i].uri.len,
01827 idx->dlist[i].uri.s);
01828 return -1;
01829 }
01830 avp_val.s = idx->dlist[i].attrs.duid;
01831 if(add_avp(AVP_VAL_STR|dstid_avp_type, dstid_avp_name,
01832 avp_val)!=0)
01833 return -1;
01834 }
01835 cnt++;
01836 }
01837
01838
01839 avp_val.s = idx->dlist[hash].uri;
01840 if(add_avp(AVP_VAL_STR|dst_avp_type, dst_avp_name, avp_val)!=0)
01841 return -1;
01842
01843 if(attrs_avp_name.n!=0 && idx->dlist[hash].attrs.body.len>0)
01844 {
01845 avp_val.s = idx->dlist[hash].attrs.body;
01846 if(add_avp(AVP_VAL_STR|attrs_avp_type, attrs_avp_name,
01847 avp_val)!=0)
01848 return -1;
01849 }
01850 if(alg==DS_ALG_LOAD)
01851 {
01852 if(idx->dlist[hash].attrs.duid.len<=0)
01853 {
01854 LM_ERR("no uid for destination: %d %.*s\n", set,
01855 idx->dlist[hash].uri.len,
01856 idx->dlist[hash].uri.s);
01857 return -1;
01858 }
01859 avp_val.s = idx->dlist[hash].attrs.duid;
01860 if(add_avp(AVP_VAL_STR|dstid_avp_type, dstid_avp_name,
01861 avp_val)!=0)
01862 return -1;
01863 }
01864 cnt++;
01865 }
01866
01867 if(grp_avp_name.n!=0)
01868 {
01869
01870 avp_val.n = set;
01871 if(add_avp(grp_avp_type, grp_avp_name, avp_val)!=0)
01872 return -1;
01873 }
01874
01875 if(cnt_avp_name.n!=0)
01876 {
01877
01878 avp_val.n = cnt;
01879 if(add_avp(cnt_avp_type, cnt_avp_name, avp_val)!=0)
01880 return -1;
01881 }
01882
01883 return 1;
01884 }
01885
01886 int ds_next_dst(struct sip_msg *msg, int mode)
01887 {
01888 struct search_state st;
01889 struct usr_avp *avp;
01890 struct usr_avp *prev_avp;
01891 int_str avp_value;
01892 int alg = 0;
01893
01894 if(!(ds_flags&DS_FAILOVER_ON) || dst_avp_name.n==0)
01895 {
01896 LM_WARN("failover support disabled\n");
01897 return -1;
01898 }
01899
01900 if(dstid_avp_name.n!=0)
01901 {
01902 prev_avp = search_first_avp(dstid_avp_type, dstid_avp_name,
01903 &avp_value, &st);
01904 if(prev_avp!=NULL)
01905 {
01906
01907 alg = DS_ALG_LOAD;
01908
01909 destroy_avp(prev_avp);
01910 }
01911 }
01912
01913 if(attrs_avp_name.n!=0)
01914 {
01915 prev_avp = search_first_avp(attrs_avp_type,
01916 attrs_avp_name, &avp_value, &st);
01917 if(prev_avp!=NULL)
01918 {
01919 destroy_avp(prev_avp);
01920 }
01921 }
01922
01923 prev_avp = search_first_avp(dst_avp_type, dst_avp_name, &avp_value, &st);
01924 if(prev_avp==NULL)
01925 return -1;
01926
01927 avp = search_next_avp(&st, &avp_value);
01928 destroy_avp(prev_avp);
01929 if(avp==NULL || !(avp->flags&AVP_VAL_STR))
01930 return -1;
01931
01932 if(ds_update_dst(msg, &avp_value.s, mode)!=0)
01933 {
01934 LM_ERR("cannot set dst addr\n");
01935 return -1;
01936 }
01937 LM_DBG("using [%.*s]\n", avp_value.s.len, avp_value.s.s);
01938 if(alg==DS_ALG_LOAD)
01939 {
01940 prev_avp = search_first_avp(dstid_avp_type, dstid_avp_name,
01941 &avp_value, &st);
01942 if(prev_avp==NULL)
01943 {
01944 LM_ERR("cannot find uid avp for destination address\n");
01945 return -1;
01946 }
01947 if(ds_load_replace(msg, &avp_value.s)<0)
01948 {
01949 LM_ERR("cannot update load distribution\n");
01950 return -1;
01951 }
01952 }
01953
01954 return 1;
01955 }
01956
01957 int ds_mark_dst(struct sip_msg *msg, int state)
01958 {
01959 int group, ret;
01960 struct usr_avp *prev_avp;
01961 int_str avp_value;
01962
01963 if(!(ds_flags&DS_FAILOVER_ON))
01964 {
01965 LM_WARN("failover support disabled\n");
01966 return -1;
01967 }
01968
01969 prev_avp = search_first_avp(grp_avp_type, grp_avp_name, &avp_value, 0);
01970
01971 if(prev_avp==NULL || prev_avp->flags&AVP_VAL_STR)
01972 return -1;
01973 group = avp_value.n;
01974
01975 prev_avp = search_first_avp(dst_avp_type, dst_avp_name, &avp_value, 0);
01976
01977 if(prev_avp==NULL || !(prev_avp->flags&AVP_VAL_STR))
01978 return -1;
01979
01980 ret = ds_update_state(msg, group, &avp_value.s, state);
01981
01982 LM_DBG("state [%d] grp [%d] dst [%.*s]\n", state, group, avp_value.s.len,
01983 avp_value.s.s);
01984
01985 return (ret==0)?1:-1;
01986 }
01987
01991 int ds_update_state(sip_msg_t *msg, int group, str *address, int state)
01992 {
01993 int i=0;
01994 int old_state = 0;
01995 ds_set_t *idx = NULL;
01996
01997 if(_ds_list==NULL || _ds_list_nr<=0)
01998 {
01999 LM_ERR("the list is null\n");
02000 return -1;
02001 }
02002
02003
02004 if(ds_get_index(group, &idx)!=0)
02005 {
02006 LM_ERR("destination set [%d] not found\n", group);
02007 return -1;
02008 }
02009
02010 while(i<idx->nr)
02011 {
02012 if(idx->dlist[i].uri.len==address->len
02013 && strncasecmp(idx->dlist[i].uri.s, address->s,
02014 address->len)==0)
02015 {
02016
02017 old_state = idx->dlist[i].flags;
02018
02019
02020 idx->dlist[i].flags &= ~(DS_STATES_ALL);
02021
02022 if((state & DS_TRYING_DST) && (old_state & DS_INACTIVE_DST))
02023 {
02024
02025
02026 state &= ~(DS_TRYING_DST);
02027 state |= DS_INACTIVE_DST;
02028 }
02029
02030
02031 if(state & DS_DISABLED_DST)
02032 {
02033 idx->dlist[i].flags |= DS_DISABLED_DST;
02034 } else {
02035 idx->dlist[i].flags |= state;
02036 }
02037
02038 if(state & DS_TRYING_DST)
02039 {
02040 idx->dlist[i].failure_count++;
02041 if (idx->dlist[i].failure_count >= probing_threshhold)
02042 {
02043 idx->dlist[i].flags &= ~DS_TRYING_DST;
02044 idx->dlist[i].flags |= DS_INACTIVE_DST;
02045 idx->dlist[i].failure_count = 0;
02046 }
02047 } else {
02048 idx->dlist[i].failure_count = 0;
02049 }
02050
02051 if (!ds_skip_dst(old_state) && ds_skip_dst(idx->dlist[i].flags))
02052 {
02053 ds_run_route(msg, address, "dispatcher:dst-down");
02054
02055 } else {
02056 if(ds_skip_dst(old_state) && !ds_skip_dst(idx->dlist[i].flags))
02057 ds_run_route(msg, address, "dispatcher:dst-up");
02058 }
02059
02060 return 0;
02061 }
02062 i++;
02063 }
02064
02065 return -1;
02066 }
02067
02068 static void ds_run_route(sip_msg_t *msg, str *uri, char *route)
02069 {
02070 int rt, backup_rt;
02071 struct run_act_ctx ctx;
02072 sip_msg_t *fmsg;
02073
02074 if (route == NULL)
02075 {
02076 LM_ERR("bad route\n");
02077 return;
02078 }
02079
02080 LM_DBG("ds_run_route event_route[%s]\n", route);
02081
02082 rt = route_get(&event_rt, route);
02083 if (rt < 0 || event_rt.rlist[rt] == NULL)
02084 {
02085 LM_DBG("route does not exist");
02086 return;
02087 }
02088
02089 if(msg==NULL)
02090 {
02091 if (faked_msg_init() < 0)
02092 {
02093 LM_ERR("faked_msg_init() failed\n");
02094 return;
02095 }
02096 fmsg = faked_msg_next();
02097 fmsg->parsed_orig_ruri_ok = 0;
02098 fmsg->new_uri = *uri;
02099 } else {
02100 fmsg = msg;
02101 }
02102
02103 backup_rt = get_route_type();
02104 set_route_type(REQUEST_ROUTE);
02105 init_run_actions_ctx(&ctx);
02106 run_top_route(event_rt.rlist[rt], fmsg, 0);
02107 set_route_type(backup_rt);
02108 }
02109
02113 int ds_reinit_state(int group, str *address, int state)
02114 {
02115 int i=0;
02116 ds_set_t *idx = NULL;
02117
02118 if(_ds_list==NULL || _ds_list_nr<=0)
02119 {
02120 LM_ERR("the list is null\n");
02121 return -1;
02122 }
02123
02124
02125 if(ds_get_index(group, &idx)!=0)
02126 {
02127 LM_ERR("destination set [%d] not found\n", group);
02128 return -1;
02129 }
02130
02131 for(i=0; i<idx->nr; i++)
02132 {
02133 if(idx->dlist[i].uri.len==address->len
02134 && strncasecmp(idx->dlist[i].uri.s, address->s,
02135 address->len)==0)
02136 {
02137
02138 idx->dlist[i].flags &= ~(DS_STATES_ALL);
02139
02140 idx->dlist[i].flags |= state;
02141 return 0;
02142 }
02143 }
02144 LM_ERR("destination address [%d : %.*s] not found\n", group,
02145 address->len, address->s);
02146 return -1;
02147 }
02151 int ds_print_list(FILE *fout)
02152 {
02153 int j;
02154 ds_set_t *list;
02155
02156 if(_ds_list==NULL || _ds_list_nr<=0)
02157 {
02158 LM_ERR("no destination sets\n");
02159 return -1;
02160 }
02161
02162 fprintf(fout, "\nnumber of destination sets: %d\n", _ds_list_nr);
02163
02164 for(list = _ds_list; list!= NULL; list= list->next)
02165 {
02166 for(j=0; j<list->nr; j++)
02167 {
02168 fprintf(fout, "\n set #%d\n", list->id);
02169
02170 if (list->dlist[j].flags&DS_DISABLED_DST)
02171 fprintf(fout, " Disabled ");
02172 else if (list->dlist[j].flags&DS_INACTIVE_DST)
02173 fprintf(fout, " Inactive ");
02174 else if (list->dlist[j].flags&DS_TRYING_DST) {
02175 fprintf(fout, " Trying");
02176
02177 if (list->dlist[j].failure_count > 0) {
02178 fprintf(fout, " (Fail %d/%d)",
02179 list->dlist[j].failure_count,
02180 probing_threshhold);
02181 } else {
02182 fprintf(fout, " ");
02183 }
02184
02185 } else {
02186 fprintf(fout, " Active ");
02187 }
02188 if (list->dlist[j].flags&DS_PROBING_DST)
02189 fprintf(fout, "(P)");
02190 else
02191 fprintf(fout, "(*)");
02192
02193 fprintf(fout, " %.*s\n",
02194 list->dlist[j].uri.len, list->dlist[j].uri.s);
02195 }
02196 }
02197 return 0;
02198 }
02199
02200
02201
02202
02203
02204 int ds_is_from_list(struct sip_msg *_m, int group)
02205 {
02206 pv_value_t val;
02207 ds_set_t *list;
02208 int j;
02209
02210 memset(&val, 0, sizeof(pv_value_t));
02211 val.flags = PV_VAL_INT|PV_TYPE_INT;
02212
02213 for(list = _ds_list; list!= NULL; list= list->next)
02214 {
02215
02216 if ((group == -1) || (group == list->id))
02217 {
02218 for(j=0; j<list->nr; j++)
02219 {
02220
02221 if (ip_addr_cmp(&_m->rcv.src_ip, &list->dlist[j].ip_address)
02222 && (list->dlist[j].port==0
02223 || _m->rcv.src_port == list->dlist[j].port))
02224 {
02225 if(group==-1 && ds_setid_pvname.s!=0)
02226 {
02227 val.ri = list->id;
02228 if(ds_setid_pv.setf(_m, &ds_setid_pv.pvp,
02229 (int)EQ_T, &val)<0)
02230 {
02231 LM_ERR("setting PV failed\n");
02232 return -2;
02233 }
02234 }
02235 return 1;
02236 }
02237 }
02238 }
02239 }
02240 return -1;
02241 }
02242
02243
02244 int ds_print_mi_list(struct mi_node* rpl)
02245 {
02246 int len, j;
02247 char* p;
02248 char c[3];
02249 str data;
02250 ds_set_t *list;
02251 struct mi_node* node = NULL;
02252 struct mi_node* set_node = NULL;
02253 struct mi_attr* attr = NULL;
02254
02255 if(_ds_list==NULL || _ds_list_nr<=0)
02256 {
02257 LM_ERR("no destination sets\n");
02258 return 0;
02259 }
02260
02261 p= int2str(_ds_list_nr, &len);
02262 node = add_mi_node_child(rpl, MI_DUP_VALUE, "SET_NO",6, p, len);
02263 if(node== NULL)
02264 return -1;
02265
02266 for(list = _ds_list; list!= NULL; list= list->next)
02267 {
02268 p = int2str(list->id, &len);
02269 set_node= add_mi_node_child(rpl, MI_DUP_VALUE,"SET", 3, p, len);
02270 if(set_node == NULL)
02271 return -1;
02272
02273 for(j=0; j<list->nr; j++)
02274 {
02275 node= add_mi_node_child(set_node, 0, "URI", 3,
02276 list->dlist[j].uri.s, list->dlist[j].uri.len);
02277 if(node == NULL)
02278 return -1;
02279
02280 memset(&c, 0, sizeof(c));
02281 if (list->dlist[j].flags & DS_INACTIVE_DST)
02282 c[0] = 'I';
02283 else if (list->dlist[j].flags & DS_DISABLED_DST)
02284 c[0] = 'D';
02285 else if (list->dlist[j].flags & DS_TRYING_DST)
02286 c[0] = 'T';
02287 else
02288 c[0] = 'A';
02289
02290 if (list->dlist[j].flags & DS_PROBING_DST)
02291 c[1] = 'P';
02292 else
02293 c[1] = 'X';
02294
02295 attr = add_mi_attr (node, MI_DUP_VALUE, "flags", 5, c, 2);
02296 if(attr == 0)
02297 return -1;
02298
02299 data.s = int2str(list->dlist[j].priority, &data.len);
02300 attr = add_mi_attr (node, MI_DUP_VALUE, "priority", 8,
02301 data.s, data.len);
02302 if(attr == 0)
02303 return -1;
02304 attr = add_mi_attr (node, MI_DUP_VALUE, "attrs", 5,
02305 (list->dlist[j].attrs.body.s)?list->dlist[j].attrs.body.s:"",
02306 list->dlist[j].attrs.body.len);
02307 if(attr == 0)
02308 return -1;
02309 }
02310 }
02311
02312 return 0;
02313 }
02314
02320 static void ds_options_callback( struct cell *t, int type,
02321 struct tmcb_params *ps )
02322 {
02323 int group = 0;
02324 str uri = {0, 0};
02325 sip_msg_t *fmsg;
02326 int state;
02327
02328
02329
02330 if (!*ps->param)
02331 {
02332 LM_DBG("No parameter provided, OPTIONS-Request was finished"
02333 " with code %d\n", ps->code);
02334 return;
02335 }
02336
02337 fmsg = NULL;
02338
02339
02340
02341 group = (int)(long)(*ps->param);
02342
02343
02344
02345 uri.s = t->to.s + 4;
02346 uri.len = t->to.len - 6;
02347 LM_DBG("OPTIONS-Request was finished with code %d (to %.*s, group %d)\n",
02348 ps->code, uri.len, uri.s, group);
02349
02350
02351
02352 if((ps->code>=200 && ps->code<=299) || ds_ping_check_rplcode(ps->code))
02353 {
02354
02355 state = 0;
02356 if (ds_probing_mode==DS_PROBE_ALL)
02357 state |= DS_PROBING_DST;
02358 if (ds_update_state(fmsg, group, &uri, state) != 0)
02359 {
02360 LM_ERR("Setting the state failed (%.*s, group %d)\n", uri.len,
02361 uri.s, group);
02362 }
02363 } else {
02364 state = DS_TRYING_DST;
02365 if (ds_probing_mode!=DS_PROBE_NONE)
02366 state |= DS_PROBING_DST;
02367
02368 if (ds_update_state(fmsg, group, &uri, state) != 0)
02369 {
02370 LM_ERR("Setting the probing state failed (%.*s, group %d)\n",
02371 uri.len, uri.s, group);
02372 }
02373 }
02374
02375 return;
02376 }
02377
02383 void ds_check_timer(unsigned int ticks, void* param)
02384 {
02385 int j;
02386 ds_set_t *list;
02387 uac_req_t uac_r;
02388
02389
02390 if(_ds_list==NULL || _ds_list_nr<=0)
02391 {
02392 LM_DBG("no destination sets\n");
02393 return;
02394 }
02395
02396
02397 for(list = _ds_list; list!= NULL; list= list->next)
02398 {
02399 for(j=0; j<list->nr; j++)
02400 {
02401
02402 if((list->dlist[j].flags&DS_DISABLED_DST) != 0)
02403 continue;
02404
02405 if (ds_probing_mode==DS_PROBE_ALL ||
02406 (list->dlist[j].flags&DS_PROBING_DST) != 0)
02407 {
02408 LM_DBG("probing set #%d, URI %.*s\n", list->id,
02409 list->dlist[j].uri.len, list->dlist[j].uri.s);
02410
02411
02412
02413
02414
02415 set_uac_req(&uac_r, &ds_ping_method, 0, 0, 0,
02416 TMCB_LOCAL_COMPLETED, ds_options_callback,
02417 (void*)(long)list->id);
02418 if (tmb.t_request(&uac_r,
02419 &list->dlist[j].uri,
02420 &list->dlist[j].uri,
02421 &ds_ping_from,
02422 &ds_outbound_proxy) < 0) {
02423 LM_ERR("unable to ping [%.*s]\n",
02424 list->dlist[j].uri.len, list->dlist[j].uri.s);
02425 }
02426 }
02427 }
02428 }
02429 }
02430
02436 void ds_ht_timer(unsigned int ticks, void *param)
02437 {
02438 ds_cell_t *it;
02439 ds_cell_t *it0;
02440 time_t now;
02441 int i;
02442
02443 if(_dsht_load==NULL)
02444 return;
02445
02446 now = time(NULL);
02447
02448 for(i=0; i<_dsht_load->htsize; i++)
02449 {
02450
02451 lock_get(&_dsht_load->entries[i].lock);
02452 it = _dsht_load->entries[i].first;
02453 while(it)
02454 {
02455 it0 = it->next;
02456 if((it->expire!=0 && it->expire<now)
02457 || (it->state==DS_LOAD_INIT
02458 && it->initexpire!=0 && it->initexpire<now))
02459 {
02460
02461 if(it->prev==NULL)
02462 _dsht_load->entries[i].first = it->next;
02463 else
02464 it->prev->next = it->next;
02465 if(it->next)
02466 it->next->prev = it->prev;
02467 _dsht_load->entries[i].esize--;
02468
02469
02470 ds_load_remove_byid(it->dset, &it->duid);
02471
02472 ds_cell_free(it);
02473 }
02474 it = it0;
02475 }
02476 lock_release(&_dsht_load->entries[i].lock);
02477 }
02478 return;
02479 }
02480
02481 int bind_dispatcher(dispatcher_api_t* api)
02482 {
02483 if (!api) {
02484 ERR("Invalid parameter value\n");
02485 return -1;
02486 }
02487 api->select = ds_select_dst;
02488 api->next = ds_next_dst;
02489 api->mark = ds_mark_dst;
02490 api->is_from = ds_is_from_list;
02491 return 0;
02492 }
02493
02494
02495 ds_set_t *ds_get_list(void)
02496 {
02497 return _ds_list;
02498 }
02499
02500 int ds_get_list_nr(void)
02501 {
02502 return _ds_list_nr;
02503 }