jabber_k/jabber.c

00001 /*
00002  * $Id$
00003  *
00004  * XJAB module
00005  *
00006  * Copyright (C) 2001-2003 FhG Fokus
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software 
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * ---
00025  *
00026  * History
00027  * -------
00028  * 2003-02-28 connection management with ihttp implemented (dcm)
00029  * 2003-02-24 first version of callback functions for ihttp (dcm)
00030  * 2003-02-13 lot of comments enclosed in #ifdef XJ_EXTRA_DEBUG (dcm)
00031  * 2003-03-11 New module interface (janakj)
00032  * 2003-03-16 flags export parameter added (janakj)
00033  * 2003-04-06 rank 0 changed to 1 in child_init (janakj)
00034  * 2003-06-19 fixed too many Jabber workers bug (mostly on RH9.0) (dcm)
00035  * 2003-08-05 adapted to the new parse_content_type_hdr function (bogdan)
00036  * 2004-06-07 db API update (andrei)
00037  */
00038 
00039 
00040 #include <stdio.h>
00041 #include <string.h>
00042 #include <stdlib.h>
00043 #include <sys/types.h>
00044 #include <sys/wait.h>
00045 #include <sys/ipc.h>
00046 #include <unistd.h>
00047 #include <fcntl.h>
00048 #include <errno.h>
00049 
00050 #include "../../sr_module.h"
00051 #include "../../error.h"
00052 #include "../../ut.h"
00053 #include "../../mem/shm_mem.h"
00054 #include "../../mem/mem.h"
00055 #include "../../globals.h"
00056 #include "../../timer.h"
00057 #include "../../parser/parse_uri.h"
00058 #include "../../parser/parse_content.h"
00059 #include "../../parser/parse_from.h"
00060 #include "../../lib/srdb1/db.h"
00061 
00062 #include "../../modules/tm/tm_load.h"
00063 
00064 #ifdef HAVE_IHTTP
00065 #include "../ihttp/ih_load.h"
00066 #endif
00067 
00068 #include "xjab_load.h"
00069 #include "xjab_worker.h"
00070 #include "xjab_util.h"
00071 
00072 
00073 MODULE_VERSION
00074 
00076 struct tm_binds tmb;
00077 
00078 #ifdef HAVE_IHTTP
00079 
00080 struct ih_binds ihb;
00082 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00083                 char *_hb, int *_hl);
00084 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00085                 char *_hb, int *_hl);
00086 #endif
00087 
00089 xj_wlist jwl = NULL;
00090 
00092 static db1_con_t** db_con;
00093 static db_func_t jabber_dbf;
00094 
00097 static str db_url   = str_init("mysql://root@127.0.0.1/sip_jab");
00098 static str db_table = str_init("jusers");
00099 char *registrar=NULL; /*"sip:registrar@example.org";*/
00100 
00101 int nrw = 2;
00102 int max_jobs = 10;
00103 
00104 char *jaddress = "127.0.0.1";
00105 int jport = 5222;
00106 
00107 char *jaliases = NULL;
00108 char *jdomain  = NULL;
00109 char *proxy        = NULL;
00110 
00111 char* priority = "9";
00112 
00113 int delay_time = 90;
00114 int sleep_time = 20;
00115 int cache_time = 600;
00116 int check_time = 20;
00117 
00118 int **pipes = NULL;
00119 
00120 static int mod_init(void);
00121 static int child_init(int rank);
00122 
00123 int xjab_manage_sipmsg(struct sip_msg *msg, int type);
00124 void xjab_check_workers(int mpid);
00125 
00126 static int xj_send_message(struct sip_msg*, char*, char*);
00127 static int xj_join_jconf(struct sip_msg*, char*, char*);
00128 static int xj_exit_jconf(struct sip_msg*, char*, char*);
00129 static int xj_go_online(struct sip_msg*, char*, char*);
00130 static int xj_go_offline(struct sip_msg*, char*, char*);
00131 
00132 static void destroy(void);
00133 
00134 /*
00135  * Exported functions
00136  */
00137 static cmd_export_t cmds[] = {
00138         {"jab_send_message",       (cmd_function)xj_send_message,
00139                         0, 0, 0, REQUEST_ROUTE},
00140         {"jab_join_jconf",         (cmd_function)xj_join_jconf,
00141                         0, 0, 0, REQUEST_ROUTE},
00142         {"jab_exit_jconf",         (cmd_function)xj_exit_jconf,
00143                         0, 0, 0, REQUEST_ROUTE},
00144         {"jab_go_online",          (cmd_function)xj_go_online,
00145                         0, 0, 0, REQUEST_ROUTE},
00146         {"jab_go_offline",         (cmd_function)xj_go_offline,
00147                         0, 0, 0, REQUEST_ROUTE},
00148         {"jab_register_watcher",   (cmd_function)xj_register_watcher,
00149                         XJ_NO_SCRIPT_F, 0, 0, 0            },
00150         {"jab_unregister_watcher", (cmd_function)xj_unregister_watcher,
00151                         XJ_NO_SCRIPT_F, 0, 0, 0            },
00152         {"load_xjab",              (cmd_function)load_xjab,
00153                         XJ_NO_SCRIPT_F, 0, 0, 0            },
00154         {0, 0, 0, 0, 0, 0}
00155 };
00156 
00157 
00158 /*
00159  * Exported parameters 
00160  */
00161 static param_export_t params[] = {
00162         {"db_url",     STR_PARAM, &db_url.s  },
00163         {"jaddress",   STR_PARAM, &jaddress  },
00164         {"aliases",    STR_PARAM, &jaliases  },
00165         {"proxy",      STR_PARAM, &proxy     },
00166         {"jdomain",    STR_PARAM, &jdomain   },
00167         {"registrar",  STR_PARAM, &registrar },
00168         {"priority",   STR_PARAM, &priority  },
00169         {"jport",      INT_PARAM, &jport     },
00170         {"workers",    INT_PARAM, &nrw       },
00171         {"max_jobs",   INT_PARAM, &max_jobs  },
00172         {"cache_time", INT_PARAM, &cache_time},
00173         {"delay_time", INT_PARAM, &delay_time},
00174         {"sleep_time", INT_PARAM, &sleep_time},
00175         {"check_time", INT_PARAM, &check_time},
00176         {0, 0, 0}
00177 };
00178 
00179 
00180 struct module_exports exports= {
00181         "jabber",
00182         DEFAULT_DLFLAGS, /* dlopen flags */
00183         cmds,       /* Exported functions */
00184         params,     /* Exported parameters */
00185         0,          /* exported statistics */
00186         0,          /* exported MI functions */
00187         0,          /* exported pseudo-variables */
00188         0,          /* extra processes */
00189         mod_init,   /* module initialization function */
00190         0,
00191         (destroy_function) destroy,
00192         child_init  /* per-child init function */
00193 };
00194 
00198 static int mod_init(void)
00199 {
00200         LM_WARN("This module is deprecated and will be removed in the next release. Use the <purple> module.");
00201 #ifdef HAVE_IHTTP
00202         load_ih_f load_ih;
00203 #endif
00204         int  i;
00205         db_url.len = strlen(db_url.s);
00206 
00207         if(!jdomain)
00208         {
00209                 LM_ERR("jdomain is NULL\n");
00210                 return -1;
00211         }
00212 
00213         /* import mysql functions */
00214         if (db_bind_mod(&db_url, &jabber_dbf)<0)
00215         {
00216                 LM_ERR("database module not found\n");
00217                 return -1;
00218         }
00219 
00220         if (!DB_CAPABILITY(jabber_dbf, DB_CAP_QUERY)) {
00221                 LM_ERR("database module does not implement 'query' function\n");
00222                 return -1;
00223         }
00224 
00225         db_con = (db1_con_t**)shm_malloc(nrw*sizeof(db1_con_t*));
00226         if (db_con == NULL)
00227         {
00228                 LM_ERR("no more shm memory\n");
00229                 return -1;
00230         }
00231 
00232         /* load the TM API */
00233         if (load_tm_api(&tmb)!=0) {
00234                 LM_ERR("can't load TM API\n");
00235                 return -1;
00236         }
00237 
00238 #ifdef HAVE_IHTTP
00239         /* import the iHTTP auto-loading function */
00240         if ( !(load_ih=(load_ih_f)find_export("load_ih", IH_NO_SCRIPT_F, 0))) {
00241                 LM_ERR("can't import load_ih\n");
00242                 return -1;
00243         }
00244         /* let the auto-loading function load all TM stuff */
00245         if (load_ih( &ihb )==-1)
00246                 return -1;
00247 #endif
00248         
00249         pipes = (int**)pkg_malloc(nrw*sizeof(int*));
00250         if (pipes == NULL)
00251         {
00252                 LM_ERR("no more pkg memory (pipes)\n");
00253                 return -1;
00254         }
00255         
00256         for(i=0; i<nrw; i++)
00257         {
00258                 pipes[i] = (int*)pkg_malloc(2*sizeof(int));
00259                 if (!pipes[i])
00260                 {
00261                         LM_ERR("no more pkg memory (pipes)\n");
00262                         return -1;
00263                 }
00264         }
00265         
00266         for(i=0; i<nrw; i++)
00267         {       
00268                 db_con[i] = jabber_dbf.init(&db_url);
00269                 if (!db_con[i])
00270                 {
00271                         LM_ERR("failed to connect to the database\n");
00272                         return -1;
00273                 }
00274                 else
00275                 {
00276                         if (jabber_dbf.use_table(db_con[i], &db_table) < 0) {
00277                                 LM_ERR("use_table failed\n");
00278                                 return -1;
00279                         }
00280                         LM_DBG("database connection opened successfully\n");
00281                 }
00282         }
00283 
00284         
00287         for(i=0;i<nrw;i++)
00288         {
00289                 /* create the pipe*/
00290                 if (pipe(pipes[i])==-1) {
00291                         LM_ERR("cannot create pipe!\n");
00292                         return -1;
00293                 }
00294                 LM_DBG("pipe[%d] = <%d>-<%d>\n", i, pipes[i][0], pipes[i][1]);
00295         }
00296         
00297         if((jwl = xj_wlist_init(pipes,nrw,max_jobs,cache_time,sleep_time,
00298                                 delay_time)) == NULL)
00299         {
00300                 LM_ERR("failed to initialize workers list\n");
00301                 return -1;
00302         }
00303         
00304         if(xj_wlist_set_aliases(jwl, jaliases, jdomain, proxy) < 0)
00305         {
00306                 LM_ERR("failed to set aliases and outbound proxy\n");
00307                 return -1;
00308         }
00309 
00310         LM_DBG("initialized ...\n");    
00311         return 0;
00312 }
00313 
00314 /*
00315  * Initialize children
00316  */
00317 static int child_init(int rank)
00318 {
00319         int i, j, mpid, cpid;
00320         
00321         LM_DBG("initializing child <%d>\n", rank);
00322              /* Rank 0 is main process now - 1 is the first child (janakj) */
00323         if(rank == 1)
00324         {
00325 #ifdef HAVE_IHTTP
00326 
00327                 ihb.reg_f("xjab", "XMPP Gateway", IH_MENU_YES,
00328                                 xjab_mod_info, NULL);
00329                 ihb.reg_f("xjabc", "XMPP connections", IH_MENU_YES,
00330                                 xjab_connections, NULL);
00331 #endif
00332                 if((mpid=fork())<0 )
00333                 {
00334                         LM_ERR("cannot launch worker's manager\n");
00335                         return -1;
00336                 }
00337                 if(mpid == 0)
00338                 {
00340                         for(i=0;i<nrw;i++)
00341                         {
00342                                 if ( (cpid=fork())<0 )
00343                                 {
00344                                         LM_ERR("cannot launch worker\n");
00345                                         return -1;
00346                                 }
00347                                 if (cpid == 0)
00348                                 {
00349                                         for(j=0;j<nrw;j++)
00350                                                 if(j!=i) close(pipes[j][0]);
00351                                         close(pipes[i][1]);
00352                                         if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00353                                         {
00354                                                 LM_ERR("failed to set worker's pid\n");
00355                                                 return -1;
00356                                         }
00357                                         xj_worker_process(jwl,jaddress,jport, priority, i, 
00358                                                         db_con[i], &jabber_dbf);
00359                                         exit(0);
00360                                 }
00361                         }
00362 
00363                         mpid = getpid();
00364                         while(1)
00365                         {
00366                                 sleep(check_time);
00367                                 xjab_check_workers(mpid);
00368                         }
00369                 }
00370         }
00371         
00372         //if(pipes)
00373         //{
00374         //      for(i=0;i<nrw;i++)
00375         //              close(pipes[i][0]);
00376         //}
00377         return 0;
00378 }
00379 
00383 static int xj_send_message(struct sip_msg *msg, char* foo1, char * foo2)
00384 {
00385         LM_DBG("processing SIP MESSAGE\n");
00386         return xjab_manage_sipmsg(msg, XJ_SEND_MESSAGE);
00387 }
00388 
00392 static int xj_join_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00393 {
00394         LM_DBG("join a Jabber conference\n");
00395         return xjab_manage_sipmsg(msg, XJ_JOIN_JCONF);
00396 }
00397 
00401 static int xj_exit_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00402 {
00403         LM_DBG("exit from a Jabber conference\n");
00404         return xjab_manage_sipmsg(msg, XJ_EXIT_JCONF);
00405 }
00406 
00410 static int xj_go_online(struct sip_msg *msg, char* foo1, char * foo2)
00411 {
00412         LM_DBG("go online in Jabber network\n");
00413         return xjab_manage_sipmsg(msg, XJ_GO_ONLINE);
00414 }
00415 
00419 static int xj_go_offline(struct sip_msg *msg, char* foo1, char * foo2)
00420 {
00421         LM_DBG("go offline in Jabber network\n");
00422         return xjab_manage_sipmsg(msg, XJ_GO_OFFLINE);
00423 }
00424 
00428 int xjab_manage_sipmsg(struct sip_msg *msg, int type)
00429 {
00430         str body, dst, from_uri;
00431         xj_sipmsg jsmsg;
00432         int pipe, fl;
00433         t_xj_jkey jkey, *p;
00434         int mime;
00435 
00436         body.len = 0;
00437         body.s = 0;
00438 
00439         // extract message body - after that whole SIP MESSAGE is parsed
00440         if (type==XJ_SEND_MESSAGE)
00441         {
00442                 /* get the message's body */
00443                 body.s = get_body( msg );
00444                 if(body.s==0) 
00445                 {
00446                         LM_ERR("cannot extract body from msg\n");
00447                         goto error;
00448                 }
00449                 
00450                 /* content-length (if present) must be already parsed */
00451                 if(!msg->content_length)
00452                 {
00453                         LM_ERR("no Content-Length header found!\n");
00454                         goto error;
00455                 }
00456                 body.len = get_content_length(msg);
00457 
00458                 /* parse the content-type header */
00459                 if((mime=parse_content_type_hdr(msg))<1)
00460                 {
00461                         LM_ERR("cannot parse Content-Type header\n");
00462                         goto error;
00463                 }
00464 
00465                 /* check the content-type value */
00466                 if(mime!=(TYPE_TEXT<<16)+SUBTYPE_PLAIN
00467                         && mime!=(TYPE_MESSAGE<<16)+SUBTYPE_CPIM)
00468                 {
00469                         LM_ERR("invalid content-type for"
00470                                 " a message request! type found=%d\n", mime);
00471                         goto error;
00472                 }
00473         }
00474         
00475         // check for TO and FROM headers - if is not SIP MESSAGE 
00476         if(parse_headers(msg,HDR_TO_F|HDR_FROM_F,0)==-1 || !msg->to || !msg->from)
00477         {
00478                 LM_ERR("cannot find TO or FROM HEADERS!\n");
00479                 goto error;
00480         }
00481         
00482         /* parsing from header */
00483         if ( parse_from_header( msg )<0 || msg->from->parsed==NULL) 
00484         {
00485                 LM_DBG("cannot get FROM header\n");
00486                 goto error;
00487         }
00488         from_uri.s = ((struct to_body*)msg->from->parsed)->uri.s;
00489         from_uri.len = ((struct to_body*)msg->from->parsed)->uri.len;
00490         if(xj_extract_aor(&from_uri, 0))
00491         {
00492                 LM_DBG("cannot get AoR from FROM header\n");
00493                 goto error;
00494         }
00495 
00496         jkey.hash = xj_get_hash(&from_uri, NULL);
00497         jkey.id = &from_uri;
00498         // get the communication pipe with the worker
00499         switch(type)
00500         {
00501                 case XJ_SEND_MESSAGE:
00502                 case XJ_JOIN_JCONF:
00503                 case XJ_GO_ONLINE:
00504                         if((pipe = xj_wlist_get(jwl, &jkey, &p)) < 0)
00505                         {
00506                                 LM_DBG("cannot find pipe of the worker!\n");
00507                                 goto error;
00508                         }
00509                 break;
00510                 case XJ_EXIT_JCONF:
00511                 case XJ_GO_OFFLINE:
00512                         if((pipe = xj_wlist_check(jwl, &jkey, &p)) < 0)
00513                         {
00514                                 LM_DBG("no open Jabber session for"
00515                                                 " <%.*s>!\n", from_uri.len, from_uri.s);
00516                                 goto error;
00517                         }
00518                 break;
00519                 default:
00520                         LM_DBG("error, strange SIP msg type!\n");
00521                         goto error;
00522         }
00523 
00524         // if is for going ONLINE/OFFLINE we do not need the destination
00525         if(type==XJ_GO_ONLINE || type==XJ_GO_OFFLINE)
00526                 goto prepare_job;
00527         
00528         // determination of destination
00529         // - try to get it from new_uri, r-uri or to hdr, but check it against
00530         // jdomain and aliases
00531         dst.len = 0;
00532         if( msg->new_uri.len > 0)
00533         {
00534                 dst.s = msg->new_uri.s;
00535                 dst.len = msg->new_uri.len;
00536                 if(xj_wlist_check_aliases(jwl, &dst))
00537                         dst.len = 0;
00538 #ifdef XJ_EXTRA_DEBUG
00539                 else
00540                         LM_DBG("using NEW URI for destination\n");
00541 #endif
00542         }
00543         
00544         if (dst.len == 0 &&  msg->first_line.u.request.uri.s != NULL
00545                         && msg->first_line.u.request.uri.len > 0 )
00546         {
00547                 dst.s = msg->first_line.u.request.uri.s;
00548                 dst.len = msg->first_line.u.request.uri.len;
00549                 if(xj_wlist_check_aliases(jwl, &dst))
00550                         dst.len = 0;
00551 #ifdef XJ_EXTRA_DEBUG
00552                 else
00553                         LM_DBG("using R-URI for destination\n");
00554 #endif
00555         }
00556 
00557         if(dst.len == 0 && msg->to->parsed)
00558         {
00559                 dst.s = ((struct to_body*)msg->to->parsed)->uri.s;
00560                 dst.len = ((struct to_body*)msg->to->parsed)->uri.len;
00561                 if(dst.s == NULL || xj_wlist_check_aliases(jwl, &dst))
00562                         dst.len = 0;
00563 #ifdef XJ_EXTRA_DEBUG
00564                 else
00565                         LM_DBG("using TO-URI for destination\n");
00566 #endif
00567         }
00568         
00569         if(dst.len == 0)
00570         {
00571                 LM_DBG("destination not found in SIP message\n");
00572                 goto error;
00573         }
00574         
00576         if(xj_extract_aor(&dst, 1))
00577         {
00578                 LM_ERR("cannot get AoR for destination\n");
00579                 goto error;
00580         }
00581 #ifdef XJ_EXTRA_DEBUG
00582         LM_DBG("destination after correction [%.*s].\n", dst.len, dst.s);
00583 #endif
00584         
00585 prepare_job:
00586         //putting the SIP message parts in share memory to be accessible by workers
00587     jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00588         memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00589     if(jsmsg == NULL)
00590         return -1;
00591         
00592         switch(type)
00593         {
00594                 case XJ_SEND_MESSAGE:
00595                         jsmsg->msg.len = body.len;
00596                         if((jsmsg->msg.s = (char*)shm_malloc(jsmsg->msg.len+1)) == NULL)
00597                         {
00598                                 shm_free(jsmsg);
00599                                 goto error;
00600                         }
00601                         strncpy(jsmsg->msg.s, body.s, jsmsg->msg.len);
00602                 break;
00603                 case XJ_GO_ONLINE:
00604                 case XJ_GO_OFFLINE:
00605                         dst.len = 0;
00606                         dst.s = 0;
00607                 case XJ_JOIN_JCONF:
00608                 case XJ_EXIT_JCONF:
00609                         jsmsg->msg.len = 0;
00610                         jsmsg->msg.s = NULL;
00611                 break;
00612                 default:
00613                         LM_DBG("this SHOULD NOT appear\n");
00614                         shm_free(jsmsg);
00615                         goto error;
00616         }
00617         if(dst.len>0)
00618         {
00619                 jsmsg->to.len = dst.len;
00620                 if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1))==NULL)
00621                 {
00622                         if(type == XJ_SEND_MESSAGE)
00623                                 shm_free(jsmsg->msg.s);
00624                         shm_free(jsmsg);
00625                         goto error;
00626                 }
00627                 strncpy(jsmsg->to.s, dst.s, jsmsg->to.len);
00628         }
00629         else
00630         {
00631                 jsmsg->to.len = 0;
00632                 jsmsg->to.s   = 0;
00633         }
00634 
00635         jsmsg->jkey = p;
00636         jsmsg->type = type;
00637         //jsmsg->jkey->hash = jkey.hash;
00638 
00639         LM_DBG("sending <%p> to worker through <%d>\n", jsmsg, pipe);
00640         // sending the SHM pointer of SIP message to the worker
00641         fl = write(pipe, &jsmsg, sizeof(jsmsg));
00642         if(fl != sizeof(jsmsg))
00643         {
00644                 LM_ERR("failed to write to worker pipe!\n");
00645                 if(type == XJ_SEND_MESSAGE)
00646                         shm_free(jsmsg->msg.s);
00647                 shm_free(jsmsg->to.s);
00648                 shm_free(jsmsg);
00649                 goto error;
00650         }
00651         
00652         return 1;
00653 error:
00654         return -1;
00655 }
00656 
00660 static void destroy(void)
00661 {
00662         int i;
00663 #ifdef XJ_EXTRA_DEBUG
00664         LM_DBG("unloading module ...\n");
00665 #endif
00666         if(pipes)
00667         { // close the pipes
00668                 for(i = 0; i < nrw; i++)
00669                 {
00670                         if(pipes[i])
00671                         {
00672                                 close(pipes[i][0]);
00673                                 close(pipes[i][1]);
00674                         }
00675                         pkg_free(pipes[i]);
00676                 }
00677                 pkg_free(pipes);
00678         }
00679         // cleaning MySQL connections
00680         if(db_con != NULL)
00681         {
00682                 for(i = 0; i<nrw; i++)
00683                         jabber_dbf.close(db_con[i]);
00684                 shm_free(db_con);
00685         }
00686                         
00687         xj_wlist_free(jwl);
00688         LM_DBG("unloaded ...\n");
00689 }
00690 
00694 void xj_register_watcher(str *from, str *to, void *cbf, void *pp)
00695 {
00696         xj_sipmsg jsmsg = NULL;
00697         t_xj_jkey jkey, *jp;
00698         int pipe, fl;
00699         str from_uri, to_uri;
00700 
00701         if(!to || !from || !cbf)
00702                 return;
00703 
00704 #ifdef XJ_EXTRA_DEBUG
00705         LM_DBG("from=[%.*s] to=[%.*s]\n", from->len,
00706             from->s, to->len, to->s);
00707 #endif
00708         from_uri.s = from->s;
00709         from_uri.len = from->len;
00710         if(xj_extract_aor(&from_uri, 0))
00711         {
00712                 LM_ERR("cannot get AoR from FROM header\n");
00713                 goto error;
00714         }
00715 
00716         jkey.hash = xj_get_hash(&from_uri, NULL);
00717         jkey.id = &from_uri;
00718 
00719         if((pipe = xj_wlist_get(jwl, &jkey, &jp)) < 0)
00720         {
00721                 LM_DBG("cannot find pipe of the worker!\n");
00722                 goto error;
00723         }
00724         
00725         //putting the SIP message parts in share memory to be accessible by workers
00726         jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00727         memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00728         if(jsmsg == NULL)
00729                 goto error;
00730         
00731         jsmsg->msg.len = 0;
00732         jsmsg->msg.s = NULL;
00733         
00734         to_uri.s = to->s;
00735         to_uri.len = to->len;
00737         if(xj_extract_aor(&to_uri, 1))
00738         {
00739                 LM_ERR("cannot get AoR for destination\n");
00740                 goto error;
00741         }
00742 #ifdef XJ_EXTRA_DEBUG
00743         LM_DBG("destination after correction [%.*s].\n", to_uri.len, to_uri.s);
00744 #endif
00745 
00746         jsmsg->to.len = to_uri.len;
00747         if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1)) == NULL)
00748         {
00749                 if(jsmsg->msg.s)
00750                         shm_free(jsmsg->msg.s);
00751                 shm_free(jsmsg);
00752                 goto error;
00753         }
00754         strncpy(jsmsg->to.s, to_uri.s, jsmsg->to.len);
00755         jsmsg->to.s[jsmsg->to.len] = '\0';
00756 
00757         jsmsg->jkey = jp;
00758         jsmsg->type = XJ_REG_WATCHER;
00759         //jsmsg->jkey->hash = jkey.hash;
00760         
00761         jsmsg->cbf = (pa_callback_f)cbf;
00762         jsmsg->p = pp;
00763 
00764 #ifdef XJ_EXTRA_DEBUG
00765         LM_DBG("sending <%p> to worker through <%d>\n", jsmsg, pipe);
00766 #endif
00767         // sending the SHM pointer of SIP message to the worker
00768         fl = write(pipe, &jsmsg, sizeof(jsmsg));
00769         if(fl != sizeof(jsmsg))
00770         {
00771                 LM_ERR("failed to write to worker pipe!\n");
00772                 if(jsmsg->msg.s)
00773                         shm_free(jsmsg->msg.s);
00774                 shm_free(jsmsg->to.s);
00775                 shm_free(jsmsg);
00776                 goto error;
00777         }
00778         
00779  error:
00780         return;
00781 }
00782 
00786 void xj_unregister_watcher(str *from, str *to, void *cbf, void *pp)
00787 {
00788         if(!to || !from)
00789                 return;
00790 }
00791 
00796 void xjab_check_workers(int mpid)
00797 {
00798         int i, n, stat;
00799         //LM_DBG("time=%d\n", get_ticks());
00800         if(!jwl || jwl->len <= 0)
00801                 return;
00802         for(i=0; i < jwl->len; i++)
00803         {
00804                 if(jwl->workers[i].pid > 0)
00805                 {
00806                         stat = 0;
00807                         n = waitpid(jwl->workers[i].pid, &stat, WNOHANG);
00808                         if(n == 0 || n!=jwl->workers[i].pid)
00809                                 continue;
00810                 
00811                         LM_ERR("worker[%d][pid=%d] has exited - status=%d err=%d"
00812                                         "errno=%d\n", i, jwl->workers[i].pid, stat, n, errno);
00813                         xj_wlist_clean_jobs(jwl, i, 1);
00814                         xj_wlist_set_pid(jwl, -1, i);
00815                 }
00816                 
00817 #ifdef XJ_EXTRA_DEBUG
00818                 LM_DBG("create a new worker[%d]\n", i);
00819 #endif
00820                 if ( (stat=fork())<0 )
00821                 {
00822 #ifdef XJ_EXTRA_DEBUG
00823                         LM_DBG("cannot launch new worker[%d]\n", i);
00824 #endif
00825                         LM_ERR("worker[%d] lost forever \n", i);
00826                         return;
00827                 }
00828                 if (stat == 0)
00829                 {
00830                         if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00831                         {
00832                                 LM_ERR("failed to set new worker's pid - w[%d]\n", i);
00833                                 return;
00834                         }
00835                         xj_worker_process(jwl,jaddress,jport,priority, i,
00836                                         db_con[i], &jabber_dbf);
00837                         exit(0);
00838                 }
00839         }                       
00840 }
00841 
00842 #ifdef HAVE_IHTTP
00843 
00847 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00848                 char *_hb, int *_hl)
00849 {
00850         if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00851                 return -1;
00852         *_hl = 0;
00853         *_hb = 0;
00854         
00855         strcpy(_bb, "<h4>SER2Jabber Gateway</h4>");
00856         strcat(_bb, "<br>Module parameters:<br>");
00857         strcat(_bb, "<br> -- db table = ");
00858         strcat(_bb, db_table);
00859         strcat(_bb, "<br> -- workers = ");
00860         strcat(_bb, int2str(nrw, NULL));
00861         strcat(_bb, "<br> -- max jobs per worker = ");
00862         strcat(_bb, int2str(max_jobs, NULL));
00863 
00864         strcat(_bb, "<br> -- jabber server address = ");
00865         strcat(_bb, jaddress);
00866         strcat(_bb, "<br> -- jabber server port = ");
00867         strcat(_bb, int2str(jport, NULL));
00868 
00869         strcat(_bb, "<br> -- aliases = ");
00870         strcat(_bb, (jaliases)?jaliases:"NULL");
00871         strcat(_bb, "<br> -- jabber domain = ");
00872         strcat(_bb, (jdomain)?jdomain:"NULL");
00873         strcat(_bb, "<br> -- proxy address = ");
00874         strcat(_bb, (proxy)?proxy:"NULL");
00875 
00876         strcat(_bb, "<br> -- delay time = ");
00877         strcat(_bb, int2str(delay_time, NULL));
00878         strcat(_bb, "<br> -- sleep time = ");
00879         strcat(_bb, int2str(sleep_time, NULL));
00880         strcat(_bb, "<br> -- cache time = ");
00881         strcat(_bb, int2str(cache_time, NULL));
00882         strcat(_bb, "<br> -- check time = ");
00883         strcat(_bb, int2str(check_time, NULL));
00884         
00885         *_bl = strlen(_bb);
00886 
00887         return 0;
00888 }
00889 
00895 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl, 
00896                 char *_hb, int *_hl)
00897 {
00898         t_xj_jkey jkey, *p;
00899         str _u;
00900         ih_param_p _ipp = NULL;
00901         int idx, i, maxcount;
00902         char *cp;
00903 
00904         if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00905                 return -1;
00906         
00907         *_hl = 0;
00908         *_hb = 0;
00909         idx = -1;
00910         strcpy(_bb, "<h4>Active XMPP connections</h4>");
00911         
00912         if(_irp->params)
00913         {
00914                 strcat(_bb, "<br><b>Close action is alpha release!</b><br>");
00915                 _ipp = _irp->params;
00916                 i = 0;
00917                 while(_ipp)
00918                 {
00919                         switch(_ipp->name[0])
00920                         {
00921                                 case 'w':
00922                                         idx = 0;
00923                                         cp = _ipp->value;
00924                                         while(*cp && *cp>='0' && *cp<='9')
00925                                         {
00926                                                 idx = idx*10 + *cp-'0';
00927                                                 cp++;
00928                                         }
00929                                         i++;
00930                                 break;
00931                                 case 'u':
00932                                         _u.s = _ipp->value;
00933                                         _u.len = strlen(_ipp->value);
00934                                         jkey.id = &_u;
00935                                         i++;
00936                                 break;
00937                                 case 'i':
00938                                         jkey.hash = 0;
00939                                         cp = _ipp->value;
00940                                         while(*cp && *cp>='0' && *cp<='9')
00941                                         {
00942                                                 jkey.hash = jkey.hash*10 + *cp-'0';
00943                                                 cp++;
00944                                         }
00945                                         i++;
00946                                 break;
00947                                 
00948                         }
00949                         _ipp = _ipp->next;
00950                 }
00951                 if(i!=3 || idx < 0 || idx >= jwl->len)
00952                 {
00953                         strcat(_bb, "<br><b><i>Bad parameters!</i></b>\n");
00954                 }
00955                 else
00956                 {
00957                         strcat(_bb, "<br><b><i>The connection of [");
00958                         strcat(_bb, _u.s);
00959 
00960                         if(xj_wlist_set_flag(jwl, &jkey, XJ_FLAG_CLOSE) < 0)
00961                                 strcat(_bb, "] does not exist!</i></b>\n");
00962                         else
00963                                 strcat(_bb, "] was scheduled for closing!</i></b>\n");
00964                 }
00965                 *_bl = strlen(_bb);
00966 
00967                 return 0;
00968         }
00969         
00970         if(jwl!=NULL && jwl->len > 0 && jwl->workers!=NULL)
00971         {
00972                 for(idx=0; idx<jwl->len; idx++)
00973                 {
00974                         strcat(_bb, "<br><b><i>Worker[");
00975                         strcat(_bb, int2str(idx, NULL));
00976                         strcat(_bb, "]</i></b> &nbsp;&nbsp;pid=");
00977                         strcat(_bb, int2str(jwl->workers[idx].pid, NULL));
00978                         strcat(_bb, " &nbsp;&nbsp;nr of jobs=");
00979                         strcat(_bb, int2str(jwl->workers[idx].nr, NULL));
00980                         if(!jwl->workers[idx].sip_ids)
00981                                 continue;
00982                         lock_set_get(jwl->sems, idx);
00983                         maxcount = count234(jwl->workers[idx].sip_ids);
00984                         for (i = 0; i < maxcount; i++) 
00985                         {
00986                                 p = (xj_jkey)index234(jwl->workers[idx].sip_ids, i);
00987                                 if(p == NULL)
00988                                         continue;
00989                                 strcat(_bb, "<br>&nbsp;&nbsp;&nbsp;");
00990                                 strcat(_bb, int2str(i, NULL));
00991                                 strcat(_bb, ".&nbsp;&nbsp;&nbsp;");
00992                                 strcat(_bb, "<a href=\"xjabc?w=");
00993                                 strcat(_bb, int2str(idx, NULL));
00994                                 strcat(_bb, "&i=");
00995                                 strcat(_bb, int2str(p->hash, NULL));
00996                                 strcat(_bb, "&u=");
00997                                 strncat(_bb, p->id->s, p->id->len);
00998                                 strcat(_bb, "\">close</a>");
00999                                 strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01000                                 strcat(_bb, int2str(p->hash, NULL));
01001                                 strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01002                                 strncat(_bb, p->id->s, p->id->len);
01003                         }
01004                         lock_set_release(jwl->sems, idx);
01005                 }
01006         }
01007         
01008         *_bl = strlen(_bb);
01009 
01010         return 0;
01011 }
01012 
01013 #endif // HAVE_IHTTP