modules_k/presence/notify.c

Go to the documentation of this file.
00001 /*
00002  * $Id$
00003  *
00004  * presence module- presence server implementation
00005  *
00006  * Copyright (C) 2006 Voice Sistem S.R.L.
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software 
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * History:
00025  * --------
00026  *  2006-08-15  initial version (Anca Vamanu)
00027  */
00028 
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <string.h>
00038 #include <libxml/parser.h>
00039 
00040 #include "../../trim.h"
00041 #include "../../ut.h"
00042 #include "../../globals.h"
00043 #include "../../str.h"
00044 #include "../../lib/srdb1/db.h"
00045 #include "../../lib/srdb1/db_val.h"
00046 #include "../../hashes.h"
00047 #include "../../socket_info.h"
00048 #include "../../modules/tm/tm_load.h"
00049 #include "../pua/hash.h"
00050 #include "presentity.h"
00051 #include "presence.h"
00052 #include "notify.h"
00053 #include "utils_func.h"
00054 
00055 #define ALLOC_SIZE 3000
00056 #define MAX_FORWARD 70
00057 
00058 c_back_param* shm_dup_cbparam(subs_t*);
00059 void free_cbparam(c_back_param* cb_param);
00060 
00061 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps);
00062 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
00063 int add_watcher_list(subs_t *s, watcher_t *watchers);
00064 str* create_winfo_xml(watcher_t* watchers, char* version,
00065                 str resource, str event, int STATE_FLAG);
00066 void free_watcher_list(watcher_t* watchers);
00067 
00068 str str_to_user_col = str_init("to_user");
00069 str str_username_col = str_init("username");
00070 str str_domain_col = str_init("domain");
00071 str str_body_col = str_init("body");
00072 str str_to_domain_col = str_init("to_domain");
00073 str str_from_user_col = str_init("from_user");
00074 str str_from_domain_col = str_init("from_domain");
00075 str str_watcher_username_col = str_init("watcher_username");
00076 str str_watcher_domain_col = str_init("watcher_domain");
00077 str str_event_id_col = str_init("event_id");
00078 str str_event_col = str_init("event");
00079 str str_etag_col = str_init("etag");
00080 str str_from_tag_col = str_init("from_tag");
00081 str str_to_tag_col = str_init("to_tag");
00082 str str_callid_col = str_init("callid");
00083 str str_local_cseq_col = str_init("local_cseq");
00084 str str_remote_cseq_col = str_init("remote_cseq");
00085 str str_record_route_col = str_init("record_route");
00086 str str_contact_col = str_init("contact");
00087 str str_expires_col = str_init("expires");
00088 str str_status_col = str_init("status");
00089 str str_reason_col = str_init("reason");
00090 str str_socket_info_col = str_init("socket_info");
00091 str str_local_contact_col = str_init("local_contact");
00092 str str_version_col = str_init("version");
00093 str str_presentity_uri_col = str_init("presentity_uri");
00094 str str_inserted_time_col = str_init("inserted_time");
00095 str str_received_time_col = str_init("received_time");
00096 str str_id_col = str_init("id");
00097 str str_sender_col = str_init("sender");
00098 str str_updated_col = str_init("updated");
00099 str str_updated_winfo_col = str_init("updated_winfo");
00100 
00101 int subset=0;
00102 
00103 char* get_status_str(int status_flag)
00104 {
00105         switch(status_flag)
00106         {
00107                 case ACTIVE_STATUS: return "active";
00108                 case PENDING_STATUS: return "pending";
00109                 case TERMINATED_STATUS: return "terminated";
00110                 case WAITING_STATUS: return "waiting";
00111         }
00112         return NULL;
00113 }
00114 
00115 void printf_subs(subs_t* subs)
00116 {
00117         LM_DBG("pres_uri: %.*s\n", subs->pres_uri.len, subs->pres_uri.s);
00118         LM_DBG("watcher_user@watcher_domain: %.*s@%.*s\n", subs->watcher_user.len, subs->watcher_user.s, subs->watcher_domain.len, subs->watcher_domain.s);
00119         LM_DBG("to_user@to_domain: %.*s@%.*s\n", subs->to_user.len, subs->to_user.s, subs->to_domain.len, subs->to_domain.s);
00120         LM_DBG("from_user@from_domain: %.*s@%.*s\n", subs->from_user.len, subs->from_user.s, subs->from_domain.len, subs->from_domain.s);
00121         LM_DBG("callid/from_tag/to_tag: %.*s/%.*s/%.*s\n", subs->callid.len, subs->callid.s, subs->from_tag.len, subs->from_tag.s, subs->to_tag.len, subs->to_tag.s);
00122         LM_DBG("local_cseq/remote_cseq: %u/%u\n", subs->local_cseq, subs->remote_cseq);
00123         LM_DBG("local_contact/contact: %.*s/%.*s\n", subs->local_contact.len, subs->local_contact.s, subs->contact.len, subs->contact.s);
00124         LM_DBG("record_route: %.*s\n", subs->record_route.len, subs->record_route.s);
00125         LM_DBG("sockinfo_str: %.*s\n", subs->sockinfo_str.len, subs->sockinfo_str.s);
00126         
00127         LM_DBG("event: %.*s\n", subs->event->name.len, subs->event->name.s);
00128         LM_DBG("status: %s\n", get_status_str(subs->status));
00129         LM_DBG("reason: %.*s\n", subs->reason.len, subs->reason.s);
00130         LM_DBG("version: %u\n", subs->version);
00131         LM_DBG("expires: %u\n", subs->expires);
00132 
00133         LM_DBG("updated/updated_winfo: %d/%d\n", subs->updated, subs->updated_winfo);
00134 }
00135 
00136 int build_str_hdr(subs_t* subs, int is_body, str* hdr)
00137 {
00138         pres_ev_t* event= subs->event;
00139         str expires = {0, 0};
00140         str status = {0, 0};
00141         char* p;
00142         str trans = {";transport=", 11};
00143 
00144         if(hdr == NULL) {
00145                 LM_ERR("bad parameter\n");
00146                 return -1;
00147         }
00148         expires.s = int2str(subs->expires, &expires.len);
00149         
00150         status.s= get_status_str(subs->status);
00151         if(status.s == NULL) {
00152                 LM_ERR("bad status %d\n", subs->status);
00153                 return -1;
00154         }
00155         status.len = strlen(status.s);
00156 
00157         hdr->len = 18 /*Max-Forwards:  + val*/ + CRLF_LEN + 
00158                 7 /*Event: */ + subs->event->name.len +4 /*;id=*/+ subs->event_id.len+
00159                 CRLF_LEN + 10 /*Contact: <*/ + subs->local_contact.len + 1/*>*/ +
00160                 15/*";transport=xxxx"*/ + CRLF_LEN + 20 /*Subscription-State: */ +
00161                 status.len + 10 /*reason/expires params*/
00162                 + (subs->reason.len>expires.len?subs->reason.len:expires.len)
00163                 + CRLF_LEN + (is_body?
00164                 (14 /*Content-Type: */+subs->event->content_type.len + CRLF_LEN):0) + 1;
00165 
00166         hdr->s = (char*)pkg_malloc(hdr->len);
00167         if(hdr->s == NULL) {
00168                 LM_ERR("no more pkg memory\n");
00169                 return -1;
00170         }
00171 
00172         p = hdr->s;
00173         p += sprintf(p, "Max-Forwards: %d\r\n", MAX_FORWARD);
00174 
00175         p += sprintf(p  ,"Event: %.*s", event->name.len, event->name.s);
00176         if(subs->event_id.len && subs->event_id.s) {
00177                 p += sprintf(p, ";id=%.*s", subs->event_id.len, subs->event_id.s);
00178         }
00179         memcpy(p, CRLF, CRLF_LEN);
00180         p += CRLF_LEN;
00181 
00182         p += sprintf(p, "Contact: <%.*s", subs->local_contact.len, subs->local_contact.s);
00183         if(subs->sockinfo_str.s!=NULL
00184                         && str_search(&subs->local_contact, &trans)==0) {
00185                 /* fix me */
00186                 switch(subs->sockinfo_str.s[0]) {
00187                         case 's':
00188                         case 'S':
00189                                 memcpy(p, ";transport=sctp", 15);
00190                                 p += 15;
00191                         break;
00192                         case 't':
00193                         case 'T':
00194                                 switch(subs->sockinfo_str.s[1]) {
00195                                         case 'c':
00196                                         case 'C':
00197                                                 memcpy(p, ";transport=tcp", 14);
00198                                                 p += 14;
00199                                         break;
00200                                         case 'l':
00201                                         case 'L':
00202                                                 memcpy(p, ";transport=tls", 14);
00203                                                 p += 14;
00204                                         break;
00205                                 }
00206                         break;
00207                 }
00208         }
00209         *p =  '>';
00210         p++;
00211         memcpy(p, CRLF, CRLF_LEN);
00212         p += CRLF_LEN;
00213 
00214         p += sprintf(p, "Subscription-State: %.*s", status.len, status.s);
00215 
00216         if(subs->status == TERMINATED_STATUS) {
00217                 LM_DBG("state = terminated\n");
00218                 p += sprintf(p, ";reason=%.*s", subs->reason.len, subs->reason.s);
00219         } else {
00220                 p += sprintf(p, ";expires=%.*s", expires.len, expires.s);
00221         }
00222         memcpy(p, CRLF, CRLF_LEN);
00223         p += CRLF_LEN;
00224 
00225         if(is_body) {
00226                 p += sprintf(p,"Content-Type: %.*s\r\n", event->content_type.len,
00227                                 event->content_type.s);
00228         }
00229         
00230         *p = '\0';
00231         hdr->len = p - hdr->s;
00232 
00233         return 0;
00234 }
00235 
00236 int get_wi_subs_db(subs_t* subs, watcher_t* watchers)
00237 {       
00238         subs_t sb;
00239         db_key_t query_cols[3];
00240         db_op_t  query_ops[3];
00241         db_val_t query_vals[3];
00242         db_key_t result_cols[5];
00243         db1_res_t *result = NULL;
00244         db_row_t *row = NULL ;  
00245         db_val_t *row_vals = NULL;
00246         int n_result_cols = 0;
00247         int n_query_cols = 0;
00248         int i;
00249         int status_col, expires_col, watcher_user_col, watcher_domain_col, callid_col;
00250 
00251         query_cols[n_query_cols] = &str_presentity_uri_col;
00252         query_ops[n_query_cols] = OP_EQ;
00253         query_vals[n_query_cols].type = DB1_STR;
00254         query_vals[n_query_cols].nul = 0;
00255         query_vals[n_query_cols].val.str_val= subs->pres_uri;
00256         n_query_cols++;
00257 
00258         query_cols[n_query_cols] = &str_event_col;
00259         query_ops[n_query_cols] = OP_EQ;
00260         query_vals[n_query_cols].type = DB1_STR;
00261         query_vals[n_query_cols].nul = 0;
00262         query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
00263         n_query_cols++;
00264 
00265         query_cols[n_query_cols] = &str_expires_col;
00266         query_ops[n_query_cols] = OP_GT;
00267         query_vals[n_query_cols].type = DB1_INT;
00268         query_vals[n_query_cols].nul = 0;
00269         query_vals[n_query_cols].val.int_val = (int)time(NULL) - expires_offset;
00270         n_query_cols++;
00271 
00272         result_cols[status_col=n_result_cols++] = &str_status_col;
00273         result_cols[expires_col=n_result_cols++] = &str_expires_col;
00274         result_cols[watcher_user_col=n_result_cols++] = &str_watcher_username_col;
00275         result_cols[watcher_domain_col=n_result_cols++] = &str_watcher_domain_col;
00276         result_cols[callid_col=n_result_cols++] = &str_callid_col;
00277 
00278         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
00279         {
00280                 LM_ERR("in use_table\n");
00281                 goto error;
00282         }
00283 
00284         if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
00285                  result_cols, n_query_cols, n_result_cols, 0,  &result) < 0) 
00286         {
00287                 LM_ERR("querying active_watchers db table\n");
00288                 goto error;
00289         }
00290 
00291         if(result== NULL )
00292         {
00293                 goto error;
00294         }
00295 
00296         if(result->n <= 0)
00297         {
00298                 LM_DBG("The query in db table for active subscription"
00299                                 " returned no result\n");
00300                 pa_dbf.free_result(pa_db, result);
00301                 return 0;
00302         }
00303         
00304         for(i=0; i<result->n; i++)
00305         {
00306                 row = &result->rows[i];
00307                 row_vals = ROW_VALUES(row);
00308                 
00309                 sb.watcher_user.s= (char*)row_vals[watcher_user_col].val.string_val;
00310                 sb.watcher_user.len= strlen(sb.watcher_user.s);
00311 
00312                 sb.watcher_domain.s= (char*)row_vals[watcher_domain_col].val.string_val;
00313                 sb.watcher_domain.len= strlen(sb.watcher_domain.s);
00314 
00315                 sb.callid.s= (char*)row_vals[callid_col].val.string_val;
00316                 sb.callid.len= strlen(sb.callid.s);
00317 
00318                 sb.event =subs->event->wipeer;
00319                 sb.status= row_vals[status_col].val.int_val;
00320                 
00321                 if(add_watcher_list(&sb, watchers)<0)
00322                         goto error;
00323         }
00324         
00325         pa_dbf.free_result(pa_db, result);
00326         return 0;
00327 
00328 error:
00329         if(result)
00330                 pa_dbf.free_result(pa_db, result);
00331         return -1;
00332 }
00333 
00334 str* get_wi_notify_body(subs_t* subs, subs_t* watcher_subs)
00335 {
00336         str* notify_body = NULL;
00337         char* version_str;
00338         watcher_t *watchers = NULL;
00339         int len = 0;
00340         unsigned int hash_code;
00341         subs_t* s= NULL;
00342         int state = FULL_STATE_FLAG;
00343         unsigned int now = (int)time(NULL);
00344 
00345         hash_code = 0;
00346         version_str = int2str(subs->version, &len);
00347         if(version_str ==NULL)
00348         {
00349                 LM_ERR("converting int to str\n ");
00350                 goto error;
00351         }
00352 
00353         watchers= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00354         if(watchers== NULL)
00355         {
00356                 ERR_MEM(PKG_MEM_STR);
00357         }
00358         memset(watchers, 0, sizeof(watcher_t));
00359 
00360         if(watcher_subs != NULL)
00361         {
00362                 if(add_watcher_list(watcher_subs, watchers)< 0)
00363                         goto error;
00364                 state = PARTIAL_STATE_FLAG;
00365 
00366                 goto done;
00367         }
00368 
00369         if(subs_dbmode == DB_ONLY)
00370         {
00371                 if(get_wi_subs_db(subs, watchers)< 0)
00372                 {
00373                         LM_ERR("getting watchers from database\n");
00374                         goto error;
00375                 }
00376         } else {
00377                 hash_code= core_hash(&subs->pres_uri, &subs->event->wipeer->name,
00378                                 shtable_size);
00379                 lock_get(&subs_htable[hash_code].lock);
00380                 s= subs_htable[hash_code].entries;
00381                 while(s->next)
00382                 {
00383                         s= s->next;
00384 
00385                         if(s->expires< now)
00386                         {
00387                                 LM_DBG("expired record\n");
00388                                 continue;
00389                         }
00390 
00391                         if(s->event== subs->event->wipeer &&
00392                                 s->pres_uri.len== subs->pres_uri.len &&
00393                                 strncmp(s->pres_uri.s, subs->pres_uri.s,subs->pres_uri.len)== 0)
00394                         {
00395                                 if(add_watcher_list(s, watchers)< 0)
00396                                 {
00397                                         lock_release(&subs_htable[hash_code].lock);
00398                                         goto error;
00399                                 }
00400                         }
00401                 }
00402                 lock_release(&subs_htable[hash_code].lock);
00403 
00404                 if(add_waiting_watchers(watchers, subs->pres_uri,
00405                                         subs->event->wipeer->name)< 0 )
00406                 {
00407                         LM_ERR("failed to add waiting watchers\n");
00408                         goto error;
00409                 }
00410         }
00411 
00412 done:
00413         notify_body = create_winfo_xml(watchers,version_str,subs->pres_uri,
00414                         subs->event->wipeer->name, state);
00415         if(notify_body== NULL)
00416         {
00417                 LM_ERR("in function create_winfo_xml\n");
00418                 goto error;
00419         }
00420         free_watcher_list(watchers);
00421         return notify_body;
00422 
00423 error:
00424         free_watcher_list(watchers);
00425         return NULL;
00426 }
00427 
00428 void free_watcher_list(watcher_t* watchers)
00429 {
00430         watcher_t* w;
00431         while(watchers)
00432         {       
00433                 w= watchers;
00434                 if(w->uri.s !=NULL)
00435                         pkg_free(w->uri.s);
00436                 if(w->id.s !=NULL)
00437                         pkg_free(w->id.s);
00438                 watchers= watchers->next;
00439                 pkg_free(w);
00440         }
00441 
00442         watchers = NULL;
00443 }
00444 
00445 int add_watcher_list(subs_t *s, watcher_t *watchers)
00446 {
00447         watcher_t* w;
00448 
00449         w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00450         if(w== NULL)
00451         {
00452                 LM_ERR("No more private memory\n");
00453                 return -1;
00454         }
00455         w->status= s->status;
00456         if(uandd_to_uri(s->watcher_user, s->watcher_domain, &w->uri)<0)
00457         {
00458                 LM_ERR("failed to create uri\n");
00459                 goto error;
00460         }
00461         w->id.s = (char*)pkg_malloc(s->callid.len+ 1);
00462         if(w->id.s == NULL)
00463         {
00464                 LM_ERR("no more memory\n");
00465                 goto error;
00466         }
00467         memcpy(w->id.s, s->callid.s, s->callid.len);
00468         w->id.len = s->callid.len;
00469         w->id.s[w->id.len] = '\0';
00470 
00471         w->next= watchers->next;
00472         watchers->next= w;
00473 
00474         return 0;
00475 
00476 error:
00477         if(w)
00478         {
00479                 if(w->uri.s)
00480                         pkg_free(w->uri.s);
00481                 pkg_free(w);
00482         }
00483         return -1;
00484 }
00485 
00486 str* build_empty_bla_body(str pres_uri)
00487 {
00488         xmlDocPtr doc;
00489         xmlNodePtr node;
00490         xmlAttrPtr attr;
00491         str* body= NULL;
00492         char* text;
00493         int len;
00494         char* entity= NULL;
00495 
00496         doc = xmlNewDoc(BAD_CAST "1.0");
00497         if(doc== NULL)
00498         {
00499                 LM_ERR("failed to construct xml document\n");
00500                 return NULL;
00501         }
00502 
00503         node = xmlNewNode(NULL, BAD_CAST "dialog-info");
00504         if(node== NULL)
00505         {
00506                 LM_ERR("failed to initialize node\n");
00507                 goto error;
00508         }
00509         xmlDocSetRootElement(doc, node);
00510 
00511         attr =  xmlNewProp(node, BAD_CAST "xmlns",BAD_CAST  "urn:ietf:params:xml:ns:dialog-info");
00512         if(attr== NULL)
00513         {
00514                 LM_ERR("failed to initialize node attribute\n");
00515                 goto error;
00516         }
00517         attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
00518         if(attr== NULL)
00519         {
00520                 LM_ERR("failed to initialize node attribute\n");
00521                 goto error;
00522         }
00523 
00524         attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
00525         if(attr== NULL)
00526         {
00527                 LM_ERR("failed to initialize node attribute\n");
00528                 goto error;
00529         }
00530 
00531         entity = (char*)pkg_malloc(pres_uri.len+1);
00532         if(entity== NULL)
00533         {
00534                 LM_ERR("no more memory\n");
00535                 goto error;
00536         }
00537         memcpy(entity, pres_uri.s, pres_uri.len);
00538         entity[pres_uri.len]= '\0';
00539 
00540         attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
00541         if(attr== NULL)
00542         {
00543                 LM_ERR("failed to initialize node attribute\n");
00544                 pkg_free(entity);
00545                 goto error;
00546         }
00547         
00548         body = (str*) pkg_malloc(sizeof(str));
00549         if(body== NULL)
00550         {
00551                 LM_ERR("no more private memory");
00552                 pkg_free(entity);
00553                 goto error;
00554         }
00555 
00556         xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&text, &len, 1);
00557         body->s = (char*) pkg_malloc(len);
00558         if(body->s == NULL)
00559         {
00560                 LM_ERR("no more private memory");
00561                 pkg_free(body);
00562                 pkg_free(entity);
00563                 goto error;
00564         }
00565         memcpy(body->s, text, len);
00566         body->len= len;
00567 
00568         
00569         pkg_free(entity);
00570         xmlFreeDoc(doc);
00571         xmlFree(text);
00572 
00573         return body;
00574 
00575 error:
00576         xmlFreeDoc(doc);
00577         return NULL;
00578 
00579 }
00580 
00581 str* get_p_notify_body(str pres_uri, pres_ev_t* event, str* etag,
00582                 str* contact)
00583 {
00584         db_key_t query_cols[6];
00585         db_val_t query_vals[6];
00586         db_key_t result_cols[6];
00587         db1_res_t *result = NULL;
00588         int body_col, expires_col, etag_col= 0, sender_col;
00589         str** body_array= NULL;
00590         str* notify_body= NULL; 
00591         db_row_t *row= NULL ;   
00592         db_val_t *row_vals;
00593         int n_result_cols = 0;
00594         int n_query_cols = 0;
00595         int i, n= 0, len;
00596         int build_off_n= -1; 
00597         str etags;
00598         str* body;
00599         int size= 0;
00600         struct sip_uri uri;
00601         unsigned int hash_code;
00602         str sender;
00603 
00604         if(parse_uri(pres_uri.s, pres_uri.len, &uri)< 0)
00605         {
00606                 LM_ERR("while parsing uri\n");
00607                 return NULL;
00608         }
00609 
00610         /* if in db_only mode, get the presentity information from database - skip htable search */
00611         if( publ_cache_enabled )
00612         {
00613                 /* search in hash table if any record exists */
00614                 hash_code= core_hash(&pres_uri, NULL, phtable_size);
00615                 if(search_phtable(&pres_uri, event->evp->type, hash_code)== NULL)
00616                 {
00617                         LM_DBG("No record exists in hash_table\n");
00618 
00619                         /* for pidf manipulation */
00620                         if(event->agg_nbody)
00621                         {
00622                                 notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00623                                 if(notify_body)
00624                                         goto done;
00625                         }
00626                         return NULL;
00627                 }
00628         }
00629 
00630         query_cols[n_query_cols] = &str_domain_col;
00631         query_vals[n_query_cols].type = DB1_STR;
00632         query_vals[n_query_cols].nul = 0;
00633         query_vals[n_query_cols].val.str_val = uri.host;
00634         n_query_cols++;
00635 
00636         query_cols[n_query_cols] = &str_username_col;
00637         query_vals[n_query_cols].type = DB1_STR;
00638         query_vals[n_query_cols].nul = 0;
00639         query_vals[n_query_cols].val.str_val = uri.user;
00640         n_query_cols++;
00641 
00642         query_cols[n_query_cols] = &str_event_col;
00643         query_vals[n_query_cols].type = DB1_STR;
00644         query_vals[n_query_cols].nul = 0;
00645         query_vals[n_query_cols].val.str_val= event->name;
00646         n_query_cols++;
00647 
00648         result_cols[body_col=n_result_cols++] = &str_body_col;
00649         result_cols[expires_col=n_result_cols++] = &str_expires_col;
00650         result_cols[etag_col=n_result_cols++] = &str_etag_col;
00651         result_cols[sender_col=n_result_cols++] = &str_sender_col;
00652         
00653         if (pa_dbf.use_table(pa_db, &presentity_table) < 0) 
00654         {
00655                 LM_ERR("in use_table\n");
00656                 return NULL;
00657         }
00658 
00659         static str query_str = str_init("received_time");
00660         if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
00661                  result_cols, n_query_cols, n_result_cols, &query_str ,  &result) < 0) 
00662         {
00663                 LM_ERR("failed to query %.*s table\n", presentity_table.len, presentity_table.s);
00664                 if(result)
00665                         pa_dbf.free_result(pa_db, result);
00666                 return NULL;
00667         }
00668         
00669         if(result== NULL)
00670                 return NULL;
00671 
00672         if (result->n<=0 )
00673         {
00674                 LM_DBG("The query returned no result\n[username]= %.*s"
00675                         "\t[domain]= %.*s\t[event]= %.*s\n",uri.user.len, uri.user.s,
00676                         uri.host.len, uri.host.s, event->name.len, event->name.s);
00677                 
00678                 pa_dbf.free_result(pa_db, result);
00679                 result= NULL;
00680 
00681                 if(event->agg_nbody)
00682                 {
00683                         notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00684                         if(notify_body)
00685                                 goto done;
00686                 }                       
00687                 return NULL;
00688         }
00689         else
00690         {
00691                 n= result->n;
00692                 if(event->agg_nbody== NULL )
00693                 {
00694                         LM_DBG("Event does not require aggregation\n");
00695                         row = &result->rows[n-1];
00696                         row_vals = ROW_VALUES(row);
00697                         
00698                         /* if event BLA - check if sender is the same as contact */
00699                         /* if so, send an empty dialog info document */
00700                         if( EVENT_DIALOG_SLA(event->evp) && contact ) {
00701                                 sender.s = (char*)row_vals[sender_col].val.string_val;
00702                                 if(sender.s== NULL || strlen(sender.s)==0)
00703                                         goto after_sender_check;
00704                                 sender.len= strlen(sender.s);
00705                         
00706                                 if(sender.len== contact->len &&
00707                                                 strncmp(sender.s, contact->s, sender.len)== 0)
00708                                 {
00709                                         notify_body= build_empty_bla_body(pres_uri);
00710                                         pa_dbf.free_result(pa_db, result);
00711                                         return notify_body;
00712                                 }
00713                         }
00714 
00715 after_sender_check:
00716                         if(row_vals[body_col].val.string_val== NULL)
00717                         {
00718                                 LM_ERR("NULL notify body record\n");
00719                                 goto error;
00720                         }
00721                         len= strlen(row_vals[body_col].val.string_val);
00722                         if(len== 0)
00723                         {
00724                                 LM_ERR("Empty notify body record\n");
00725                                 goto error;
00726                         }
00727                         notify_body= (str*)pkg_malloc(sizeof(str));
00728                         if(notify_body== NULL)
00729                         {
00730                                 ERR_MEM(PKG_MEM_STR);   
00731                         }
00732                         memset(notify_body, 0, sizeof(str));
00733                         notify_body->s= (char*)pkg_malloc( len* sizeof(char));
00734                         if(notify_body->s== NULL)
00735                         {
00736                                 pkg_free(notify_body);
00737                                 ERR_MEM(PKG_MEM_STR);
00738                         }
00739                         memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
00740                         notify_body->len= len;
00741                         pa_dbf.free_result(pa_db, result);
00742                         
00743                         return notify_body;
00744                 }
00745                 
00746                 LM_DBG("Event requires aggregation\n");
00747                 
00748                 body_array =(str**)pkg_malloc( (n+2) *sizeof(str*));
00749                 if(body_array == NULL)
00750                 {
00751                         ERR_MEM(PKG_MEM_STR);
00752                 }
00753                 memset(body_array, 0, (n+2) *sizeof(str*));
00754 
00755                 if(etag!= NULL)
00756                 {
00757                         LM_DBG("searched etag = %.*s len= %d\n", 
00758                                         etag->len, etag->s, etag->len);
00759                         LM_DBG("etag not NULL\n");
00760                         for(i= 0; i< n; i++)
00761                         {
00762                                 row = &result->rows[i];
00763                                 row_vals = ROW_VALUES(row);
00764                                 etags.s = (char*)row_vals[etag_col].val.string_val;
00765                                 etags.len = strlen(etags.s);
00766 
00767                                 LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
00768                                 if( (etags.len == etag->len) && (strncmp(etags.s,
00769                                                                 etag->s,etags.len)==0 ) )
00770                                 {
00771                                         LM_DBG("found etag\n");
00772                                         build_off_n= i;
00773                                 }
00774                                 len= strlen((char*)row_vals[body_col].val.string_val);
00775                                 if(len== 0)
00776                                 {
00777                                         LM_ERR("Empty notify body record\n");
00778                                         goto error;
00779                                 }
00780                         
00781                                 size= sizeof(str)+ len* sizeof(char);
00782                                 body= (str*)pkg_malloc(size);
00783                                 if(body== NULL)
00784                                 {
00785                                         ERR_MEM(PKG_MEM_STR);
00786                                 }
00787                                 memset(body, 0, size);
00788                                 size= sizeof(str);
00789                                 body->s= (char*)body+ size;
00790                                 memcpy(body->s, (char*)row_vals[body_col].val.string_val, len);
00791                                 body->len= len;
00792 
00793                                 body_array[i]= body;
00794                         }
00795                 }       
00796                 else
00797                 {       
00798                         for(i=0; i< n; i++)
00799                         {
00800                                 row = &result->rows[i];
00801                                 row_vals = ROW_VALUES(row);
00802                                 
00803                                 len= strlen((char*)row_vals[body_col].val.string_val);
00804                                 if(len== 0)
00805                                 {
00806                                         LM_ERR("Empty notify body record\n");
00807                                         goto error;
00808                                 }
00809                                 
00810                                 size= sizeof(str)+ len* sizeof(char);
00811                                 body= (str*)pkg_malloc(size);
00812                                 if(body== NULL)
00813                                 {
00814                                         ERR_MEM(PKG_MEM_STR);
00815                                 }
00816                                 memset(body, 0, size);
00817                                 size= sizeof(str);
00818                                 body->s= (char*)body+ size;
00819                                 memcpy(body->s, row_vals[body_col].val.string_val, len);
00820                                 body->len= len;
00821 
00822                                 body_array[i]= body;
00823                         }                       
00824                 }
00825                 pa_dbf.free_result(pa_db, result);
00826                 result= NULL;
00827                 
00828                 notify_body = event->agg_nbody(&uri.user, &uri.host, body_array, n, build_off_n);
00829         }
00830 
00831 done:   
00832         if(body_array!=NULL)
00833         {
00834                 for(i= 0; i< n; i++)
00835                 {
00836                         if(body_array[i])
00837                                 pkg_free(body_array[i]);
00838                 }
00839                 pkg_free(body_array);
00840         }
00841         return notify_body;
00842 
00843 error:
00844         if(result!=NULL)
00845                 pa_dbf.free_result(pa_db, result);
00846 
00847         if(body_array!=NULL)
00848         {
00849                 for(i= 0; i< n; i++)
00850                 {
00851                         if(body_array[i])
00852                                 pkg_free(body_array[i]);
00853                         else
00854                                 break;
00855 
00856                 }
00857         
00858                 pkg_free(body_array);
00859         }
00860         return NULL;
00861 }
00862 
00863 void free_notify_body(str *body, pres_ev_t *ev)
00864 {
00865         if (body != NULL)
00866         {
00867                 if (body->s != NULL)
00868                 {
00869                         if (ev->type & WINFO_TYPE)
00870                                 xmlFree(body->s);
00871                         else if (ev->agg_nbody== NULL && ev->apply_auth_nbody== NULL)
00872                                 pkg_free(body->s);
00873                         else
00874                                 ev->free_body(body->s);
00875                 }
00876                 pkg_free(body);
00877         }
00878 }
00879 
00880 static int ps_free_tm_dlg(dlg_t *td)
00881 {
00882         if(td)
00883         {
00884                 if(td->loc_uri.s)
00885                         pkg_free(td->loc_uri.s);
00886                 if(td->rem_uri.s)
00887                         pkg_free(td->rem_uri.s);
00888 
00889                 if(td->route_set)
00890                         free_rr(&td->route_set);
00891                 pkg_free(td);
00892         }
00893         return 0;
00894 }
00895 
00896 dlg_t* ps_build_dlg_t(subs_t* subs)
00897 {
00898         dlg_t* td =NULL;
00899         int found_contact = 1;
00900 
00901         td = (dlg_t*)pkg_malloc(sizeof(dlg_t));
00902         if(td == NULL)
00903         {
00904                 ERR_MEM(PKG_MEM_STR);
00905         }
00906         memset(td, 0, sizeof(dlg_t));
00907 
00908         td->loc_seq.value = subs->local_cseq;
00909         td->loc_seq.is_set = 1;
00910 
00911         td->id.call_id = subs->callid;
00912         td->id.rem_tag = subs->from_tag;
00913         td->id.loc_tag =subs->to_tag;
00914         
00915         uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
00916         if(td->loc_uri.s== NULL)
00917         {
00918                 LM_ERR("while creating uri\n");
00919                 goto error;
00920         }
00921 
00922         if(subs->contact.len ==0 || subs->contact.s == NULL )
00923         {
00924                 found_contact = 0;
00925         }
00926         else
00927         {
00928                 LM_DBG("CONTACT = %.*s\n", subs->contact.len , subs->contact.s);
00929                 td->rem_target = subs->contact;
00930         }
00931 
00932         uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
00933         if(td->rem_uri.s ==NULL)
00934         {
00935                 LM_ERR("while creating uri\n");
00936                 goto error;
00937         }
00938         
00939         if(found_contact == 0)
00940         {
00941                 td->rem_target = td->rem_uri;
00942         }
00943         if(subs->record_route.s && subs->record_route.len)
00944         {
00945                 if(parse_rr_body(subs->record_route.s, subs->record_route.len,
00946                         &td->route_set)< 0)
00947                 {
00948                         LM_ERR("in function parse_rr_body\n");
00949                         goto error;
00950                 }
00951         }       
00952         td->state= DLG_CONFIRMED ;
00953 
00954         if (subs->sockinfo_str.len) {
00955                 int port, proto;
00956         str host;
00957                 char* tmp;
00958                 if ((tmp = as_asciiz(&subs->sockinfo_str)) == NULL) {
00959                         LM_ERR("no pkg memory left\n");
00960                         goto error;
00961                 }
00962                 if (parse_phostport (tmp,&host.s,
00963                                 &host.len,&port, &proto )) {
00964                         LM_ERR("bad sockinfo string\n");
00965                         pkg_free(tmp);
00966                         goto error;
00967                 }
00968                 pkg_free(tmp);
00969                 td->send_sock = grep_sock_info (
00970                         &host, (unsigned short) port, (unsigned short) proto);
00971         }
00972         
00973         return td;
00974 
00975 error:          
00976         ps_free_tm_dlg(td);
00977         return NULL;
00978 }
00979 
00980 int get_subs_db(str* pres_uri, pres_ev_t* event, str* sender,
00981                 subs_t** s_array, int* n)
00982 {
00983         db_key_t query_cols[7];
00984         db_op_t  query_ops[7];
00985         db_val_t query_vals[7];
00986         db_key_t result_cols[19];
00987         int n_result_cols = 0, n_query_cols = 0;
00988         db_row_t *row ; 
00989         db_val_t *row_vals ;
00990         db1_res_t *result = NULL;
00991         int from_user_col, from_domain_col, from_tag_col;
00992         int to_user_col, to_domain_col, to_tag_col;
00993         int expires_col= 0,callid_col, cseq_col, i, reason_col;
00994         int version_col= 0, record_route_col = 0, contact_col = 0;
00995         int sockinfo_col= 0, local_contact_col= 0, event_id_col = 0;
00996         int watcher_user_col= 0, watcher_domain_col= 0;
00997         subs_t s, *s_new;
00998         int inc= 0;
00999                 
01000         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0) 
01001         {
01002                 LM_ERR("in use_table\n");
01003                 return -1;
01004         }
01005 
01006         LM_DBG("querying database table = active_watchers\n");
01007         query_cols[n_query_cols] = &str_presentity_uri_col;
01008         query_ops[n_query_cols] = OP_EQ;
01009         query_vals[n_query_cols].type = DB1_STR;
01010         query_vals[n_query_cols].nul = 0;
01011         query_vals[n_query_cols].val.str_val = *pres_uri;
01012         n_query_cols++;
01013         
01014         query_cols[n_query_cols] = &str_event_col;
01015         query_ops[n_query_cols] = OP_EQ;
01016         query_vals[n_query_cols].type = DB1_STR;
01017         query_vals[n_query_cols].nul = 0;
01018         query_vals[n_query_cols].val.str_val = event->name;
01019         n_query_cols++;
01020 
01021         query_cols[n_query_cols] = &str_status_col;
01022         query_ops[n_query_cols] = OP_EQ;
01023         query_vals[n_query_cols].type = DB1_INT;
01024         query_vals[n_query_cols].nul = 0;
01025         query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
01026         n_query_cols++;
01027 
01028         query_cols[n_query_cols] = &str_contact_col;
01029         query_ops[n_query_cols] = OP_NEQ;
01030         query_vals[n_query_cols].type = DB1_STR;
01031         query_vals[n_query_cols].nul = 0;
01032         if(sender)
01033         {       
01034                 LM_DBG("Do not send Notify to:[uri]= %.*s\n",sender->len,sender->s);
01035                 query_vals[n_query_cols].val.str_val = *sender;
01036         } else {
01037                 query_vals[n_query_cols].val.str_val.s = "";
01038                 query_vals[n_query_cols].val.str_val.len = 0;
01039         }
01040         n_query_cols++;
01041 
01042         result_cols[to_user_col=n_result_cols++]      =   &str_to_user_col;
01043         result_cols[to_domain_col=n_result_cols++]    =   &str_to_domain_col;
01044         result_cols[from_user_col=n_result_cols++]    =   &str_from_user_col;
01045         result_cols[from_domain_col=n_result_cols++]  =   &str_from_domain_col;
01046         result_cols[watcher_user_col=n_result_cols++] =   &str_watcher_username_col;
01047         result_cols[watcher_domain_col=n_result_cols++]=   &str_watcher_domain_col;
01048         result_cols[event_id_col=n_result_cols++]     =   &str_event_id_col;
01049         result_cols[from_tag_col=n_result_cols++]     =   &str_from_tag_col;
01050         result_cols[to_tag_col=n_result_cols++]       =   &str_to_tag_col;
01051         result_cols[callid_col=n_result_cols++]       =   &str_callid_col;
01052         result_cols[cseq_col=n_result_cols++]         =   &str_local_cseq_col;
01053         result_cols[record_route_col=n_result_cols++] =   &str_record_route_col;
01054         result_cols[contact_col=n_result_cols++]      =   &str_contact_col;
01055         result_cols[expires_col=n_result_cols++]      =   &str_expires_col;
01056         result_cols[reason_col=n_result_cols++]       =   &str_reason_col;
01057         result_cols[sockinfo_col=n_result_cols++]     =   &str_socket_info_col;
01058         result_cols[local_contact_col=n_result_cols++]=   &str_local_contact_col;
01059         result_cols[version_col=n_result_cols++]      =   &str_version_col;
01060 
01061         if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,result_cols,
01062                                 n_query_cols, n_result_cols, 0, &result) < 0) 
01063         {
01064                 LM_ERR("while querying database\n");
01065                 if(result)
01066                 {
01067                         pa_dbf.free_result(pa_db, result);
01068                 }
01069                 return -1;
01070         }
01071 
01072         if(result== NULL)
01073                 return -1;
01074 
01075         if(result->n <=0 )
01076         {
01077                 LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
01078                         " returned no result\n",pres_uri->len, pres_uri->s,
01079                         event->name.len, event->name.s);
01080                 pa_dbf.free_result(pa_db, result);
01081                 return 0;
01082         }
01083         LM_DBG("found %d dialogs\n", result->n);
01084         
01085         for(i=0; i<result->n; i++)
01086         {
01087                 row = &result->rows[i];
01088                 row_vals = ROW_VALUES(row);     
01089                 
01090                 //      if(row_vals[expires_col].val.int_val< (int)time(NULL))
01091                 //              continue;
01092 
01093                 if(row_vals[reason_col].val.string_val) {
01094                     if(strlen(row_vals[reason_col].val.string_val) != 0)
01095                         continue;
01096                 }
01097 
01098                 //      s.reason.len= strlen(s.reason.s);
01099 
01100                 memset(&s, 0, sizeof(subs_t));
01101                 s.status= ACTIVE_STATUS;
01102                 
01103                 s.pres_uri= *pres_uri;
01104                 s.to_user.s= (char*)row_vals[to_user_col].val.string_val;
01105                 s.to_user.len= strlen(s.to_user.s);
01106                 
01107                 s.to_domain.s= (char*)row_vals[to_domain_col].val.string_val;
01108                 s.to_domain.len= strlen(s.to_domain.s);
01109                 
01110                 s.from_user.s= (char*)row_vals[from_user_col].val.string_val;
01111                 s.from_user.len= strlen(s.from_user.s);
01112                 
01113                 s.from_domain.s= (char*)row_vals[from_domain_col].val.string_val;
01114                 s.from_domain.len= strlen(s.from_domain.s);
01115 
01116                 s.watcher_user.s= (char*)row_vals[watcher_user_col].val.string_val;
01117                 s.watcher_user.len= strlen(s.watcher_user.s);
01118                 
01119                 s.watcher_domain.s= (char*)row_vals[watcher_domain_col].val.string_val;
01120                 s.watcher_domain.len= strlen(s.watcher_domain.s);
01121                 
01122                 s.event_id.s=(char*)row_vals[event_id_col].val.string_val;
01123                 s.event_id.len= (s.event_id.s)?strlen(s.event_id.s):0;
01124                 
01125                 s.to_tag.s= (char*)row_vals[to_tag_col].val.string_val;
01126                 s.to_tag.len= strlen(s.to_tag.s);
01127                 
01128                 s.from_tag.s= (char*)row_vals[from_tag_col].val.string_val; 
01129                 s.from_tag.len= strlen(s.from_tag.s);
01130                 
01131                 s.callid.s= (char*)row_vals[callid_col].val.string_val;
01132                 s.callid.len= strlen(s.callid.s);
01133                 
01134                 s.record_route.s=  (char*)row_vals[record_route_col].val.string_val;
01135                 s.record_route.len= (s.record_route.s)?strlen(s.record_route.s):0;
01136 
01137                 s.contact.s= (char*)row_vals[contact_col].val.string_val;
01138                 s.contact.len= strlen(s.contact.s);
01139                 
01140                 s.sockinfo_str.s = (char*)row_vals[sockinfo_col].val.string_val;
01141                 s.sockinfo_str.len = s.sockinfo_str.s?strlen(s.sockinfo_str.s):0;
01142 
01143                 s.local_contact.s = (char*)row_vals[local_contact_col].val.string_val;
01144                 s.local_contact.len = s.local_contact.s?strlen(s.local_contact.s):0;
01145                 
01146                 s.event= event;
01147                 s.local_cseq = row_vals[cseq_col].val.int_val +1;
01148                 if(row_vals[expires_col].val.int_val < (int)time(NULL))
01149                     s.expires = 0;
01150                 else
01151                     s.expires = row_vals[expires_col].val.int_val -
01152                         (int)time(NULL);
01153                 s.version = row_vals[version_col].val.int_val +1;
01154 
01155                 s_new= mem_copy_subs(&s, PKG_MEM_TYPE);
01156                 if(s_new== NULL)
01157                 {
01158                         LM_ERR("while copying subs_t structure\n");
01159                         goto error;
01160                 }
01161                 s_new->next= (*s_array);
01162                 (*s_array)= s_new;
01163                 printf_subs(s_new);
01164                 inc++;
01165                 
01166         }
01167         pa_dbf.free_result(pa_db, result);
01168         *n= inc;
01169 
01170         return 0;
01171 
01172 error:
01173         if(result)
01174                 pa_dbf.free_result(pa_db, result);
01175         
01176         return -1;
01177 }
01178 
01179 subs_t* get_subs_dialog(str* pres_uri, pres_ev_t* event, str* sender)
01180 {
01181         unsigned int hash_code;
01182         subs_t* s= NULL, *s_new;
01183         subs_t* s_array= NULL;
01184         int n= 0;
01185         
01186         /* if subs_dbmode!=DB_ONLY, should take the subscriptions from the hashtable only
01187            in DB_ONLY mode should take all dialogs from db
01188         */
01189 
01190         if(subs_dbmode == DB_ONLY)
01191         {
01192                 if(get_subs_db(pres_uri, event, sender, &s_array, &n)< 0)
01193                 {
01194                         LM_ERR("getting dialogs from database\n");
01195                         goto error;
01196                 }
01197         }else {
01198                 hash_code= core_hash(pres_uri, &event->name, shtable_size);
01199                 
01200                 lock_get(&subs_htable[hash_code].lock);
01201 
01202                 s= subs_htable[hash_code].entries;
01203 
01204                 while(s->next)
01205                 {
01206                         s= s->next;
01207 
01208                         printf_subs(s);
01209 
01210                         if(s->expires< (int)time(NULL))
01211                         {
01212                                 LM_DBG("expired subs\n");
01213                                 continue;
01214                         }
01215 
01216                         if((!(s->status== ACTIVE_STATUS &&
01217                     s->reason.len== 0 &&
01218                                 s->event== event && s->pres_uri.len== pres_uri->len &&
01219                                 strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0)) || 
01220                                 (sender && sender->len== s->contact.len && 
01221                                 strncmp(sender->s, s->contact.s, sender->len)== 0))
01222                                 continue;
01223 
01224                         s_new= mem_copy_subs(s, PKG_MEM_TYPE);
01225                         if(s_new== NULL)
01226                         {
01227                                 LM_ERR("copying subs_t structure\n");
01228                                 lock_release(&subs_htable[hash_code].lock);
01229                                 goto error;
01230                         }
01231                         s_new->expires-= (int)time(NULL);
01232                         s_new->next= s_array;
01233                         s_array= s_new;
01234                 }
01235                 lock_release(&subs_htable[hash_code].lock);
01236         }
01237 
01238         return s_array;
01239 
01240 error:
01241         free_subs_list(s_array, PKG_MEM_TYPE, 0);
01242         return NULL;
01243         
01244 }
01245 
01246 int publ_notify(presentity_t* p, str pres_uri, str* body, str* offline_etag, str* rules_doc)
01247 {
01248         str *notify_body = NULL, *aux_body = NULL;
01249         subs_t* subs_array= NULL, *s= NULL;
01250         int ret_code= -1;
01251 
01252         subs_array= get_subs_dialog(&pres_uri, p->event , p->sender);
01253         if(subs_array == NULL)
01254         {
01255                 LM_DBG("Could not find subs_dialog\n");
01256                 ret_code= 0;
01257                 goto done;
01258         }
01259 
01260         /* if the event does not require aggregation - we have the final body */
01261         if(p->event->agg_nbody)
01262         {       
01263                 notify_body = get_p_notify_body(pres_uri, p->event , offline_etag, NULL);
01264                 if(notify_body == NULL)
01265                 {
01266                         LM_DBG("Could not get the notify_body\n");
01267                         /* goto error; */
01268                 }
01269         }
01270 
01271         s= subs_array;
01272         while(s)
01273         {
01274                 s->auth_rules_doc= rules_doc;
01275                 if (p->event->aux_body_processing) {
01276                         aux_body = p->event->aux_body_processing(s, notify_body?notify_body:body);
01277                 }
01278 
01279                 if(notify(s, NULL, aux_body?aux_body:(notify_body?notify_body:body), 0)< 0 )
01280                 {
01281                         LM_ERR("Could not send notify for %.*s\n",
01282                                         p->event->name.len, p->event->name.s);
01283                 }
01284 
01285                 if(aux_body!=NULL) {
01286                         if(aux_body->s) {
01287                                 p->event->aux_free_body(aux_body->s);
01288                         }
01289                         pkg_free(aux_body);
01290                 }
01291                 s= s->next;
01292         }
01293         ret_code= 0;
01294 
01295 done:
01296         free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01297         free_notify_body(notify_body, p->event);        
01298         return ret_code;
01299 }
01300 
01301 int publ_notify_notifier(str pres_uri, pres_ev_t *event)
01302 {
01303         db_key_t query_cols[2], result_cols[3];
01304         db_val_t query_vals[2], *values;
01305         db_row_t *rows;
01306         db1_res_t *result = NULL;
01307         int n_query_cols = 0, n_result_cols = 0;
01308         int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
01309         int i;
01310         int ret = -1;
01311         subs_t subs;
01312 
01313         if(pa_db == NULL)
01314         {
01315                 LM_ERR("null database connection\n");
01316                 goto error;
01317         }
01318 
01319         if(pa_dbf.use_table(pa_db, &active_watchers_table)< 0)
01320         {
01321                 LM_ERR("use table failed\n");
01322                 goto error;
01323         }
01324 
01325         query_cols[n_query_cols]= &str_presentity_uri_col;
01326         query_vals[n_query_cols].type = DB1_STR;
01327         query_vals[n_query_cols].nul = 0;
01328         query_vals[n_query_cols].val.str_val = pres_uri;
01329         n_query_cols++;
01330 
01331         query_cols[n_query_cols]= &str_event_col;
01332         query_vals[n_query_cols].type = DB1_STR;
01333         query_vals[n_query_cols].nul = 0;
01334         query_vals[n_query_cols].val.str_val = event->name;
01335         n_query_cols++;
01336 
01337         result_cols[r_callid_col=n_result_cols++] = &str_callid_col;
01338         result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
01339         result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
01340 
01341         if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, 
01342                                 n_query_cols, n_result_cols, 0, &result )< 0)
01343         {
01344                 LM_ERR("Can't query db\n");
01345                 goto error;
01346         }
01347 
01348         if(result == NULL)
01349         {
01350                 LM_ERR("bad result\n");
01351                 goto error;
01352         }
01353 
01354         rows = RES_ROWS(result);
01355         for (i = 0; i <RES_ROW_N(result); i++)
01356         {
01357                 values = ROW_VALUES(&rows[i]);
01358 
01359                 subs.callid.s = (char *) VAL_STRING(&values[r_callid_col]);
01360                 subs.callid.len = strlen(subs.callid.s);
01361                 subs.to_tag.s = (char *) VAL_STRING(&values[r_to_tag_col]);
01362                 subs.to_tag.len = strlen(subs.to_tag.s);
01363                 subs.from_tag.s = (char *) VAL_STRING(&values[r_from_tag_col]);
01364                 subs.from_tag.len = strlen(subs.from_tag.s);
01365 
01366                 set_updated(&subs);
01367         }
01368 
01369         ret = RES_ROW_N(result);
01370 
01371 error:
01372         if (result) pa_dbf.free_result(pa_db, result);
01373         return ret;
01374 }
01375 
01376 int query_db_notify(str* pres_uri, pres_ev_t* event, subs_t* watcher_subs )
01377 {
01378         subs_t* subs_array = NULL, *s= NULL;
01379         str* notify_body = NULL, *aux_body = NULL;
01380         int ret_code= -1;
01381 
01382         subs_array= get_subs_dialog(pres_uri, event , NULL);
01383         if(subs_array == NULL)
01384         {
01385                 LM_DBG("Could not get subscription dialog\n");
01386                 ret_code= 1;
01387                 goto done;
01388         }
01389         
01390         if(event->type & PUBL_TYPE)
01391         {
01392                 notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
01393                 if(notify_body == NULL)
01394                 {
01395                         LM_DBG("Could not get the notify_body\n");
01396                         /* goto error; */
01397                 }
01398         }       
01399 
01400         s= subs_array;
01401         
01402         while(s)
01403         {
01404 
01405                 if (event->aux_body_processing) {
01406                         aux_body = event->aux_body_processing(s, notify_body);
01407                 }
01408 
01409                 if(notify(s, watcher_subs, aux_body?aux_body:notify_body, 0)< 0 )
01410                 {
01411                         LM_ERR("Could not send notify for [event]=%.*s\n",
01412                                         event->name.len, event->name.s);
01413                         goto done;
01414                 }
01415 
01416                 if(aux_body!=NULL) {
01417                         if(aux_body->s) {
01418                                 event->aux_free_body(aux_body->s);
01419                         }
01420                         pkg_free(aux_body);
01421                 }
01422                 s= s->next;
01423         }
01424 
01425         ret_code= 1;
01426 
01427 done:
01428         free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01429         free_notify_body(notify_body, event);
01430 
01431         return ret_code;
01432 }
01433 
01434 int send_notify_request(subs_t* subs, subs_t * watcher_subs,
01435                 str* n_body,int force_null_body)
01436 {
01437         dlg_t* td = NULL;
01438         str met = {"NOTIFY", 6};
01439         str str_hdr = {0, 0};
01440         str* notify_body = NULL;
01441         int result= 0;
01442         c_back_param *cb_param= NULL;
01443         str* final_body= NULL;
01444         uac_req_t uac_r;
01445         
01446         LM_DBG("dialog info:\n");
01447         printf_subs(subs);
01448 
01449         /* getting the status of the subscription */
01450 
01451         if(force_null_body)
01452         {
01453                 goto jump_over_body;
01454         }
01455 
01456         if(n_body!= NULL && subs->status== ACTIVE_STATUS)
01457         {
01458                 if( subs->event->req_auth)
01459                 {
01460                         
01461                         if(subs->auth_rules_doc && subs->event->apply_auth_nbody)
01462                         {
01463                                 if(subs->event->apply_auth_nbody(n_body, subs, &notify_body)< 0)
01464                                 {
01465                                         LM_ERR("in function apply_auth_nbody\n");
01466                                         goto error;
01467                                 }
01468                         }
01469                         if(notify_body== NULL)
01470                                 notify_body= n_body;
01471                 }
01472                 else
01473                         notify_body= n_body;
01474         }
01475         else
01476         {
01477                 if(subs->status== TERMINATED_STATUS || 
01478                                 subs->status== PENDING_STATUS) 
01479                 {
01480                         LM_DBG("state terminated or pending- notify body NULL\n");
01481                         notify_body = NULL;
01482                 }
01483                 else 
01484                 {
01485                         if(subs->event->type & WINFO_TYPE)
01486                         {
01487                                 notify_body = get_wi_notify_body(subs, watcher_subs);
01488                                 if(notify_body == NULL)
01489                                 {
01490                                         LM_DBG("Could not get notify_body\n");
01491                                         goto error;
01492                                 }
01493                         }
01494                         else
01495                         {
01496                                 notify_body = get_p_notify_body(subs->pres_uri,
01497                                         subs->event, NULL, (subs->contact.s)?&subs->contact:NULL);
01498                                 if(notify_body == NULL || notify_body->s== NULL)
01499                                 {
01500                                         LM_DBG("Could not get the notify_body\n");
01501                                 }
01502                                 else            /* apply authorization rules if exists */
01503                                 if(subs->event->req_auth)
01504                                 {
01505                                         if(subs->auth_rules_doc && subs->event->apply_auth_nbody
01506                                                         && subs->event->apply_auth_nbody(notify_body,
01507                                                                 subs,&final_body)<0)
01508                                         {
01509                                                 LM_ERR("in function apply_auth\n");
01510                                                 goto error;
01511                                         }
01512                                         if(final_body)
01513                                         {
01514                                                 xmlFree(notify_body->s);
01515                                                 pkg_free(notify_body);
01516                                                 notify_body= final_body;
01517                                         }
01518                                 }
01519                         }
01520                 }
01521         }
01522 
01523 jump_over_body:
01524 
01525         if(subs->expires<= 0)
01526         {
01527                 subs->expires= 0;
01528                 subs->status= TERMINATED_STATUS;
01529                 subs->reason.s= "timeout";
01530                 subs->reason.len= 7;
01531         }
01532 
01533         /* build extra headers */
01534         if( build_str_hdr( subs, notify_body?1:0, &str_hdr)< 0 )
01535         {
01536                 LM_ERR("while building headers\n");
01537                 goto error;
01538         }
01539         LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
01540 
01541         /* construct the dlg_t structure */
01542         td = ps_build_dlg_t(subs);
01543         if(td ==NULL)
01544         {
01545                 LM_ERR("while building dlg_t structure\n");
01546                 goto error;
01547         }
01548 
01549         LM_DBG("expires %d status %d\n", subs->expires, subs->status);
01550         /* if status is TERMINATED_STATUS, the subscription will be deleted so no need to send a parameter */
01551         if(subs->status != TERMINATED_STATUS)
01552         {
01553                 cb_param = shm_dup_cbparam(subs);
01554                 if(cb_param == NULL)
01555                 {
01556                         LM_ERR("while duplicating cb_param in share memory\n");
01557                         goto error;
01558                 }
01559         }
01560 
01561         set_uac_req(&uac_r, &met, &str_hdr, notify_body, td, TMCB_LOCAL_COMPLETED,
01562                         p_tm_callback, (void*)cb_param);
01563         result = tmb.t_request_within(&uac_r);
01564         if(result< 0)
01565         {
01566                 LM_ERR("in function tmb.t_request_within\n");
01567                 if(cb_param)
01568                         free_cbparam(cb_param);
01569                 goto error;
01570         }
01571 
01572         LM_INFO("NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s\n",
01573                 td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
01574                 td->hooks.next_hop->s,
01575                 td->loc_uri.len, td->loc_uri.s, subs->event->name.len,
01576                 subs->event->name.s);
01577 
01578         ps_free_tm_dlg(td);
01579         
01580         if(str_hdr.s) pkg_free(str_hdr.s);
01581         
01582         if((int)(long)n_body!= (int)(long)notify_body)
01583                 free_notify_body(notify_body, subs->event);
01584 
01585         return 0;
01586 
01587 error:
01588         ps_free_tm_dlg(td);
01589         if(str_hdr.s!=NULL)
01590                 pkg_free(str_hdr.s);
01591         if((int)(long)n_body!= (int)(long)notify_body)
01592         {
01593                 if(notify_body!=NULL)
01594                 {
01595                         if(notify_body->s!=NULL)
01596                         {
01597                                 if(subs->event->type& WINFO_TYPE)
01598                                         xmlFree(notify_body->s);
01599                                 else
01600                                 if(subs->event->apply_auth_nbody== NULL
01601                                                 && subs->event->agg_nbody== NULL)
01602                                         pkg_free(notify_body->s);
01603                                 else
01604                                 subs->event->free_body(notify_body->s);
01605                         }
01606                         pkg_free(notify_body);
01607                 }
01608         }
01609         return -1;
01610 }
01611 
01612 
01613 int notify(subs_t* subs, subs_t * watcher_subs,str* n_body,int force_null_body)
01614 {
01615         /* update first in hash table and the send Notify */
01616         if(subs->expires!= 0 && subs->status != TERMINATED_STATUS)
01617         {
01618                 unsigned int hash_code;
01619                 hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
01620 
01621                 /* if subscriptions are held also in memory, update the subscription hashtable */
01622                 if(subs_dbmode != DB_ONLY)
01623                 {
01624                         if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0)
01625                         {
01626                                 /* subscriptions are held only in memory, and hashtable update failed */
01627                                 LM_ERR("updating subscription record in hash table\n");
01628                                 return -1;
01629                         }
01630                 }
01631                 /* if DB_ONLY mode or WRITE_THROUGH update in database */
01632                 if(subs->recv_event!=PRES_SUBSCRIBE_RECV
01633                                 && ((subs_dbmode == DB_ONLY && pres_notifier_processes == 0)
01634                                     || subs_dbmode == WRITE_THROUGH))
01635                 {
01636                         LM_DBG("updating subscription to database\n");
01637                         if(update_subs_db(subs, LOCAL_TYPE)< 0)
01638                         {
01639                                 LM_ERR("updating subscription in database\n");
01640                                 return -1;
01641                         }
01642                 }
01643         }
01644 
01645         if(subs->reason.s && subs->status== ACTIVE_STATUS && 
01646         subs->reason.len== 12 && strncmp(subs->reason.s, "polite-block", 12)== 0)
01647         {
01648                 force_null_body = 1;
01649         }
01650 
01651         if(send_notify_request(subs, watcher_subs, n_body, force_null_body)< 0)
01652         {
01653                 LM_ERR("sending Notify not successful\n");
01654                 return -1;
01655         }
01656         return 0;
01657 }
01658 
01659 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps)
01660 {
01661         c_back_param*  cb;
01662 
01663         if(ps->param==NULL || *ps->param==NULL ||
01664                         ((c_back_param*)(*ps->param))->callid.s == NULL ||
01665                         ((c_back_param*)(*ps->param))->to_tag.s== NULL ||
01666                         ((c_back_param*)(*ps->param))->from_tag.s== NULL)
01667         {
01668                 LM_DBG("message id not received, probably a timeout notify\n");
01669                 if(ps->param != NULL && *ps->param !=NULL)
01670                         free_cbparam((c_back_param*)(*ps->param));
01671                 return;
01672         }
01673 
01674         cb= (c_back_param*)(*ps->param);
01675         LM_DBG("completed with status %d [to_tag:%.*s]\n",
01676                         ps->code, cb->to_tag.len, cb->to_tag.s);
01677 
01678         if(ps->code == 481 || (ps->code == 408 && timeout_rm_subs))
01679                 delete_subs(&cb->pres_uri, &cb->ev_name,
01680                                 &cb->to_tag, &cb->from_tag, &cb->callid);
01681 
01682         free_cbparam(cb);
01683 }
01684 
01685 void free_cbparam(c_back_param* cb_param)
01686 {
01687         if(cb_param!= NULL)
01688                 shm_free(cb_param);
01689 }
01690 
01691 c_back_param* shm_dup_cbparam(subs_t* subs)
01692 {
01693         int size;
01694         c_back_param* cb_param = NULL;
01695         
01696         size = sizeof(c_back_param) + subs->pres_uri.len +
01697                         subs->event->name.len + subs->to_tag.len +
01698                         subs->from_tag.len + subs->callid.len;
01699 
01700         cb_param= (c_back_param*)shm_malloc(size);
01701         LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len,
01702                         subs->event->name.len, subs->to_tag.len);
01703         if(cb_param==NULL)
01704         {
01705                 LM_ERR("no more shared memory\n");
01706                 return NULL;
01707         }
01708         memset(cb_param, 0, size);
01709 
01710         cb_param->pres_uri.s = (char*)cb_param + sizeof(c_back_param);
01711         memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
01712         cb_param->pres_uri.len = subs->pres_uri.len;
01713         cb_param->ev_name.s = (char*)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
01714         memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
01715         cb_param->ev_name.len = subs->event->name.len;
01716         cb_param->to_tag.s = (char*)(cb_param->ev_name.s) + cb_param->ev_name.len;
01717         memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
01718         cb_param->to_tag.len = subs->to_tag.len;
01719 
01720         cb_param->from_tag.s = (char*)(cb_param->to_tag.s) + cb_param->to_tag.len;
01721         memcpy(cb_param->from_tag.s, subs->from_tag.s, subs->from_tag.len);
01722         cb_param->from_tag.len = subs->from_tag.len;
01723 
01724         cb_param->callid.s = (char*)(cb_param->from_tag.s) + cb_param->from_tag.len;
01725         memcpy(cb_param->callid.s, subs->callid.s, subs->callid.len);
01726         cb_param->callid.len = subs->callid.len;
01727 
01728         return cb_param;
01729 }
01730 
01731 
01732 str* create_winfo_xml(watcher_t* watchers, char* version,
01733                 str resource, str event, int STATE_FLAG)
01734 {
01735         xmlDocPtr doc = NULL;       
01736     xmlNodePtr root_node = NULL, node = NULL;
01737         xmlNodePtr w_list_node = NULL;  
01738         char content[200];
01739         str *body= NULL;
01740         char* res= NULL;
01741         watcher_t* w;
01742 
01743     LIBXML_TEST_VERSION;
01744     
01745         doc = xmlNewDoc(BAD_CAST "1.0");
01746     root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
01747     xmlDocSetRootElement(doc, root_node);
01748 
01749     xmlNewProp(root_node, BAD_CAST "xmlns",
01750                         BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
01751     xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version );
01752    
01753         if(STATE_FLAG & FULL_STATE_FLAG)
01754         {
01755                 if( xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL)
01756                 {
01757                         LM_ERR("while adding new attribute\n");
01758                         goto error;
01759                 }
01760         }
01761         else    
01762         {       
01763                 if( xmlNewProp(root_node, BAD_CAST "state", 
01764                                         BAD_CAST "partial")== NULL) 
01765                 {
01766                         LM_ERR("while adding new attribute\n");
01767                         goto error;
01768                 }
01769         }
01770 
01771         w_list_node =xmlNewChild(root_node, NULL, BAD_CAST "watcher-list",NULL);
01772         if( w_list_node == NULL)
01773         {
01774                 LM_ERR("while adding child\n");
01775                 goto error;
01776         }
01777         res= (char*)pkg_malloc(MAX_unsigned(resource.len, event.len) + 1);
01778         if(res== NULL)
01779         {
01780                 ERR_MEM(PKG_MEM_STR);
01781         }
01782         memcpy(res, resource.s, resource.len);
01783         res[resource.len]= '\0';
01784         xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
01785         memcpy(res, event.s, event.len);
01786         res[event.len]= '\0';
01787         xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
01788         pkg_free(res);
01789 
01790 
01791         w= watchers->next;
01792         while(w)
01793         {
01794                 strncpy( content,w->uri.s, w->uri.len);
01795                 content[ w->uri.len ]='\0';
01796                 node = xmlNewChild(w_list_node, NULL, BAD_CAST "watcher",
01797                                 BAD_CAST content) ;
01798                 if( node ==NULL)
01799                 {
01800                         LM_ERR("while adding child\n");
01801                         goto error;
01802                 }
01803                 if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s)== NULL)
01804                 {
01805                         LM_ERR("while adding new attribute\n");
01806                         goto error;
01807                 }       
01808                 
01809                 if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe")== NULL)
01810                 {
01811                         LM_ERR("while adding new attribute\n");
01812                         goto error;
01813                 }       
01814                 
01815                 if(xmlNewProp(node, BAD_CAST "status", 
01816                                         BAD_CAST get_status_str(w->status))== NULL)
01817                 {
01818                         LM_ERR("while adding new attribute\n");
01819                         goto error;
01820                 }
01821                 w= w->next;
01822         }
01823     body = (str*)pkg_malloc(sizeof(str));
01824         if(body == NULL)
01825         {
01826                 ERR_MEM(PKG_MEM_STR);   
01827         }
01828         memset(body, 0, sizeof(str));
01829 
01830         xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&body->s, &body->len, 1);
01831 
01832         xmlFreeDoc(doc);
01833 
01834         xmlCleanupParser();
01835 
01836     xmlMemoryDump();
01837 
01838     return body;
01839 
01840 error:
01841     if(doc)
01842                 xmlFreeDoc(doc);
01843         return NULL;
01844 }
01845 
01846 int watcher_found_in_list(watcher_t * watchers, str wuri)
01847 {
01848         watcher_t * w;
01849 
01850         w = watchers->next;
01851 
01852         while(w)
01853         {
01854                 if(w->uri.len == wuri.len && strncmp(w->uri.s, wuri.s, wuri.len)== 0)
01855                         return 1;
01856                 w= w->next;
01857         }
01858 
01859         return 0;
01860 }
01861 
01862 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
01863 {
01864         watcher_t * w;
01865         db_key_t query_cols[3];
01866         db_val_t query_vals[3];
01867         db_key_t result_cols[2];
01868         db1_res_t *result = NULL;
01869         db_row_t *row= NULL ;   
01870         db_val_t *row_vals;
01871         int n_result_cols = 0;
01872         int n_query_cols = 0;
01873         int wuser_col, wdomain_col;
01874         str wuser, wdomain, wuri;
01875         int i;
01876 
01877         /* select from watchers table the users that have subscribed
01878          * to the presentity and have status pending */
01879 
01880         query_cols[n_query_cols] = &str_presentity_uri_col;
01881         query_vals[n_query_cols].type = DB1_STR;
01882         query_vals[n_query_cols].nul = 0;
01883         query_vals[n_query_cols].val.str_val = pres_uri;
01884         n_query_cols++;
01885 
01886         query_cols[n_query_cols] = &str_event_col;
01887         query_vals[n_query_cols].type = DB1_STR;
01888         query_vals[n_query_cols].nul = 0;
01889         query_vals[n_query_cols].val.str_val = event;
01890         n_query_cols++;
01891 
01892         query_cols[n_query_cols] = &str_status_col;
01893         query_vals[n_query_cols].type = DB1_INT;
01894         query_vals[n_query_cols].nul = 0;
01895         query_vals[n_query_cols].val.int_val = PENDING_STATUS;
01896         n_query_cols++;
01897 
01898         result_cols[wuser_col=n_result_cols++] = &str_watcher_username_col;
01899         result_cols[wdomain_col=n_result_cols++] = &str_watcher_domain_col;
01900         
01901         if (pa_dbf.use_table(pa_db, &watchers_table) < 0) 
01902         {
01903                 LM_ERR("sql use table 'watchers_table' failed\n");
01904                 return -1;
01905         }
01906 
01907         if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
01908                  result_cols, n_query_cols, n_result_cols, 0, &result) < 0) 
01909         {
01910                 LM_ERR("failed to query %.*s table\n",
01911                                 watchers_table.len, watchers_table.s);
01912                 if(result)
01913                         pa_dbf.free_result(pa_db, result);
01914                 return -1;
01915         }
01916         
01917         if(result== NULL)
01918         {
01919                 LM_ERR("mysql query failed - null result\n");
01920                 return -1;
01921         }
01922 
01923         if (result->n<=0 )
01924         {
01925                 LM_DBG("The query returned no result\n");
01926                 pa_dbf.free_result(pa_db, result);
01927                 return 0;
01928         }
01929 
01930         for(i=0; i< result->n; i++)
01931         {
01932                 row = &result->rows[i];
01933                 row_vals = ROW_VALUES(row);
01934 
01935                 wuser.s = (char*)row_vals[wuser_col].val.string_val;
01936                 wuser.len = strlen(wuser.s);
01937 
01938                 wdomain.s = (char*)row_vals[wdomain_col].val.string_val;
01939                 wdomain.len = strlen(wdomain.s);
01940 
01941                 if(uandd_to_uri(wuser, wdomain, &wuri)<0)
01942                 {
01943                         LM_ERR("creating uri from username and domain\n");
01944                         goto error;
01945                 }
01946 
01947                 if(watcher_found_in_list(watchers, wuri))
01948                 {
01949                         pkg_free(wuri.s);
01950                         continue;
01951                 }
01952                 
01953                 w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
01954                 if(w== NULL)
01955                 {
01956                         ERR_MEM(PKG_MEM_STR);
01957                 }
01958                 memset(w, 0, sizeof(watcher_t));
01959 
01960                 w->status= WAITING_STATUS;
01961                 w->uri = wuri;
01962                 w->id.s = (char*)pkg_malloc(w->uri.len*2 +1);
01963                 if(w->id.s== NULL)
01964                 {
01965                         pkg_free(w->uri.s);
01966                         pkg_free(w);
01967                         ERR_MEM(PKG_MEM_STR);
01968                 }
01969 
01970                 to64frombits((unsigned char *)w->id.s,
01971                         (const unsigned char*)w->uri.s, w->uri.len);
01972                 w->id.len = strlen(w->id.s);
01973                 w->event= event;
01974         
01975                 w->next= watchers->next;
01976                 watchers->next= w;
01977 
01978         }
01979 
01980         pa_dbf.free_result(pa_db, result);
01981         return 0;
01982 
01983 error:
01984         if(result)
01985                 pa_dbf.free_result(pa_db, result);
01986         return -1;
01987 
01988 }
01989 
01990 #define EXTRACT_STRING(strng, chars)\
01991                         do {\
01992                         strng.s = (char *) chars;\
01993                         strng.len = strlen(strng.s);\
01994                         } while(0);
01995 
01996 static int unset_watchers_updated_winfo(str *pres_uri)
01997 {
01998         db_key_t query_cols[3], result_cols[1], update_cols[1];
01999         db_val_t query_vals[3], update_vals[1];
02000         db_op_t query_ops[2];
02001         db1_res_t *result = NULL;
02002         int n_query_cols = 0;
02003         int ret = -1;
02004         str winfo = str_init("presence.winfo");
02005 
02006         /* If this is the only presence.winfo dialog awaiting
02007            update for this presentity reset all of the watchers
02008            updated_winfo fields. */
02009 
02010         query_cols[n_query_cols] = &str_presentity_uri_col;
02011         query_vals[n_query_cols].type = DB1_STR;
02012         query_vals[n_query_cols].nul = 0;
02013         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02014         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02015         n_query_cols++;
02016 
02017         query_cols[n_query_cols] = &str_event_col;
02018         query_vals[n_query_cols].type = DB1_STR;
02019         query_vals[n_query_cols].nul = 0;
02020         query_vals[n_query_cols].val.str_val = winfo;
02021         n_query_cols++;
02022 
02023         query_cols[n_query_cols] = &str_updated_col;
02024         query_vals[n_query_cols].type = DB1_INT;
02025         query_vals[n_query_cols].nul = 0;
02026         query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
02027         n_query_cols++;
02028 
02029         result_cols[0] = &str_id_col;
02030 
02031         update_cols[0] = &str_updated_winfo_col;
02032         update_vals[0].type = DB1_INT;
02033         update_vals[0].nul = 0;
02034         update_vals[0].val.int_val = NO_UPDATE_TYPE;
02035 
02036         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02037         {
02038                 LM_ERR("use table failed\n");
02039                 goto error;
02040         }
02041 
02042         if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
02043                                         n_query_cols, 1, 0, &result) < 0)
02044         {
02045                 LM_ERR("in sql query\n");
02046                 goto error;
02047         }
02048 
02049         if (result == NULL)
02050         {
02051                 LM_ERR("bad result\n");
02052                 goto error;
02053         }
02054 
02055         if (RES_ROW_N(result) <= 0)
02056         {
02057                 query_ops[0] = OP_EQ;
02058                 query_ops[1] = OP_NEQ;
02059 
02060                 if (pa_dbf.update(pa_db, query_cols, query_ops,
02061                                   query_vals, update_cols,
02062                                   update_vals, 2, 1) < 0)
02063                 {
02064                         LM_ERR("in sql query\n");
02065                         goto error;
02066                 }
02067 
02068                 if (pa_dbf.affected_rows)
02069                         ret = pa_dbf.affected_rows(pa_db);
02070                 else
02071                         ret = 0;
02072         }
02073         else
02074                 ret = 0;
02075 
02076 error:
02077         if (result) pa_dbf.free_result(pa_db, result);
02078         return ret;
02079 }
02080 
02081 static int dialogs_awaiting_update(str *pres_uri, str event)
02082 {
02083         db_key_t query_cols[3], result_cols[1];
02084         db_val_t query_vals[3];
02085         db_op_t query_ops[3];
02086         db1_res_t *result = NULL;
02087         int n_query_cols = 0;
02088         int ret = -1;
02089 
02090         query_cols[n_query_cols] = &str_presentity_uri_col;
02091         query_vals[n_query_cols].type = DB1_STR;
02092         query_vals[n_query_cols].nul = 0;
02093         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02094         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02095         query_ops[n_query_cols] = OP_EQ;
02096         n_query_cols++;
02097 
02098         query_cols[n_query_cols] = &str_event_col;
02099         query_vals[n_query_cols].type = DB1_STR;
02100         query_vals[n_query_cols].nul = 0;
02101         query_vals[n_query_cols].val.str_val = event;
02102         query_ops[n_query_cols] = OP_EQ;
02103         n_query_cols++;
02104 
02105         query_cols[n_query_cols] = &str_updated_col;
02106         query_vals[n_query_cols].type = DB1_INT;
02107         query_vals[n_query_cols].nul = 0;
02108         query_vals[n_query_cols].val.int_val = NO_UPDATE_TYPE;
02109         query_ops[n_query_cols] = OP_NEQ;
02110         n_query_cols++;
02111 
02112         result_cols[0] = &str_id_col;
02113 
02114         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02115         {
02116                 LM_ERR("use table failed\n");
02117                 goto error;
02118         }
02119 
02120         if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,
02121                                 result_cols, n_query_cols, 1, 0, &result) < 0)
02122         {
02123                 LM_ERR("in sql query\n");
02124                 goto error;
02125         }
02126 
02127         if (result == NULL)
02128         {
02129                 LM_ERR("bad result\n");
02130                 goto error;
02131         }
02132         else
02133                 ret = RES_ROW_N(result);
02134 
02135 error:
02136         if (result) pa_dbf.free_result(pa_db, result);
02137         return ret;
02138 }
02139 
02140 int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
02141 {
02142         db_key_t query_cols[3], result_cols[3], update_cols[2];
02143         db_val_t query_vals[3], update_vals[2], *values;
02144         db_row_t *rows;
02145         db1_res_t *result = NULL;
02146         int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
02147         int callid_col, from_tag_col, to_tag_col;
02148         int i, ret = -1, count;
02149         str callid, from_tag, to_tag;
02150 
02151         query_cols[n_query_cols] = &str_presentity_uri_col;
02152         query_vals[n_query_cols].type = DB1_STR;
02153         query_vals[n_query_cols].nul = 0;
02154         query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02155         query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02156         n_query_cols++;
02157 
02158         query_cols[n_query_cols] = &str_event_col;
02159         query_vals[n_query_cols].type = DB1_STR;
02160         query_vals[n_query_cols].nul = 0;
02161         query_vals[n_query_cols].val.str_val = event->name;
02162         n_query_cols++;
02163 
02164         result_cols[callid_col = n_result_cols++] = &str_callid_col;
02165         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
02166         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
02167 
02168         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02169         {
02170                 LM_ERR("use table failed\n");
02171                 goto error;
02172         }
02173 
02174         if (pa_dbf.query (pa_db, query_cols, 0, query_vals, result_cols,
02175                                 n_query_cols, n_result_cols, 0,  &result) < 0) 
02176         {
02177                 LM_ERR("in sql query\n");
02178                 goto error;
02179         }
02180 
02181         if (result == NULL)
02182         {
02183                 LM_ERR("bad result\n");
02184                 goto error;
02185         }
02186 
02187         if (RES_ROW_N(result) <= 0)
02188         {
02189                 ret = 0;
02190                 goto done;
02191         }
02192 
02193         rows = RES_ROWS(result);
02194         count = RES_ROW_N(result);
02195         for (i = 0; i < RES_ROW_N(result); i++)
02196         {
02197                 values = ROW_VALUES(&rows[i]);
02198 
02199                 EXTRACT_STRING(callid, VAL_STRING(&values[callid_col]));
02200                 EXTRACT_STRING(from_tag, VAL_STRING(&values[from_tag_col]));
02201                 EXTRACT_STRING(to_tag, VAL_STRING(&values[to_tag_col]));
02202 
02203                 n_query_cols = 0;
02204                 n_update_cols = 0;
02205 
02206                 query_cols[n_query_cols] = &str_callid_col;
02207                 query_vals[n_query_cols].type = DB1_STR;
02208                 query_vals[n_query_cols].nul = 0;
02209                 query_vals[n_query_cols].val.str_val = callid;
02210                 n_query_cols++;
02211         
02212                 query_cols[n_query_cols] = &str_to_tag_col;
02213                 query_vals[n_query_cols].type = DB1_STR;
02214                 query_vals[n_query_cols].nul = 0;
02215                 query_vals[n_query_cols].val.str_val = to_tag;
02216                 n_query_cols++;
02217         
02218                 query_cols[n_query_cols] = &str_from_tag_col;
02219                 query_vals[n_query_cols].type = DB1_STR;
02220                 query_vals[n_query_cols].nul = 0;
02221                 query_vals[n_query_cols].val.str_val = from_tag;
02222                 n_query_cols++;
02223         
02224                 update_cols[n_update_cols] = &str_updated_col;
02225                 update_vals[n_update_cols].type = DB1_INT;
02226                 update_vals[n_update_cols].nul = 0;
02227                 update_vals[n_update_cols].val.int_val = core_hash(&callid,
02228                         &from_tag, (pres_waitn_time * pres_notifier_poll_rate
02229                                         * pres_notifier_processes) - 1);
02230                 n_update_cols++;
02231 
02232                 if (full)
02233                 {
02234                         update_cols[n_update_cols] = &str_updated_winfo_col;
02235                         update_vals[n_update_cols].type = DB1_INT;
02236                         update_vals[n_update_cols].nul = 0;
02237                         update_vals[n_update_cols].val.int_val = UPDATED_TYPE;
02238                         n_update_cols++;
02239                 }
02240 
02241                 if (pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
02242                                   update_vals, n_query_cols, n_update_cols) < 0)
02243                 {
02244                         LM_ERR("in sql query\n");
02245                         goto error;
02246                 }
02247 
02248                 if (pa_dbf.affected_rows)
02249                         if (pa_dbf.affected_rows(pa_db) == 0) count--;
02250         }
02251 
02252         ret = count; 
02253 
02254 done:
02255 error:
02256         if (result) pa_dbf.free_result(pa_db, result);
02257         return ret;
02258 }
02259 
02260 int set_updated(subs_t *sub)
02261 {
02262         db_key_t query_cols[3], update_cols[1];
02263         db_val_t query_vals[3], update_vals[1];
02264         int n_query_cols = 0;
02265 
02266         query_cols[n_query_cols] = &str_callid_col;
02267         query_vals[n_query_cols].type = DB1_STR;
02268         query_vals[n_query_cols].nul = 0;
02269         query_vals[n_query_cols].val.str_val = sub->callid;
02270         n_query_cols++;
02271 
02272         query_cols[n_query_cols] = &str_to_tag_col;
02273         query_vals[n_query_cols].type = DB1_STR;
02274         query_vals[n_query_cols].nul = 0;
02275         query_vals[n_query_cols].val.str_val = sub->to_tag;
02276         n_query_cols++;
02277 
02278         query_cols[n_query_cols] = &str_from_tag_col;
02279         query_vals[n_query_cols].type = DB1_STR;
02280         query_vals[n_query_cols].nul = 0;
02281         query_vals[n_query_cols].val.str_val = sub->from_tag;
02282         n_query_cols++;
02283 
02284         update_cols[0] = &str_updated_col;
02285         update_vals[0].type = DB1_INT;
02286         update_vals[0].nul = 0;
02287         update_vals[0].val.int_val = core_hash(&sub->callid, &sub->from_tag,
02288                                 (pres_waitn_time * pres_notifier_poll_rate
02289                                                 * pres_notifier_processes) - 1);
02290 
02291         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02292         {
02293                 LM_ERR("use table failed\n");
02294                 return -1;
02295         }
02296 
02297         if (pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
02298                                         update_vals, n_query_cols, 1) < 0)
02299         {
02300                 LM_ERR("in sql query\n");
02301                 return -1;
02302         }
02303 
02304         if (pa_dbf.affected_rows)
02305                 return pa_dbf.affected_rows(pa_db);
02306         else
02307                 return 0;
02308 }
02309 
02310 static watcher_t *build_watchers_list(subs_t *sub)
02311 {
02312         db_key_t query_cols[3], result_cols[4];
02313         db_val_t query_vals[3], *values;
02314         db_row_t *rows;
02315         db1_res_t *result;
02316         int n_query_cols = 0, n_result_cols = 0;
02317         int wuser_col, wdomain_col, callid_col, status_col;
02318         int i;
02319         subs_t sb;
02320         watcher_t *watchers = NULL;
02321 
02322         watchers = (watcher_t*) pkg_malloc(sizeof(watcher_t));
02323         if (watchers == NULL)
02324         {
02325                 ERR_MEM(PKG_MEM_STR);
02326         }
02327         memset(watchers, 0, sizeof(watcher_t));
02328 
02329         query_cols[n_query_cols] = &str_presentity_uri_col;
02330         query_vals[n_query_cols].type = DB1_STR;
02331         query_vals[n_query_cols].nul = 0;
02332         query_vals[n_query_cols].val.str_val= sub->pres_uri;
02333         n_query_cols++;
02334 
02335         query_cols[n_query_cols] = &str_event_col;
02336         query_vals[n_query_cols].type = DB1_STR;
02337         query_vals[n_query_cols].nul = 0;
02338         query_vals[n_query_cols].val.str_val = sub->event->wipeer->name;
02339         n_query_cols++;
02340 
02341         query_cols[n_query_cols] = &str_updated_winfo_col;
02342         query_vals[n_query_cols].type = DB1_INT;
02343         query_vals[n_query_cols].nul = 0;
02344         query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
02345         n_query_cols++;
02346 
02347         result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
02348         result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
02349         result_cols[callid_col = n_result_cols++] = &str_callid_col;
02350         result_cols[status_col = n_result_cols++] = &str_status_col;
02351 
02352         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02353         {
02354                 LM_ERR("use table failed\n");
02355                 goto error;
02356         }
02357 
02358         if (pa_dbf.query (pa_db, query_cols, 0, query_vals, result_cols,
02359                                 n_query_cols, n_result_cols, 0,  &result) < 0) 
02360         {
02361                 LM_ERR("in sql query\n");
02362                 goto error;
02363         }
02364 
02365         if (result == NULL)
02366         {
02367                 LM_ERR("bad result\n");
02368                 goto error;
02369         }
02370 
02371         if (RES_ROW_N(result) <= 0)
02372                 goto done;
02373 
02374         rows = RES_ROWS(result);
02375         for (i = 0; i < RES_ROW_N(result); i++)
02376         {
02377                 memset(&sb, 0, sizeof(subs_t));
02378                 values = ROW_VALUES(&rows[i]);
02379 
02380                 EXTRACT_STRING(sb.watcher_user, VAL_STRING(&values[wuser_col]));
02381                 EXTRACT_STRING(sb.watcher_domain, VAL_STRING(&values[wdomain_col]));
02382                 EXTRACT_STRING(sb.callid, VAL_STRING(&values[callid_col]));
02383                 sb.status = VAL_INT(&values[status_col]);
02384 
02385                 sb.event = sub->event->wipeer;
02386                 
02387                 if (add_watcher_list(&sb, watchers) < 0)
02388                         goto error;
02389         }
02390         
02391 done:
02392         pa_dbf.free_result(pa_db, result);
02393         return watchers;
02394 
02395 error:
02396         if (result) pa_dbf.free_result(pa_db, result);
02397         free_watcher_list(watchers);
02398         return NULL;
02399 }
02400 
02401 static int cleanup_missing_dialog(subs_t *sub)
02402 {
02403         int ret = -1, num_other_watchers = 0;
02404 
02405         if (sub->event->type & WINFO_TYPE)
02406         {
02407                 if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02408                 {
02409                         LM_ERR("resetting updated_winfo flags\n");
02410                         goto error;
02411                 }
02412         }
02413         else if (sub->event->type & PUBL_TYPE)
02414         {
02415                 if ((num_other_watchers = dialogs_awaiting_update(
02416                                         &sub->pres_uri, sub->event->name)) < 0)
02417                 {
02418                         LM_ERR("checking watchers\n");
02419                         goto error;
02420                 }
02421                 else if (num_other_watchers == 0)
02422                 {
02423                         if (delete_offline_presentities(&sub->pres_uri,
02424                                                         sub->event) < 0)
02425                         {
02426                                 LM_ERR("deleting presentity\n");
02427                                 goto error;
02428                         }
02429                 }
02430         }
02431 
02432         ret = 0;
02433 
02434 error:
02435         return ret;
02436 }
02437 
02438 static int notifier_notify(subs_t *sub, int *updated, int *end_transaction)
02439 {
02440         str *nbody = NULL;
02441         watcher_t *watchers = NULL;
02442         int ret = 0, attempt_delete_presentities = 0;
02443 
02444         *updated = 0;
02445 
02446         /* Terminating dialog NOTIFY */
02447         if (sub->expires == 0 || sub->status == TERMINATED_STATUS)
02448         {
02449                 sub->status = TERMINATED_STATUS;
02450 
02451                 if (sub->event->type & WINFO_TYPE)
02452                 {
02453                         if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02454                         {
02455                                 LM_WARN("resetting updated_winfo flags\n");
02456 
02457                                 if (pa_dbf.abort_transaction)
02458                                 {
02459                                         if (pa_dbf.abort_transaction(pa_db) < 0)
02460                                         {
02461                                                 LM_ERR("in abort_transaction\n");
02462                                                 goto error;
02463                                         }
02464                                 }
02465                                 *end_transaction = 0;
02466 
02467                                 /* Make sure this gets tried again next time */
02468                                 *updated = 1;
02469                                 goto done;
02470                         }
02471                 }
02472                 else
02473                 {
02474                         str winfo = str_init("presence.winfo");
02475                         int num_other_watchers, num_winfos;
02476 
02477                         if (sub->event->type & PUBL_TYPE)
02478                         {
02479                                 if ((num_other_watchers = dialogs_awaiting_update(
02480                                                 &sub->pres_uri, sub->event->name)) < 0)
02481                                 {
02482                                         LM_ERR("checking watchers\n");
02483                                         goto error;
02484                                 }
02485                                 else if (num_other_watchers == 0)
02486                                         attempt_delete_presentities = 1;
02487                         }
02488 
02489                         if (sub->event->wipeer)
02490                         {
02491                                 if ((num_winfos = dialogs_awaiting_update(
02492                                                         &sub->pres_uri,
02493                                                         winfo)) < 0)
02494                                 {
02495                                         LM_ERR("checking winfos\n");
02496                                         goto error;
02497                                 }
02498 
02499                                 if (sub->updated_winfo == UPDATED_TYPE && num_winfos > 0)
02500                                 {
02501                                         *updated = 1;
02502                                         goto done;
02503                                 }
02504                         }
02505                 }
02506         }
02507         else /* Non-terminating dialog */
02508         {
02509                 if (sub->event->type & WINFO_TYPE) /* presence.winfo dialog */
02510                 {
02511                         if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02512                         {
02513                                 LM_WARN("resetting updated_winfo flags\n");
02514 
02515                                 if (pa_dbf.abort_transaction)
02516                                 {
02517                                         if (pa_dbf.abort_transaction(pa_db) < 0)
02518                                         {
02519                                                 LM_ERR("in abort_transaction\n");
02520                                                 goto error;
02521                                         }
02522                                 }
02523                                 *end_transaction = 0;
02524 
02525                                 /* Make sure this gets tried again next time */
02526                                 *updated = 1;
02527                                 goto done;
02528                         }
02529 
02530                         if (sub->updated_winfo == NO_UPDATE_TYPE)
02531                         {
02532                                 /* Partial notify if
02533                                    updated_winfo == NO_UPDATE_TYPE */
02534                                 int len = 0;
02535                                 char *version_str = int2str(sub->version,
02536                                                                 &len);
02537                                 if(version_str ==NULL)
02538                                 {
02539                                         LM_ERR("converting int to str\n");
02540                                         goto error;
02541                                 }
02542 
02543                                 watchers = build_watchers_list(sub);
02544                                 if(watchers == NULL)
02545                                 {
02546                                         LM_ERR("in build_watchers_list\n");
02547                                         goto error;
02548                                 }
02549 
02550                                 nbody = create_winfo_xml(watchers,version_str,
02551                                                 sub->pres_uri,
02552                                                 sub->event->wipeer->name,
02553                                                 PARTIAL_STATE_FLAG);
02554                                 if(nbody == NULL)
02555                                 {
02556                                         LM_ERR("in create_winfo_xml\n");
02557                                         goto error;
02558                                 }
02559 
02560                         }
02561                         else    /* Full presence.winfo NOTIFY */
02562                                 sub->updated_winfo = NO_UPDATE_TYPE;
02563                 }
02564                 else if (sub->event->type & PUBL_TYPE)
02565                 {
02566                         int num_other_watchers;
02567 
02568                         if ((num_other_watchers = dialogs_awaiting_update(
02569                                         &sub->pres_uri, sub->event->name)) < 0)
02570                         {
02571                                 LM_ERR("checking watchers\n");
02572                                 goto error;
02573                         }
02574                         else if (num_other_watchers == 0)
02575                                 attempt_delete_presentities = 1;
02576                 }
02577                 else if (!send_fast_notify)
02578                         goto done;
02579         }
02580 
02581         if (notify(sub, NULL, nbody, 0) < 0)
02582         {
02583                 LM_ERR("could not send notify\n");
02584                 goto error;
02585         }
02586 
02587         ret = 1;
02588 
02589 done:
02590         if (attempt_delete_presentities)
02591         {
02592                 if (delete_offline_presentities(&sub->pres_uri, sub->event) < 0)
02593                 {
02594                         LM_ERR("deleting presentity\n");
02595                         goto error;
02596                 }
02597         }
02598 
02599         free_notify_body(nbody, sub->event);
02600         free_watcher_list(watchers);
02601 
02602         return ret;
02603 
02604 error:
02605         free_notify_body(nbody, sub->event);
02606         free_watcher_list(watchers);
02607 
02608         if (pa_dbf.abort_transaction)
02609         {
02610                 if (pa_dbf.abort_transaction(pa_db) < 0)
02611                         LM_ERR("in abort_transaction\n");
02612         }
02613         *end_transaction = 0;
02614 
02615         return -1;
02616 }
02617 
02618 int process_dialogs(int round, int presence_winfo)
02619 {
02620         db_key_t query_cols[3], result_cols[18], update_cols[4];
02621         db_val_t query_vals[3], update_vals[4], *values, *dvalues;
02622         db_op_t query_ops[2];
02623         db_row_t *rows, *drows;
02624         db1_res_t *dialog_list = NULL, *dialog = NULL;
02625         int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
02626         int callid_col, to_tag_col, from_tag_col;
02627         int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
02628         int wuser_col, wdomain_col, sockinfo_col, lcontact_col, contact_col;
02629         int rroute_col, event_id_col, reason_col, event_col, lcseq_col;
02630         int rcseq_col, status_col, version_col, updated_winfo_col, expires_col;
02631         int i, notify_sent = 0, cached_updated_winfo, ret = -1;
02632         int end_transaction = 0;
02633         subs_t sub;
02634         str ev_sname, winfo = str_init("presence.winfo");
02635         int now = (int)time(NULL);
02636         int updated = 0;
02637 
02638         if (++subset > (pres_waitn_time * pres_notifier_poll_rate) -1)
02639                 subset = 0;
02640 
02641         query_cols[n_query_cols] = &str_updated_col;
02642         query_vals[n_query_cols].type = DB1_INT;
02643         query_vals[n_query_cols].nul = 0;
02644         query_vals[n_query_cols].val.int_val = round;
02645         query_ops[n_query_cols] = OP_EQ;
02646         n_query_cols++;
02647 
02648         query_cols[n_query_cols] = &str_event_col;
02649         query_vals[n_query_cols].type = DB1_STR;
02650         query_vals[n_query_cols].nul = 0;
02651         query_vals[n_query_cols].val.str_val = winfo;
02652         query_ops[n_query_cols] = presence_winfo ? OP_EQ : OP_NEQ;
02653         n_query_cols++;
02654 
02655         result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
02656         result_cols[callid_col = n_result_cols++] = &str_callid_col;
02657         result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
02658         result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
02659         result_cols[event_col = n_result_cols++] = &str_event_col;
02660         
02661         update_cols[n_update_cols] = &str_updated_col;
02662         update_vals[n_update_cols].type = DB1_INT;
02663         update_vals[n_update_cols].nul = 0;
02664         update_vals[n_update_cols].val.int_val = NO_UPDATE_TYPE;
02665         n_update_cols++;
02666 
02667         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02668         {
02669                 LM_ERR("use table failed\n");
02670                 goto error;
02671         }
02672 
02673         if (pa_dbf.start_transaction)
02674         {
02675                 if (pa_dbf.start_transaction(pa_db) < 0)
02676                 {
02677                         LM_ERR("in start_transaction\n");
02678                         goto error;
02679                 }
02680         }
02681 
02682         /* Step 1: Find active_watchers that require notification */
02683         if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
02684                          n_query_cols, n_result_cols, 0, &dialog_list) < 0)
02685         {
02686                 LM_ERR("in sql query\n");
02687                 goto error;
02688         }
02689         if(dialog_list == NULL)
02690         {
02691                 LM_ERR("bad result\n");
02692                 goto error;
02693         }
02694 
02695         if(dialog_list->n <= 0)
02696                 goto done;
02697 
02698         /* Step 2: Update the records so they are not notified again */
02699         if (pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
02700                           update_vals, n_query_cols, n_update_cols) < 0)
02701         {
02702                 LM_ERR("in sql update\n");
02703                 goto error;
02704         }
02705 
02706         if (pa_dbf.end_transaction)
02707         {
02708                 if (pa_dbf.end_transaction(pa_db) < 0)
02709                 {
02710                         LM_ERR("in end_transaction\n");
02711                         goto error;
02712                 }
02713         }
02714 
02715         /* Step 3: Notify each watcher we found */
02716         rows = RES_ROWS(dialog_list);
02717         for (i = 0; i < RES_ROW_N(dialog_list); i++)
02718         {
02719                 n_query_cols = 0;
02720                 n_result_cols = 0;
02721                 n_update_cols = 0;
02722                 memset(&sub, 0, sizeof(subs_t));
02723                 values = ROW_VALUES(&rows[i]);
02724 
02725                 EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
02726                 EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
02727                 EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
02728                 EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
02729                 EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
02730                 sub.event = contains_event(&ev_sname, NULL);
02731                 if (sub.event == NULL)
02732                 {
02733                         LM_ERR("event not found and set to NULL\n");
02734                         goto delete_dialog;
02735                 }
02736 
02737                 query_cols[n_query_cols] = &str_callid_col;
02738                 query_vals[n_query_cols].type = DB1_STR;
02739                 query_vals[n_query_cols].nul = 0;
02740                 query_vals[n_query_cols].val.str_val = sub.callid;
02741                 n_query_cols++;
02742 
02743                 query_cols[n_query_cols] = &str_to_tag_col;
02744                 query_vals[n_query_cols].type = DB1_STR;
02745                 query_vals[n_query_cols].nul = 0;
02746                 query_vals[n_query_cols].val.str_val = sub.to_tag;
02747                 n_query_cols++;
02748 
02749                 query_cols[n_query_cols] = &str_from_tag_col;
02750                 query_vals[n_query_cols].type = DB1_STR;
02751                 query_vals[n_query_cols].nul = 0;
02752                 query_vals[n_query_cols].val.str_val = sub.from_tag;
02753                 n_query_cols++;
02754 
02755                 result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
02756                 result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
02757                 result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
02758                 result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
02759                 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
02760                 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
02761                 result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
02762                 result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
02763                 result_cols[contact_col = n_result_cols++] = &str_contact_col;
02764                 result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
02765                 result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
02766                 result_cols[reason_col = n_result_cols++] = &str_reason_col;
02767                 result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
02768                 result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
02769                 result_cols[status_col = n_result_cols++] = &str_status_col;
02770                 result_cols[version_col = n_result_cols++] = &str_version_col;
02771                 result_cols[updated_winfo_col = n_result_cols++] = &str_updated_winfo_col;
02772                 result_cols[expires_col = n_result_cols++] = &str_expires_col;
02773 
02774                 if (pa_dbf.start_transaction)
02775                 {
02776                         if (pa_dbf.start_transaction(pa_db) < 0)
02777                         {
02778                                 LM_ERR("in start_transaction\n");
02779                                 goto error;
02780                         }
02781                 }
02782                 end_transaction = 1;
02783 
02784                 if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
02785                                  n_query_cols, n_result_cols, 0, &dialog) < 0)
02786                 {
02787                         LM_ERR("in sql query\n");
02788                         goto error;
02789                 }
02790 
02791                 if (dialog == NULL)
02792                 {
02793                         LM_ERR("bad result\n");
02794                         goto error;
02795                 }
02796 
02797                 if (dialog->n <= 0)
02798                 {
02799                         LM_WARN("record not found\n");
02800                         if (cleanup_missing_dialog(&sub) < 0)
02801                                 LM_ERR("cleaning up after missing record\n");
02802                         goto next_dialog;
02803                 }
02804 
02805                 if (dialog->n > 1)
02806                 {
02807                         LM_ERR("multiple records found\n");
02808                         goto delete_dialog;
02809                 }
02810 
02811                 drows = RES_ROWS(dialog);
02812                 dvalues = ROW_VALUES(drows);
02813 
02814                 EXTRACT_STRING(sub.to_user, VAL_STRING(&dvalues[tuser_col]));
02815                 EXTRACT_STRING(sub.to_domain, VAL_STRING(&dvalues[tdomain_col]));
02816                 EXTRACT_STRING(sub.from_user, VAL_STRING(&dvalues[fuser_col]));
02817                 EXTRACT_STRING(sub.from_domain, VAL_STRING(&dvalues[fdomain_col]));
02818                 EXTRACT_STRING(sub.watcher_user, VAL_STRING(&dvalues[wuser_col]));
02819                 EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&dvalues[wdomain_col]));
02820                 EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&dvalues[sockinfo_col]));
02821                 EXTRACT_STRING(sub.local_contact, VAL_STRING(&dvalues[lcontact_col]));
02822                 EXTRACT_STRING(sub.contact, VAL_STRING(&dvalues[contact_col]));
02823                 EXTRACT_STRING(sub.record_route, VAL_STRING(&dvalues[rroute_col]));
02824                 EXTRACT_STRING(sub.event_id, VAL_STRING(&dvalues[event_id_col]));
02825                 EXTRACT_STRING(sub.reason, VAL_STRING(&dvalues[reason_col]));
02826 
02827                 sub.local_cseq = VAL_INT(&dvalues[lcseq_col]) + 1;
02828                 sub.remote_cseq = VAL_INT(&dvalues[rcseq_col]);
02829                 sub.status = VAL_INT(&dvalues[status_col]);
02830                 sub.version = VAL_INT(&dvalues[version_col]) + 1;
02831                 cached_updated_winfo = sub.updated_winfo
02832                                         = VAL_INT(&dvalues[updated_winfo_col]);
02833                 
02834                 if (VAL_INT(&dvalues[expires_col]) > now)
02835                         sub.expires = VAL_INT(&dvalues[expires_col]) - now;
02836                 else
02837                         sub.expires = 0;
02838 
02839                 if (sub.expires < expires_offset) sub.expires = 0;
02840 
02841                 sub.updated = round;
02842 
02843                 if ((notify_sent = notifier_notify(&sub, &updated, &end_transaction)) < 0)
02844                 {
02845                         LM_ERR("sending NOTIFY request\n");
02846 
02847                         if (cleanup_missing_dialog(&sub) < 0)
02848                                 LM_ERR("cleaning up after error sending NOTIFY"
02849                                         "request\n");
02850         
02851                         /* remove the dialog and continue */
02852                         goto delete_dialog;
02853                 }
02854 
02855                 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02856                 {
02857                         LM_ERR("use table failed\n");
02858                         goto error;
02859                 }
02860 
02861                 if ((sub.expires > 0 && sub.status != TERMINATED_STATUS) || updated)
02862                 {
02863                         if (sub.updated_winfo != cached_updated_winfo)
02864                         {
02865                                 update_cols[n_update_cols] = &str_updated_winfo_col;
02866                                 update_vals[n_update_cols].type = DB1_INT;
02867                                 update_vals[n_update_cols].nul = 0;
02868                                 update_vals[n_update_cols].val.int_val = sub.updated_winfo;
02869                                 n_update_cols++;
02870                         }
02871 
02872                         if (updated)
02873                         {
02874                                 update_cols[n_update_cols] = &str_updated_col;
02875                                 update_vals[n_update_cols].type = DB1_INT;
02876                                 update_vals[n_update_cols].nul = 0;
02877                                 update_vals[n_update_cols].val.int_val = round;
02878                                 n_update_cols++;
02879                         }
02880 
02881                         if (notify_sent)
02882                         {
02883                                 update_cols[n_update_cols] = &str_local_cseq_col;
02884                                 update_vals[n_update_cols].type = DB1_INT;
02885                                 update_vals[n_update_cols].nul = 0;
02886                                 update_vals[n_update_cols].val.int_val = sub.local_cseq;
02887                                 n_update_cols++;
02888 
02889                                 update_cols[n_update_cols] = &str_version_col;
02890                                 update_vals[n_update_cols].type = DB1_INT;
02891                                 update_vals[n_update_cols].nul = 0;
02892                                 update_vals[n_update_cols].val.int_val = sub.version;
02893                                 n_update_cols++;
02894                         }
02895 
02896                         if (n_update_cols > 0)
02897                         {
02898                                 if (pa_dbf.update(pa_db,
02899                                                   query_cols,
02900                                                   0,
02901                                                   query_vals,
02902                                                   update_cols,
02903                                                   update_vals,
02904                                                   n_query_cols,
02905                                                   n_update_cols) < 0)
02906                                 {
02907                                         LM_ERR("in sql update\n");
02908                                         goto error;
02909                                 }
02910                         }
02911 
02912                 }
02913                 else if (notify_sent)
02914                 {
02915 delete_dialog:
02916                         if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02917                         {
02918                                 LM_ERR("use table failed\n");
02919                                 goto error;
02920                         }
02921 
02922                         if (pa_dbf.delete(pa_db, query_cols, 0, query_vals,
02923                                                 n_query_cols) < 0)
02924                         {       
02925                                 LM_ERR("in sql delete");
02926                                 goto error;
02927                         }
02928                 }
02929 
02930 next_dialog:
02931                 if (pa_dbf.end_transaction && end_transaction)
02932                 {
02933                         if (pa_dbf.end_transaction(pa_db) < 0)
02934                         {
02935                                 LM_ERR("in end_transaction\n");
02936                                 goto error;
02937                         }
02938                 }
02939 
02940                 pa_dbf.free_result(pa_db, dialog);
02941                 dialog = NULL;
02942         }
02943 
02944 done:
02945         ret = 0;
02946 error:
02947         if (dialog_list) pa_dbf.free_result(pa_db, dialog_list);
02948         if (dialog) pa_dbf.free_result(pa_db, dialog);
02949 
02950         if (pa_dbf.abort_transaction)
02951         {
02952                 if (pa_dbf.abort_transaction(pa_db) < 0)
02953                         LM_ERR("in abort_transaction\n");
02954         }
02955 
02956         return ret;
02957 }
02958 
02959 void pres_timer_send_notify(unsigned int ticks, void *param)
02960 {
02961         int process_num = *((int *) param);
02962         int round = subset + (pres_waitn_time * pres_notifier_poll_rate
02963                                 * process_num);
02964 
02965         if (process_dialogs(round, 0) < 0)
02966         {
02967                 LM_ERR("Handling non presence.winfo dialogs\n");
02968                 return;
02969         }
02970         if (process_dialogs(round, 1) < 0)
02971         {
02972                 LM_ERR("Handling presence.winfo dialogs\n");
02973                 return;
02974         }
02975 }