00001
00025 #include <stdio.h>
00026 #include <unistd.h>
00027 #include <stdlib.h>
00028 #include <string.h>
00029
00030 #include "../../sr_module.h"
00031 #include "../../dprint.h"
00032 #include "../../ut.h"
00033 #include "../../pvar.h"
00034 #include "../../mod_fix.h"
00035 #include "../../parser/parse_param.h"
00036 #include "../../shm_init.h"
00037
00038 #include "mqueue_api.h"
00039 #include "api.h"
00040
00041 MODULE_VERSION
00042
00043 static int mod_init(void);
00044 static void mod_destroy(void);
00045
00046 static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2);
00047 static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val);
00048 static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2);
00049 int mq_param(modparam_t type, void *val);
00050 static int fixup_mq_add(void** param, int param_no);
00051 static int bind_mq(mq_api_t* api);
00052
00053 static pv_export_t mod_pvs[] = {
00054 { {"mqk", sizeof("mqk")-1}, PVT_OTHER, pv_get_mqk, 0,
00055 pv_parse_mq_name, 0, 0, 0 },
00056 { {"mqv", sizeof("mqv")-1}, PVT_OTHER, pv_get_mqv, 0,
00057 pv_parse_mq_name, 0, 0, 0 },
00058 { {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
00059 };
00060
00061
00062 static cmd_export_t cmds[]={
00063 {"mq_fetch", (cmd_function)w_mq_fetch, 1, fixup_spve_null,
00064 0, ANY_ROUTE},
00065 {"mq_add", (cmd_function)w_mq_add, 3, fixup_mq_add,
00066 0, ANY_ROUTE},
00067 {"mq_pv_free", (cmd_function)w_mq_pv_free, 1, fixup_spve_null,
00068 0, ANY_ROUTE},
00069 {"bind_mq", (cmd_function)bind_mq, 1, 0,
00070 0, ANY_ROUTE},
00071 {0, 0, 0, 0, 0, 0}
00072 };
00073
00074 static param_export_t params[]={
00075 {"mqueue", STR_PARAM|USE_FUNC_PARAM, (void*)mq_param},
00076 {0, 0, 0}
00077 };
00078
00079 struct module_exports exports = {
00080 "mqueue",
00081 DEFAULT_DLFLAGS,
00082 cmds,
00083 params,
00084 0,
00085 0,
00086 mod_pvs,
00087 0,
00088 mod_init,
00089 0,
00090 mod_destroy,
00091 0
00092 };
00093
00094
00095
00099 static int mod_init(void)
00100 {
00101 if(!mq_head_defined())
00102 LM_WARN("no mqueue defined\n");
00103 return 0;
00104 }
00105
00109 static void mod_destroy(void)
00110 {
00111 mq_destroy();
00112 }
00113
00114 static int w_mq_fetch(struct sip_msg* msg, char* mq, char* str2)
00115 {
00116 int ret;
00117 str q;
00118
00119 if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00120 {
00121 LM_ERR("cannot get the queue\n");
00122 return -1;
00123 }
00124 ret = mq_head_fetch(&q);
00125 if(ret<0)
00126 return ret;
00127 return 1;
00128 }
00129
00130 static int w_mq_add(struct sip_msg* msg, char* mq, char* key, char* val)
00131 {
00132 str q;
00133 str qkey;
00134 str qval;
00135
00136 if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00137 {
00138 LM_ERR("cannot get the queue\n");
00139 return -1;
00140 }
00141 if(fixup_get_svalue(msg, (gparam_t*)key, &qkey)<0)
00142 {
00143 LM_ERR("cannot get the key\n");
00144 return -1;
00145 }
00146 if(fixup_get_svalue(msg, (gparam_t*)val, &qval)<0)
00147 {
00148 LM_ERR("cannot get the val\n");
00149 return -1;
00150 }
00151 if(mq_item_add(&q, &qkey, &qval)<0)
00152 return -1;
00153 return 1;
00154 }
00155
00156 static int w_mq_pv_free(struct sip_msg* msg, char* mq, char* str2)
00157 {
00158 str q;
00159
00160 if(fixup_get_svalue(msg, (gparam_t*)mq, &q)<0)
00161 {
00162 LM_ERR("cannot get the queue\n");
00163 return -1;
00164 }
00165 mq_pv_free(&q);
00166 return 1;
00167 }
00168
00169 int mq_param(modparam_t type, void *val)
00170 {
00171 str mqs;
00172 param_t* params_list = NULL;
00173 param_hooks_t phooks;
00174 param_t *pit=NULL;
00175 str qname = {0, 0};
00176 int msize = 0;
00177
00178 if(val==NULL)
00179 return -1;
00180
00181 if(!shm_initialized())
00182 {
00183 LM_ERR("shm not intialized - cannot define mqueue now\n");
00184 return 0;
00185 }
00186
00187 mqs.s = (char*)val;
00188 mqs.len = strlen(mqs.s);
00189 if(mqs.s[mqs.len-1]==';')
00190 mqs.len--;
00191 if (parse_params(&mqs, CLASS_ANY, &phooks, ¶ms_list)<0)
00192 return -1;
00193 for (pit = params_list; pit; pit=pit->next)
00194 {
00195 if (pit->name.len==4
00196 && strncasecmp(pit->name.s, "name", 4)==0) {
00197 qname = pit->body;
00198 } else if(pit->name.len==4
00199 && strncasecmp(pit->name.s, "size", 4)==0) {
00200 str2sint(&pit->body, &msize);
00201 } else {
00202 LM_ERR("unknown param: %.*s\n", pit->name.len, pit->name.s);
00203 free_params(params_list);
00204 return -1;
00205 }
00206 }
00207 if(qname.len<=0)
00208 {
00209 LM_ERR("mqueue name not defined: %.*s\n", mqs.len, mqs.s);
00210 free_params(params_list);
00211 return -1;
00212 }
00213 if(mq_head_add(&qname, msize)<0)
00214 {
00215 LM_ERR("cannot add mqueue: %.*s\n", mqs.len, mqs.s);
00216 free_params(params_list);
00217 return -1;
00218 }
00219 free_params(params_list);
00220 return 0;
00221 }
00222
00223 static int fixup_mq_add(void** param, int param_no)
00224 {
00225 if(param_no==1 || param_no==2 || param_no==3) {
00226 return fixup_spve_null(param, 1);
00227 }
00228
00229 LM_ERR("invalid parameter number %d\n", param_no);
00230 return E_UNSPEC;
00231 }
00232
00233 static int bind_mq(mq_api_t* api)
00234 {
00235 if (!api)
00236 return -1;
00237 api->add = mq_item_add;
00238 return 0;
00239 }