jabber_s/jabber.c

00001 /*
00002  * $Id$
00003  *
00004  * XJAB module
00005  *
00006  *
00007  * Copyright (C) 2001-2003 FhG Fokus
00008  *
00009  * This file is part of ser, a free SIP server.
00010  *
00011  * ser is free software; you can redistribute it and/or modify
00012  * it under the terms of the GNU General Public License as published by
00013  * the Free Software Foundation; either version 2 of the License, or
00014  * (at your option) any later version
00015  *
00016  * For a license to use the ser software under conditions
00017  * other than those described here, or to purchase support for this
00018  * software, please contact iptel.org by e-mail at the following addresses:
00019  *    info@iptel.org
00020  *
00021  * ser is distributed in the hope that it will be useful,
00022  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00023  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00024  * GNU General Public License for more details.
00025  *
00026  * You should have received a copy of the GNU General Public License
00027  * along with this program; if not, write to the Free Software
00028  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00029  *
00030  * ---
00031  *
00032  * History
00033  * -------
00034  * 2003-02-28 connection management with ihttp implemented (dcm)
00035  * 2003-02-24 first version of callback functions for ihttp (dcm)
00036  * 2003-02-13 lot of comments enclosed in #ifdef XJ_EXTRA_DEBUG (dcm)
00037  * 2003-03-11 New module interface (janakj)
00038  * 2003-03-16 flags export parameter added (janakj)
00039  * 2003-04-06 rank 0 changed to 1 in child_init (janakj)
00040  * 2003-06-19 fixed too many Jabber workers bug (mostly on RH9.0) (dcm)
00041  * 2003-08-05 adapted to the new parse_content_type_hdr function (bogdan)
00042  * 2004-06-07 db API update (andrei)
00043  */
00044 
00045 
00046 #include <stdio.h>
00047 #include <string.h>
00048 #include <stdlib.h>
00049 #include <sys/types.h>
00050 #include <sys/wait.h>
00051 #include <sys/ipc.h>
00052 #include <unistd.h>
00053 #include <fcntl.h>
00054 #include <errno.h>
00055 
00056 #include "../../sr_module.h"
00057 #include "../../error.h"
00058 #include "../../ut.h"
00059 #include "../../mem/shm_mem.h"
00060 #include "../../mem/mem.h"
00061 #include "../../globals.h"
00062 #include "../../timer.h"
00063 #include "../../parser/parse_uri.h"
00064 #include "../../parser/parse_content.h"
00065 #include "../../parser/parse_from.h"
00066 #include "../../lib/srdb2/db.h"
00067 #include "../../cfg/cfg_struct.h"
00068 
00069 #include "../../modules/tm/tm_load.h"
00070 
00071 #ifdef HAVE_IHTTP
00072 #include "../ihttp/ih_load.h"
00073 #endif
00074 
00075 #include "xjab_load.h"
00076 #include "xjab_worker.h"
00077 #include "xjab_util.h"
00078 
00079 
00080 MODULE_VERSION
00081 
00083 struct tm_binds tmb;
00084 
00085 #ifdef HAVE_IHTTP
00086 
00087 struct ih_binds ihb;
00089 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00090                 char *_hb, int *_hl);
00091 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00092                 char *_hb, int *_hl);
00093 #endif
00094 
00096 xj_wlist jwl = NULL;
00097 
00099 db_ctx_t* ctx = NULL;
00100 db_cmd_t* cmd = NULL;
00101 
00102 db_fld_t db_params[] = {
00103         {.name = "sip_id", .type = DB_CSTR},
00104         {.name = "type", .type = DB_INT},
00105         {.name = 0}
00106 };
00107 
00108 db_fld_t db_cols[] = {
00109         {.name = "jab_id", .type = DB_CSTR},
00110         {.name = "jab_passwd", .type = DB_CSTR},
00111         {.name = 0}
00112 };
00113 
00114 
00115 
00118 static char *db_url   = "mysql://root@127.0.0.1/sip_jab";
00119 char *db_table = "jusers";
00120 char *registrar=NULL; //"sip:registrar@iptel.org";
00121 
00122 int nrw = 2;
00123 int max_jobs = 10;
00124 
00125 char *jaddress = "127.0.0.1";
00126 int jport = 5222;
00127 
00128 char *jaliases = NULL;
00129 char *jdomain  = NULL;
00130 char *proxy        = NULL;
00131 
00132 int delay_time = 90;
00133 int sleep_time = 20;
00134 int cache_time = 600;
00135 int check_time = 20;
00136 
00137 int **pipes = NULL;
00138 
00139 static int mod_init(void);
00140 static int child_init(int rank);
00141 
00142 int xjab_manage_sipmsg(struct sip_msg *msg, int type);
00143 void xjab_check_workers(int mpid);
00144 
00145 static int xj_send_message(struct sip_msg*, char*, char*);
00146 static int xj_join_jconf(struct sip_msg*, char*, char*);
00147 static int xj_exit_jconf(struct sip_msg*, char*, char*);
00148 static int xj_go_online(struct sip_msg*, char*, char*);
00149 static int xj_go_offline(struct sip_msg*, char*, char*);
00150 
00151 static void destroy(void);
00152 
00153 /*
00154  * Exported functions
00155  */
00156 static cmd_export_t cmds[] = {
00157         {"jab_send_message",       xj_send_message,                     0,              0, REQUEST_ROUTE},
00158         {"jab_join_jconf",         xj_join_jconf,                       0,              0, REQUEST_ROUTE},
00159         {"jab_exit_jconf",         xj_exit_jconf,                       0,              0, REQUEST_ROUTE},
00160         {"jab_go_online",          xj_go_online,                        0,              0, REQUEST_ROUTE},
00161         {"jab_go_offline",         xj_go_offline,                       0,              0, REQUEST_ROUTE},
00162         {"jab_register_watcher",   (cmd_function)xj_register_watcher,   XJ_NO_SCRIPT_F, 0, 0            },
00163         {"jab_unregister_watcher", (cmd_function)xj_unregister_watcher, XJ_NO_SCRIPT_F, 0, 0            },
00164         {"load_xjab",              (cmd_function)load_xjab,             XJ_NO_SCRIPT_F, 0, 0            },
00165         {0, 0, 0, 0, 0}
00166 };
00167 
00168 
00169 /*
00170  * Exported parameters
00171  */
00172 static param_export_t params[] = {
00173         {"db_url",     PARAM_STRING, &db_url    },
00174         {"jaddress",   PARAM_STRING, &jaddress  },
00175         {"aliases",    PARAM_STRING, &jaliases  },
00176         {"proxy",      PARAM_STRING, &proxy     },
00177         {"jdomain",    PARAM_STRING, &jdomain   },
00178         {"registrar",  PARAM_STRING, &registrar },
00179         {"jport",      PARAM_INT,    &jport     },
00180         {"workers",    PARAM_INT,    &nrw       },
00181         {"max_jobs",   PARAM_INT,    &max_jobs  },
00182         {"cache_time", PARAM_INT,    &cache_time},
00183         {"delay_time", PARAM_INT,    &delay_time},
00184         {"sleep_time", PARAM_INT,    &sleep_time},
00185         {"check_time", PARAM_INT,    &check_time},
00186         {0, 0, 0}
00187 };
00188 
00189 
00190 struct module_exports exports= {
00191         "jabber",
00192         cmds,       /* Exported functions */
00193         0,          /* RPC methods */
00194         params,     /* Exported parameters */
00195         mod_init,   /* module initialization function */
00196         (response_function) 0,
00197         (destroy_function) destroy,
00198         0,
00199         child_init  /* per-child init function */
00200 };
00201 
00205 static int mod_init(void)
00206 {
00207         load_tm_f load_tm;
00208 #ifdef HAVE_IHTTP
00209         load_ih_f load_ih;
00210 #endif
00211         int  i;
00212 
00213         DBG("XJAB:mod_init: initializing ...\n");
00214         if(!jdomain)
00215         {
00216                 LOG(L_ERR, "XJAB:mod_init: ERROR jdomain is NULL\n");
00217                 return -1;
00218         }
00219 
00220         /* import the TM auto-loading function */
00221         if ( !(load_tm=(load_tm_f)find_export("load_tm", NO_SCRIPT, 0))) {
00222                 LOG(L_ERR, "ERROR: xjab:mod_init: can't import load_tm\n");
00223                 return -1;
00224         }
00225         /* let the auto-loading function load all TM stuff */
00226         if (load_tm( &tmb )==-1)
00227                 return -1;
00228 
00229 #ifdef HAVE_IHTTP
00230         /* import the iHTTP auto-loading function */
00231         if ( !(load_ih=(load_ih_f)find_export("load_ih", IH_NO_SCRIPT_F, 0))) {
00232                 LOG(L_ERR, "ERROR:xjab:mod_init: can't import load_ih\n");
00233                 return -1;
00234         }
00235         /* let the auto-loading function load all TM stuff */
00236         if (load_ih( &ihb )==-1)
00237                 return -1;
00238 #endif
00239 
00240         pipes = (int**)pkg_malloc(nrw*sizeof(int*));
00241         if (pipes == NULL)
00242         {
00243                 LOG(L_ERR, "XJAB:mod_init:Error while allocating pipes\n");
00244                 return -1;
00245         }
00246 
00247         for(i=0; i<nrw; i++)
00248         {
00249                 pipes[i] = (int*)pkg_malloc(2*sizeof(int));
00250                 if (!pipes[i])
00251                 {
00252                         LOG(L_ERR, "XJAB:mod_init: Error while allocating pipes\n");
00253                         return -1;
00254                 }
00255         }
00256 
00259         for(i=0;i<nrw;i++)
00260         {
00261                 /* create the pipe*/
00262                 if (pipe(pipes[i])==-1) {
00263                         LOG(L_ERR, "XJAB:mod_init: error - cannot create pipe!\n");
00264                         return -1;
00265                 }
00266                 DBG("XJAB:mod_init: pipe[%d] = <%d>-<%d>\n", i, pipes[i][0],
00267                         pipes[i][1]);
00268         }
00269 
00270         if((jwl = xj_wlist_init(pipes,nrw,max_jobs,cache_time,sleep_time,
00271                                 delay_time)) == NULL)
00272         {
00273                 LOG(L_ERR, "XJAB:mod_init: error initializing workers list\n");
00274                 return -1;
00275         }
00276 
00277         if(xj_wlist_set_aliases(jwl, jaliases, jdomain, proxy) < 0)
00278         {
00279                 LOG(L_ERR, "XJAB:mod_init: error setting aliases and outbound proxy\n");
00280                 return -1;
00281         }
00282 
00283         /* register nrw + 1 number of children that will keep
00284          * updating their local configuration */
00285         cfg_register_child(nrw + 1);
00286 
00287         DBG("XJAB:mod_init: initialized ...\n");
00288         return 0;
00289 }
00290 
00291 /*
00292  * Initialize children
00293  */
00294 static int child_init(int rank)
00295 {
00296         int i, j, mpid, cpid;
00297 
00298         DBG("XJAB:child_init: initializing child <%d>\n", rank);
00299              /* Rank 0 is main process now - 1 is the first child (janakj) */
00300         if(rank == 1)
00301         {
00302 #ifdef HAVE_IHTTP
00303 
00304                 ihb.reg_f("xjab", "XMPP Gateway", IH_MENU_YES,
00305                                 xjab_mod_info, NULL);
00306                 ihb.reg_f("xjabc", "XMPP connections", IH_MENU_YES,
00307                                 xjab_connections, NULL);
00308 #endif
00309                 if((mpid=fork())<0 )
00310                 {
00311                         LOG(L_ERR, "XJAB:child_init:error - cannot launch worker's"
00312                                         " manager\n");
00313                         return -1;
00314                 }
00315                 if(mpid == 0)
00316                 {
00318                         for(i=0;i<nrw;i++)
00319                         {
00320                                 if ( (cpid=fork())<0 )
00321                                 {
00322                                         LOG(L_ERR,"XJAB:child_init:error - cannot launch worker\n");
00323                                         return -1;
00324                                 }
00325                                 if (cpid == 0)
00326                                 {
00327                                         for(j=0;j<nrw;j++)
00328                                                 if(j!=i) close(pipes[j][0]);
00329                                         close(pipes[i][1]);
00330                                         if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00331                                         {
00332                                                 LOG(L_ERR, "XJAB:child_init:error setting worker's"
00333                                                                                 " pid\n");
00334                                                 return -1;
00335                                         }
00336 
00337                                         /* initialize the config framework */
00338                                         if (cfg_child_init()) return -1;
00339 
00340                                         ctx = db_ctx("jabber");
00341                                         if (ctx == NULL) goto dberror;
00342                                         if (db_add_db(ctx, db_url) < 0) goto dberror;
00343                                         if (db_connect(ctx) < 0) goto dberror;
00344 
00345                                         cmd = db_cmd(DB_GET, ctx, db_table, db_cols, db_params, NULL);
00346                                         if (!cmd) goto dberror;
00347 
00348                                         xj_worker_process(jwl,jaddress,jport,i, cmd);
00349 
00350                                         db_cmd_free(cmd);
00351                                         db_ctx_free(ctx);
00352                                         ctx = NULL;
00353 
00354                                         /* destroy the local config */
00355                                         cfg_child_destroy();
00356 
00357                                         exit(0);
00358                                 }
00359                         }
00360 
00361                         mpid = getpid();
00362 
00363                         /* initialize the config framework */
00364                         if (cfg_child_init()) return -1;
00365 
00366                         while(1)
00367                         {
00368                                 sleep(check_time);
00369 
00370                                 /* update the local config */
00371                                 cfg_update();
00372 
00373                                 xjab_check_workers(mpid);
00374                         }
00375                 }
00376         }
00377 
00378         //if(pipes)
00379         //{
00380         //      for(i=0;i<nrw;i++)
00381         //              close(pipes[i][0]);
00382         //}
00383         return 0;
00384 
00385  dberror:
00386         if (cmd) db_cmd_free(cmd);
00387         cmd = NULL;
00388         if (ctx) db_ctx_free(ctx);
00389         ctx = NULL;
00390         return -1;
00391 }
00392 
00396 static int xj_send_message(struct sip_msg *msg, char* foo1, char * foo2)
00397 {
00398         DBG("XJAB: processing SIP MESSAGE\n");
00399         return xjab_manage_sipmsg(msg, XJ_SEND_MESSAGE);
00400 }
00401 
00405 static int xj_join_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00406 {
00407         DBG("XJAB: join a Jabber conference\n");
00408         return xjab_manage_sipmsg(msg, XJ_JOIN_JCONF);
00409 }
00410 
00414 static int xj_exit_jconf(struct sip_msg *msg, char* foo1, char * foo2)
00415 {
00416         DBG("XJAB: exit from a Jabber conference\n");
00417         return xjab_manage_sipmsg(msg, XJ_EXIT_JCONF);
00418 }
00419 
00423 static int xj_go_online(struct sip_msg *msg, char* foo1, char * foo2)
00424 {
00425         DBG("XJAB: go online in Jabber network\n");
00426         return xjab_manage_sipmsg(msg, XJ_GO_ONLINE);
00427 }
00428 
00432 static int xj_go_offline(struct sip_msg *msg, char* foo1, char * foo2)
00433 {
00434         DBG("XJAB: go offline in Jabber network\n");
00435         return xjab_manage_sipmsg(msg, XJ_GO_OFFLINE);
00436 }
00437 
00441 int xjab_manage_sipmsg(struct sip_msg *msg, int type)
00442 {
00443         str body, dst, from_uri;
00444         xj_sipmsg jsmsg;
00445         int pipe, fl;
00446         t_xj_jkey jkey, *p;
00447         int mime;
00448 
00449         body.s=0;  /* fixes gcc 4.0 warning */
00450         body.len=0;
00451         // extract message body - after that whole SIP MESSAGE is parsed
00452         if (type==XJ_SEND_MESSAGE)
00453         {
00454                 /* get the message's body */
00455                 body.s = get_body( msg );
00456                 if(body.s==0)
00457                 {
00458                         LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR cannot extract body from"
00459                                 " msg\n");
00460                         goto error;
00461                 }
00462 
00463                 /* content-length (if present) must be already parsed */
00464                 if(!msg->content_length)
00465                 {
00466                         LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR no Content-Length"
00467                                         " header found!\n");
00468                         goto error;
00469                 }
00470                 body.len = get_content_length(msg);
00471 
00472                 /* parse the content-type header */
00473                 if((mime=parse_content_type_hdr(msg))<1)
00474                 {
00475                         LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR cannot parse"
00476                                         " Content-Type header\n");
00477                         goto error;
00478                 }
00479 
00480                 /* check the content-type value */
00481                 if(mime!=(TYPE_TEXT<<16)+SUBTYPE_PLAIN
00482                         && mime!=(TYPE_MESSAGE<<16)+SUBTYPE_CPIM)
00483                 {
00484                         LOG(L_ERR,"XJAB:xjab_manage_sipmsg: ERROR invalid content-type for"
00485                                 " a message request! type found=%d\n", mime);
00486                         goto error;
00487                 }
00488         }
00489 
00490         // check for TO and FROM headers - if is not SIP MESSAGE
00491         if(parse_headers( msg, HDR_TO_F|HDR_FROM_F, 0)==-1 || !msg->to
00492                         || !msg->from)
00493         {
00494                 LOG(L_ERR,"XJAB:xjab_manage_sipmsg: cannot find TO or FROM HEADERS!\n");
00495                 goto error;
00496         }
00497 
00498         /* parsing from header */
00499         if ( parse_from_header( msg )==-1 || msg->from->parsed==NULL)
00500         {
00501                 DBG("ERROR:xjab_manage_sipmsg: cannot get FROM header\n");
00502                 goto error;
00503         }
00504         from_uri.s = ((struct to_body*)msg->from->parsed)->uri.s;
00505         from_uri.len = ((struct to_body*)msg->from->parsed)->uri.len;
00506         if(xj_extract_aor(&from_uri, 0))
00507         {
00508                 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR from FROM header\n");
00509                 goto error;
00510         }
00511 
00512         jkey.hash = xj_get_hash(&from_uri, NULL);
00513         jkey.id = &from_uri;
00514         // get the communication pipe with the worker
00515         switch(type)
00516         {
00517                 case XJ_SEND_MESSAGE:
00518                 case XJ_JOIN_JCONF:
00519                 case XJ_GO_ONLINE:
00520                         if((pipe = xj_wlist_get(jwl, &jkey, &p)) < 0)
00521                         {
00522                                 DBG("XJAB:xjab_manage_sipmsg: cannot find pipe of the worker!\n");
00523                                 goto error;
00524                         }
00525                 break;
00526                 case XJ_EXIT_JCONF:
00527                 case XJ_GO_OFFLINE:
00528                         if((pipe = xj_wlist_check(jwl, &jkey, &p)) < 0)
00529                         {
00530                                 DBG("XJAB:xjab_manage_sipmsg: no open Jabber session for"
00531                                                 " <%.*s>!\n", from_uri.len, from_uri.s);
00532                                 goto error;
00533                         }
00534                 break;
00535                 default:
00536                         DBG("XJAB:xjab_manage_sipmsg: ERROR:strange SIP msg type!\n");
00537                         goto error;
00538         }
00539 
00540         // if is for going ONLINE/OFFLINE we do not need the destination
00541         if(type==XJ_GO_ONLINE || type==XJ_GO_OFFLINE)
00542                 goto prepare_job;
00543 
00544         // determination of destination
00545         // - try to get it from new_uri, r-uri or to hdr, but check it against
00546         // jdomain and aliases
00547         dst.len = 0;
00548         if( msg->new_uri.len > 0)
00549         {
00550                 dst.s = msg->new_uri.s;
00551                 dst.len = msg->new_uri.len;
00552                 if(xj_wlist_check_aliases(jwl, &dst))
00553                         dst.len = 0;
00554 #ifdef XJ_EXTRA_DEBUG
00555                 else
00556                         DBG("XJAB:xjab_manage_sipmsg: using NEW URI for destination\n");
00557 #endif
00558         }
00559 
00560         if (dst.len == 0 &&  msg->first_line.u.request.uri.s != NULL
00561                         && msg->first_line.u.request.uri.len > 0 )
00562         {
00563                 dst.s = msg->first_line.u.request.uri.s;
00564                 dst.len = msg->first_line.u.request.uri.len;
00565                 if(xj_wlist_check_aliases(jwl, &dst))
00566                         dst.len = 0;
00567 #ifdef XJ_EXTRA_DEBUG
00568                 else
00569                         DBG("XJAB:xjab_manage_sipmsg: using R-URI for destination\n");
00570 #endif
00571         }
00572 
00573         if(dst.len == 0 && msg->to->parsed)
00574         {
00575                 dst.s = ((struct to_body*)msg->to->parsed)->uri.s;
00576                 dst.len = ((struct to_body*)msg->to->parsed)->uri.len;
00577                 if(dst.s == NULL || xj_wlist_check_aliases(jwl, &dst))
00578                         dst.len = 0;
00579 #ifdef XJ_EXTRA_DEBUG
00580                 else
00581                         DBG("XJAB:xjab_manage_sipmsg: using TO-URI for destination\n");
00582 #endif
00583         }
00584 
00585         if(dst.len == 0)
00586         {
00587                 DBG("XJAB:xjab_manage_sipmsg: destination not found in SIP message\n");
00588                 goto error;
00589         }
00590 
00592         if(xj_extract_aor(&dst, 1))
00593         {
00594                 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR for destination\n");
00595                 goto error;
00596         }
00597 #ifdef XJ_EXTRA_DEBUG
00598         DBG("XJAB:xjab_manage_sipmsg: DESTINATION after correction [%.*s].\n",
00599                                 dst.len, dst.s);
00600 #endif
00601 
00602 prepare_job:
00603         //putting the SIP message parts in share memory to be accessible by workers
00604     jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00605         memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00606     if(jsmsg == NULL)
00607         return -1;
00608 
00609         switch(type)
00610         {
00611                 case XJ_SEND_MESSAGE:
00612                         jsmsg->msg.len = body.len;
00613                         if((jsmsg->msg.s = (char*)shm_malloc(jsmsg->msg.len+1)) == NULL)
00614                         {
00615                                 shm_free(jsmsg);
00616                                 goto error;
00617                         }
00618                         strncpy(jsmsg->msg.s, body.s, jsmsg->msg.len);
00619                 break;
00620                 case XJ_GO_ONLINE:
00621                 case XJ_GO_OFFLINE:
00622                         dst.len = 0;
00623                         dst.s = 0;
00624                 case XJ_JOIN_JCONF:
00625                 case XJ_EXIT_JCONF:
00626                         jsmsg->msg.len = 0;
00627                         jsmsg->msg.s = NULL;
00628                 break;
00629                 default:
00630                         DBG("XJAB:xjab_manage_sipmsg: this SHOULD NOT appear\n");
00631                         shm_free(jsmsg);
00632                         goto error;
00633         }
00634         if(dst.len>0)
00635         {
00636                 jsmsg->to.len = dst.len;
00637                 if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1))==NULL)
00638                 {
00639                         if(type == XJ_SEND_MESSAGE)
00640                                 shm_free(jsmsg->msg.s);
00641                         shm_free(jsmsg);
00642                         goto error;
00643                 }
00644                 strncpy(jsmsg->to.s, dst.s, jsmsg->to.len);
00645         }
00646         else
00647         {
00648                 jsmsg->to.len = 0;
00649                 jsmsg->to.s   = 0;
00650         }
00651 
00652         jsmsg->jkey = p;
00653         jsmsg->type = type;
00654         //jsmsg->jkey->hash = jkey.hash;
00655 
00656         DBG("XJAB:xjab_manage_sipmsg:%d: sending <%p> to worker through <%d>\n",
00657                         getpid(), jsmsg, pipe);
00658         // sending the SHM pointer of SIP message to the worker
00659         fl = write(pipe, &jsmsg, sizeof(jsmsg));
00660         if(fl != sizeof(jsmsg))
00661         {
00662                 DBG("XJAB:xjab_manage_sipmsg: error when writing to worker pipe!\n");
00663                 if(type == XJ_SEND_MESSAGE)
00664                         shm_free(jsmsg->msg.s);
00665                 shm_free(jsmsg->to.s);
00666                 shm_free(jsmsg);
00667                 goto error;
00668         }
00669 
00670         return 1;
00671 error:
00672         return -1;
00673 }
00674 
00678 static void destroy(void)
00679 {
00680         int i;
00681 #ifdef XJ_EXTRA_DEBUG
00682         DBG("XJAB: Unloading module ...\n");
00683 #endif
00684         if(pipes)
00685         { // close the pipes
00686                 for(i = 0; i < nrw; i++)
00687                 {
00688                         if(pipes[i])
00689                         {
00690                                 close(pipes[i][0]);
00691                                 close(pipes[i][1]);
00692                         }
00693                         pkg_free(pipes[i]);
00694                 }
00695                 pkg_free(pipes);
00696         }
00697 
00698         if (ctx) db_ctx_free(ctx);
00699         ctx = NULL;
00700         
00701         xj_wlist_free(jwl);
00702         DBG("XJAB: Unloaded ...\n");
00703 }
00704 
00708 void xj_register_watcher(str *from, str *to, void *cbf, void *pp)
00709 {
00710         xj_sipmsg jsmsg = NULL;
00711         t_xj_jkey jkey, *jp;
00712         int pipe, fl;
00713         str from_uri, to_uri;
00714 
00715         if(!to || !from || !cbf)
00716                 return;
00717 
00718 #ifdef XJ_EXTRA_DEBUG
00719         DBG("XJAB:xj_register_watcher: from=[%.*s] to=[%.*s]\n", from->len,
00720             from->s, to->len, to->s);
00721 #endif
00722         from_uri.s = from->s;
00723         from_uri.len = from->len;
00724         if(xj_extract_aor(&from_uri, 0))
00725         {
00726                 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR from FROM header\n");
00727                 goto error;
00728         }
00729 
00730         jkey.hash = xj_get_hash(&from_uri, NULL);
00731         jkey.id = &from_uri;
00732 
00733         if((pipe = xj_wlist_get(jwl, &jkey, &jp)) < 0)
00734         {
00735                 DBG("XJAB:xj_register_watcher: cannot find pipe of the worker!\n");
00736                 goto error;
00737         }
00738 
00739         //putting the SIP message parts in share memory to be accessible by workers
00740         jsmsg = (xj_sipmsg)shm_malloc(sizeof(t_xj_sipmsg));
00741         memset(jsmsg, 0, sizeof(t_xj_sipmsg));
00742         if(jsmsg == NULL)
00743                 goto error;
00744 
00745         jsmsg->msg.len = 0;
00746         jsmsg->msg.s = NULL;
00747 
00748         to_uri.s = to->s;
00749         to_uri.len = to->len;
00751         if(xj_extract_aor(&to_uri, 1))
00752         {
00753                 DBG("ERROR:xjab_manage_sipmsg: cannot get AoR for destination\n");
00754                 goto error;
00755         }
00756 #ifdef XJ_EXTRA_DEBUG
00757         DBG("XJAB:xj_register_watcher: DESTINATION after correction [%.*s].\n",
00758             to_uri.len, to_uri.s);
00759 #endif
00760 
00761         jsmsg->to.len = to_uri.len;
00762         if((jsmsg->to.s = (char*)shm_malloc(jsmsg->to.len+1)) == NULL)
00763         {
00764                 if(jsmsg->msg.s)
00765                         shm_free(jsmsg->msg.s);
00766                 shm_free(jsmsg);
00767                 goto error;
00768         }
00769         strncpy(jsmsg->to.s, to_uri.s, jsmsg->to.len);
00770         jsmsg->to.s[jsmsg->to.len] = '\0';
00771 
00772         jsmsg->jkey = jp;
00773         jsmsg->type = XJ_REG_WATCHER;
00774         //jsmsg->jkey->hash = jkey.hash;
00775 
00776         jsmsg->cbf = (pa_callback_f)cbf;
00777         jsmsg->p = pp;
00778 
00779 #ifdef XJ_EXTRA_DEBUG
00780         DBG("XJAB:xj_register_watcher:%d: sending <%p> to worker through <%d>\n",
00781             getpid(), jsmsg, pipe);
00782 #endif
00783         // sending the SHM pointer of SIP message to the worker
00784         fl = write(pipe, &jsmsg, sizeof(jsmsg));
00785         if(fl != sizeof(jsmsg))
00786         {
00787                 DBG("XJAB:xj_register_watcher: error when writing to worker pipe!\n");
00788                 if(jsmsg->msg.s)
00789                         shm_free(jsmsg->msg.s);
00790                 shm_free(jsmsg->to.s);
00791                 shm_free(jsmsg);
00792                 goto error;
00793         }
00794 
00795  error:
00796         return;
00797 }
00798 
00802 void xj_unregister_watcher(str *from, str *to, void *cbf, void *pp)
00803 {
00804         if(!to || !from)
00805                 return;
00806 }
00807 
00812 void xjab_check_workers(int mpid)
00813 {
00814         int i, n, stat;
00815         //DBG("XJAB:%d:xjab_check_workers: time=%d\n", mpid, get_ticks());
00816         if(!jwl || jwl->len <= 0)
00817                 return;
00818         for(i=0; i < jwl->len; i++)
00819         {
00820                 if(jwl->workers[i].pid > 0)
00821                 {
00822                         stat = 0;
00823                         n = waitpid(jwl->workers[i].pid, &stat, WNOHANG);
00824                         if(n == 0 || n!=jwl->workers[i].pid)
00825                                 continue;
00826 
00827                         LOG(L_ERR,"XJAB:xjab_check_workers: worker[%d][pid=%d] has exited"
00828                                 " - status=%d err=%d errno=%d\n", i, jwl->workers[i].pid,
00829                                 stat, n, errno);
00830                         xj_wlist_clean_jobs(jwl, i, 1);
00831                         xj_wlist_set_pid(jwl, -1, i);
00832                 }
00833 
00834 #ifdef XJ_EXTRA_DEBUG
00835                 DBG("XJAB:%d:xjab_check_workers: create a new worker[%d]\n", mpid, i);
00836 #endif
00837                 if ( (stat=fork())<0 )
00838                 {
00839 #ifdef XJ_EXTRA_DEBUG
00840                         DBG("XJAB:xjab_check_workers: error - cannot launch new"
00841                                 " worker[%d]\n", i);
00842 #endif
00843                         LOG(L_ERR, "XJAB:xjab_check_workers: error - worker[%d] lost"
00844                                 " forever \n", i);
00845                         return;
00846                 }
00847                 if (stat == 0)
00848                 {
00849                         if(xj_wlist_set_pid(jwl, getpid(), i) < 0)
00850                         {
00851                                 LOG(L_ERR, "XJAB:xjab_check_workers: error setting new"
00852                                         " worker's pid - w[%d]\n", i);
00853                                 return;
00854                         }
00855 
00856 
00857                         /* initialize the config framework
00858                          * The child process was not registered under
00859                          * the framework during mod_init, therefore the
00860                          * late version needs to be called. (Miklos) */
00861                         if (cfg_late_child_init()) return;
00862 
00863                         ctx = db_ctx("jabber");
00864                         if (ctx == NULL) goto dberror;
00865                         if (db_add_db(ctx, db_url) < 0) goto dberror;
00866                         if (db_connect(ctx) < 0) goto dberror;
00867 
00868                         cmd = db_cmd(DB_GET, ctx, db_table, db_cols, db_params, NULL);
00869                         if (!cmd) goto dberror;
00870                         
00871                         xj_worker_process(jwl,jaddress,jport,i, cmd);
00872 
00873                         db_cmd_free(cmd);
00874                         db_ctx_free(ctx);
00875                         ctx = NULL;
00876 
00877                         /* destroy the local config */
00878                         cfg_child_destroy();
00879 
00880                         exit(0);
00881                 }
00882         }
00883         
00884  dberror:
00885         if (cmd) db_cmd_free(cmd);
00886         cmd = NULL;
00887         if (ctx) db_ctx_free(ctx);
00888         ctx = NULL;
00889 }
00890 
00891 #ifdef HAVE_IHTTP
00892 
00896 int xjab_mod_info(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00897                 char *_hb, int *_hl)
00898 {
00899         if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00900                 return -1;
00901         *_hl = 0;
00902         *_hb = 0;
00903 
00904         strcpy(_bb, "<h4>SER2Jabber Gateway</h4>");
00905         strcat(_bb, "<br>Module parameters:<br>");
00906         strcat(_bb, "<br> -- db table = ");
00907         strcat(_bb, db_table);
00908         strcat(_bb, "<br> -- workers = ");
00909         strcat(_bb, int2str(nrw, NULL));
00910         strcat(_bb, "<br> -- max jobs per worker = ");
00911         strcat(_bb, int2str(max_jobs, NULL));
00912 
00913         strcat(_bb, "<br> -- jabber server address = ");
00914         strcat(_bb, jaddress);
00915         strcat(_bb, "<br> -- jabber server port = ");
00916         strcat(_bb, int2str(jport, NULL));
00917 
00918         strcat(_bb, "<br> -- aliases = ");
00919         strcat(_bb, (jaliases)?jaliases:"NULL");
00920         strcat(_bb, "<br> -- jabber domain = ");
00921         strcat(_bb, (jdomain)?jdomain:"NULL");
00922         strcat(_bb, "<br> -- proxy address = ");
00923         strcat(_bb, (proxy)?proxy:"NULL");
00924 
00925         strcat(_bb, "<br> -- delay time = ");
00926         strcat(_bb, int2str(delay_time, NULL));
00927         strcat(_bb, "<br> -- sleep time = ");
00928         strcat(_bb, int2str(sleep_time, NULL));
00929         strcat(_bb, "<br> -- cache time = ");
00930         strcat(_bb, int2str(cache_time, NULL));
00931         strcat(_bb, "<br> -- check time = ");
00932         strcat(_bb, int2str(check_time, NULL));
00933 
00934         *_bl = strlen(_bb);
00935 
00936         return 0;
00937 }
00938 
00944 int xjab_connections(ih_req_p _irp, void *_p, char *_bb, int *_bl,
00945                 char *_hb, int *_hl)
00946 {
00947         t_xj_jkey jkey, *p;
00948         str _u;
00949         ih_param_p _ipp = NULL;
00950         int idx, i, maxcount;
00951         char *cp;
00952 
00953         if(!_irp || !_bb || !_bl || *_bl <= 0 || !_hb || !_hl || *_hl <= 0)
00954                 return -1;
00955 
00956         *_hl = 0;
00957         *_hb = 0;
00958         idx = -1;
00959         strcpy(_bb, "<h4>Active XMPP connections</h4>");
00960 
00961         if(_irp->params)
00962         {
00963                 strcat(_bb, "<br><b>Close action is alpha release!</b><br>");
00964                 _ipp = _irp->params;
00965                 i = 0;
00966                 while(_ipp)
00967                 {
00968                         switch(_ipp->name[0])
00969                         {
00970                                 case 'w':
00971                                         idx = 0;
00972                                         cp = _ipp->value;
00973                                         while(*cp && *cp>='0' && *cp<='9')
00974                                         {
00975                                                 idx = idx*10 + *cp-'0';
00976                                                 cp++;
00977                                         }
00978                                         i++;
00979                                 break;
00980                                 case 'u':
00981                                         _u.s = _ipp->value;
00982                                         _u.len = strlen(_ipp->value);
00983                                         jkey.id = &_u;
00984                                         i++;
00985                                 break;
00986                                 case 'i':
00987                                         jkey.hash = 0;
00988                                         cp = _ipp->value;
00989                                         while(*cp && *cp>='0' && *cp<='9')
00990                                         {
00991                                                 jkey.hash = jkey.hash*10 + *cp-'0';
00992                                                 cp++;
00993                                         }
00994                                         i++;
00995                                 break;
00996 
00997                         }
00998                         _ipp = _ipp->next;
00999                 }
01000                 if(i!=3 || idx < 0 || idx >= jwl->len)
01001                 {
01002                         strcat(_bb, "<br><b><i>Bad parameters!</i></b>\n");
01003                 }
01004                 else
01005                 {
01006                         strcat(_bb, "<br><b><i>The connection of [");
01007                         strcat(_bb, _u.s);
01008 
01009                         if(xj_wlist_set_flag(jwl, &jkey, XJ_FLAG_CLOSE) < 0)
01010                                 strcat(_bb, "] does not exist!</i></b>\n");
01011                         else
01012                                 strcat(_bb, "] was scheduled for closing!</i></b>\n");
01013                 }
01014                 *_bl = strlen(_bb);
01015 
01016                 return 0;
01017         }
01018 
01019         if(jwl!=NULL && jwl->len > 0 && jwl->workers!=NULL)
01020         {
01021                 for(idx=0; idx<jwl->len; idx++)
01022                 {
01023                         strcat(_bb, "<br><b><i>Worker[");
01024                         strcat(_bb, int2str(idx, NULL));
01025                         strcat(_bb, "]</i></b> &nbsp;&nbsp;pid=");
01026                         strcat(_bb, int2str(jwl->workers[idx].pid, NULL));
01027                         strcat(_bb, " &nbsp;&nbsp;nr of jobs=");
01028                         strcat(_bb, int2str(jwl->workers[idx].nr, NULL));
01029                         if(!jwl->workers[idx].sip_ids)
01030                                 continue;
01031                         lock_set_get(jwl->sems, idx);
01032                         maxcount = count234(jwl->workers[idx].sip_ids);
01033                         for (i = 0; i < maxcount; i++)
01034                         {
01035                                 p = (xj_jkey)index234(jwl->workers[idx].sip_ids, i);
01036                                 if(p == NULL)
01037                                         continue;
01038                                 strcat(_bb, "<br>&nbsp;&nbsp;&nbsp;");
01039                                 strcat(_bb, int2str(i, NULL));
01040                                 strcat(_bb, ".&nbsp;&nbsp;&nbsp;");
01041                                 strcat(_bb, "<a href=\"xjabc?w=");
01042                                 strcat(_bb, int2str(idx, NULL));
01043                                 strcat(_bb, "&i=");
01044                                 strcat(_bb, int2str(p->hash, NULL));
01045                                 strcat(_bb, "&u=");
01046                                 strncat(_bb, p->id->s, p->id->len);
01047                                 strcat(_bb, "\">close</a>");
01048                                 strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01049                                 strcat(_bb, int2str(p->hash, NULL));
01050                                 strcat(_bb, "&nbsp;&nbsp;&nbsp;");
01051                                 strncat(_bb, p->id->s, p->id->len);
01052                         }
01053                         lock_set_release(jwl->sems, idx);
01054                 }
01055         }
01056 
01057         *_bl = strlen(_bb);
01058 
01059         return 0;
01060 }
01061 
01062 #endif // HAVE_IHTTP