00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <string.h>
00038 #include <libxml/parser.h>
00039
00040 #include "../../trim.h"
00041 #include "../../ut.h"
00042 #include "../../globals.h"
00043 #include "../../str.h"
00044 #include "../../lib/srdb1/db.h"
00045 #include "../../lib/srdb1/db_val.h"
00046 #include "../../hashes.h"
00047 #include "../../socket_info.h"
00048 #include "../../modules/tm/tm_load.h"
00049 #include "../pua/hash.h"
00050 #include "presentity.h"
00051 #include "presence.h"
00052 #include "notify.h"
00053 #include "utils_func.h"
00054
00055 #define ALLOC_SIZE 3000
00056 #define MAX_FORWARD 70
00057
00058 c_back_param* shm_dup_cbparam(subs_t*);
00059 void free_cbparam(c_back_param* cb_param);
00060
00061 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps);
00062 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
00063 int add_watcher_list(subs_t *s, watcher_t *watchers);
00064 str* create_winfo_xml(watcher_t* watchers, char* version,
00065 str resource, str event, int STATE_FLAG);
00066 void free_watcher_list(watcher_t* watchers);
00067
00068 str str_to_user_col = str_init("to_user");
00069 str str_username_col = str_init("username");
00070 str str_domain_col = str_init("domain");
00071 str str_body_col = str_init("body");
00072 str str_to_domain_col = str_init("to_domain");
00073 str str_from_user_col = str_init("from_user");
00074 str str_from_domain_col = str_init("from_domain");
00075 str str_watcher_username_col = str_init("watcher_username");
00076 str str_watcher_domain_col = str_init("watcher_domain");
00077 str str_event_id_col = str_init("event_id");
00078 str str_event_col = str_init("event");
00079 str str_etag_col = str_init("etag");
00080 str str_from_tag_col = str_init("from_tag");
00081 str str_to_tag_col = str_init("to_tag");
00082 str str_callid_col = str_init("callid");
00083 str str_local_cseq_col = str_init("local_cseq");
00084 str str_remote_cseq_col = str_init("remote_cseq");
00085 str str_record_route_col = str_init("record_route");
00086 str str_contact_col = str_init("contact");
00087 str str_expires_col = str_init("expires");
00088 str str_status_col = str_init("status");
00089 str str_reason_col = str_init("reason");
00090 str str_socket_info_col = str_init("socket_info");
00091 str str_local_contact_col = str_init("local_contact");
00092 str str_version_col = str_init("version");
00093 str str_presentity_uri_col = str_init("presentity_uri");
00094 str str_inserted_time_col = str_init("inserted_time");
00095 str str_received_time_col = str_init("received_time");
00096 str str_id_col = str_init("id");
00097 str str_sender_col = str_init("sender");
00098 str str_updated_col = str_init("updated");
00099 str str_updated_winfo_col = str_init("updated_winfo");
00100
00101 int subset=0;
00102
00103 char* get_status_str(int status_flag)
00104 {
00105 switch(status_flag)
00106 {
00107 case ACTIVE_STATUS: return "active";
00108 case PENDING_STATUS: return "pending";
00109 case TERMINATED_STATUS: return "terminated";
00110 case WAITING_STATUS: return "waiting";
00111 }
00112 return NULL;
00113 }
00114
00115 void printf_subs(subs_t* subs)
00116 {
00117 LM_DBG("pres_uri: %.*s\n", subs->pres_uri.len, subs->pres_uri.s);
00118 LM_DBG("watcher_user@watcher_domain: %.*s@%.*s\n", subs->watcher_user.len, subs->watcher_user.s, subs->watcher_domain.len, subs->watcher_domain.s);
00119 LM_DBG("to_user@to_domain: %.*s@%.*s\n", subs->to_user.len, subs->to_user.s, subs->to_domain.len, subs->to_domain.s);
00120 LM_DBG("from_user@from_domain: %.*s@%.*s\n", subs->from_user.len, subs->from_user.s, subs->from_domain.len, subs->from_domain.s);
00121 LM_DBG("callid/from_tag/to_tag: %.*s/%.*s/%.*s\n", subs->callid.len, subs->callid.s, subs->from_tag.len, subs->from_tag.s, subs->to_tag.len, subs->to_tag.s);
00122 LM_DBG("local_cseq/remote_cseq: %u/%u\n", subs->local_cseq, subs->remote_cseq);
00123 LM_DBG("local_contact/contact: %.*s/%.*s\n", subs->local_contact.len, subs->local_contact.s, subs->contact.len, subs->contact.s);
00124 LM_DBG("record_route: %.*s\n", subs->record_route.len, subs->record_route.s);
00125 LM_DBG("sockinfo_str: %.*s\n", subs->sockinfo_str.len, subs->sockinfo_str.s);
00126
00127 LM_DBG("event: %.*s\n", subs->event->name.len, subs->event->name.s);
00128 LM_DBG("status: %s\n", get_status_str(subs->status));
00129 LM_DBG("reason: %.*s\n", subs->reason.len, subs->reason.s);
00130 LM_DBG("version: %u\n", subs->version);
00131 LM_DBG("expires: %u\n", subs->expires);
00132
00133 LM_DBG("updated/updated_winfo: %d/%d\n", subs->updated, subs->updated_winfo);
00134 }
00135
00136 int build_str_hdr(subs_t* subs, int is_body, str* hdr)
00137 {
00138 pres_ev_t* event= subs->event;
00139 str expires = {0, 0};
00140 str status = {0, 0};
00141 char* p;
00142 str trans = {";transport=", 11};
00143
00144 if(hdr == NULL) {
00145 LM_ERR("bad parameter\n");
00146 return -1;
00147 }
00148 expires.s = int2str(subs->expires, &expires.len);
00149
00150 status.s= get_status_str(subs->status);
00151 if(status.s == NULL) {
00152 LM_ERR("bad status %d\n", subs->status);
00153 return -1;
00154 }
00155 status.len = strlen(status.s);
00156
00157 hdr->len = 18 + CRLF_LEN +
00158 7 + subs->event->name.len +4 + subs->event_id.len+
00159 CRLF_LEN + 10 + subs->local_contact.len + 1 +
00160 15 + CRLF_LEN + 20 +
00161 status.len + 10
00162 + (subs->reason.len>expires.len?subs->reason.len:expires.len)
00163 + CRLF_LEN + (is_body?
00164 (14 +subs->event->content_type.len + CRLF_LEN):0) + 1;
00165
00166 hdr->s = (char*)pkg_malloc(hdr->len);
00167 if(hdr->s == NULL) {
00168 LM_ERR("no more pkg memory\n");
00169 return -1;
00170 }
00171
00172 p = hdr->s;
00173 p += sprintf(p, "Max-Forwards: %d\r\n", MAX_FORWARD);
00174
00175 p += sprintf(p ,"Event: %.*s", event->name.len, event->name.s);
00176 if(subs->event_id.len && subs->event_id.s) {
00177 p += sprintf(p, ";id=%.*s", subs->event_id.len, subs->event_id.s);
00178 }
00179 memcpy(p, CRLF, CRLF_LEN);
00180 p += CRLF_LEN;
00181
00182 p += sprintf(p, "Contact: <%.*s", subs->local_contact.len, subs->local_contact.s);
00183 if(subs->sockinfo_str.s!=NULL
00184 && str_search(&subs->local_contact, &trans)==0) {
00185
00186 switch(subs->sockinfo_str.s[0]) {
00187 case 's':
00188 case 'S':
00189 memcpy(p, ";transport=sctp", 15);
00190 p += 15;
00191 break;
00192 case 't':
00193 case 'T':
00194 switch(subs->sockinfo_str.s[1]) {
00195 case 'c':
00196 case 'C':
00197 memcpy(p, ";transport=tcp", 14);
00198 p += 14;
00199 break;
00200 case 'l':
00201 case 'L':
00202 memcpy(p, ";transport=tls", 14);
00203 p += 14;
00204 break;
00205 }
00206 break;
00207 }
00208 }
00209 *p = '>';
00210 p++;
00211 memcpy(p, CRLF, CRLF_LEN);
00212 p += CRLF_LEN;
00213
00214 p += sprintf(p, "Subscription-State: %.*s", status.len, status.s);
00215
00216 if(subs->status == TERMINATED_STATUS) {
00217 LM_DBG("state = terminated\n");
00218 p += sprintf(p, ";reason=%.*s", subs->reason.len, subs->reason.s);
00219 } else {
00220 p += sprintf(p, ";expires=%.*s", expires.len, expires.s);
00221 }
00222 memcpy(p, CRLF, CRLF_LEN);
00223 p += CRLF_LEN;
00224
00225 if(is_body) {
00226 p += sprintf(p,"Content-Type: %.*s\r\n", event->content_type.len,
00227 event->content_type.s);
00228 }
00229
00230 *p = '\0';
00231 hdr->len = p - hdr->s;
00232
00233 return 0;
00234 }
00235
00236 int get_wi_subs_db(subs_t* subs, watcher_t* watchers)
00237 {
00238 subs_t sb;
00239 db_key_t query_cols[3];
00240 db_op_t query_ops[3];
00241 db_val_t query_vals[3];
00242 db_key_t result_cols[5];
00243 db1_res_t *result = NULL;
00244 db_row_t *row = NULL ;
00245 db_val_t *row_vals = NULL;
00246 int n_result_cols = 0;
00247 int n_query_cols = 0;
00248 int i;
00249 int status_col, expires_col, watcher_user_col, watcher_domain_col, callid_col;
00250
00251 query_cols[n_query_cols] = &str_presentity_uri_col;
00252 query_ops[n_query_cols] = OP_EQ;
00253 query_vals[n_query_cols].type = DB1_STR;
00254 query_vals[n_query_cols].nul = 0;
00255 query_vals[n_query_cols].val.str_val= subs->pres_uri;
00256 n_query_cols++;
00257
00258 query_cols[n_query_cols] = &str_event_col;
00259 query_ops[n_query_cols] = OP_EQ;
00260 query_vals[n_query_cols].type = DB1_STR;
00261 query_vals[n_query_cols].nul = 0;
00262 query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
00263 n_query_cols++;
00264
00265 query_cols[n_query_cols] = &str_expires_col;
00266 query_ops[n_query_cols] = OP_GT;
00267 query_vals[n_query_cols].type = DB1_INT;
00268 query_vals[n_query_cols].nul = 0;
00269 query_vals[n_query_cols].val.int_val = (int)time(NULL) - expires_offset;
00270 n_query_cols++;
00271
00272 result_cols[status_col=n_result_cols++] = &str_status_col;
00273 result_cols[expires_col=n_result_cols++] = &str_expires_col;
00274 result_cols[watcher_user_col=n_result_cols++] = &str_watcher_username_col;
00275 result_cols[watcher_domain_col=n_result_cols++] = &str_watcher_domain_col;
00276 result_cols[callid_col=n_result_cols++] = &str_callid_col;
00277
00278 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
00279 {
00280 LM_ERR("in use_table\n");
00281 goto error;
00282 }
00283
00284 if (pa_dbf.query (pa_db, query_cols, query_ops, query_vals,
00285 result_cols, n_query_cols, n_result_cols, 0, &result) < 0)
00286 {
00287 LM_ERR("querying active_watchers db table\n");
00288 goto error;
00289 }
00290
00291 if(result== NULL )
00292 {
00293 goto error;
00294 }
00295
00296 if(result->n <= 0)
00297 {
00298 LM_DBG("The query in db table for active subscription"
00299 " returned no result\n");
00300 pa_dbf.free_result(pa_db, result);
00301 return 0;
00302 }
00303
00304 for(i=0; i<result->n; i++)
00305 {
00306 row = &result->rows[i];
00307 row_vals = ROW_VALUES(row);
00308
00309 sb.watcher_user.s= (char*)row_vals[watcher_user_col].val.string_val;
00310 sb.watcher_user.len= strlen(sb.watcher_user.s);
00311
00312 sb.watcher_domain.s= (char*)row_vals[watcher_domain_col].val.string_val;
00313 sb.watcher_domain.len= strlen(sb.watcher_domain.s);
00314
00315 sb.callid.s= (char*)row_vals[callid_col].val.string_val;
00316 sb.callid.len= strlen(sb.callid.s);
00317
00318 sb.event =subs->event->wipeer;
00319 sb.status= row_vals[status_col].val.int_val;
00320
00321 if(add_watcher_list(&sb, watchers)<0)
00322 goto error;
00323 }
00324
00325 pa_dbf.free_result(pa_db, result);
00326 return 0;
00327
00328 error:
00329 if(result)
00330 pa_dbf.free_result(pa_db, result);
00331 return -1;
00332 }
00333
00334 str* get_wi_notify_body(subs_t* subs, subs_t* watcher_subs)
00335 {
00336 str* notify_body = NULL;
00337 char* version_str;
00338 watcher_t *watchers = NULL;
00339 int len = 0;
00340 unsigned int hash_code;
00341 subs_t* s= NULL;
00342 int state = FULL_STATE_FLAG;
00343 unsigned int now = (int)time(NULL);
00344
00345 hash_code = 0;
00346 version_str = int2str(subs->version, &len);
00347 if(version_str ==NULL)
00348 {
00349 LM_ERR("converting int to str\n ");
00350 goto error;
00351 }
00352
00353 watchers= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00354 if(watchers== NULL)
00355 {
00356 ERR_MEM(PKG_MEM_STR);
00357 }
00358 memset(watchers, 0, sizeof(watcher_t));
00359
00360 if(watcher_subs != NULL)
00361 {
00362 if(add_watcher_list(watcher_subs, watchers)< 0)
00363 goto error;
00364 state = PARTIAL_STATE_FLAG;
00365
00366 goto done;
00367 }
00368
00369 if(subs_dbmode == DB_ONLY)
00370 {
00371 if(get_wi_subs_db(subs, watchers)< 0)
00372 {
00373 LM_ERR("getting watchers from database\n");
00374 goto error;
00375 }
00376 } else {
00377 hash_code= core_hash(&subs->pres_uri, &subs->event->wipeer->name,
00378 shtable_size);
00379 lock_get(&subs_htable[hash_code].lock);
00380 s= subs_htable[hash_code].entries;
00381 while(s->next)
00382 {
00383 s= s->next;
00384
00385 if(s->expires< now)
00386 {
00387 LM_DBG("expired record\n");
00388 continue;
00389 }
00390
00391 if(s->event== subs->event->wipeer &&
00392 s->pres_uri.len== subs->pres_uri.len &&
00393 strncmp(s->pres_uri.s, subs->pres_uri.s,subs->pres_uri.len)== 0)
00394 {
00395 if(add_watcher_list(s, watchers)< 0)
00396 {
00397 lock_release(&subs_htable[hash_code].lock);
00398 goto error;
00399 }
00400 }
00401 }
00402 lock_release(&subs_htable[hash_code].lock);
00403
00404 if(add_waiting_watchers(watchers, subs->pres_uri,
00405 subs->event->wipeer->name)< 0 )
00406 {
00407 LM_ERR("failed to add waiting watchers\n");
00408 goto error;
00409 }
00410 }
00411
00412 done:
00413 notify_body = create_winfo_xml(watchers,version_str,subs->pres_uri,
00414 subs->event->wipeer->name, state);
00415 if(notify_body== NULL)
00416 {
00417 LM_ERR("in function create_winfo_xml\n");
00418 goto error;
00419 }
00420 free_watcher_list(watchers);
00421 return notify_body;
00422
00423 error:
00424 free_watcher_list(watchers);
00425 return NULL;
00426 }
00427
00428 void free_watcher_list(watcher_t* watchers)
00429 {
00430 watcher_t* w;
00431 while(watchers)
00432 {
00433 w= watchers;
00434 if(w->uri.s !=NULL)
00435 pkg_free(w->uri.s);
00436 if(w->id.s !=NULL)
00437 pkg_free(w->id.s);
00438 watchers= watchers->next;
00439 pkg_free(w);
00440 }
00441
00442 watchers = NULL;
00443 }
00444
00445 int add_watcher_list(subs_t *s, watcher_t *watchers)
00446 {
00447 watcher_t* w;
00448
00449 w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
00450 if(w== NULL)
00451 {
00452 LM_ERR("No more private memory\n");
00453 return -1;
00454 }
00455 w->status= s->status;
00456 if(uandd_to_uri(s->watcher_user, s->watcher_domain, &w->uri)<0)
00457 {
00458 LM_ERR("failed to create uri\n");
00459 goto error;
00460 }
00461 w->id.s = (char*)pkg_malloc(s->callid.len+ 1);
00462 if(w->id.s == NULL)
00463 {
00464 LM_ERR("no more memory\n");
00465 goto error;
00466 }
00467 memcpy(w->id.s, s->callid.s, s->callid.len);
00468 w->id.len = s->callid.len;
00469 w->id.s[w->id.len] = '\0';
00470
00471 w->next= watchers->next;
00472 watchers->next= w;
00473
00474 return 0;
00475
00476 error:
00477 if(w)
00478 {
00479 if(w->uri.s)
00480 pkg_free(w->uri.s);
00481 pkg_free(w);
00482 }
00483 return -1;
00484 }
00485
00486 str* build_empty_bla_body(str pres_uri)
00487 {
00488 xmlDocPtr doc;
00489 xmlNodePtr node;
00490 xmlAttrPtr attr;
00491 str* body= NULL;
00492 char* text;
00493 int len;
00494 char* entity= NULL;
00495
00496 doc = xmlNewDoc(BAD_CAST "1.0");
00497 if(doc== NULL)
00498 {
00499 LM_ERR("failed to construct xml document\n");
00500 return NULL;
00501 }
00502
00503 node = xmlNewNode(NULL, BAD_CAST "dialog-info");
00504 if(node== NULL)
00505 {
00506 LM_ERR("failed to initialize node\n");
00507 goto error;
00508 }
00509 xmlDocSetRootElement(doc, node);
00510
00511 attr = xmlNewProp(node, BAD_CAST "xmlns",BAD_CAST "urn:ietf:params:xml:ns:dialog-info");
00512 if(attr== NULL)
00513 {
00514 LM_ERR("failed to initialize node attribute\n");
00515 goto error;
00516 }
00517 attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
00518 if(attr== NULL)
00519 {
00520 LM_ERR("failed to initialize node attribute\n");
00521 goto error;
00522 }
00523
00524 attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
00525 if(attr== NULL)
00526 {
00527 LM_ERR("failed to initialize node attribute\n");
00528 goto error;
00529 }
00530
00531 entity = (char*)pkg_malloc(pres_uri.len+1);
00532 if(entity== NULL)
00533 {
00534 LM_ERR("no more memory\n");
00535 goto error;
00536 }
00537 memcpy(entity, pres_uri.s, pres_uri.len);
00538 entity[pres_uri.len]= '\0';
00539
00540 attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
00541 if(attr== NULL)
00542 {
00543 LM_ERR("failed to initialize node attribute\n");
00544 pkg_free(entity);
00545 goto error;
00546 }
00547
00548 body = (str*) pkg_malloc(sizeof(str));
00549 if(body== NULL)
00550 {
00551 LM_ERR("no more private memory");
00552 pkg_free(entity);
00553 goto error;
00554 }
00555
00556 xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&text, &len, 1);
00557 body->s = (char*) pkg_malloc(len);
00558 if(body->s == NULL)
00559 {
00560 LM_ERR("no more private memory");
00561 pkg_free(body);
00562 pkg_free(entity);
00563 goto error;
00564 }
00565 memcpy(body->s, text, len);
00566 body->len= len;
00567
00568
00569 pkg_free(entity);
00570 xmlFreeDoc(doc);
00571 xmlFree(text);
00572
00573 return body;
00574
00575 error:
00576 xmlFreeDoc(doc);
00577 return NULL;
00578
00579 }
00580
00581 str* get_p_notify_body(str pres_uri, pres_ev_t* event, str* etag,
00582 str* contact)
00583 {
00584 db_key_t query_cols[6];
00585 db_val_t query_vals[6];
00586 db_key_t result_cols[6];
00587 db1_res_t *result = NULL;
00588 int body_col, expires_col, etag_col= 0, sender_col;
00589 str** body_array= NULL;
00590 str* notify_body= NULL;
00591 db_row_t *row= NULL ;
00592 db_val_t *row_vals;
00593 int n_result_cols = 0;
00594 int n_query_cols = 0;
00595 int i, n= 0, len;
00596 int build_off_n= -1;
00597 str etags;
00598 str* body;
00599 int size= 0;
00600 struct sip_uri uri;
00601 unsigned int hash_code;
00602 str sender;
00603
00604 if(parse_uri(pres_uri.s, pres_uri.len, &uri)< 0)
00605 {
00606 LM_ERR("while parsing uri\n");
00607 return NULL;
00608 }
00609
00610
00611 if( publ_cache_enabled )
00612 {
00613
00614 hash_code= core_hash(&pres_uri, NULL, phtable_size);
00615 if(search_phtable(&pres_uri, event->evp->type, hash_code)== NULL)
00616 {
00617 LM_DBG("No record exists in hash_table\n");
00618
00619
00620 if(event->agg_nbody)
00621 {
00622 notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00623 if(notify_body)
00624 goto done;
00625 }
00626 return NULL;
00627 }
00628 }
00629
00630 query_cols[n_query_cols] = &str_domain_col;
00631 query_vals[n_query_cols].type = DB1_STR;
00632 query_vals[n_query_cols].nul = 0;
00633 query_vals[n_query_cols].val.str_val = uri.host;
00634 n_query_cols++;
00635
00636 query_cols[n_query_cols] = &str_username_col;
00637 query_vals[n_query_cols].type = DB1_STR;
00638 query_vals[n_query_cols].nul = 0;
00639 query_vals[n_query_cols].val.str_val = uri.user;
00640 n_query_cols++;
00641
00642 query_cols[n_query_cols] = &str_event_col;
00643 query_vals[n_query_cols].type = DB1_STR;
00644 query_vals[n_query_cols].nul = 0;
00645 query_vals[n_query_cols].val.str_val= event->name;
00646 n_query_cols++;
00647
00648 result_cols[body_col=n_result_cols++] = &str_body_col;
00649 result_cols[expires_col=n_result_cols++] = &str_expires_col;
00650 result_cols[etag_col=n_result_cols++] = &str_etag_col;
00651 result_cols[sender_col=n_result_cols++] = &str_sender_col;
00652
00653 if (pa_dbf.use_table(pa_db, &presentity_table) < 0)
00654 {
00655 LM_ERR("in use_table\n");
00656 return NULL;
00657 }
00658
00659 static str query_str = str_init("received_time");
00660 if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
00661 result_cols, n_query_cols, n_result_cols, &query_str , &result) < 0)
00662 {
00663 LM_ERR("failed to query %.*s table\n", presentity_table.len, presentity_table.s);
00664 if(result)
00665 pa_dbf.free_result(pa_db, result);
00666 return NULL;
00667 }
00668
00669 if(result== NULL)
00670 return NULL;
00671
00672 if (result->n<=0 )
00673 {
00674 LM_DBG("The query returned no result\n[username]= %.*s"
00675 "\t[domain]= %.*s\t[event]= %.*s\n",uri.user.len, uri.user.s,
00676 uri.host.len, uri.host.s, event->name.len, event->name.s);
00677
00678 pa_dbf.free_result(pa_db, result);
00679 result= NULL;
00680
00681 if(event->agg_nbody)
00682 {
00683 notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
00684 if(notify_body)
00685 goto done;
00686 }
00687 return NULL;
00688 }
00689 else
00690 {
00691 n= result->n;
00692 if(event->agg_nbody== NULL )
00693 {
00694 LM_DBG("Event does not require aggregation\n");
00695 row = &result->rows[n-1];
00696 row_vals = ROW_VALUES(row);
00697
00698
00699
00700 if( EVENT_DIALOG_SLA(event->evp) && contact ) {
00701 sender.s = (char*)row_vals[sender_col].val.string_val;
00702 if(sender.s== NULL || strlen(sender.s)==0)
00703 goto after_sender_check;
00704 sender.len= strlen(sender.s);
00705
00706 if(sender.len== contact->len &&
00707 strncmp(sender.s, contact->s, sender.len)== 0)
00708 {
00709 notify_body= build_empty_bla_body(pres_uri);
00710 pa_dbf.free_result(pa_db, result);
00711 return notify_body;
00712 }
00713 }
00714
00715 after_sender_check:
00716 if(row_vals[body_col].val.string_val== NULL)
00717 {
00718 LM_ERR("NULL notify body record\n");
00719 goto error;
00720 }
00721 len= strlen(row_vals[body_col].val.string_val);
00722 if(len== 0)
00723 {
00724 LM_ERR("Empty notify body record\n");
00725 goto error;
00726 }
00727 notify_body= (str*)pkg_malloc(sizeof(str));
00728 if(notify_body== NULL)
00729 {
00730 ERR_MEM(PKG_MEM_STR);
00731 }
00732 memset(notify_body, 0, sizeof(str));
00733 notify_body->s= (char*)pkg_malloc( len* sizeof(char));
00734 if(notify_body->s== NULL)
00735 {
00736 pkg_free(notify_body);
00737 ERR_MEM(PKG_MEM_STR);
00738 }
00739 memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
00740 notify_body->len= len;
00741 pa_dbf.free_result(pa_db, result);
00742
00743 return notify_body;
00744 }
00745
00746 LM_DBG("Event requires aggregation\n");
00747
00748 body_array =(str**)pkg_malloc( (n+2) *sizeof(str*));
00749 if(body_array == NULL)
00750 {
00751 ERR_MEM(PKG_MEM_STR);
00752 }
00753 memset(body_array, 0, (n+2) *sizeof(str*));
00754
00755 if(etag!= NULL)
00756 {
00757 LM_DBG("searched etag = %.*s len= %d\n",
00758 etag->len, etag->s, etag->len);
00759 LM_DBG("etag not NULL\n");
00760 for(i= 0; i< n; i++)
00761 {
00762 row = &result->rows[i];
00763 row_vals = ROW_VALUES(row);
00764 etags.s = (char*)row_vals[etag_col].val.string_val;
00765 etags.len = strlen(etags.s);
00766
00767 LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
00768 if( (etags.len == etag->len) && (strncmp(etags.s,
00769 etag->s,etags.len)==0 ) )
00770 {
00771 LM_DBG("found etag\n");
00772 build_off_n= i;
00773 }
00774 len= strlen((char*)row_vals[body_col].val.string_val);
00775 if(len== 0)
00776 {
00777 LM_ERR("Empty notify body record\n");
00778 goto error;
00779 }
00780
00781 size= sizeof(str)+ len* sizeof(char);
00782 body= (str*)pkg_malloc(size);
00783 if(body== NULL)
00784 {
00785 ERR_MEM(PKG_MEM_STR);
00786 }
00787 memset(body, 0, size);
00788 size= sizeof(str);
00789 body->s= (char*)body+ size;
00790 memcpy(body->s, (char*)row_vals[body_col].val.string_val, len);
00791 body->len= len;
00792
00793 body_array[i]= body;
00794 }
00795 }
00796 else
00797 {
00798 for(i=0; i< n; i++)
00799 {
00800 row = &result->rows[i];
00801 row_vals = ROW_VALUES(row);
00802
00803 len= strlen((char*)row_vals[body_col].val.string_val);
00804 if(len== 0)
00805 {
00806 LM_ERR("Empty notify body record\n");
00807 goto error;
00808 }
00809
00810 size= sizeof(str)+ len* sizeof(char);
00811 body= (str*)pkg_malloc(size);
00812 if(body== NULL)
00813 {
00814 ERR_MEM(PKG_MEM_STR);
00815 }
00816 memset(body, 0, size);
00817 size= sizeof(str);
00818 body->s= (char*)body+ size;
00819 memcpy(body->s, row_vals[body_col].val.string_val, len);
00820 body->len= len;
00821
00822 body_array[i]= body;
00823 }
00824 }
00825 pa_dbf.free_result(pa_db, result);
00826 result= NULL;
00827
00828 notify_body = event->agg_nbody(&uri.user, &uri.host, body_array, n, build_off_n);
00829 }
00830
00831 done:
00832 if(body_array!=NULL)
00833 {
00834 for(i= 0; i< n; i++)
00835 {
00836 if(body_array[i])
00837 pkg_free(body_array[i]);
00838 }
00839 pkg_free(body_array);
00840 }
00841 return notify_body;
00842
00843 error:
00844 if(result!=NULL)
00845 pa_dbf.free_result(pa_db, result);
00846
00847 if(body_array!=NULL)
00848 {
00849 for(i= 0; i< n; i++)
00850 {
00851 if(body_array[i])
00852 pkg_free(body_array[i]);
00853 else
00854 break;
00855
00856 }
00857
00858 pkg_free(body_array);
00859 }
00860 return NULL;
00861 }
00862
00863 void free_notify_body(str *body, pres_ev_t *ev)
00864 {
00865 if (body != NULL)
00866 {
00867 if (body->s != NULL)
00868 {
00869 if (ev->type & WINFO_TYPE)
00870 xmlFree(body->s);
00871 else if (ev->agg_nbody== NULL && ev->apply_auth_nbody== NULL)
00872 pkg_free(body->s);
00873 else
00874 ev->free_body(body->s);
00875 }
00876 pkg_free(body);
00877 }
00878 }
00879
00880 static int ps_free_tm_dlg(dlg_t *td)
00881 {
00882 if(td)
00883 {
00884 if(td->loc_uri.s)
00885 pkg_free(td->loc_uri.s);
00886 if(td->rem_uri.s)
00887 pkg_free(td->rem_uri.s);
00888
00889 if(td->route_set)
00890 free_rr(&td->route_set);
00891 pkg_free(td);
00892 }
00893 return 0;
00894 }
00895
00896 dlg_t* ps_build_dlg_t(subs_t* subs)
00897 {
00898 dlg_t* td =NULL;
00899 int found_contact = 1;
00900
00901 td = (dlg_t*)pkg_malloc(sizeof(dlg_t));
00902 if(td == NULL)
00903 {
00904 ERR_MEM(PKG_MEM_STR);
00905 }
00906 memset(td, 0, sizeof(dlg_t));
00907
00908 td->loc_seq.value = subs->local_cseq;
00909 td->loc_seq.is_set = 1;
00910
00911 td->id.call_id = subs->callid;
00912 td->id.rem_tag = subs->from_tag;
00913 td->id.loc_tag =subs->to_tag;
00914
00915 uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
00916 if(td->loc_uri.s== NULL)
00917 {
00918 LM_ERR("while creating uri\n");
00919 goto error;
00920 }
00921
00922 if(subs->contact.len ==0 || subs->contact.s == NULL )
00923 {
00924 found_contact = 0;
00925 }
00926 else
00927 {
00928 LM_DBG("CONTACT = %.*s\n", subs->contact.len , subs->contact.s);
00929 td->rem_target = subs->contact;
00930 }
00931
00932 uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
00933 if(td->rem_uri.s ==NULL)
00934 {
00935 LM_ERR("while creating uri\n");
00936 goto error;
00937 }
00938
00939 if(found_contact == 0)
00940 {
00941 td->rem_target = td->rem_uri;
00942 }
00943 if(subs->record_route.s && subs->record_route.len)
00944 {
00945 if(parse_rr_body(subs->record_route.s, subs->record_route.len,
00946 &td->route_set)< 0)
00947 {
00948 LM_ERR("in function parse_rr_body\n");
00949 goto error;
00950 }
00951 }
00952 td->state= DLG_CONFIRMED ;
00953
00954 if (subs->sockinfo_str.len) {
00955 int port, proto;
00956 str host;
00957 char* tmp;
00958 if ((tmp = as_asciiz(&subs->sockinfo_str)) == NULL) {
00959 LM_ERR("no pkg memory left\n");
00960 goto error;
00961 }
00962 if (parse_phostport (tmp,&host.s,
00963 &host.len,&port, &proto )) {
00964 LM_ERR("bad sockinfo string\n");
00965 pkg_free(tmp);
00966 goto error;
00967 }
00968 pkg_free(tmp);
00969 td->send_sock = grep_sock_info (
00970 &host, (unsigned short) port, (unsigned short) proto);
00971 }
00972
00973 return td;
00974
00975 error:
00976 ps_free_tm_dlg(td);
00977 return NULL;
00978 }
00979
00980 int get_subs_db(str* pres_uri, pres_ev_t* event, str* sender,
00981 subs_t** s_array, int* n)
00982 {
00983 db_key_t query_cols[7];
00984 db_op_t query_ops[7];
00985 db_val_t query_vals[7];
00986 db_key_t result_cols[19];
00987 int n_result_cols = 0, n_query_cols = 0;
00988 db_row_t *row ;
00989 db_val_t *row_vals ;
00990 db1_res_t *result = NULL;
00991 int from_user_col, from_domain_col, from_tag_col;
00992 int to_user_col, to_domain_col, to_tag_col;
00993 int expires_col= 0,callid_col, cseq_col, i, reason_col;
00994 int version_col= 0, record_route_col = 0, contact_col = 0;
00995 int sockinfo_col= 0, local_contact_col= 0, event_id_col = 0;
00996 int watcher_user_col= 0, watcher_domain_col= 0;
00997 subs_t s, *s_new;
00998 int inc= 0;
00999
01000 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
01001 {
01002 LM_ERR("in use_table\n");
01003 return -1;
01004 }
01005
01006 LM_DBG("querying database table = active_watchers\n");
01007 query_cols[n_query_cols] = &str_presentity_uri_col;
01008 query_ops[n_query_cols] = OP_EQ;
01009 query_vals[n_query_cols].type = DB1_STR;
01010 query_vals[n_query_cols].nul = 0;
01011 query_vals[n_query_cols].val.str_val = *pres_uri;
01012 n_query_cols++;
01013
01014 query_cols[n_query_cols] = &str_event_col;
01015 query_ops[n_query_cols] = OP_EQ;
01016 query_vals[n_query_cols].type = DB1_STR;
01017 query_vals[n_query_cols].nul = 0;
01018 query_vals[n_query_cols].val.str_val = event->name;
01019 n_query_cols++;
01020
01021 query_cols[n_query_cols] = &str_status_col;
01022 query_ops[n_query_cols] = OP_EQ;
01023 query_vals[n_query_cols].type = DB1_INT;
01024 query_vals[n_query_cols].nul = 0;
01025 query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
01026 n_query_cols++;
01027
01028 query_cols[n_query_cols] = &str_contact_col;
01029 query_ops[n_query_cols] = OP_NEQ;
01030 query_vals[n_query_cols].type = DB1_STR;
01031 query_vals[n_query_cols].nul = 0;
01032 if(sender)
01033 {
01034 LM_DBG("Do not send Notify to:[uri]= %.*s\n",sender->len,sender->s);
01035 query_vals[n_query_cols].val.str_val = *sender;
01036 } else {
01037 query_vals[n_query_cols].val.str_val.s = "";
01038 query_vals[n_query_cols].val.str_val.len = 0;
01039 }
01040 n_query_cols++;
01041
01042 result_cols[to_user_col=n_result_cols++] = &str_to_user_col;
01043 result_cols[to_domain_col=n_result_cols++] = &str_to_domain_col;
01044 result_cols[from_user_col=n_result_cols++] = &str_from_user_col;
01045 result_cols[from_domain_col=n_result_cols++] = &str_from_domain_col;
01046 result_cols[watcher_user_col=n_result_cols++] = &str_watcher_username_col;
01047 result_cols[watcher_domain_col=n_result_cols++]= &str_watcher_domain_col;
01048 result_cols[event_id_col=n_result_cols++] = &str_event_id_col;
01049 result_cols[from_tag_col=n_result_cols++] = &str_from_tag_col;
01050 result_cols[to_tag_col=n_result_cols++] = &str_to_tag_col;
01051 result_cols[callid_col=n_result_cols++] = &str_callid_col;
01052 result_cols[cseq_col=n_result_cols++] = &str_local_cseq_col;
01053 result_cols[record_route_col=n_result_cols++] = &str_record_route_col;
01054 result_cols[contact_col=n_result_cols++] = &str_contact_col;
01055 result_cols[expires_col=n_result_cols++] = &str_expires_col;
01056 result_cols[reason_col=n_result_cols++] = &str_reason_col;
01057 result_cols[sockinfo_col=n_result_cols++] = &str_socket_info_col;
01058 result_cols[local_contact_col=n_result_cols++]= &str_local_contact_col;
01059 result_cols[version_col=n_result_cols++] = &str_version_col;
01060
01061 if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,result_cols,
01062 n_query_cols, n_result_cols, 0, &result) < 0)
01063 {
01064 LM_ERR("while querying database\n");
01065 if(result)
01066 {
01067 pa_dbf.free_result(pa_db, result);
01068 }
01069 return -1;
01070 }
01071
01072 if(result== NULL)
01073 return -1;
01074
01075 if(result->n <=0 )
01076 {
01077 LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
01078 " returned no result\n",pres_uri->len, pres_uri->s,
01079 event->name.len, event->name.s);
01080 pa_dbf.free_result(pa_db, result);
01081 return 0;
01082 }
01083 LM_DBG("found %d dialogs\n", result->n);
01084
01085 for(i=0; i<result->n; i++)
01086 {
01087 row = &result->rows[i];
01088 row_vals = ROW_VALUES(row);
01089
01090
01091
01092
01093 if(row_vals[reason_col].val.string_val) {
01094 if(strlen(row_vals[reason_col].val.string_val) != 0)
01095 continue;
01096 }
01097
01098
01099
01100 memset(&s, 0, sizeof(subs_t));
01101 s.status= ACTIVE_STATUS;
01102
01103 s.pres_uri= *pres_uri;
01104 s.to_user.s= (char*)row_vals[to_user_col].val.string_val;
01105 s.to_user.len= strlen(s.to_user.s);
01106
01107 s.to_domain.s= (char*)row_vals[to_domain_col].val.string_val;
01108 s.to_domain.len= strlen(s.to_domain.s);
01109
01110 s.from_user.s= (char*)row_vals[from_user_col].val.string_val;
01111 s.from_user.len= strlen(s.from_user.s);
01112
01113 s.from_domain.s= (char*)row_vals[from_domain_col].val.string_val;
01114 s.from_domain.len= strlen(s.from_domain.s);
01115
01116 s.watcher_user.s= (char*)row_vals[watcher_user_col].val.string_val;
01117 s.watcher_user.len= strlen(s.watcher_user.s);
01118
01119 s.watcher_domain.s= (char*)row_vals[watcher_domain_col].val.string_val;
01120 s.watcher_domain.len= strlen(s.watcher_domain.s);
01121
01122 s.event_id.s=(char*)row_vals[event_id_col].val.string_val;
01123 s.event_id.len= (s.event_id.s)?strlen(s.event_id.s):0;
01124
01125 s.to_tag.s= (char*)row_vals[to_tag_col].val.string_val;
01126 s.to_tag.len= strlen(s.to_tag.s);
01127
01128 s.from_tag.s= (char*)row_vals[from_tag_col].val.string_val;
01129 s.from_tag.len= strlen(s.from_tag.s);
01130
01131 s.callid.s= (char*)row_vals[callid_col].val.string_val;
01132 s.callid.len= strlen(s.callid.s);
01133
01134 s.record_route.s= (char*)row_vals[record_route_col].val.string_val;
01135 s.record_route.len= (s.record_route.s)?strlen(s.record_route.s):0;
01136
01137 s.contact.s= (char*)row_vals[contact_col].val.string_val;
01138 s.contact.len= strlen(s.contact.s);
01139
01140 s.sockinfo_str.s = (char*)row_vals[sockinfo_col].val.string_val;
01141 s.sockinfo_str.len = s.sockinfo_str.s?strlen(s.sockinfo_str.s):0;
01142
01143 s.local_contact.s = (char*)row_vals[local_contact_col].val.string_val;
01144 s.local_contact.len = s.local_contact.s?strlen(s.local_contact.s):0;
01145
01146 s.event= event;
01147 s.local_cseq = row_vals[cseq_col].val.int_val +1;
01148 if(row_vals[expires_col].val.int_val < (int)time(NULL))
01149 s.expires = 0;
01150 else
01151 s.expires = row_vals[expires_col].val.int_val -
01152 (int)time(NULL);
01153 s.version = row_vals[version_col].val.int_val +1;
01154
01155 s_new= mem_copy_subs(&s, PKG_MEM_TYPE);
01156 if(s_new== NULL)
01157 {
01158 LM_ERR("while copying subs_t structure\n");
01159 goto error;
01160 }
01161 s_new->next= (*s_array);
01162 (*s_array)= s_new;
01163 printf_subs(s_new);
01164 inc++;
01165
01166 }
01167 pa_dbf.free_result(pa_db, result);
01168 *n= inc;
01169
01170 return 0;
01171
01172 error:
01173 if(result)
01174 pa_dbf.free_result(pa_db, result);
01175
01176 return -1;
01177 }
01178
01179 subs_t* get_subs_dialog(str* pres_uri, pres_ev_t* event, str* sender)
01180 {
01181 unsigned int hash_code;
01182 subs_t* s= NULL, *s_new;
01183 subs_t* s_array= NULL;
01184 int n= 0;
01185
01186
01187
01188
01189
01190 if(subs_dbmode == DB_ONLY)
01191 {
01192 if(get_subs_db(pres_uri, event, sender, &s_array, &n)< 0)
01193 {
01194 LM_ERR("getting dialogs from database\n");
01195 goto error;
01196 }
01197 }else {
01198 hash_code= core_hash(pres_uri, &event->name, shtable_size);
01199
01200 lock_get(&subs_htable[hash_code].lock);
01201
01202 s= subs_htable[hash_code].entries;
01203
01204 while(s->next)
01205 {
01206 s= s->next;
01207
01208 printf_subs(s);
01209
01210 if(s->expires< (int)time(NULL))
01211 {
01212 LM_DBG("expired subs\n");
01213 continue;
01214 }
01215
01216 if((!(s->status== ACTIVE_STATUS &&
01217 s->reason.len== 0 &&
01218 s->event== event && s->pres_uri.len== pres_uri->len &&
01219 strncmp(s->pres_uri.s, pres_uri->s, pres_uri->len)== 0)) ||
01220 (sender && sender->len== s->contact.len &&
01221 strncmp(sender->s, s->contact.s, sender->len)== 0))
01222 continue;
01223
01224 s_new= mem_copy_subs(s, PKG_MEM_TYPE);
01225 if(s_new== NULL)
01226 {
01227 LM_ERR("copying subs_t structure\n");
01228 lock_release(&subs_htable[hash_code].lock);
01229 goto error;
01230 }
01231 s_new->expires-= (int)time(NULL);
01232 s_new->next= s_array;
01233 s_array= s_new;
01234 }
01235 lock_release(&subs_htable[hash_code].lock);
01236 }
01237
01238 return s_array;
01239
01240 error:
01241 free_subs_list(s_array, PKG_MEM_TYPE, 0);
01242 return NULL;
01243
01244 }
01245
01246 int publ_notify(presentity_t* p, str pres_uri, str* body, str* offline_etag, str* rules_doc)
01247 {
01248 str *notify_body = NULL, *aux_body = NULL;
01249 subs_t* subs_array= NULL, *s= NULL;
01250 int ret_code= -1;
01251
01252 subs_array= get_subs_dialog(&pres_uri, p->event , p->sender);
01253 if(subs_array == NULL)
01254 {
01255 LM_DBG("Could not find subs_dialog\n");
01256 ret_code= 0;
01257 goto done;
01258 }
01259
01260
01261 if(p->event->agg_nbody)
01262 {
01263 notify_body = get_p_notify_body(pres_uri, p->event , offline_etag, NULL);
01264 if(notify_body == NULL)
01265 {
01266 LM_DBG("Could not get the notify_body\n");
01267
01268 }
01269 }
01270
01271 s= subs_array;
01272 while(s)
01273 {
01274 s->auth_rules_doc= rules_doc;
01275 if (p->event->aux_body_processing) {
01276 aux_body = p->event->aux_body_processing(s, notify_body?notify_body:body);
01277 }
01278
01279 if(notify(s, NULL, aux_body?aux_body:(notify_body?notify_body:body), 0)< 0 )
01280 {
01281 LM_ERR("Could not send notify for %.*s\n",
01282 p->event->name.len, p->event->name.s);
01283 }
01284
01285 if(aux_body!=NULL) {
01286 if(aux_body->s) {
01287 p->event->aux_free_body(aux_body->s);
01288 }
01289 pkg_free(aux_body);
01290 }
01291 s= s->next;
01292 }
01293 ret_code= 0;
01294
01295 done:
01296 free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01297 free_notify_body(notify_body, p->event);
01298 return ret_code;
01299 }
01300
01301 int publ_notify_notifier(str pres_uri, pres_ev_t *event)
01302 {
01303 db_key_t query_cols[2], result_cols[3];
01304 db_val_t query_vals[2], *values;
01305 db_row_t *rows;
01306 db1_res_t *result = NULL;
01307 int n_query_cols = 0, n_result_cols = 0;
01308 int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
01309 int i;
01310 int ret = -1;
01311 subs_t subs;
01312
01313 if(pa_db == NULL)
01314 {
01315 LM_ERR("null database connection\n");
01316 goto error;
01317 }
01318
01319 if(pa_dbf.use_table(pa_db, &active_watchers_table)< 0)
01320 {
01321 LM_ERR("use table failed\n");
01322 goto error;
01323 }
01324
01325 query_cols[n_query_cols]= &str_presentity_uri_col;
01326 query_vals[n_query_cols].type = DB1_STR;
01327 query_vals[n_query_cols].nul = 0;
01328 query_vals[n_query_cols].val.str_val = pres_uri;
01329 n_query_cols++;
01330
01331 query_cols[n_query_cols]= &str_event_col;
01332 query_vals[n_query_cols].type = DB1_STR;
01333 query_vals[n_query_cols].nul = 0;
01334 query_vals[n_query_cols].val.str_val = event->name;
01335 n_query_cols++;
01336
01337 result_cols[r_callid_col=n_result_cols++] = &str_callid_col;
01338 result_cols[r_to_tag_col=n_result_cols++] = &str_to_tag_col;
01339 result_cols[r_from_tag_col=n_result_cols++] = &str_from_tag_col;
01340
01341 if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
01342 n_query_cols, n_result_cols, 0, &result )< 0)
01343 {
01344 LM_ERR("Can't query db\n");
01345 goto error;
01346 }
01347
01348 if(result == NULL)
01349 {
01350 LM_ERR("bad result\n");
01351 goto error;
01352 }
01353
01354 rows = RES_ROWS(result);
01355 for (i = 0; i <RES_ROW_N(result); i++)
01356 {
01357 values = ROW_VALUES(&rows[i]);
01358
01359 subs.callid.s = (char *) VAL_STRING(&values[r_callid_col]);
01360 subs.callid.len = strlen(subs.callid.s);
01361 subs.to_tag.s = (char *) VAL_STRING(&values[r_to_tag_col]);
01362 subs.to_tag.len = strlen(subs.to_tag.s);
01363 subs.from_tag.s = (char *) VAL_STRING(&values[r_from_tag_col]);
01364 subs.from_tag.len = strlen(subs.from_tag.s);
01365
01366 set_updated(&subs);
01367 }
01368
01369 ret = RES_ROW_N(result);
01370
01371 error:
01372 if (result) pa_dbf.free_result(pa_db, result);
01373 return ret;
01374 }
01375
01376 int query_db_notify(str* pres_uri, pres_ev_t* event, subs_t* watcher_subs )
01377 {
01378 subs_t* subs_array = NULL, *s= NULL;
01379 str* notify_body = NULL, *aux_body = NULL;
01380 int ret_code= -1;
01381
01382 subs_array= get_subs_dialog(pres_uri, event , NULL);
01383 if(subs_array == NULL)
01384 {
01385 LM_DBG("Could not get subscription dialog\n");
01386 ret_code= 1;
01387 goto done;
01388 }
01389
01390 if(event->type & PUBL_TYPE)
01391 {
01392 notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
01393 if(notify_body == NULL)
01394 {
01395 LM_DBG("Could not get the notify_body\n");
01396
01397 }
01398 }
01399
01400 s= subs_array;
01401
01402 while(s)
01403 {
01404
01405 if (event->aux_body_processing) {
01406 aux_body = event->aux_body_processing(s, notify_body);
01407 }
01408
01409 if(notify(s, watcher_subs, aux_body?aux_body:notify_body, 0)< 0 )
01410 {
01411 LM_ERR("Could not send notify for [event]=%.*s\n",
01412 event->name.len, event->name.s);
01413 goto done;
01414 }
01415
01416 if(aux_body!=NULL) {
01417 if(aux_body->s) {
01418 event->aux_free_body(aux_body->s);
01419 }
01420 pkg_free(aux_body);
01421 }
01422 s= s->next;
01423 }
01424
01425 ret_code= 1;
01426
01427 done:
01428 free_subs_list(subs_array, PKG_MEM_TYPE, 0);
01429 free_notify_body(notify_body, event);
01430
01431 return ret_code;
01432 }
01433
01434 int send_notify_request(subs_t* subs, subs_t * watcher_subs,
01435 str* n_body,int force_null_body)
01436 {
01437 dlg_t* td = NULL;
01438 str met = {"NOTIFY", 6};
01439 str str_hdr = {0, 0};
01440 str* notify_body = NULL;
01441 int result= 0;
01442 c_back_param *cb_param= NULL;
01443 str* final_body= NULL;
01444 uac_req_t uac_r;
01445
01446 LM_DBG("dialog info:\n");
01447 printf_subs(subs);
01448
01449
01450
01451 if(force_null_body)
01452 {
01453 goto jump_over_body;
01454 }
01455
01456 if(n_body!= NULL && subs->status== ACTIVE_STATUS)
01457 {
01458 if( subs->event->req_auth)
01459 {
01460
01461 if(subs->auth_rules_doc && subs->event->apply_auth_nbody)
01462 {
01463 if(subs->event->apply_auth_nbody(n_body, subs, ¬ify_body)< 0)
01464 {
01465 LM_ERR("in function apply_auth_nbody\n");
01466 goto error;
01467 }
01468 }
01469 if(notify_body== NULL)
01470 notify_body= n_body;
01471 }
01472 else
01473 notify_body= n_body;
01474 }
01475 else
01476 {
01477 if(subs->status== TERMINATED_STATUS ||
01478 subs->status== PENDING_STATUS)
01479 {
01480 LM_DBG("state terminated or pending- notify body NULL\n");
01481 notify_body = NULL;
01482 }
01483 else
01484 {
01485 if(subs->event->type & WINFO_TYPE)
01486 {
01487 notify_body = get_wi_notify_body(subs, watcher_subs);
01488 if(notify_body == NULL)
01489 {
01490 LM_DBG("Could not get notify_body\n");
01491 goto error;
01492 }
01493 }
01494 else
01495 {
01496 notify_body = get_p_notify_body(subs->pres_uri,
01497 subs->event, NULL, (subs->contact.s)?&subs->contact:NULL);
01498 if(notify_body == NULL || notify_body->s== NULL)
01499 {
01500 LM_DBG("Could not get the notify_body\n");
01501 }
01502 else
01503 if(subs->event->req_auth)
01504 {
01505 if(subs->auth_rules_doc && subs->event->apply_auth_nbody
01506 && subs->event->apply_auth_nbody(notify_body,
01507 subs,&final_body)<0)
01508 {
01509 LM_ERR("in function apply_auth\n");
01510 goto error;
01511 }
01512 if(final_body)
01513 {
01514 xmlFree(notify_body->s);
01515 pkg_free(notify_body);
01516 notify_body= final_body;
01517 }
01518 }
01519 }
01520 }
01521 }
01522
01523 jump_over_body:
01524
01525 if(subs->expires<= 0)
01526 {
01527 subs->expires= 0;
01528 subs->status= TERMINATED_STATUS;
01529 subs->reason.s= "timeout";
01530 subs->reason.len= 7;
01531 }
01532
01533
01534 if( build_str_hdr( subs, notify_body?1:0, &str_hdr)< 0 )
01535 {
01536 LM_ERR("while building headers\n");
01537 goto error;
01538 }
01539 LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
01540
01541
01542 td = ps_build_dlg_t(subs);
01543 if(td ==NULL)
01544 {
01545 LM_ERR("while building dlg_t structure\n");
01546 goto error;
01547 }
01548
01549 LM_DBG("expires %d status %d\n", subs->expires, subs->status);
01550
01551 if(subs->status != TERMINATED_STATUS)
01552 {
01553 cb_param = shm_dup_cbparam(subs);
01554 if(cb_param == NULL)
01555 {
01556 LM_ERR("while duplicating cb_param in share memory\n");
01557 goto error;
01558 }
01559 }
01560
01561 set_uac_req(&uac_r, &met, &str_hdr, notify_body, td, TMCB_LOCAL_COMPLETED,
01562 p_tm_callback, (void*)cb_param);
01563 result = tmb.t_request_within(&uac_r);
01564 if(result< 0)
01565 {
01566 LM_ERR("in function tmb.t_request_within\n");
01567 if(cb_param)
01568 free_cbparam(cb_param);
01569 goto error;
01570 }
01571
01572 LM_INFO("NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s\n",
01573 td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
01574 td->hooks.next_hop->s,
01575 td->loc_uri.len, td->loc_uri.s, subs->event->name.len,
01576 subs->event->name.s);
01577
01578 ps_free_tm_dlg(td);
01579
01580 if(str_hdr.s) pkg_free(str_hdr.s);
01581
01582 if((int)(long)n_body!= (int)(long)notify_body)
01583 free_notify_body(notify_body, subs->event);
01584
01585 return 0;
01586
01587 error:
01588 ps_free_tm_dlg(td);
01589 if(str_hdr.s!=NULL)
01590 pkg_free(str_hdr.s);
01591 if((int)(long)n_body!= (int)(long)notify_body)
01592 {
01593 if(notify_body!=NULL)
01594 {
01595 if(notify_body->s!=NULL)
01596 {
01597 if(subs->event->type& WINFO_TYPE)
01598 xmlFree(notify_body->s);
01599 else
01600 if(subs->event->apply_auth_nbody== NULL
01601 && subs->event->agg_nbody== NULL)
01602 pkg_free(notify_body->s);
01603 else
01604 subs->event->free_body(notify_body->s);
01605 }
01606 pkg_free(notify_body);
01607 }
01608 }
01609 return -1;
01610 }
01611
01612
01613 int notify(subs_t* subs, subs_t * watcher_subs,str* n_body,int force_null_body)
01614 {
01615
01616 if(subs->expires!= 0 && subs->status != TERMINATED_STATUS)
01617 {
01618 unsigned int hash_code;
01619 hash_code= core_hash(&subs->pres_uri, &subs->event->name, shtable_size);
01620
01621
01622 if(subs_dbmode != DB_ONLY)
01623 {
01624 if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0)
01625 {
01626
01627 LM_ERR("updating subscription record in hash table\n");
01628 return -1;
01629 }
01630 }
01631
01632 if(subs->recv_event!=PRES_SUBSCRIBE_RECV
01633 && ((subs_dbmode == DB_ONLY && pres_notifier_processes == 0)
01634 || subs_dbmode == WRITE_THROUGH))
01635 {
01636 LM_DBG("updating subscription to database\n");
01637 if(update_subs_db(subs, LOCAL_TYPE)< 0)
01638 {
01639 LM_ERR("updating subscription in database\n");
01640 return -1;
01641 }
01642 }
01643 }
01644
01645 if(subs->reason.s && subs->status== ACTIVE_STATUS &&
01646 subs->reason.len== 12 && strncmp(subs->reason.s, "polite-block", 12)== 0)
01647 {
01648 force_null_body = 1;
01649 }
01650
01651 if(send_notify_request(subs, watcher_subs, n_body, force_null_body)< 0)
01652 {
01653 LM_ERR("sending Notify not successful\n");
01654 return -1;
01655 }
01656 return 0;
01657 }
01658
01659 void p_tm_callback( struct cell *t, int type, struct tmcb_params *ps)
01660 {
01661 c_back_param* cb;
01662
01663 if(ps->param==NULL || *ps->param==NULL ||
01664 ((c_back_param*)(*ps->param))->callid.s == NULL ||
01665 ((c_back_param*)(*ps->param))->to_tag.s== NULL ||
01666 ((c_back_param*)(*ps->param))->from_tag.s== NULL)
01667 {
01668 LM_DBG("message id not received, probably a timeout notify\n");
01669 if(ps->param != NULL && *ps->param !=NULL)
01670 free_cbparam((c_back_param*)(*ps->param));
01671 return;
01672 }
01673
01674 cb= (c_back_param*)(*ps->param);
01675 LM_DBG("completed with status %d [to_tag:%.*s]\n",
01676 ps->code, cb->to_tag.len, cb->to_tag.s);
01677
01678 if(ps->code == 481 || (ps->code == 408 && timeout_rm_subs))
01679 delete_subs(&cb->pres_uri, &cb->ev_name,
01680 &cb->to_tag, &cb->from_tag, &cb->callid);
01681
01682 free_cbparam(cb);
01683 }
01684
01685 void free_cbparam(c_back_param* cb_param)
01686 {
01687 if(cb_param!= NULL)
01688 shm_free(cb_param);
01689 }
01690
01691 c_back_param* shm_dup_cbparam(subs_t* subs)
01692 {
01693 int size;
01694 c_back_param* cb_param = NULL;
01695
01696 size = sizeof(c_back_param) + subs->pres_uri.len +
01697 subs->event->name.len + subs->to_tag.len +
01698 subs->from_tag.len + subs->callid.len;
01699
01700 cb_param= (c_back_param*)shm_malloc(size);
01701 LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len,
01702 subs->event->name.len, subs->to_tag.len);
01703 if(cb_param==NULL)
01704 {
01705 LM_ERR("no more shared memory\n");
01706 return NULL;
01707 }
01708 memset(cb_param, 0, size);
01709
01710 cb_param->pres_uri.s = (char*)cb_param + sizeof(c_back_param);
01711 memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
01712 cb_param->pres_uri.len = subs->pres_uri.len;
01713 cb_param->ev_name.s = (char*)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
01714 memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
01715 cb_param->ev_name.len = subs->event->name.len;
01716 cb_param->to_tag.s = (char*)(cb_param->ev_name.s) + cb_param->ev_name.len;
01717 memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
01718 cb_param->to_tag.len = subs->to_tag.len;
01719
01720 cb_param->from_tag.s = (char*)(cb_param->to_tag.s) + cb_param->to_tag.len;
01721 memcpy(cb_param->from_tag.s, subs->from_tag.s, subs->from_tag.len);
01722 cb_param->from_tag.len = subs->from_tag.len;
01723
01724 cb_param->callid.s = (char*)(cb_param->from_tag.s) + cb_param->from_tag.len;
01725 memcpy(cb_param->callid.s, subs->callid.s, subs->callid.len);
01726 cb_param->callid.len = subs->callid.len;
01727
01728 return cb_param;
01729 }
01730
01731
01732 str* create_winfo_xml(watcher_t* watchers, char* version,
01733 str resource, str event, int STATE_FLAG)
01734 {
01735 xmlDocPtr doc = NULL;
01736 xmlNodePtr root_node = NULL, node = NULL;
01737 xmlNodePtr w_list_node = NULL;
01738 char content[200];
01739 str *body= NULL;
01740 char* res= NULL;
01741 watcher_t* w;
01742
01743 LIBXML_TEST_VERSION;
01744
01745 doc = xmlNewDoc(BAD_CAST "1.0");
01746 root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
01747 xmlDocSetRootElement(doc, root_node);
01748
01749 xmlNewProp(root_node, BAD_CAST "xmlns",
01750 BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
01751 xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version );
01752
01753 if(STATE_FLAG & FULL_STATE_FLAG)
01754 {
01755 if( xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL)
01756 {
01757 LM_ERR("while adding new attribute\n");
01758 goto error;
01759 }
01760 }
01761 else
01762 {
01763 if( xmlNewProp(root_node, BAD_CAST "state",
01764 BAD_CAST "partial")== NULL)
01765 {
01766 LM_ERR("while adding new attribute\n");
01767 goto error;
01768 }
01769 }
01770
01771 w_list_node =xmlNewChild(root_node, NULL, BAD_CAST "watcher-list",NULL);
01772 if( w_list_node == NULL)
01773 {
01774 LM_ERR("while adding child\n");
01775 goto error;
01776 }
01777 res= (char*)pkg_malloc(MAX_unsigned(resource.len, event.len) + 1);
01778 if(res== NULL)
01779 {
01780 ERR_MEM(PKG_MEM_STR);
01781 }
01782 memcpy(res, resource.s, resource.len);
01783 res[resource.len]= '\0';
01784 xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
01785 memcpy(res, event.s, event.len);
01786 res[event.len]= '\0';
01787 xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
01788 pkg_free(res);
01789
01790
01791 w= watchers->next;
01792 while(w)
01793 {
01794 strncpy( content,w->uri.s, w->uri.len);
01795 content[ w->uri.len ]='\0';
01796 node = xmlNewChild(w_list_node, NULL, BAD_CAST "watcher",
01797 BAD_CAST content) ;
01798 if( node ==NULL)
01799 {
01800 LM_ERR("while adding child\n");
01801 goto error;
01802 }
01803 if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s)== NULL)
01804 {
01805 LM_ERR("while adding new attribute\n");
01806 goto error;
01807 }
01808
01809 if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe")== NULL)
01810 {
01811 LM_ERR("while adding new attribute\n");
01812 goto error;
01813 }
01814
01815 if(xmlNewProp(node, BAD_CAST "status",
01816 BAD_CAST get_status_str(w->status))== NULL)
01817 {
01818 LM_ERR("while adding new attribute\n");
01819 goto error;
01820 }
01821 w= w->next;
01822 }
01823 body = (str*)pkg_malloc(sizeof(str));
01824 if(body == NULL)
01825 {
01826 ERR_MEM(PKG_MEM_STR);
01827 }
01828 memset(body, 0, sizeof(str));
01829
01830 xmlDocDumpFormatMemory(doc,(xmlChar**)(void*)&body->s, &body->len, 1);
01831
01832 xmlFreeDoc(doc);
01833
01834 xmlCleanupParser();
01835
01836 xmlMemoryDump();
01837
01838 return body;
01839
01840 error:
01841 if(doc)
01842 xmlFreeDoc(doc);
01843 return NULL;
01844 }
01845
01846 int watcher_found_in_list(watcher_t * watchers, str wuri)
01847 {
01848 watcher_t * w;
01849
01850 w = watchers->next;
01851
01852 while(w)
01853 {
01854 if(w->uri.len == wuri.len && strncmp(w->uri.s, wuri.s, wuri.len)== 0)
01855 return 1;
01856 w= w->next;
01857 }
01858
01859 return 0;
01860 }
01861
01862 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
01863 {
01864 watcher_t * w;
01865 db_key_t query_cols[3];
01866 db_val_t query_vals[3];
01867 db_key_t result_cols[2];
01868 db1_res_t *result = NULL;
01869 db_row_t *row= NULL ;
01870 db_val_t *row_vals;
01871 int n_result_cols = 0;
01872 int n_query_cols = 0;
01873 int wuser_col, wdomain_col;
01874 str wuser, wdomain, wuri;
01875 int i;
01876
01877
01878
01879
01880 query_cols[n_query_cols] = &str_presentity_uri_col;
01881 query_vals[n_query_cols].type = DB1_STR;
01882 query_vals[n_query_cols].nul = 0;
01883 query_vals[n_query_cols].val.str_val = pres_uri;
01884 n_query_cols++;
01885
01886 query_cols[n_query_cols] = &str_event_col;
01887 query_vals[n_query_cols].type = DB1_STR;
01888 query_vals[n_query_cols].nul = 0;
01889 query_vals[n_query_cols].val.str_val = event;
01890 n_query_cols++;
01891
01892 query_cols[n_query_cols] = &str_status_col;
01893 query_vals[n_query_cols].type = DB1_INT;
01894 query_vals[n_query_cols].nul = 0;
01895 query_vals[n_query_cols].val.int_val = PENDING_STATUS;
01896 n_query_cols++;
01897
01898 result_cols[wuser_col=n_result_cols++] = &str_watcher_username_col;
01899 result_cols[wdomain_col=n_result_cols++] = &str_watcher_domain_col;
01900
01901 if (pa_dbf.use_table(pa_db, &watchers_table) < 0)
01902 {
01903 LM_ERR("sql use table 'watchers_table' failed\n");
01904 return -1;
01905 }
01906
01907 if (pa_dbf.query (pa_db, query_cols, 0, query_vals,
01908 result_cols, n_query_cols, n_result_cols, 0, &result) < 0)
01909 {
01910 LM_ERR("failed to query %.*s table\n",
01911 watchers_table.len, watchers_table.s);
01912 if(result)
01913 pa_dbf.free_result(pa_db, result);
01914 return -1;
01915 }
01916
01917 if(result== NULL)
01918 {
01919 LM_ERR("mysql query failed - null result\n");
01920 return -1;
01921 }
01922
01923 if (result->n<=0 )
01924 {
01925 LM_DBG("The query returned no result\n");
01926 pa_dbf.free_result(pa_db, result);
01927 return 0;
01928 }
01929
01930 for(i=0; i< result->n; i++)
01931 {
01932 row = &result->rows[i];
01933 row_vals = ROW_VALUES(row);
01934
01935 wuser.s = (char*)row_vals[wuser_col].val.string_val;
01936 wuser.len = strlen(wuser.s);
01937
01938 wdomain.s = (char*)row_vals[wdomain_col].val.string_val;
01939 wdomain.len = strlen(wdomain.s);
01940
01941 if(uandd_to_uri(wuser, wdomain, &wuri)<0)
01942 {
01943 LM_ERR("creating uri from username and domain\n");
01944 goto error;
01945 }
01946
01947 if(watcher_found_in_list(watchers, wuri))
01948 {
01949 pkg_free(wuri.s);
01950 continue;
01951 }
01952
01953 w= (watcher_t*)pkg_malloc(sizeof(watcher_t));
01954 if(w== NULL)
01955 {
01956 ERR_MEM(PKG_MEM_STR);
01957 }
01958 memset(w, 0, sizeof(watcher_t));
01959
01960 w->status= WAITING_STATUS;
01961 w->uri = wuri;
01962 w->id.s = (char*)pkg_malloc(w->uri.len*2 +1);
01963 if(w->id.s== NULL)
01964 {
01965 pkg_free(w->uri.s);
01966 pkg_free(w);
01967 ERR_MEM(PKG_MEM_STR);
01968 }
01969
01970 to64frombits((unsigned char *)w->id.s,
01971 (const unsigned char*)w->uri.s, w->uri.len);
01972 w->id.len = strlen(w->id.s);
01973 w->event= event;
01974
01975 w->next= watchers->next;
01976 watchers->next= w;
01977
01978 }
01979
01980 pa_dbf.free_result(pa_db, result);
01981 return 0;
01982
01983 error:
01984 if(result)
01985 pa_dbf.free_result(pa_db, result);
01986 return -1;
01987
01988 }
01989
01990 #define EXTRACT_STRING(strng, chars)\
01991 do {\
01992 strng.s = (char *) chars;\
01993 strng.len = strlen(strng.s);\
01994 } while(0);
01995
01996 static int unset_watchers_updated_winfo(str *pres_uri)
01997 {
01998 db_key_t query_cols[3], result_cols[1], update_cols[1];
01999 db_val_t query_vals[3], update_vals[1];
02000 db_op_t query_ops[2];
02001 db1_res_t *result = NULL;
02002 int n_query_cols = 0;
02003 int ret = -1;
02004 str winfo = str_init("presence.winfo");
02005
02006
02007
02008
02009
02010 query_cols[n_query_cols] = &str_presentity_uri_col;
02011 query_vals[n_query_cols].type = DB1_STR;
02012 query_vals[n_query_cols].nul = 0;
02013 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02014 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02015 n_query_cols++;
02016
02017 query_cols[n_query_cols] = &str_event_col;
02018 query_vals[n_query_cols].type = DB1_STR;
02019 query_vals[n_query_cols].nul = 0;
02020 query_vals[n_query_cols].val.str_val = winfo;
02021 n_query_cols++;
02022
02023 query_cols[n_query_cols] = &str_updated_col;
02024 query_vals[n_query_cols].type = DB1_INT;
02025 query_vals[n_query_cols].nul = 0;
02026 query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
02027 n_query_cols++;
02028
02029 result_cols[0] = &str_id_col;
02030
02031 update_cols[0] = &str_updated_winfo_col;
02032 update_vals[0].type = DB1_INT;
02033 update_vals[0].nul = 0;
02034 update_vals[0].val.int_val = NO_UPDATE_TYPE;
02035
02036 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02037 {
02038 LM_ERR("use table failed\n");
02039 goto error;
02040 }
02041
02042 if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
02043 n_query_cols, 1, 0, &result) < 0)
02044 {
02045 LM_ERR("in sql query\n");
02046 goto error;
02047 }
02048
02049 if (result == NULL)
02050 {
02051 LM_ERR("bad result\n");
02052 goto error;
02053 }
02054
02055 if (RES_ROW_N(result) <= 0)
02056 {
02057 query_ops[0] = OP_EQ;
02058 query_ops[1] = OP_NEQ;
02059
02060 if (pa_dbf.update(pa_db, query_cols, query_ops,
02061 query_vals, update_cols,
02062 update_vals, 2, 1) < 0)
02063 {
02064 LM_ERR("in sql query\n");
02065 goto error;
02066 }
02067
02068 if (pa_dbf.affected_rows)
02069 ret = pa_dbf.affected_rows(pa_db);
02070 else
02071 ret = 0;
02072 }
02073 else
02074 ret = 0;
02075
02076 error:
02077 if (result) pa_dbf.free_result(pa_db, result);
02078 return ret;
02079 }
02080
02081 static int dialogs_awaiting_update(str *pres_uri, str event)
02082 {
02083 db_key_t query_cols[3], result_cols[1];
02084 db_val_t query_vals[3];
02085 db_op_t query_ops[3];
02086 db1_res_t *result = NULL;
02087 int n_query_cols = 0;
02088 int ret = -1;
02089
02090 query_cols[n_query_cols] = &str_presentity_uri_col;
02091 query_vals[n_query_cols].type = DB1_STR;
02092 query_vals[n_query_cols].nul = 0;
02093 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02094 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02095 query_ops[n_query_cols] = OP_EQ;
02096 n_query_cols++;
02097
02098 query_cols[n_query_cols] = &str_event_col;
02099 query_vals[n_query_cols].type = DB1_STR;
02100 query_vals[n_query_cols].nul = 0;
02101 query_vals[n_query_cols].val.str_val = event;
02102 query_ops[n_query_cols] = OP_EQ;
02103 n_query_cols++;
02104
02105 query_cols[n_query_cols] = &str_updated_col;
02106 query_vals[n_query_cols].type = DB1_INT;
02107 query_vals[n_query_cols].nul = 0;
02108 query_vals[n_query_cols].val.int_val = NO_UPDATE_TYPE;
02109 query_ops[n_query_cols] = OP_NEQ;
02110 n_query_cols++;
02111
02112 result_cols[0] = &str_id_col;
02113
02114 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02115 {
02116 LM_ERR("use table failed\n");
02117 goto error;
02118 }
02119
02120 if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals,
02121 result_cols, n_query_cols, 1, 0, &result) < 0)
02122 {
02123 LM_ERR("in sql query\n");
02124 goto error;
02125 }
02126
02127 if (result == NULL)
02128 {
02129 LM_ERR("bad result\n");
02130 goto error;
02131 }
02132 else
02133 ret = RES_ROW_N(result);
02134
02135 error:
02136 if (result) pa_dbf.free_result(pa_db, result);
02137 return ret;
02138 }
02139
02140 int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
02141 {
02142 db_key_t query_cols[3], result_cols[3], update_cols[2];
02143 db_val_t query_vals[3], update_vals[2], *values;
02144 db_row_t *rows;
02145 db1_res_t *result = NULL;
02146 int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
02147 int callid_col, from_tag_col, to_tag_col;
02148 int i, ret = -1, count;
02149 str callid, from_tag, to_tag;
02150
02151 query_cols[n_query_cols] = &str_presentity_uri_col;
02152 query_vals[n_query_cols].type = DB1_STR;
02153 query_vals[n_query_cols].nul = 0;
02154 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
02155 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
02156 n_query_cols++;
02157
02158 query_cols[n_query_cols] = &str_event_col;
02159 query_vals[n_query_cols].type = DB1_STR;
02160 query_vals[n_query_cols].nul = 0;
02161 query_vals[n_query_cols].val.str_val = event->name;
02162 n_query_cols++;
02163
02164 result_cols[callid_col = n_result_cols++] = &str_callid_col;
02165 result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
02166 result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
02167
02168 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02169 {
02170 LM_ERR("use table failed\n");
02171 goto error;
02172 }
02173
02174 if (pa_dbf.query (pa_db, query_cols, 0, query_vals, result_cols,
02175 n_query_cols, n_result_cols, 0, &result) < 0)
02176 {
02177 LM_ERR("in sql query\n");
02178 goto error;
02179 }
02180
02181 if (result == NULL)
02182 {
02183 LM_ERR("bad result\n");
02184 goto error;
02185 }
02186
02187 if (RES_ROW_N(result) <= 0)
02188 {
02189 ret = 0;
02190 goto done;
02191 }
02192
02193 rows = RES_ROWS(result);
02194 count = RES_ROW_N(result);
02195 for (i = 0; i < RES_ROW_N(result); i++)
02196 {
02197 values = ROW_VALUES(&rows[i]);
02198
02199 EXTRACT_STRING(callid, VAL_STRING(&values[callid_col]));
02200 EXTRACT_STRING(from_tag, VAL_STRING(&values[from_tag_col]));
02201 EXTRACT_STRING(to_tag, VAL_STRING(&values[to_tag_col]));
02202
02203 n_query_cols = 0;
02204 n_update_cols = 0;
02205
02206 query_cols[n_query_cols] = &str_callid_col;
02207 query_vals[n_query_cols].type = DB1_STR;
02208 query_vals[n_query_cols].nul = 0;
02209 query_vals[n_query_cols].val.str_val = callid;
02210 n_query_cols++;
02211
02212 query_cols[n_query_cols] = &str_to_tag_col;
02213 query_vals[n_query_cols].type = DB1_STR;
02214 query_vals[n_query_cols].nul = 0;
02215 query_vals[n_query_cols].val.str_val = to_tag;
02216 n_query_cols++;
02217
02218 query_cols[n_query_cols] = &str_from_tag_col;
02219 query_vals[n_query_cols].type = DB1_STR;
02220 query_vals[n_query_cols].nul = 0;
02221 query_vals[n_query_cols].val.str_val = from_tag;
02222 n_query_cols++;
02223
02224 update_cols[n_update_cols] = &str_updated_col;
02225 update_vals[n_update_cols].type = DB1_INT;
02226 update_vals[n_update_cols].nul = 0;
02227 update_vals[n_update_cols].val.int_val = core_hash(&callid,
02228 &from_tag, (pres_waitn_time * pres_notifier_poll_rate
02229 * pres_notifier_processes) - 1);
02230 n_update_cols++;
02231
02232 if (full)
02233 {
02234 update_cols[n_update_cols] = &str_updated_winfo_col;
02235 update_vals[n_update_cols].type = DB1_INT;
02236 update_vals[n_update_cols].nul = 0;
02237 update_vals[n_update_cols].val.int_val = UPDATED_TYPE;
02238 n_update_cols++;
02239 }
02240
02241 if (pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
02242 update_vals, n_query_cols, n_update_cols) < 0)
02243 {
02244 LM_ERR("in sql query\n");
02245 goto error;
02246 }
02247
02248 if (pa_dbf.affected_rows)
02249 if (pa_dbf.affected_rows(pa_db) == 0) count--;
02250 }
02251
02252 ret = count;
02253
02254 done:
02255 error:
02256 if (result) pa_dbf.free_result(pa_db, result);
02257 return ret;
02258 }
02259
02260 int set_updated(subs_t *sub)
02261 {
02262 db_key_t query_cols[3], update_cols[1];
02263 db_val_t query_vals[3], update_vals[1];
02264 int n_query_cols = 0;
02265
02266 query_cols[n_query_cols] = &str_callid_col;
02267 query_vals[n_query_cols].type = DB1_STR;
02268 query_vals[n_query_cols].nul = 0;
02269 query_vals[n_query_cols].val.str_val = sub->callid;
02270 n_query_cols++;
02271
02272 query_cols[n_query_cols] = &str_to_tag_col;
02273 query_vals[n_query_cols].type = DB1_STR;
02274 query_vals[n_query_cols].nul = 0;
02275 query_vals[n_query_cols].val.str_val = sub->to_tag;
02276 n_query_cols++;
02277
02278 query_cols[n_query_cols] = &str_from_tag_col;
02279 query_vals[n_query_cols].type = DB1_STR;
02280 query_vals[n_query_cols].nul = 0;
02281 query_vals[n_query_cols].val.str_val = sub->from_tag;
02282 n_query_cols++;
02283
02284 update_cols[0] = &str_updated_col;
02285 update_vals[0].type = DB1_INT;
02286 update_vals[0].nul = 0;
02287 update_vals[0].val.int_val = core_hash(&sub->callid, &sub->from_tag,
02288 (pres_waitn_time * pres_notifier_poll_rate
02289 * pres_notifier_processes) - 1);
02290
02291 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02292 {
02293 LM_ERR("use table failed\n");
02294 return -1;
02295 }
02296
02297 if (pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
02298 update_vals, n_query_cols, 1) < 0)
02299 {
02300 LM_ERR("in sql query\n");
02301 return -1;
02302 }
02303
02304 if (pa_dbf.affected_rows)
02305 return pa_dbf.affected_rows(pa_db);
02306 else
02307 return 0;
02308 }
02309
02310 static watcher_t *build_watchers_list(subs_t *sub)
02311 {
02312 db_key_t query_cols[3], result_cols[4];
02313 db_val_t query_vals[3], *values;
02314 db_row_t *rows;
02315 db1_res_t *result;
02316 int n_query_cols = 0, n_result_cols = 0;
02317 int wuser_col, wdomain_col, callid_col, status_col;
02318 int i;
02319 subs_t sb;
02320 watcher_t *watchers = NULL;
02321
02322 watchers = (watcher_t*) pkg_malloc(sizeof(watcher_t));
02323 if (watchers == NULL)
02324 {
02325 ERR_MEM(PKG_MEM_STR);
02326 }
02327 memset(watchers, 0, sizeof(watcher_t));
02328
02329 query_cols[n_query_cols] = &str_presentity_uri_col;
02330 query_vals[n_query_cols].type = DB1_STR;
02331 query_vals[n_query_cols].nul = 0;
02332 query_vals[n_query_cols].val.str_val= sub->pres_uri;
02333 n_query_cols++;
02334
02335 query_cols[n_query_cols] = &str_event_col;
02336 query_vals[n_query_cols].type = DB1_STR;
02337 query_vals[n_query_cols].nul = 0;
02338 query_vals[n_query_cols].val.str_val = sub->event->wipeer->name;
02339 n_query_cols++;
02340
02341 query_cols[n_query_cols] = &str_updated_winfo_col;
02342 query_vals[n_query_cols].type = DB1_INT;
02343 query_vals[n_query_cols].nul = 0;
02344 query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
02345 n_query_cols++;
02346
02347 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
02348 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
02349 result_cols[callid_col = n_result_cols++] = &str_callid_col;
02350 result_cols[status_col = n_result_cols++] = &str_status_col;
02351
02352 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02353 {
02354 LM_ERR("use table failed\n");
02355 goto error;
02356 }
02357
02358 if (pa_dbf.query (pa_db, query_cols, 0, query_vals, result_cols,
02359 n_query_cols, n_result_cols, 0, &result) < 0)
02360 {
02361 LM_ERR("in sql query\n");
02362 goto error;
02363 }
02364
02365 if (result == NULL)
02366 {
02367 LM_ERR("bad result\n");
02368 goto error;
02369 }
02370
02371 if (RES_ROW_N(result) <= 0)
02372 goto done;
02373
02374 rows = RES_ROWS(result);
02375 for (i = 0; i < RES_ROW_N(result); i++)
02376 {
02377 memset(&sb, 0, sizeof(subs_t));
02378 values = ROW_VALUES(&rows[i]);
02379
02380 EXTRACT_STRING(sb.watcher_user, VAL_STRING(&values[wuser_col]));
02381 EXTRACT_STRING(sb.watcher_domain, VAL_STRING(&values[wdomain_col]));
02382 EXTRACT_STRING(sb.callid, VAL_STRING(&values[callid_col]));
02383 sb.status = VAL_INT(&values[status_col]);
02384
02385 sb.event = sub->event->wipeer;
02386
02387 if (add_watcher_list(&sb, watchers) < 0)
02388 goto error;
02389 }
02390
02391 done:
02392 pa_dbf.free_result(pa_db, result);
02393 return watchers;
02394
02395 error:
02396 if (result) pa_dbf.free_result(pa_db, result);
02397 free_watcher_list(watchers);
02398 return NULL;
02399 }
02400
02401 static int cleanup_missing_dialog(subs_t *sub)
02402 {
02403 int ret = -1, num_other_watchers = 0;
02404
02405 if (sub->event->type & WINFO_TYPE)
02406 {
02407 if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02408 {
02409 LM_ERR("resetting updated_winfo flags\n");
02410 goto error;
02411 }
02412 }
02413 else if (sub->event->type & PUBL_TYPE)
02414 {
02415 if ((num_other_watchers = dialogs_awaiting_update(
02416 &sub->pres_uri, sub->event->name)) < 0)
02417 {
02418 LM_ERR("checking watchers\n");
02419 goto error;
02420 }
02421 else if (num_other_watchers == 0)
02422 {
02423 if (delete_offline_presentities(&sub->pres_uri,
02424 sub->event) < 0)
02425 {
02426 LM_ERR("deleting presentity\n");
02427 goto error;
02428 }
02429 }
02430 }
02431
02432 ret = 0;
02433
02434 error:
02435 return ret;
02436 }
02437
02438 static int notifier_notify(subs_t *sub, int *updated, int *end_transaction)
02439 {
02440 str *nbody = NULL;
02441 watcher_t *watchers = NULL;
02442 int ret = 0, attempt_delete_presentities = 0;
02443
02444 *updated = 0;
02445
02446
02447 if (sub->expires == 0 || sub->status == TERMINATED_STATUS)
02448 {
02449 sub->status = TERMINATED_STATUS;
02450
02451 if (sub->event->type & WINFO_TYPE)
02452 {
02453 if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02454 {
02455 LM_WARN("resetting updated_winfo flags\n");
02456
02457 if (pa_dbf.abort_transaction)
02458 {
02459 if (pa_dbf.abort_transaction(pa_db) < 0)
02460 {
02461 LM_ERR("in abort_transaction\n");
02462 goto error;
02463 }
02464 }
02465 *end_transaction = 0;
02466
02467
02468 *updated = 1;
02469 goto done;
02470 }
02471 }
02472 else
02473 {
02474 str winfo = str_init("presence.winfo");
02475 int num_other_watchers, num_winfos;
02476
02477 if (sub->event->type & PUBL_TYPE)
02478 {
02479 if ((num_other_watchers = dialogs_awaiting_update(
02480 &sub->pres_uri, sub->event->name)) < 0)
02481 {
02482 LM_ERR("checking watchers\n");
02483 goto error;
02484 }
02485 else if (num_other_watchers == 0)
02486 attempt_delete_presentities = 1;
02487 }
02488
02489 if (sub->event->wipeer)
02490 {
02491 if ((num_winfos = dialogs_awaiting_update(
02492 &sub->pres_uri,
02493 winfo)) < 0)
02494 {
02495 LM_ERR("checking winfos\n");
02496 goto error;
02497 }
02498
02499 if (sub->updated_winfo == UPDATED_TYPE && num_winfos > 0)
02500 {
02501 *updated = 1;
02502 goto done;
02503 }
02504 }
02505 }
02506 }
02507 else
02508 {
02509 if (sub->event->type & WINFO_TYPE)
02510 {
02511 if (unset_watchers_updated_winfo(&sub->pres_uri) < 0)
02512 {
02513 LM_WARN("resetting updated_winfo flags\n");
02514
02515 if (pa_dbf.abort_transaction)
02516 {
02517 if (pa_dbf.abort_transaction(pa_db) < 0)
02518 {
02519 LM_ERR("in abort_transaction\n");
02520 goto error;
02521 }
02522 }
02523 *end_transaction = 0;
02524
02525
02526 *updated = 1;
02527 goto done;
02528 }
02529
02530 if (sub->updated_winfo == NO_UPDATE_TYPE)
02531 {
02532
02533
02534 int len = 0;
02535 char *version_str = int2str(sub->version,
02536 &len);
02537 if(version_str ==NULL)
02538 {
02539 LM_ERR("converting int to str\n");
02540 goto error;
02541 }
02542
02543 watchers = build_watchers_list(sub);
02544 if(watchers == NULL)
02545 {
02546 LM_ERR("in build_watchers_list\n");
02547 goto error;
02548 }
02549
02550 nbody = create_winfo_xml(watchers,version_str,
02551 sub->pres_uri,
02552 sub->event->wipeer->name,
02553 PARTIAL_STATE_FLAG);
02554 if(nbody == NULL)
02555 {
02556 LM_ERR("in create_winfo_xml\n");
02557 goto error;
02558 }
02559
02560 }
02561 else
02562 sub->updated_winfo = NO_UPDATE_TYPE;
02563 }
02564 else if (sub->event->type & PUBL_TYPE)
02565 {
02566 int num_other_watchers;
02567
02568 if ((num_other_watchers = dialogs_awaiting_update(
02569 &sub->pres_uri, sub->event->name)) < 0)
02570 {
02571 LM_ERR("checking watchers\n");
02572 goto error;
02573 }
02574 else if (num_other_watchers == 0)
02575 attempt_delete_presentities = 1;
02576 }
02577 else if (!send_fast_notify)
02578 goto done;
02579 }
02580
02581 if (notify(sub, NULL, nbody, 0) < 0)
02582 {
02583 LM_ERR("could not send notify\n");
02584 goto error;
02585 }
02586
02587 ret = 1;
02588
02589 done:
02590 if (attempt_delete_presentities)
02591 {
02592 if (delete_offline_presentities(&sub->pres_uri, sub->event) < 0)
02593 {
02594 LM_ERR("deleting presentity\n");
02595 goto error;
02596 }
02597 }
02598
02599 free_notify_body(nbody, sub->event);
02600 free_watcher_list(watchers);
02601
02602 return ret;
02603
02604 error:
02605 free_notify_body(nbody, sub->event);
02606 free_watcher_list(watchers);
02607
02608 if (pa_dbf.abort_transaction)
02609 {
02610 if (pa_dbf.abort_transaction(pa_db) < 0)
02611 LM_ERR("in abort_transaction\n");
02612 }
02613 *end_transaction = 0;
02614
02615 return -1;
02616 }
02617
02618 int process_dialogs(int round, int presence_winfo)
02619 {
02620 db_key_t query_cols[3], result_cols[18], update_cols[4];
02621 db_val_t query_vals[3], update_vals[4], *values, *dvalues;
02622 db_op_t query_ops[2];
02623 db_row_t *rows, *drows;
02624 db1_res_t *dialog_list = NULL, *dialog = NULL;
02625 int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
02626 int callid_col, to_tag_col, from_tag_col;
02627 int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
02628 int wuser_col, wdomain_col, sockinfo_col, lcontact_col, contact_col;
02629 int rroute_col, event_id_col, reason_col, event_col, lcseq_col;
02630 int rcseq_col, status_col, version_col, updated_winfo_col, expires_col;
02631 int i, notify_sent = 0, cached_updated_winfo, ret = -1;
02632 int end_transaction = 0;
02633 subs_t sub;
02634 str ev_sname, winfo = str_init("presence.winfo");
02635 int now = (int)time(NULL);
02636 int updated = 0;
02637
02638 if (++subset > (pres_waitn_time * pres_notifier_poll_rate) -1)
02639 subset = 0;
02640
02641 query_cols[n_query_cols] = &str_updated_col;
02642 query_vals[n_query_cols].type = DB1_INT;
02643 query_vals[n_query_cols].nul = 0;
02644 query_vals[n_query_cols].val.int_val = round;
02645 query_ops[n_query_cols] = OP_EQ;
02646 n_query_cols++;
02647
02648 query_cols[n_query_cols] = &str_event_col;
02649 query_vals[n_query_cols].type = DB1_STR;
02650 query_vals[n_query_cols].nul = 0;
02651 query_vals[n_query_cols].val.str_val = winfo;
02652 query_ops[n_query_cols] = presence_winfo ? OP_EQ : OP_NEQ;
02653 n_query_cols++;
02654
02655 result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
02656 result_cols[callid_col = n_result_cols++] = &str_callid_col;
02657 result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
02658 result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
02659 result_cols[event_col = n_result_cols++] = &str_event_col;
02660
02661 update_cols[n_update_cols] = &str_updated_col;
02662 update_vals[n_update_cols].type = DB1_INT;
02663 update_vals[n_update_cols].nul = 0;
02664 update_vals[n_update_cols].val.int_val = NO_UPDATE_TYPE;
02665 n_update_cols++;
02666
02667 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02668 {
02669 LM_ERR("use table failed\n");
02670 goto error;
02671 }
02672
02673 if (pa_dbf.start_transaction)
02674 {
02675 if (pa_dbf.start_transaction(pa_db) < 0)
02676 {
02677 LM_ERR("in start_transaction\n");
02678 goto error;
02679 }
02680 }
02681
02682
02683 if (pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
02684 n_query_cols, n_result_cols, 0, &dialog_list) < 0)
02685 {
02686 LM_ERR("in sql query\n");
02687 goto error;
02688 }
02689 if(dialog_list == NULL)
02690 {
02691 LM_ERR("bad result\n");
02692 goto error;
02693 }
02694
02695 if(dialog_list->n <= 0)
02696 goto done;
02697
02698
02699 if (pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
02700 update_vals, n_query_cols, n_update_cols) < 0)
02701 {
02702 LM_ERR("in sql update\n");
02703 goto error;
02704 }
02705
02706 if (pa_dbf.end_transaction)
02707 {
02708 if (pa_dbf.end_transaction(pa_db) < 0)
02709 {
02710 LM_ERR("in end_transaction\n");
02711 goto error;
02712 }
02713 }
02714
02715
02716 rows = RES_ROWS(dialog_list);
02717 for (i = 0; i < RES_ROW_N(dialog_list); i++)
02718 {
02719 n_query_cols = 0;
02720 n_result_cols = 0;
02721 n_update_cols = 0;
02722 memset(&sub, 0, sizeof(subs_t));
02723 values = ROW_VALUES(&rows[i]);
02724
02725 EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
02726 EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
02727 EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
02728 EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
02729 EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
02730 sub.event = contains_event(&ev_sname, NULL);
02731 if (sub.event == NULL)
02732 {
02733 LM_ERR("event not found and set to NULL\n");
02734 goto delete_dialog;
02735 }
02736
02737 query_cols[n_query_cols] = &str_callid_col;
02738 query_vals[n_query_cols].type = DB1_STR;
02739 query_vals[n_query_cols].nul = 0;
02740 query_vals[n_query_cols].val.str_val = sub.callid;
02741 n_query_cols++;
02742
02743 query_cols[n_query_cols] = &str_to_tag_col;
02744 query_vals[n_query_cols].type = DB1_STR;
02745 query_vals[n_query_cols].nul = 0;
02746 query_vals[n_query_cols].val.str_val = sub.to_tag;
02747 n_query_cols++;
02748
02749 query_cols[n_query_cols] = &str_from_tag_col;
02750 query_vals[n_query_cols].type = DB1_STR;
02751 query_vals[n_query_cols].nul = 0;
02752 query_vals[n_query_cols].val.str_val = sub.from_tag;
02753 n_query_cols++;
02754
02755 result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
02756 result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
02757 result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
02758 result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
02759 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
02760 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
02761 result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
02762 result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
02763 result_cols[contact_col = n_result_cols++] = &str_contact_col;
02764 result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
02765 result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
02766 result_cols[reason_col = n_result_cols++] = &str_reason_col;
02767 result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
02768 result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
02769 result_cols[status_col = n_result_cols++] = &str_status_col;
02770 result_cols[version_col = n_result_cols++] = &str_version_col;
02771 result_cols[updated_winfo_col = n_result_cols++] = &str_updated_winfo_col;
02772 result_cols[expires_col = n_result_cols++] = &str_expires_col;
02773
02774 if (pa_dbf.start_transaction)
02775 {
02776 if (pa_dbf.start_transaction(pa_db) < 0)
02777 {
02778 LM_ERR("in start_transaction\n");
02779 goto error;
02780 }
02781 }
02782 end_transaction = 1;
02783
02784 if (pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
02785 n_query_cols, n_result_cols, 0, &dialog) < 0)
02786 {
02787 LM_ERR("in sql query\n");
02788 goto error;
02789 }
02790
02791 if (dialog == NULL)
02792 {
02793 LM_ERR("bad result\n");
02794 goto error;
02795 }
02796
02797 if (dialog->n <= 0)
02798 {
02799 LM_WARN("record not found\n");
02800 if (cleanup_missing_dialog(&sub) < 0)
02801 LM_ERR("cleaning up after missing record\n");
02802 goto next_dialog;
02803 }
02804
02805 if (dialog->n > 1)
02806 {
02807 LM_ERR("multiple records found\n");
02808 goto delete_dialog;
02809 }
02810
02811 drows = RES_ROWS(dialog);
02812 dvalues = ROW_VALUES(drows);
02813
02814 EXTRACT_STRING(sub.to_user, VAL_STRING(&dvalues[tuser_col]));
02815 EXTRACT_STRING(sub.to_domain, VAL_STRING(&dvalues[tdomain_col]));
02816 EXTRACT_STRING(sub.from_user, VAL_STRING(&dvalues[fuser_col]));
02817 EXTRACT_STRING(sub.from_domain, VAL_STRING(&dvalues[fdomain_col]));
02818 EXTRACT_STRING(sub.watcher_user, VAL_STRING(&dvalues[wuser_col]));
02819 EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&dvalues[wdomain_col]));
02820 EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&dvalues[sockinfo_col]));
02821 EXTRACT_STRING(sub.local_contact, VAL_STRING(&dvalues[lcontact_col]));
02822 EXTRACT_STRING(sub.contact, VAL_STRING(&dvalues[contact_col]));
02823 EXTRACT_STRING(sub.record_route, VAL_STRING(&dvalues[rroute_col]));
02824 EXTRACT_STRING(sub.event_id, VAL_STRING(&dvalues[event_id_col]));
02825 EXTRACT_STRING(sub.reason, VAL_STRING(&dvalues[reason_col]));
02826
02827 sub.local_cseq = VAL_INT(&dvalues[lcseq_col]) + 1;
02828 sub.remote_cseq = VAL_INT(&dvalues[rcseq_col]);
02829 sub.status = VAL_INT(&dvalues[status_col]);
02830 sub.version = VAL_INT(&dvalues[version_col]) + 1;
02831 cached_updated_winfo = sub.updated_winfo
02832 = VAL_INT(&dvalues[updated_winfo_col]);
02833
02834 if (VAL_INT(&dvalues[expires_col]) > now)
02835 sub.expires = VAL_INT(&dvalues[expires_col]) - now;
02836 else
02837 sub.expires = 0;
02838
02839 if (sub.expires < expires_offset) sub.expires = 0;
02840
02841 sub.updated = round;
02842
02843 if ((notify_sent = notifier_notify(&sub, &updated, &end_transaction)) < 0)
02844 {
02845 LM_ERR("sending NOTIFY request\n");
02846
02847 if (cleanup_missing_dialog(&sub) < 0)
02848 LM_ERR("cleaning up after error sending NOTIFY"
02849 "request\n");
02850
02851
02852 goto delete_dialog;
02853 }
02854
02855 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02856 {
02857 LM_ERR("use table failed\n");
02858 goto error;
02859 }
02860
02861 if ((sub.expires > 0 && sub.status != TERMINATED_STATUS) || updated)
02862 {
02863 if (sub.updated_winfo != cached_updated_winfo)
02864 {
02865 update_cols[n_update_cols] = &str_updated_winfo_col;
02866 update_vals[n_update_cols].type = DB1_INT;
02867 update_vals[n_update_cols].nul = 0;
02868 update_vals[n_update_cols].val.int_val = sub.updated_winfo;
02869 n_update_cols++;
02870 }
02871
02872 if (updated)
02873 {
02874 update_cols[n_update_cols] = &str_updated_col;
02875 update_vals[n_update_cols].type = DB1_INT;
02876 update_vals[n_update_cols].nul = 0;
02877 update_vals[n_update_cols].val.int_val = round;
02878 n_update_cols++;
02879 }
02880
02881 if (notify_sent)
02882 {
02883 update_cols[n_update_cols] = &str_local_cseq_col;
02884 update_vals[n_update_cols].type = DB1_INT;
02885 update_vals[n_update_cols].nul = 0;
02886 update_vals[n_update_cols].val.int_val = sub.local_cseq;
02887 n_update_cols++;
02888
02889 update_cols[n_update_cols] = &str_version_col;
02890 update_vals[n_update_cols].type = DB1_INT;
02891 update_vals[n_update_cols].nul = 0;
02892 update_vals[n_update_cols].val.int_val = sub.version;
02893 n_update_cols++;
02894 }
02895
02896 if (n_update_cols > 0)
02897 {
02898 if (pa_dbf.update(pa_db,
02899 query_cols,
02900 0,
02901 query_vals,
02902 update_cols,
02903 update_vals,
02904 n_query_cols,
02905 n_update_cols) < 0)
02906 {
02907 LM_ERR("in sql update\n");
02908 goto error;
02909 }
02910 }
02911
02912 }
02913 else if (notify_sent)
02914 {
02915 delete_dialog:
02916 if (pa_dbf.use_table(pa_db, &active_watchers_table) < 0)
02917 {
02918 LM_ERR("use table failed\n");
02919 goto error;
02920 }
02921
02922 if (pa_dbf.delete(pa_db, query_cols, 0, query_vals,
02923 n_query_cols) < 0)
02924 {
02925 LM_ERR("in sql delete");
02926 goto error;
02927 }
02928 }
02929
02930 next_dialog:
02931 if (pa_dbf.end_transaction && end_transaction)
02932 {
02933 if (pa_dbf.end_transaction(pa_db) < 0)
02934 {
02935 LM_ERR("in end_transaction\n");
02936 goto error;
02937 }
02938 }
02939
02940 pa_dbf.free_result(pa_db, dialog);
02941 dialog = NULL;
02942 }
02943
02944 done:
02945 ret = 0;
02946 error:
02947 if (dialog_list) pa_dbf.free_result(pa_db, dialog_list);
02948 if (dialog) pa_dbf.free_result(pa_db, dialog);
02949
02950 if (pa_dbf.abort_transaction)
02951 {
02952 if (pa_dbf.abort_transaction(pa_db) < 0)
02953 LM_ERR("in abort_transaction\n");
02954 }
02955
02956 return ret;
02957 }
02958
02959 void pres_timer_send_notify(unsigned int ticks, void *param)
02960 {
02961 int process_num = *((int *) param);
02962 int round = subset + (pres_waitn_time * pres_notifier_poll_rate
02963 * process_num);
02964
02965 if (process_dialogs(round, 0) < 0)
02966 {
02967 LM_ERR("Handling non presence.winfo dialogs\n");
02968 return;
02969 }
02970 if (process_dialogs(round, 1) < 0)
02971 {
02972 LM_ERR("Handling presence.winfo dialogs\n");
02973 return;
02974 }
02975 }