mqueue_mod.c

00001 
00025 #include <stdio.h>
00026 #include <unistd.h>
00027 #include <stdlib.h>
00028 #include <string.h>
00029 
00030 #include "../../sr_module.h"
00031 #include "../../dprint.h"
00032 #include "../../ut.h"
00033 #include "../../pvar.h"
00034 #include "../../mod_fix.h"
00035 #include "../../parser/parse_param.h"
00036 #include "../../shm_init.h"
00037 
00038 #include "mqueue_api.h"
00039 #include "api.h"
00040 
00041 MODULE_VERSION
00042 
00043 static int  mod_init(void);
00044 static void mod_destroy(void);
00045 
00046 static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2);
00047 static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val);
00048 static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2);
00049 int mq_param(modparam_t type, void *val);
00050 static int fixup_mq_add(void** param, int param_no);
00051 static int bind_mq(mq_api_t* api);
00052 
00053 static pv_export_t mod_pvs[] = {
00054         { {"mqk", sizeof("mqk")-1}, PVT_OTHER, pv_get_mqk, 0,
00055                 pv_parse_mq_name, 0, 0, 0 },
00056         { {"mqv", sizeof("mqv")-1}, PVT_OTHER, pv_get_mqv, 0,
00057                 pv_parse_mq_name, 0, 0, 0 },
00058         { {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
00059 };
00060 
00061 
00062 static cmd_export_t cmds[]={
00063         {"mq_fetch", (cmd_function)w_mq_fetch, 1, fixup_spve_null,
00064                 0, ANY_ROUTE},
00065         {"mq_add", (cmd_function)w_mq_add, 3, fixup_mq_add,
00066                 0, ANY_ROUTE},
00067         {"mq_pv_free", (cmd_function)w_mq_pv_free, 1, fixup_spve_null,
00068                 0, ANY_ROUTE},
00069         {"bind_mq", (cmd_function)bind_mq, 1, 0,
00070                 0, ANY_ROUTE},
00071         {0, 0, 0, 0, 0, 0}
00072 };
00073 
00074 static param_export_t params[]={
00075         {"mqueue",          STR_PARAM|USE_FUNC_PARAM, (void*)mq_param},
00076         {0, 0, 0}
00077 };
00078 
00079 struct module_exports exports = {
00080         "mqueue",
00081         DEFAULT_DLFLAGS, /* dlopen flags */
00082         cmds,
00083         params,
00084         0,
00085         0,              /* exported MI functions */
00086         mod_pvs,        /* exported pseudo-variables */
00087         0,              /* extra processes */
00088         mod_init,       /* module initialization function */
00089         0,              /* response function */
00090         mod_destroy,    /* destroy function */
00091         0               /* per child init function */
00092 };
00093 
00094 
00095 
00099 static int mod_init(void)
00100 {
00101         if(!mq_head_defined())
00102                 LM_WARN("no mqueue defined\n");
00103         return 0;
00104 }
00105 
00109 static void mod_destroy(void)
00110 {
00111         mq_destroy();
00112 }
00113 
00114 static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2)
00115 {
00116         int ret;
00117         str q;
00118 
00119         if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00120         {
00121                 LM_ERR("cannot get the queue\n");
00122                 return -1;
00123         }
00124         ret = mq_head_fetch(&q);
00125         if(ret<0)
00126                 return ret;
00127         return 1;
00128 }
00129 
00130 static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val)
00131 {
00132         str q;
00133         str qkey;
00134         str qval;
00135 
00136         if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00137         {
00138                 LM_ERR("cannot get the queue\n");
00139                 return -1;
00140         }
00141         if(fixup_get_svalue(msg, (gparam_t*)key, &qkey)<0)
00142         {
00143                 LM_ERR("cannot get the key\n");
00144                 return -1;
00145         }
00146         if(fixup_get_svalue(msg, (gparam_t*)val, &qval)<0)
00147         {
00148                 LM_ERR("cannot get the val\n");
00149                 return -1;
00150         }
00151         if(mq_item_add(&q, &qkey, &qval)<0)
00152                 return -1;
00153         return 1;
00154 }
00155 
00156 static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2)
00157 {
00158         str q;
00159 
00160         if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00161         {
00162                 LM_ERR("cannot get the queue\n");
00163                 return -1;
00164         }
00165         mq_pv_free(&q);
00166         return 1;
00167 }
00168 
00169 int mq_param(modparam_t type, void *val)
00170 {
00171         str mqs;
00172         param_t* params_list = NULL;
00173         param_hooks_t phooks;
00174         param_t *pit=NULL;
00175         str qname = {0, 0};
00176         int msize = 0;
00177 
00178         if(val==NULL)
00179                 return -1;
00180 
00181         if(!shm_initialized())
00182         {
00183                 LM_ERR("shm not intialized - cannot define mqueue now\n");
00184                 return 0;
00185         }
00186 
00187         mqs.s = (char*)val;
00188         mqs.len = strlen(mqs.s);
00189         if(mqs.s[mqs.len-1]==';')
00190                 mqs.len--;
00191         if (parse_params(&mqs, CLASS_ANY, &phooks, &params_list)<0)
00192                 return -1;
00193         for (pit = params_list; pit; pit=pit->next)
00194         {
00195                 if (pit->name.len==4
00196                                 && strncasecmp(pit->name.s, "name", 4)==0) {
00197                         qname = pit->body;
00198                 } else if(pit->name.len==4
00199                                 && strncasecmp(pit->name.s, "size", 4)==0) {
00200                         str2sint(&pit->body, &msize);
00201                 }  else {
00202                         LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s);
00203                         free_params(params_list);
00204                         return -1;
00205                 }
00206         }
00207         if(qname.len<=0)
00208         {
00209                 LM_ERR("mqueue name not defined: %.*s\n", mqs.len, mqs.s);
00210                 free_params(params_list);
00211                 return -1;
00212         }
00213         if(mq_head_add(&qname, msize)<0)
00214         {
00215                 LM_ERR("cannot add mqueue: %.*s\n", mqs.len, mqs.s);
00216                 free_params(params_list);
00217                 return -1;
00218         }
00219         free_params(params_list);
00220         return 0;
00221 }
00222 
00223 static int fixup_mq_add(void** param, int param_no)
00224 {
00225     if(param_no==1 || param_no==2 || param_no==3) {
00226                 return fixup_spve_null(param, 1);
00227     }
00228 
00229     LM_ERR("invalid parameter number %d\n", param_no);
00230     return E_UNSPEC;
00231 }
00232 
00233 static int bind_mq(mq_api_t* api)
00234 {
00235         if (!api)
00236                 return -1;
00237         api->add = mq_item_add;
00238         return 0;
00239 }