00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046 #include <stdio.h>
00047 #include <string.h>
00048 #include <stdlib.h>
00049 #include <sys/types.h>
00050 #include <sys/wait.h>
00051 #include <sys/ipc.h>
00052 #include <unistd.h>
00053 #include <fcntl.h>
00054 #include <errno.h>
00055
00056 #include "../../sr_module.h"
00057 #include "../../error.h"
00058 #include "../../ut.h"
00059 #include "../../mem/shm_mem.h"
00060 #include "../../mem/mem.h"
00061 #include "../../globals.h"
00062 #include "../../timer.h"
00063 #include "../../parser/parse_uri.h"
00064 #include "../../parser/parse_content.h"
00065 #include "../../parser/parse_from.h"
00066 #include "../../lib/srdb2/db.h"
00067 #include "../../cfg/cfg_struct.h"
00068
00069 #include "../../modules/tm/tm_load.h"
00070
00071 #ifdef HAVE_IHTTP
00072 #include "../ihttp/ih_load.h"
00073 #endif
00074
00075 #include "xjab_load.h"
00076 #include "xjab_worker.h"
00077 #include "xjab_util.h"
00078
00079
00080 MODULE_VERSION
00081
00083 struct tm_binds tmb;
00084
00085 #ifdef HAVE_IHTTP
00086
00087 struct ih_binds ihb;
00089 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00090 char *_hb, int *_hl);
00091 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00092 char *_hb, int *_hl);
00093 #endif
00094
00096 xj_wlist jwl = NULL;
00097
00099 db_ctx_t* ctx = NULL;
00100 db_cmd_t* cmd = NULL;
00101
00102 db_fld_t db_params[] = {
00103 {.name = "sip_id", .type = DB_CSTR},
00104 {.name = "type", .type = DB_INT},
00105 {.name = 0}
00106 };
00107
00108 db_fld_t db_cols[] = {
00109 {.name = "jab_id", .type = DB_CSTR},
00110 {.name = "jab_passwd", .type = DB_CSTR},
00111 {.name = 0}
00112 };
00113
00114
00115
00118 static char *db_url = "mysql://root@127.0.0.1/sip_jab";
00119 char *db_table = "jusers";
00120 char *registrar=NULL;
00121
00122 int nrw = 2;
00123 int max_jobs = 10;
00124
00125 char *jaddress = "127.0.0.1";
00126 int jport = 5222;
00127
00128 char *jaliases = NULL;
00129 char *jdomain = NULL;
00130 char *proxy = NULL;
00131
00132 int delay_time = 90;
00133 int sleep_time = 20;
00134 int cache_time = 600;
00135 int check_time = 20;
00136
00137 int **pipes = NULL;
00138
00139 static int mod_init(void);
00140 static int child_init(int rank);
00141
00142 int xjab_manage_sipmsg(struct sip_msg *msg, int type);
00143 void xjab_check_workers(int mpid);
00144
00145 static int xj_send_message(struct sip_msg*, char*, char*);
00146 static int xj_join_jconf(struct sip_msg*, char*, char*);
00147 static int xj_exit_jconf(struct sip_msg*, char*, char*);
00148 static int xj_go_online(struct sip_msg*, char*, char*);
00149 static int xj_go_offline(struct sip_msg*, char*, char*);
00150
00151 static void destroy(void);
00152
00153
00154
00155
00156 static cmd_export_t cmds[] = {
00157 {"jab_send_message", xj_send_message, 0, 0, REQUEST_ROUTE},
00158 {"jab_join_jconf", xj_join_jconf, 0, 0, REQUEST_ROUTE},
00159 {"jab_exit_jconf", xj_exit_jconf, 0, 0, REQUEST_ROUTE},
00160 {"jab_go_online", xj_go_online, 0, 0, REQUEST_ROUTE},
00161 {"jab_go_offline", xj_go_offline, 0, 0, REQUEST_ROUTE},
00162 {"jab_register_watcher", (cmd_function)xj_register_watcher, XJ_NO_SCRIPT_F, 0, 0 },
00163 {"jab_unregister_watcher", (cmd_function)xj_unregister_watcher, XJ_NO_SCRIPT_F, 0, 0 },
00164 {"load_xjab", (cmd_function)load_xjab, XJ_NO_SCRIPT_F, 0, 0 },
00165 {0, 0, 0, 0, 0}
00166 };
00167
00168
00169
00170
00171
00172 static param_export_t params[] = {
00173 {"db_url", PARAM_STRING, &db_url },
00174 {"jaddress", PARAM_STRING, &jaddress },
00175 {"aliases", PARAM_STRING, &jaliases },
00176 {"proxy", PARAM_STRING, &proxy },
00177 {"jdomain", PARAM_STRING, &jdomain },
00178 {"registrar", PARAM_STRING, ®istrar },
00179 {"jport", PARAM_INT, &jport },
00180 {"workers", PARAM_INT, &nrw },
00181 {"max_jobs", PARAM_INT, &max_jobs },
00182 {"cache_time", PARAM_INT, &cache_time},
00183 {"delay_time", PARAM_INT, &delay_time},
00184 {"sleep_time", PARAM_INT, &sleep_time},
00185 {"check_time", PARAM_INT, &check_time},
00186 {0, 0, 0}
00187 };
00188
00189
00190 struct module_exports exports= {
00191 "jabber",
00192 cmds,
00193 0,
00194 params,
00195 mod_init,
00196 (response_function) 0,
00197 (destroy_function) destroy,
00198 0,
00199 child_init
00200 };
00201
00205 static int mod_init(void)
00206 {
00207 load_tm_f load_tm;
00208 #ifdef HAVE_IHTTP
00209 load_ih_f load_ih;
00210 #endif
00211 int i;
00212
00213 DBG("XJAB:mod_init: initializing ...\n");
00214 if(!jdomain)
00215 {
00216 LOG(L_ERR, "XJAB:mod_init: ERROR jdomain is NULL\n");
00217 return -1;
00218 }
00219
00220
00221 if ( !(load_tm=(load_tm_f)find_export("load_tm", NO_SCRIPT, 0))) {
00222 LOG(L_ERR, "ERROR: xjab:mod_init: can't import load_tm\n");
00223 return -1;
00224 }
00225
00226 if (load_tm( &tmb )==-1)
00227 return -1;
00228
00229 #ifdef HAVE_IHTTP
00230
00231 if ( !(load_ih=(load_ih_f)find_export("load_ih", IH_NO_SCRIPT_F, 0))) {
00232 LOG(L_ERR, "ERROR:xjab:mod_init: can't import load_ih\n");
00233 return -1;
00234 }
00235
00236 if (load_ih( &ihb )==-1)
00237 return -1;
00238 #endif
00239
00240 pipes = (int**)pkg_malloc(nrw*sizeof(int*));
00241 if (pipes == NULL)
00242 {
00243 LOG(L_ERR, "XJAB:mod_init:Error while allocating pipes\n");
00244 return -1;
00245 }
00246
00247 for(i=0; i<nrw; i++)
00248 {
00249 pipes[i] = (int*)pkg_malloc(2*sizeof(int));
00250 if (!pipes[i])
00251 {
00252 LOG(L_ERR, "XJAB:mod_init: Error while allocating pipes\n");
00253 return -1;
00254 }
00255 }
00256
00259 for(i=0;i<nrw;i++)
00260 {
00261
00262 if (pipe(pipes[i])==-1) {
00263 LOG(L_ERR, "XJAB:mod_init: error - cannot create pipe!\n");
00264 return -1;
00265 }
00266 DBG("XJAB:mod_init: pipe[%d] = <%d>-<%d>\n", i, pipes[i][0],
00267 pipes[i][1]);
00268 }
00269
00270 if((jwl = xj_wlist_init(pipes,nrw,max_jobs,cache_time,sleep_time,
00271 delay_time)) == NULL)
00272 {
00273 LOG(L_ERR, "XJAB:mod_init: error initializing workers list\n");
00274 return -1;
00275 }
00276
00277 if(xj_wlist_set_aliases(jwl, jaliases, jdomain, proxy) < 0)
00278 {
00279 LOG(L_ERR, "XJAB:mod_init: error setting aliases and outbound proxy\n");
00280 return -1;
00281 }
00282
00283
00284
00285 cfg_register_child(nrw + 1);
00286
00287 DBG("XJAB:mod_init: initialized ...\n");
00288 return 0;
00289 }
00290
00291
00292
00293
00294 static int child_init(int rank)
00295 {
00296 int i, j, mpid, cpid;
00297
00298 DBG("XJAB:child_init: initializing child <%d>\n", rank);
00299
00300 if(rank == 1)
00301 {
00302 #ifdef HAVE_IHTTP
00303
00304 ihb.reg_f("xjab", "XMPP Gateway", IH_MENU_YES,
00305 xjab_mod_info, NULL);
00306 ihb.reg_f("xjabc", "XMPP connections", IH_MENU_YES,
00307 xjab_connections, NULL);
00308 #endif
00309 if((mpid=fork())<0 )
00310 {
00311 LOG(L_ERR, "XJAB:child_init:error - cannot launch worker's"
00312 " manager\n");
00313 return -1;
00314 }
00315 if(mpid == 0)
00316 {
00318 for(i=0;i<nrw;i++)
00319 {
00320 if ( (cpid=fork())<0 )
00321 {
00322 LOG(L_ERR,"XJAB:child_init:error - cannot launch worker\n");
00323 return -1;
00324 }
00325 if (cpid == 0)
00326 {
00327 for(j=0;j<nrw;j++)
00328 if(j!=i) close(pipes[j][0]);
00329 close(pipes[i][1]);
00330 if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00331 {
00332 LOG(L_ERR, "XJAB:child_init:error setting worker's"
00333 " pid\n");
00334 return -1;
00335 }
00336
00337
00338 if (cfg_child_init()) return -1;
00339
00340 ctx = db_ctx("jabber");
00341 if (ctx == NULL) goto dberror;
00342 if (db_add_db(ctx, db_url) < 0) goto dberror;
00343 if (db_connect(ctx) < 0) goto dberror;
00344
00345 cmd = db_cmd(DB_GET, ctx, db_table, db_cols, db_params, NULL);
00346 if (!cmd) goto dberror;
00347
00348 xj_worker_process(jwl,jaddress,jport,i, cmd);
00349
00350 db_cmd_free(cmd);
00351 db_ctx_free(ctx);
00352 ctx = NULL;
00353
00354
00355 cfg_child_destroy();
00356
00357 exit(0);
00358 }
00359 }
00360
00361 mpid = getpid();
00362
00363
00364 if (cfg_child_init()) return -1;
00365
00366 while(1)
00367 {
00368 sleep(check_time);
00369
00370
00371 cfg_update();
00372
00373 xjab_check_workers(mpid);
00374 }
00375 }
00376 }
00377
00378
00379
00380
00381
00382
00383 return 0;
00384
00385 dberror:
00386 if (cmd) db_cmd_free(cmd);
00387 cmd = NULL;
00388 if (ctx) db_ctx_free(ctx);
00389 ctx = NULL;
00390 return -1;
00391 }
00392
00396 static int xj_send_message(struct sip_msg *msg, char* foo1, char * foo2)
00397 {
00398 DBG("XJAB: processing SIP MESSAGE\n");
00399 return xjab_manage_sipmsg(msg, XJ_SEND_MESSAGE);
00400 }
00401
00405 static int xj_join_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00406 {
00407 DBG("XJAB: join a Jabber conference\n");
00408 return xjab_manage_sipmsg(msg, XJ_JOIN_JCONF);
00409 }
00410
00414 static int xj_exit_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00415 {
00416 DBG("XJAB: exit from a Jabber conference\n");
00417 return xjab_manage_sipmsg(msg, XJ_EXIT_JCONF);
00418 }
00419
00423 static int xj_go_online(struct sip_msg *msg, char* foo1, char * foo2)
00424 {
00425 DBG("XJAB: go online in Jabber network\n");
00426 return xjab_manage_sipmsg(msg, XJ_GO_ONLINE);
00427 }
00428
00432 static int xj_go_offline(struct sip_msg *msg, char* foo1, char * foo2)
00433 {
00434 DBG("XJAB: go offline in Jabber network\n");
00435 return xjab_manage_sipmsg(msg, XJ_GO_OFFLINE);
00436 }
00437
00441 int xjab_manage_sipmsg(struct sip_msg *msg, int type)
00442 {
00443 str body, dst, from_uri;
00444 xj_sipmsg jsmsg;
00445 int pipe, fl;
00446 t_xj_jkey jkey, *p;
00447 int mime;
00448
00449 body.s=0;
00450 body.len=0;
00451
00452 if (type==XJ_SEND_MESSAGE)
00453 {
00454
00455 body.s = get_body( msg );
00456 if(body.s==0)
00457 {
00458 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR cannot extract body from"
00459 " msg\n");
00460 goto error;
00461 }
00462
00463
00464 if(!msg->content_length)
00465 {
00466 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR no Content-Length"
00467 " header found!\n");
00468 goto error;
00469 }
00470 body.len = get_content_length(msg);
00471
00472
00473 if((mime=parse_content_type_hdr(msg))<1)
00474 {
00475 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR cannot parse"
00476 " Content-Type header\n");
00477 goto error;
00478 }
00479
00480
00481 if(mime!=(TYPE_TEXT<<16)+SUBTYPE_PLAIN
00482 && mime!=(TYPE_MESSAGE<<16)+SUBTYPE_CPIM)
00483 {
00484 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR invalid content-type for"
00485 " a message request! type found=%d\n", mime);
00486 goto error;
00487 }
00488 }
00489
00490
00491 if(parse_headers( msg, HDR_TO_F|HDR_FROM_F, 0)==-1 || !msg->to
00492 || !msg->from)
00493 {
00494 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: cannot find TO or FROM HEADERS!\n");
00495 goto error;
00496 }
00497
00498
00499 if ( parse_from_header( msg )==-1 || msg->from->parsed==NULL)
00500 {
00501 DBG("ERROR:xjab_manage_sipmsg: cannot get FROM header\n");
00502 goto error;
00503 }
00504 from_uri.s = ((struct to_body*)msg->from->parsed)->uri.s;
00505 from_uri.len = ((struct to_body*)msg->from->parsed)->uri.len;
00506 if(xj_extract_aor(&from_uri, 0))
00507 {
00508 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR from FROM header\n");
00509 goto error;
00510 }
00511
00512 jkey.hash = xj_get_hash(&from_uri, NULL);
00513 jkey.id = &from_uri;
00514
00515 switch(type)
00516 {
00517 case XJ_SEND_MESSAGE:
00518 case XJ_JOIN_JCONF:
00519 case XJ_GO_ONLINE:
00520 if((pipe = xj_wlist_get(jwl, &jkey, &p)) < 0)
00521 {
00522 DBG("XJAB:xjab_manage_sipmsg: cannot find pipe of the worker!\n");
00523 goto error;
00524 }
00525 break;
00526 case XJ_EXIT_JCONF:
00527 case XJ_GO_OFFLINE:
00528 if((pipe = xj_wlist_check(jwl, &jkey, &p)) < 0)
00529 {
00530 DBG("XJAB:xjab_manage_sipmsg: no open Jabber session for"
00531 " <%.*s>!\n", from_uri.len, from_uri.s);
00532 goto error;
00533 }
00534 break;
00535 default:
00536 DBG("XJAB:xjab_manage_sipmsg: ERROR:strange SIP msg type!\n");
00537 goto error;
00538 }
00539
00540
00541 if(type==XJ_GO_ONLINE || type==XJ_GO_OFFLINE)
00542 goto prepare_job;
00543
00544
00545
00546
00547 dst.len = 0;
00548 if( msg->new_uri.len > 0)
00549 {
00550 dst.s = msg->new_uri.s;
00551 dst.len = msg->new_uri.len;
00552 if(xj_wlist_check_aliases(jwl, &dst))
00553 dst.len = 0;
00554 #ifdef XJ_EXTRA_DEBUG
00555 else
00556 DBG("XJAB:xjab_manage_sipmsg: using NEW URI for destination\n");
00557 #endif
00558 }
00559
00560 if (dst.len == 0 && msg->first_line.u.request.uri.s != NULL
00561 && msg->first_line.u.request.uri.len > 0 )
00562 {
00563 dst.s = msg->first_line.u.request.uri.s;
00564 dst.len = msg->first_line.u.request.uri.len;
00565 if(xj_wlist_check_aliases(jwl, &dst))
00566 dst.len = 0;
00567 #ifdef XJ_EXTRA_DEBUG
00568 else
00569 DBG("XJAB:xjab_manage_sipmsg: using R-URI for destination\n");
00570 #endif
00571 }
00572
00573 if(dst.len == 0 && msg->to->parsed)
00574 {
00575 dst.s = ((struct to_body*)msg->to->parsed)->uri.s;
00576 dst.len = ((struct to_body*)msg->to->parsed)->uri.len;
00577 if(dst.s == NULL || xj_wlist_check_aliases(jwl, &dst))
00578 dst.len = 0;
00579 #ifdef XJ_EXTRA_DEBUG
00580 else
00581 DBG("XJAB:xjab_manage_sipmsg: using TO-URI for destination\n");
00582 #endif
00583 }
00584
00585 if(dst.len == 0)
00586 {
00587 DBG("XJAB:xjab_manage_sipmsg: destination not found in SIP message\n");
00588 goto error;
00589 }
00590
00592 if(xj_extract_aor(&dst, 1))
00593 {
00594 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR for destination\n");
00595 goto error;
00596 }
00597 #ifdef XJ_EXTRA_DEBUG
00598 DBG("XJAB:xjab_manage_sipmsg: DESTINATION after correction [%.*s].\n",
00599 dst.len, dst.s);
00600 #endif
00601
00602 prepare_job:
00603
00604 jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00605 memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00606 if(jsmsg == NULL)
00607 return -1;
00608
00609 switch(type)
00610 {
00611 case XJ_SEND_MESSAGE:
00612 jsmsg->msg.len = body.len;
00613 if((jsmsg->msg.s = (char*)shm_malloc(jsmsg->msg.len+1)) == NULL)
00614 {
00615 shm_free(jsmsg);
00616 goto error;
00617 }
00618 strncpy(jsmsg->msg.s, body.s, jsmsg->msg.len);
00619 break;
00620 case XJ_GO_ONLINE:
00621 case XJ_GO_OFFLINE:
00622 dst.len = 0;
00623 dst.s = 0;
00624 case XJ_JOIN_JCONF:
00625 case XJ_EXIT_JCONF:
00626 jsmsg->msg.len = 0;
00627 jsmsg->msg.s = NULL;
00628 break;
00629 default:
00630 DBG("XJAB:xjab_manage_sipmsg: this SHOULD NOT appear\n");
00631 shm_free(jsmsg);
00632 goto error;
00633 }
00634 if(dst.len>0)
00635 {
00636 jsmsg->to.len = dst.len;
00637 if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1))==NULL)
00638 {
00639 if(type == XJ_SEND_MESSAGE)
00640 shm_free(jsmsg->msg.s);
00641 shm_free(jsmsg);
00642 goto error;
00643 }
00644 strncpy(jsmsg->to.s, dst.s, jsmsg->to.len);
00645 }
00646 else
00647 {
00648 jsmsg->to.len = 0;
00649 jsmsg->to.s = 0;
00650 }
00651
00652 jsmsg->jkey = p;
00653 jsmsg->type = type;
00654
00655
00656 DBG("XJAB:xjab_manage_sipmsg:%d: sending <%p> to worker through <%d>\n",
00657 getpid(), jsmsg, pipe);
00658
00659 fl = write(pipe, &jsmsg, sizeof(jsmsg));
00660 if(fl != sizeof(jsmsg))
00661 {
00662 DBG("XJAB:xjab_manage_sipmsg: error when writing to worker pipe!\n");
00663 if(type == XJ_SEND_MESSAGE)
00664 shm_free(jsmsg->msg.s);
00665 shm_free(jsmsg->to.s);
00666 shm_free(jsmsg);
00667 goto error;
00668 }
00669
00670 return 1;
00671 error:
00672 return -1;
00673 }
00674
00678 static void destroy(void)
00679 {
00680 int i;
00681 #ifdef XJ_EXTRA_DEBUG
00682 DBG("XJAB: Unloading module ...\n");
00683 #endif
00684 if(pipes)
00685 {
00686 for(i = 0; i < nrw; i++)
00687 {
00688 if(pipes[i])
00689 {
00690 close(pipes[i][0]);
00691 close(pipes[i][1]);
00692 }
00693 pkg_free(pipes[i]);
00694 }
00695 pkg_free(pipes);
00696 }
00697
00698 if (ctx) db_ctx_free(ctx);
00699 ctx = NULL;
00700
00701 xj_wlist_free(jwl);
00702 DBG("XJAB: Unloaded ...\n");
00703 }
00704
00708 void xj_register_watcher(str *from, str *to, void *cbf, void *pp)
00709 {
00710 xj_sipmsg jsmsg = NULL;
00711 t_xj_jkey jkey, *jp;
00712 int pipe, fl;
00713 str from_uri, to_uri;
00714
00715 if(!to || !from || !cbf)
00716 return;
00717
00718 #ifdef XJ_EXTRA_DEBUG
00719 DBG("XJAB:xj_register_watcher: from=[%.*s] to=[%.*s]\n", from->len,
00720 from->s, to->len, to->s);
00721 #endif
00722 from_uri.s = from->s;
00723 from_uri.len = from->len;
00724 if(xj_extract_aor(&from_uri, 0))
00725 {
00726 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR from FROM header\n");
00727 goto error;
00728 }
00729
00730 jkey.hash = xj_get_hash(&from_uri, NULL);
00731 jkey.id = &from_uri;
00732
00733 if((pipe = xj_wlist_get(jwl, &jkey, &jp)) < 0)
00734 {
00735 DBG("XJAB:xj_register_watcher: cannot find pipe of the worker!\n");
00736 goto error;
00737 }
00738
00739
00740 jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00741 memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00742 if(jsmsg == NULL)
00743 goto error;
00744
00745 jsmsg->msg.len = 0;
00746 jsmsg->msg.s = NULL;
00747
00748 to_uri.s = to->s;
00749 to_uri.len = to->len;
00751 if(xj_extract_aor(&to_uri, 1))
00752 {
00753 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR for destination\n");
00754 goto error;
00755 }
00756 #ifdef XJ_EXTRA_DEBUG
00757 DBG("XJAB:xj_register_watcher: DESTINATION after correction [%.*s].\n",
00758 to_uri.len, to_uri.s);
00759 #endif
00760
00761 jsmsg->to.len = to_uri.len;
00762 if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1)) == NULL)
00763 {
00764 if(jsmsg->msg.s)
00765 shm_free(jsmsg->msg.s);
00766 shm_free(jsmsg);
00767 goto error;
00768 }
00769 strncpy(jsmsg->to.s, to_uri.s, jsmsg->to.len);
00770 jsmsg->to.s[jsmsg->to.len] = '\0';
00771
00772 jsmsg->jkey = jp;
00773 jsmsg->type = XJ_REG_WATCHER;
00774
00775
00776 jsmsg->cbf = (pa_callback_f)cbf;
00777 jsmsg->p = pp;
00778
00779 #ifdef XJ_EXTRA_DEBUG
00780 DBG("XJAB:xj_register_watcher:%d: sending <%p> to worker through <%d>\n",
00781 getpid(), jsmsg, pipe);
00782 #endif
00783
00784 fl = write(pipe, &jsmsg, sizeof(jsmsg));
00785 if(fl != sizeof(jsmsg))
00786 {
00787 DBG("XJAB:xj_register_watcher: error when writing to worker pipe!\n");
00788 if(jsmsg->msg.s)
00789 shm_free(jsmsg->msg.s);
00790 shm_free(jsmsg->to.s);
00791 shm_free(jsmsg);
00792 goto error;
00793 }
00794
00795 error:
00796 return;
00797 }
00798
00802 void xj_unregister_watcher(str *from, str *to, void *cbf, void *pp)
00803 {
00804 if(!to || !from)
00805 return;
00806 }
00807
00812 void xjab_check_workers(int mpid)
00813 {
00814 int i, n, stat;
00815
00816 if(!jwl || jwl->len <= 0)
00817 return;
00818 for(i=0; i < jwl->len; i++)
00819 {
00820 if(jwl->workers[i].pid > 0)
00821 {
00822 stat = 0;
00823 n = waitpid(jwl->workers[i].pid, &stat, WNOHANG);
00824 if(n == 0 || n!=jwl->workers[i].pid)
00825 continue;
00826
00827 LOG(L_ERR,"XJAB:xjab_check_workers: worker[%d][pid=%d] has exited"
00828 " - status=%d err=%d errno=%d\n", i, jwl->workers[i].pid,
00829 stat, n, errno);
00830 xj_wlist_clean_jobs(jwl, i, 1);
00831 xj_wlist_set_pid(jwl, -1, i);
00832 }
00833
00834 #ifdef XJ_EXTRA_DEBUG
00835 DBG("XJAB:%d:xjab_check_workers: create a new worker[%d]\n", mpid, i);
00836 #endif
00837 if ( (stat=fork())<0 )
00838 {
00839 #ifdef XJ_EXTRA_DEBUG
00840 DBG("XJAB:xjab_check_workers: error - cannot launch new"
00841 " worker[%d]\n", i);
00842 #endif
00843 LOG(L_ERR, "XJAB:xjab_check_workers: error - worker[%d] lost"
00844 " forever \n", i);
00845 return;
00846 }
00847 if (stat == 0)
00848 {
00849 if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00850 {
00851 LOG(L_ERR, "XJAB:xjab_check_workers: error setting new"
00852 " worker's pid - w[%d]\n", i);
00853 return;
00854 }
00855
00856
00857
00858
00859
00860
00861 if (cfg_late_child_init()) return;
00862
00863 ctx = db_ctx("jabber");
00864 if (ctx == NULL) goto dberror;
00865 if (db_add_db(ctx, db_url) < 0) goto dberror;
00866 if (db_connect(ctx) < 0) goto dberror;
00867
00868 cmd = db_cmd(DB_GET, ctx, db_table, db_cols, db_params, NULL);
00869 if (!cmd) goto dberror;
00870
00871 xj_worker_process(jwl,jaddress,jport,i, cmd);
00872
00873 db_cmd_free(cmd);
00874 db_ctx_free(ctx);
00875 ctx = NULL;
00876
00877
00878 cfg_child_destroy();
00879
00880 exit(0);
00881 }
00882 }
00883
00884 dberror:
00885 if (cmd) db_cmd_free(cmd);
00886 cmd = NULL;
00887 if (ctx) db_ctx_free(ctx);
00888 ctx = NULL;
00889 }
00890
00891 #ifdef HAVE_IHTTP
00892
00896 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00897 char *_hb, int *_hl)
00898 {
00899 if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00900 return -1;
00901 *_hl = 0;
00902 *_hb = 0;
00903
00904 strcpy(_bb, "<h4>SER2Jabber Gateway</h4>");
00905 strcat(_bb, "<br>Module parameters:<br>");
00906 strcat(_bb, "<br> -- db table = ");
00907 strcat(_bb, db_table);
00908 strcat(_bb, "<br> -- workers = ");
00909 strcat(_bb, int2str(nrw, NULL));
00910 strcat(_bb, "<br> -- max jobs per worker = ");
00911 strcat(_bb, int2str(max_jobs, NULL));
00912
00913 strcat(_bb, "<br> -- jabber server address = ");
00914 strcat(_bb, jaddress);
00915 strcat(_bb, "<br> -- jabber server port = ");
00916 strcat(_bb, int2str(jport, NULL));
00917
00918 strcat(_bb, "<br> -- aliases = ");
00919 strcat(_bb, (jaliases)?jaliases:"NULL");
00920 strcat(_bb, "<br> -- jabber domain = ");
00921 strcat(_bb, (jdomain)?jdomain:"NULL");
00922 strcat(_bb, "<br> -- proxy address = ");
00923 strcat(_bb, (proxy)?proxy:"NULL");
00924
00925 strcat(_bb, "<br> -- delay time = ");
00926 strcat(_bb, int2str(delay_time, NULL));
00927 strcat(_bb, "<br> -- sleep time = ");
00928 strcat(_bb, int2str(sleep_time, NULL));
00929 strcat(_bb, "<br> -- cache time = ");
00930 strcat(_bb, int2str(cache_time, NULL));
00931 strcat(_bb, "<br> -- check time = ");
00932 strcat(_bb, int2str(check_time, NULL));
00933
00934 *_bl = strlen(_bb);
00935
00936 return 0;
00937 }
00938
00944 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00945 char *_hb, int *_hl)
00946 {
00947 t_xj_jkey jkey, *p;
00948 str _u;
00949 ih_param_p _ipp = NULL;
00950 int idx, i, maxcount;
00951 char *cp;
00952
00953 if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00954 return -1;
00955
00956 *_hl = 0;
00957 *_hb = 0;
00958 idx = -1;
00959 strcpy(_bb, "<h4>Active XMPP connections</h4>");
00960
00961 if(_irp->params)
00962 {
00963 strcat(_bb, "<br><b>Close action is alpha release!</b><br>");
00964 _ipp = _irp->params;
00965 i = 0;
00966 while(_ipp)
00967 {
00968 switch(_ipp->name[0])
00969 {
00970 case 'w':
00971 idx = 0;
00972 cp = _ipp->value;
00973 while(*cp && *cp>='0' && *cp<='9')
00974 {
00975 idx = idx*10 + *cp-'0';
00976 cp++;
00977 }
00978 i++;
00979 break;
00980 case 'u':
00981 _u.s = _ipp->value;
00982 _u.len = strlen(_ipp->value);
00983 jkey.id = &_u;
00984 i++;
00985 break;
00986 case 'i':
00987 jkey.hash = 0;
00988 cp = _ipp->value;
00989 while(*cp && *cp>='0' && *cp<='9')
00990 {
00991 jkey.hash = jkey.hash*10 + *cp-'0';
00992 cp++;
00993 }
00994 i++;
00995 break;
00996
00997 }
00998 _ipp = _ipp->next;
00999 }
01000 if(i!=3 || idx < 0 || idx >= jwl->len)
01001 {
01002 strcat(_bb, "<br><b><i>Bad parameters!</i></b>\n");
01003 }
01004 else
01005 {
01006 strcat(_bb, "<br><b><i>The connection of [");
01007 strcat(_bb, _u.s);
01008
01009 if(xj_wlist_set_flag(jwl, &jkey, XJ_FLAG_CLOSE) < 0)
01010 strcat(_bb, "] does not exist!</i></b>\n");
01011 else
01012 strcat(_bb, "] was scheduled for closing!</i></b>\n");
01013 }
01014 *_bl = strlen(_bb);
01015
01016 return 0;
01017 }
01018
01019 if(jwl!=NULL && jwl->len > 0 && jwl->workers!=NULL)
01020 {
01021 for(idx=0; idx<jwl->len; idx++)
01022 {
01023 strcat(_bb, "<br><b><i>Worker[");
01024 strcat(_bb, int2str(idx, NULL));
01025 strcat(_bb, "]</i></b> pid=");
01026 strcat(_bb, int2str(jwl->workers[idx].pid, NULL));
01027 strcat(_bb, " nr of jobs=");
01028 strcat(_bb, int2str(jwl->workers[idx].nr, NULL));
01029 if(!jwl->workers[idx].sip_ids)
01030 continue;
01031 lock_set_get(jwl->sems, idx);
01032 maxcount = count234(jwl->workers[idx].sip_ids);
01033 for (i = 0; i < maxcount; i++)
01034 {
01035 p = (xj_jkey)index234(jwl->workers[idx].sip_ids, i);
01036 if(p == NULL)
01037 continue;
01038 strcat(_bb, "<br> ");
01039 strcat(_bb, int2str(i, NULL));
01040 strcat(_bb, ". ");
01041 strcat(_bb, "<a href=\"xjabc?w=");
01042 strcat(_bb, int2str(idx, NULL));
01043 strcat(_bb, "&i=");
01044 strcat(_bb, int2str(p->hash, NULL));
01045 strcat(_bb, "&u=");
01046 strncat(_bb, p->id->s, p->id->len);
01047 strcat(_bb, "\">close</a>");
01048 strcat(_bb, " ");
01049 strcat(_bb, int2str(p->hash, NULL));
01050 strcat(_bb, " ");
01051 strncat(_bb, p->id->s, p->id->len);
01052 }
01053 lock_set_release(jwl->sems, idx);
01054 }
01055 }
01056
01057 *_bl = strlen(_bb);
01058
01059 return 0;
01060 }
01061
01062 #endif // HAVE_IHTTP