virtual_subscription.c

00001 #include "rl_subscription.h"
00002 #include "../../mem/mem.h"
00003 #include "../../mem/shm_mem.h"
00004 #include <xcap/resource_list.h>
00005 #include "rls_mod.h"
00006 #include "result_codes.h"
00007 #include <cds/dstring.h>
00008 #include <cds/logger.h>
00009 #include <presence/qsa.h>
00010 #include <presence/pres_doc.h>
00011 #include <presence/pidf.h>
00012 #include <cds/list.h>
00013 #include "rls_data.h"
00014 #include "rls_auth.h"
00015 
00016 /* shared structure holding the data */
00017 typedef struct {
00018         virtual_subscription_t *first;
00019         virtual_subscription_t *last;
00020         /* hash, ... */
00021         notifier_domain_t *domain;
00022         qsa_content_type_t *ct_presence_info;
00023         qsa_content_type_t *ct_raw;
00024 } vs_data_t;
00025 
00026 static vs_data_t *vsd = NULL;
00027 
00028 /******** global functions (initialization) ********/
00029 
00030 int vs_init()
00031 {
00032         static str presence_info = STR_STATIC_INIT(CT_PRESENCE_INFO);
00033         static str raw = STR_STATIC_INIT(CT_RAW);
00034         
00035         vsd = (vs_data_t*)mem_alloc(sizeof(vs_data_t));
00036         if (!vsd) {
00037                 LOG(L_ERR, "vs_init(): memory allocation error\n");
00038                 return -1;
00039         }
00040         vsd->first = NULL;
00041         vsd->last = NULL;
00042         vsd->domain = qsa_get_default_domain();
00043         if (!vsd->domain) {
00044                 LOG(L_ERR, "vs_init(): can't register notifier domain\n");
00045                 return -1;      
00046         }
00047         DEBUG_LOG("QSA (vs) domain: %p\n", vsd->domain);
00048         
00049         vsd->ct_presence_info = register_content_type(vsd->domain, 
00050                         &presence_info, (destroy_function_f)free_presentity_info);
00051         if (!vsd->ct_presence_info) {
00052                 ERR("can't register QSA content type\n");
00053                 return -1;
00054         }
00055         else TRACE("RLS_PRESENCE_INFO: %p\n", vsd->ct_presence_info);
00056         
00057         vsd->ct_raw = register_content_type(vsd->domain, 
00058                         &raw, (destroy_function_f)free_raw_presence_info);
00059         if (!vsd->ct_raw) {
00060                 ERR("can't register QSA content type\n");
00061                 return -1;
00062         }
00063         else TRACE("RLS_RAW: %p\n", vsd->ct_raw);
00064         
00065         return 0;
00066 }
00067 
00068 int vs_destroy()
00069 {
00070         /* virtual subscriptions are freed in rls_free */
00071         if (vsd) {
00072                 qsa_release_domain(vsd->domain);
00073                 vsd->domain = NULL;
00074                 mem_free(vsd);
00075                 vsd = NULL;
00076         }
00077         return 0;
00078 }
00079 
00080 /******** Helper functions ********/
00081 
00082 /* sets new documents (frees them if equal) */
00083 static void set_vs_document(virtual_subscription_t *vs,
00084                 str_t *new_doc, 
00085                 str_t *new_content_type)
00086 {
00087         if (str_case_equals(&vs->state_document, new_doc) == 0) {
00088                 /* DEBUG("new document is equal to the older one\n"); */
00089                 str_free_content(new_doc);
00090         }
00091         else {
00092                 str_free_content(&vs->state_document);
00093                 if (new_doc) vs->state_document = *new_doc;
00094                 else str_clear(&vs->state_document);
00095                 vs->changed = 1;
00096         }
00097         
00098         if (str_case_equals(&vs->content_type, new_content_type) == 0) {
00099                 /* DEBUG("new content-type is equal to the older one\n"); */
00100                 str_free_content(new_content_type);
00101         }
00102         else {
00103                 str_free_content(&vs->content_type);
00104                 if (new_content_type) vs->content_type = *new_content_type;
00105                 else str_clear(&vs->content_type);
00106                 vs->changed = 1;
00107         }
00108 }
00109 
00110 /* duplicates documents if changed */
00111 static int set_vs_document_dup(virtual_subscription_t *vs,
00112                 str_t *new_doc, 
00113                 str_t *new_content_type)
00114 {
00115         if (str_case_equals(&vs->state_document, new_doc) == 0) {
00116                 /* DEBUG("new document is equal to the older one\n"); */
00117         }
00118         else {
00119                 str_free_content(&vs->state_document);
00120                 str_dup(&vs->state_document, new_doc);
00121                 vs->changed = 1;
00122         }
00123         
00124         if (str_case_equals(&vs->content_type, new_content_type) == 0) {
00125                 /* DEBUG("new content-type is equal to the older one\n"); */
00126         }
00127         else {
00128                 str_free_content(&vs->content_type);
00129                 str_dup(&vs->content_type, new_content_type);
00130                 vs->changed = 1;
00131         }
00132         return 0;
00133 }
00134 
00135 static void propagate_change(virtual_subscription_t *vs)
00136 {
00137         if (vs->subscription->type == rls_internal_subscription) {
00138                 /* propagate change to higher level */
00139                 rls_generate_notify(vs->subscription, 1);
00140         }
00141         else {
00142                 /* external subscriptions will send NOTIFY only sometimes 
00143                  * => we mark it as changed now */
00144                 vs->subscription->changed++;
00145                 /* FIXME: put this subscription in some queue? (needs remove from it
00146                  * when freeing!) */
00147                 if (rls) rls->changed_subscriptions++; /* change indicator */
00148         }
00149 }
00150 
00151 void process_rls_notification(virtual_subscription_t *vs, client_notify_info_t *info) 
00152 {
00153         presentity_info_t *pinfo;
00154         raw_presence_info_t *raw;
00155         str_t new_doc = STR_NULL;
00156         str_t new_type = STR_NULL;
00157         subscription_status_t old_status;
00158         
00159         if ((!vs) || (!info)) return;
00160         
00161         DBG("Processing notification for VS %p\n", vs);
00162         /* FIXME: put information from more sources together ? */
00163 
00164         old_status = vs->status;
00165         switch (info->status) {
00166                 case qsa_subscription_active:
00167                         vs->status = subscription_active;
00168                         break;
00169                 case qsa_subscription_pending:
00170                         vs->status = subscription_pending;
00171                         break;
00172                 case qsa_subscription_rejected:
00173                         vs->status = subscription_terminated;
00174                         break;
00175                 case qsa_subscription_terminated:
00176                         vs->status = subscription_terminated;
00177                         break;
00178         }
00179         if (old_status != vs->status) vs->changed = 1;
00180 
00181         if (info->content_type == vsd->ct_raw) {
00182                 DEBUG("Processing raw notification\n");
00183                         
00184                 raw = (raw_presence_info_t*)info->data;
00185                 if (!raw) return;
00186         
00187                 /* document MUST be duplicated !!! */
00188                 if (set_vs_document_dup(vs, &raw->pres_doc, &raw->content_type) < 0) {
00189                         ERR("can't set new status document for VS %p\n", vs);
00190                         return;
00191                 }
00192         }
00193         else {  
00194                 if (info->content_type == vsd->ct_presence_info) {
00195                         DEBUG("Processing structured notification\n");
00196                         
00197                         pinfo = (presentity_info_t*)info->data;
00198                         if (!pinfo) {
00199                                 str_clear(&new_doc);
00200                                 str_clear(&new_type);
00201                         }
00202                         else {
00203                                 if (create_pidf_document(pinfo, &new_doc, &new_type) < 0) {
00204                                         ERR("can't create PIDF document\n");
00205                                         str_free_content(&vs->state_document);          
00206                                         str_free_content(&vs->content_type);
00207                                         return;
00208                                 }
00209                                 set_vs_document(vs, &new_doc, &new_type);
00210                         }
00211                 }
00212                 else {
00213                         if (info->content_type)
00214                                 ERR("received unacceptable notification (%.*s)\n", 
00215                                         FMT_STR(info->content_type->name));
00216                         else ERR("received unacceptable notification without content type\n");
00217                         str_free_content(&vs->state_document);          
00218                         str_free_content(&vs->content_type);
00219                         return;
00220                 }
00221         }
00222         
00223         if (vs->changed) propagate_change(vs);
00224 }
00225 
00226 void process_internal_notify(virtual_subscription_t *vs, 
00227                 str_t *new_state_document,
00228                 str_t *new_content_type) 
00229 {
00230         if (!vs) return;
00231         
00232         DBG("Processing internal notification for VS %p\n", vs);
00233         
00234         /* don't copy document - use it directly */
00235         set_vs_document(vs, new_state_document, new_content_type);
00236         if (vs->changed) propagate_change(vs);
00237 }
00238 
00239 #if 0
00240 
00241 static void mark_as_modified(virtual_subscription_t *vs)
00242 {
00243         rl_subscription_t *rls = vs->subscription;
00244 
00245         switch (rls->type) {
00246                 case rls_external_subscription:
00247                         if (sm_subscription_pending(&rls->u.external) == 0) {
00248                                 /* pending subscription will not be notified */
00249                                 return; 
00250                         }
00251                         break;
00252                 case rls_internal_subscription:
00253                         /* FIXME: something like above? */
00254                         break;
00255         }
00256                                 
00257         /* NOTIFY should be send only for nonpending subscriptions (or active?)*/
00258 
00259         vs->subscription->changed++;
00260         DEBUG_LOG("RL subscription status changed (%p, %d)\n", 
00261                 rls, rls->changed);
00262 }
00263 
00264 static void vs_timer_cb(unsigned int ticks, void *param)
00265 {
00266         virtual_subscription_t *vs;
00267         int changed = 0;
00268         int cntr = 0;
00269         time_t start, stop;
00270 
00271         start = time(NULL);
00272         rls_lock();
00273 
00274         /* process all messages for virtual subscriptions */
00275         vs = vsd->first;
00276         while (vs) {
00277                 if (process_vs_messages(vs) > 0) {
00278                         DEBUG_LOG("VS status changed\n");
00279                         mark_as_modified(vs);
00280                         changed = 1;
00281                 }
00282                 vs = vs->next;
00283                 cntr++; /* debugging purposes */
00284         }
00285 
00286         /* TRACE_LOG("processed messages for %d virtual subscription(s)\n", cntr); */
00287         
00288         if (changed) {
00289                 /* this could be called from some rli_timer ? */
00290                 notify_all_modified();  
00291         }
00292         
00293         rls_unlock();
00294         stop = time(NULL);
00295 
00296         if (stop - start > 1) WARN("vs_timer_cb took %d secs\n", (int) (stop - start));
00297 }
00298 
00299 #endif
00300 
00301 static int add_to_vs_list(virtual_subscription_t *vs)
00302 {
00303         if (!vs) return RES_INTERNAL_ERR;
00304         if (!vsd) {
00305                 LOG(L_ERR, "vs_add(): vsd not set!\n");
00306                 return RES_INTERNAL_ERR;
00307         }
00308         DOUBLE_LINKED_LIST_ADD(vsd->first, vsd->last, vs);
00309 
00310         return RES_OK;
00311 }
00312 
00313 static int remove_from_vs_list(virtual_subscription_t *vs)
00314 {
00315         if (!vs) return RES_INTERNAL_ERR;
00316         if (!vsd) {
00317                 LOG(L_ERR, "vs_remove(): vsd not set!\n");
00318                 return RES_INTERNAL_ERR;
00319         }
00320 
00321         DOUBLE_LINKED_LIST_REMOVE(vsd->first, vsd->last, vs);
00322         
00323         return RES_OK;
00324 }
00325 
00326 int xcap_query_rls_services(xcap_query_params_t *xcap_params,
00327                 const str *uri, const str *package, 
00328                 flat_list_t **dst)
00329 {
00330         if (dst) *dst = NULL;
00331         
00332         if (reduce_xcap_needs)
00333                 return get_rls_from_full_doc(uri, xcap_params, package, dst);
00334         else
00335                 return get_rls(uri, xcap_params, package, dst);
00336 }
00337 
00338 static int create_subscriptions(virtual_subscription_t *vs, int nesting_level)
00339 {
00340         /* create concrete local subscription */
00341         str *package = NULL;
00342         str *subscriber = NULL;
00343         flat_list_t *flat = NULL;
00344 
00345         package = rls_get_package(vs->subscription);
00346 
00347         DEBUG_LOG("creating local subscription to %.*s\n", FMT_STR(vs->uri));
00348 
00349         if ((nesting_level != 0) &&
00350                         (xcap_query_rls_services(&vs->subscription->xcap_params,
00351                                 &vs->uri, package, &flat) == 0)) {
00352                 if (nesting_level > 0) nesting_level--;
00353                 /* it is resource list -> do internal subscription to RLS */
00354                 if (rls_create_internal_subscription(vs,
00355                                         &vs->local_subscription_list, flat,
00356                                         nesting_level) != 0) {
00357                         ERR("can't create internal subscription\n");
00358                         free_flat_list(flat);
00359                         return -1;
00360                 }
00361                 free_flat_list(flat);
00362                 
00363                 vs->status = subscription_active;
00364                 /* FIXME: rls_authorize_subscription(vs->local_subscription_list); */
00365         }
00366         else {
00367                 /* fill QSA subscription data */
00368                 clear_subscription_data(&vs->local_subscription_pres_data);
00369                 vs->local_subscription_pres_data.dst = &rls->notify_mq;
00370                 vs->local_subscription_pres_data.record_id = vs->uri;
00371                 subscriber = rls_get_subscriber(vs->subscription);
00372                 vs->local_subscription_pres_data.subscriber_data = vs;
00373                 if (subscriber) 
00374                         vs->local_subscription_pres_data.subscriber_id = *subscriber;
00375 
00376                 /* not RLS record -> do QSA subscription to given package */
00377                 vs->local_subscription_pres = subscribe(vsd->domain, 
00378                                 package, &vs->local_subscription_pres_data);
00379                 if (!vs->local_subscription_pres) {
00380                         LOG(L_ERR, "can't create local subscription (pres)!\n");
00381                         return -1;
00382                 }
00383         }
00384         
00385         return 0;
00386 }
00387 
00388 /******** VS manipulation ********/
00389 
00390 int vs_create(str *uri, 
00391                 virtual_subscription_t **dst, 
00392                 display_name_t *dnames, 
00393                 rl_subscription_t *subscription,
00394                 int nesting_level)
00395 {
00396         int res;
00397         display_name_t *d;
00398 
00399         if (!dst) return RES_INTERNAL_ERR;
00400         *dst = NULL;
00401         if (!uri) {
00402                 LOG(L_ERR, "vs_create(): no uri given\n");
00403                 return RES_INTERNAL_ERR;
00404         }
00405         if ((!uri->s) || (uri->len < 1)) {
00406                 LOG(L_ERR, "vs_create(): no uri given\n");
00407                 return RES_INTERNAL_ERR;
00408         }
00409         
00410         *dst = (virtual_subscription_t*)mem_alloc(sizeof(virtual_subscription_t) + uri->len + 1);
00411         if (!(*dst)) {
00412                 LOG(L_ERR, "vs_create(): can't allocate memory\n");
00413                 return RES_MEMORY_ERR;
00414         }
00415 
00416         (*dst)->next = NULL;
00417         (*dst)->prev = NULL;
00418         vector_init(&(*dst)->display_names, sizeof(vs_display_name_t), 4);
00419         memcpy((*dst)->uri_str, uri->s, uri->len);
00420         (*dst)->uri.s = (*dst)->uri_str;
00421         (*dst)->uri.len = uri->len;
00422         (*dst)->state_document.len = 0;
00423         (*dst)->state_document.s = NULL;
00424         (*dst)->content_type.len = 0;
00425         (*dst)->content_type.s = NULL;
00426         (*dst)->status = subscription_pending;
00427         (*dst)->local_subscription_pres = NULL;
00428         (*dst)->local_subscription_list = NULL;
00429         (*dst)->subscription = subscription;
00430         (*dst)->changed = 0;
00431         generate_db_id(&(*dst)->dbid, *dst);
00432 
00433         add_to_vs_list(*dst);
00434 
00435         DBG("created VS %p to %.*s\n", *dst, uri->len, uri->s);
00436         
00437         res = create_subscriptions(*dst, nesting_level);
00438         if (res != 0) {
00439                 vs_free(*dst);
00440                 return res;
00441         }
00442         
00443         /* TODO: remember the list of Accept headers from client subscribe
00444          * it will be used for Back-End subscriptions */
00445                         
00446         /* add names */
00447         if (dnames) {
00448                 d = SEQUENCE_FIRST(dnames);
00449                 while (d) {
00450                         vs_add_display_name((*dst), d->name, d->lang);
00451                         d = SEQUENCE_NEXT(d);
00452                 }
00453         }
00454 
00455         return RES_OK;
00456 }
00457 
00458 int vs_add_display_name(virtual_subscription_t *vs, const char *name, const char *lang)
00459 {
00460         vs_display_name_t dn;
00461 
00462         if (name) {
00463                 dn.name.len = strlen(name);
00464                 if (dn.name.len > 0) {
00465                         dn.name.s = (char *)mem_alloc(dn.name.len);
00466                         if (!dn.name.s) dn.name.len = 0;
00467                         else memcpy(dn.name.s, name, dn.name.len);
00468                 }
00469         }
00470         else {
00471                 dn.name.len = 0;
00472                 dn.name.s = NULL;
00473         }
00474         
00475         if (lang) {
00476                 dn.lang.len = strlen(lang);
00477                 if (dn.lang.len > 0) {
00478                         dn.lang.s = (char *)mem_alloc(dn.lang.len);
00479                         if (!dn.lang.s) dn.lang.len = 0;
00480                         else memcpy(dn.lang.s, lang, dn.lang.len);
00481                 }
00482         }
00483         else {
00484                 dn.lang.len = 0;
00485                 dn.lang.s = NULL;
00486         }
00487 /*      TRACE_LOG("adding display name: %s\n", name);*/
00488         return vector_add(&vs->display_names, &dn);
00489 }
00490 
00491 void vs_free(virtual_subscription_t *vs)
00492 {
00493         int i, cnt;
00494         vs_display_name_t dn;
00495         
00496         if (vs) {
00497                 if (vs->local_subscription_pres)
00498                         unsubscribe(vsd->domain, vs->local_subscription_pres);
00499                 if (vs->local_subscription_list) 
00500                         rls_remove(vs->local_subscription_list);
00501         
00502                 /* remove notification messages for given subscription */
00503                 destroy_notifications(vs->local_subscription_pres);
00504                 
00505                 remove_from_vs_list(vs);
00506                 
00507                 str_free_content(&vs->state_document);
00508                 str_free_content(&vs->content_type);
00509 
00510                 /* if ( (vs->package.len > 0) && (vs->package.s) ) 
00511                 mem_free(vs->package.s); */
00512                 
00513                 cnt = vector_size(&vs->display_names);
00514                 for (i = 0; i < cnt; i++) {
00515                         if (vector_get(&vs->display_names, i, &dn) != 0) continue;
00516                         if (dn.name.s && (dn.name.len > 0)) mem_free(dn.name.s);
00517                         if (dn.lang.s && (dn.lang.len > 0)) mem_free(dn.lang.s);
00518                 }
00519                 vector_destroy(&vs->display_names);
00520 
00521                 mem_free(vs);
00522 /*              LOG(L_TRACE, "Virtual Subscription freed\n");*/
00523         }
00524 }
00525