00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00039 #include "pg_cmd.h"
00040 #include "pg_sql.h"
00041 #include "pg_fld.h"
00042 #include "pg_con.h"
00043 #include "pg_mod.h"
00044 #include "pg_uri.h"
00045 #include "pg_res.h"
00046
00047 #include "../../mem/mem.h"
00048 #include "../../dprint.h"
00049 #include "../../ut.h"
00050
00051 #include <string.h>
00052
00057 static int server_query_no = 0;
00058
00059 static int upload_cmd(db_cmd_t* cmd);
00060 static void free_pg_params(struct pg_params* cmd);
00061
00062
00068 static void pg_cmd_free(db_cmd_t* cmd, struct pg_cmd* payload)
00069 {
00070 db_drv_free(&payload->gen);
00071 if (payload->sql_cmd.s) pkg_free(payload->sql_cmd.s);
00072 free_pg_params(&payload->params);
00073 if (payload->name) pkg_free(payload->name);
00074 if (payload->types) PQclear(payload->types);
00075 pkg_free(payload);
00076 }
00077
00078
00089 static int gen_cmd_name(db_cmd_t* cmd)
00090 {
00091 struct pg_cmd* pcmd;
00092 char* c;
00093 int len;
00094
00095 pcmd = DB_GET_PAYLOAD(cmd);
00096 c = int2str(server_query_no, &len);
00097
00098 pcmd->name = pkg_malloc(len + 1);
00099 if (pcmd->name == NULL) {
00100 ERR("postgres: No memory left\n");
00101 return -1;
00102 }
00103 memcpy(pcmd->name, c, len);
00104 pcmd->name[len] = '\0';
00105 server_query_no++;
00106 return 0;
00107 }
00108
00109
00119 static int create_pg_params(db_cmd_t* cmd)
00120 {
00121 int num;
00122 struct pg_cmd* pcmd;
00123
00124 pcmd = DB_GET_PAYLOAD(cmd);
00125
00126 num = cmd->match_count + cmd->vals_count;
00127
00128 if (num == 0) return 0;
00129 pcmd->params.val = (const char**)pkg_malloc(sizeof(const char*) * num);
00130 pcmd->params.len = (int*)pkg_malloc(sizeof(int) * num);
00131 pcmd->params.fmt = (int*)pkg_malloc(sizeof(int) * num);
00132
00133 if (!pcmd->params.val ||
00134 !pcmd->params.len || !pcmd->params.fmt) {
00135 ERR("postgres: No memory left\n");
00136 goto error;
00137 }
00138
00139 memset(pcmd->params.val, '\0', sizeof(const char*) * num);
00140 memset(pcmd->params.len, '\0', sizeof(int) * num);
00141 memset(pcmd->params.fmt, '\0', sizeof(int) * num);
00142 pcmd->params.n = num;
00143 return 0;
00144
00145 error:
00146 free_pg_params(&pcmd->params);
00147 return -1;
00148 }
00149
00150
00156 static void free_pg_params(struct pg_params* params)
00157 {
00158 if (params == NULL) return;
00159
00160 if (params->val) pkg_free(params->val);
00161 params->val = NULL;
00162
00163 if (params->len) pkg_free(params->len);
00164 params->len = NULL;
00165
00166 if (params->fmt) pkg_free(params->fmt);
00167 params->fmt = NULL;
00168 }
00169
00170
00181 static int check_types(db_cmd_t* cmd)
00182 {
00183 struct pg_cmd* pcmd;
00184 struct pg_con* pcon;
00185
00186 pcmd = DB_GET_PAYLOAD(cmd);
00187
00188 pcon = DB_GET_PAYLOAD(cmd->ctx->con[db_payload_idx]);
00189
00190 if (pg_check_fld2pg(cmd->match, pcon->oid)) return -1;
00191 if (pg_check_fld2pg(cmd->vals, pcon->oid)) return -1;
00192 if (pg_check_pg2fld(cmd->result, pcon->oid)) return -1;
00193 return 0;
00194 }
00195
00196
00197 static int get_types(db_cmd_t* cmd)
00198 {
00199 struct pg_cmd* pcmd;
00200 struct pg_con* pcon;
00201
00202 pcmd = DB_GET_PAYLOAD(cmd);
00203
00204 pcon = DB_GET_PAYLOAD(cmd->ctx->con[db_payload_idx]);
00205
00206 pcmd->types = PQdescribePrepared(pcon->con, pcmd->name);
00207
00208 if (PQresultStatus(pcmd->types) != PGRES_COMMAND_OK) {
00209 ERR("postgres: Error while obtaining description of prepared statement\n");
00210 return -1;
00211 }
00212
00213 return 0;
00214 }
00215
00216
00217 int pg_cmd(db_cmd_t* cmd)
00218 {
00219 struct pg_cmd* pcmd;
00220
00221 pcmd = (struct pg_cmd*)pkg_malloc(sizeof(struct pg_cmd));
00222 if (pcmd == NULL) {
00223 ERR("postgres: No memory left\n");
00224 goto error;
00225 }
00226 memset(pcmd, '\0', sizeof(struct pg_cmd));
00227 if (db_drv_init(&pcmd->gen, pg_cmd_free) < 0) goto error;
00228
00229 switch(cmd->type) {
00230 case DB_PUT:
00231 if (build_insert_sql(&pcmd->sql_cmd, cmd) < 0) goto error;
00232 break;
00233
00234 case DB_DEL:
00235 if (build_delete_sql(&pcmd->sql_cmd, cmd) < 0) goto error;
00236 break;
00237
00238 case DB_GET:
00239 if (build_select_sql(&pcmd->sql_cmd, cmd) < 0) goto error;
00240 break;
00241
00242 case DB_UPD:
00243 if (build_update_sql(&pcmd->sql_cmd, cmd) < 0) goto error;
00244 break;
00245
00246 case DB_SQL:
00247 pcmd->sql_cmd.s = (char*)pkg_malloc(cmd->table.len + 1);
00248 if (pcmd->sql_cmd.s == NULL) {
00249 ERR("postgres: Out of private memory\n");
00250 goto error;
00251 }
00252 memcpy(pcmd->sql_cmd.s,cmd->table.s, cmd->table.len);
00253 pcmd->sql_cmd.s[cmd->table.len] = '\0';
00254 pcmd->sql_cmd.len = cmd->table.len;
00255 break;
00256 }
00257
00258 DB_SET_PAYLOAD(cmd, pcmd);
00259
00260
00261 if (create_pg_params(cmd) < 0) goto error;
00262
00263
00264 if (gen_cmd_name(cmd) != 0) goto error;
00265
00266
00267 if (upload_cmd(cmd) != 0) goto error;
00268
00269
00270
00271 if (get_types(cmd) != 0) goto error;
00272
00273
00274 if (pg_resolve_param_oids(cmd->vals, cmd->match,
00275 cmd->vals_count, cmd->match_count,
00276 pcmd->types))
00277 goto error;
00278 if (pg_resolve_result_oids(cmd->result, cmd->result_count, pcmd->types))
00279 goto error;
00280
00281 if (check_types(cmd)) goto error;
00282
00283 return 0;
00284
00285 error:
00286 if (pcmd) {
00287 DB_SET_PAYLOAD(cmd, NULL);
00288 free_pg_params(&pcmd->params);
00289
00290 if (pcmd->types) PQclear(pcmd->types);
00291 if (pcmd->name) pkg_free(pcmd->name);
00292 if (pcmd->sql_cmd.s) pkg_free(pcmd->sql_cmd.s);
00293
00294 db_drv_free(&pcmd->gen);
00295 pkg_free(pcmd);
00296 }
00297 return -1;
00298 }
00299
00300
00301 int pg_getopt(db_cmd_t* cmd, char* optname, va_list ap)
00302 {
00303 struct pg_cmd* pcmd;
00304 long long* id;
00305
00306 pcmd = (struct pg_cmd*)DB_GET_PAYLOAD(cmd);
00307
00308 if (!strcasecmp("last_id", optname)) {
00309 id = va_arg(ap, long long*);
00310 if (id == NULL) {
00311 BUG("postgres: NULL pointer passed to 'last_id' option\n");
00312 goto error;
00313 }
00314 return -1;
00315 } else {
00316 return 1;
00317 }
00318 return 0;
00319
00320 error:
00321 return -1;
00322 }
00323
00324
00325 int pg_setopt(db_cmd_t* cmd, char* optname, va_list ap)
00326 {
00327 struct pg_cmd* pcmd;
00328
00329 pcmd = (struct pg_cmd*)DB_GET_PAYLOAD(cmd);
00330 return 1;
00331 }
00332
00333
00341 static int upload_cmd(db_cmd_t* cmd)
00342 {
00343 struct pg_cmd* pcmd;
00344 struct pg_con* pcon;
00345 PGresult* res;
00346 int st;
00347
00348 pcmd = DB_GET_PAYLOAD(cmd);
00349
00350 pcon = DB_GET_PAYLOAD(cmd->ctx->con[db_payload_idx]);
00351
00352 DBG("postgres: Uploading command '%s': '%s'\n", pcmd->name,
00353 pcmd->sql_cmd.s);
00354
00355 res = PQprepare(pcon->con, pcmd->name, pcmd->sql_cmd.s, 0, NULL);
00356
00357 st = PQresultStatus(res);
00358
00359 if (st != PGRES_COMMAND_OK && st != PGRES_NONFATAL_ERROR &&
00360 st != PGRES_TUPLES_OK) {
00361 ERR("postgres: Error while uploading command to server: %d, %s",
00362 st, PQresultErrorMessage(res));
00363 ERR("postgres: Command: '%s'\n", pcmd->sql_cmd.s);
00364 PQclear(res);
00365 return -1;
00366 }
00367
00368 PQclear(res);
00369 return 0;
00370 }
00371
00372
00373 int pg_cmd_exec(db_res_t* res, db_cmd_t* cmd)
00374 {
00375 PGresult* tmp;
00376 int i, err, stat;
00377 db_con_t* con;
00378 struct pg_cmd* pcmd;
00379 struct pg_con* pcon;
00380 struct pg_uri* puri;
00381 struct pg_res* pres;
00382
00383
00384
00385
00386 con = cmd->ctx->con[db_payload_idx];
00387 pcmd = DB_GET_PAYLOAD(cmd);
00388 pcon = DB_GET_PAYLOAD(con);
00389 puri = DB_GET_PAYLOAD(con->uri);
00390
00391 for(i = 0; i <= pg_retries; i++) {
00392
00393
00394 if (pg_fld2pg(&pcmd->params, 0, pcon->oid, cmd->match, pcon->flags) != 0)
00395 return 1;
00396
00397 if (pg_fld2pg(&pcmd->params, cmd->match_count,
00398 pcon->oid, cmd->vals, pcon->flags) != 0) return 1;
00399
00400
00401 tmp = PQexecPrepared(pcon->con, pcmd->name,
00402 pcmd->params.n,
00403 pcmd->params.val, pcmd->params.len,
00404 pcmd->params.fmt, 1);
00405 if (!tmp) {
00406 ERR("postgres: PQexecPrepared returned no result\n");
00407 continue;
00408 }
00409
00410 switch(PQresultStatus(tmp)) {
00411 case PGRES_COMMAND_OK:
00412 case PGRES_NONFATAL_ERROR:
00413 case PGRES_TUPLES_OK:
00414 if (res) {
00415 pres = DB_GET_PAYLOAD(res);
00416 pres->res = tmp;
00417 pres->rows = PQntuples(tmp);
00418 } else {
00419 PQclear(tmp);
00420 }
00421 return 0;
00422
00423 default:
00424 break;
00425 }
00426 ERR("postgres: Command on server %s failed: %s: %s\n",
00427 puri->host, PQresStatus(PQresultStatus(tmp)),
00428 PQresultErrorMessage(tmp));
00429 PQclear(tmp);
00430
00431
00432
00433 if (PQstatus(pcon->con) != CONNECTION_OK) {
00434 INFO("postgres: Connection to server %s disconnected, attempting reconnect\n",
00435 puri->host);
00436 pg_con_disconnect(con);
00437 if (pg_con_connect(con)) {
00438 INFO("postgres: Failed to reconnect server %s, giving up\n",
00439 puri->host);
00440 return -1;
00441 }
00442 INFO("postgres: Successfully reconnected server on %s\n",
00443 puri->host);
00444 }
00445
00446
00447
00448
00449 tmp = PQdescribePrepared(pcon->con, pcmd->name);
00450 if (tmp == NULL) {
00451 ERR("postgres: PQdescribePrepared returned no result\n");
00452 continue;
00453 }
00454 stat = PQresultStatus(tmp);
00455 PQclear(tmp);
00456 switch (stat) {
00457 case PGRES_COMMAND_OK:
00458 case PGRES_NONFATAL_ERROR:
00459 case PGRES_TUPLES_OK:
00460 INFO("postgres: Command %s on server %s still exists, reusing\n",
00461 pcmd->name, puri->host);
00462
00463 continue;
00464 default:
00465 break;
00466 }
00467
00468
00469 INFO("postgres: Command %s on server %s missing, uploading\n",
00470 pcmd->name, puri->host);
00471 err = upload_cmd(cmd);
00472 if (err < 0) {
00473 continue;
00474 } else if (err > 0) {
00475
00476
00477
00478 return 1;
00479 }
00480 }
00481
00482 INFO("postgres: Failed to execute command %s on server %s, giving up\n",
00483 pcmd->name, puri->host);
00484 return -1;
00485 }
00486
00487
00488 int pg_cmd_first(db_res_t* res)
00489 {
00490 struct pg_res* pres;
00491
00492 pres = DB_GET_PAYLOAD(res);
00493
00494 if (pres->rows <= 0) return 1;
00495 pres->row = 0;
00496 return pg_cmd_next(res);
00497 }
00498
00499
00500 int pg_cmd_next(db_res_t* res)
00501 {
00502 struct pg_res* pres;
00503 struct pg_con* pcon;
00504
00505 pres = DB_GET_PAYLOAD(res);
00506 pcon = DB_GET_PAYLOAD(res->cmd->ctx->con[db_payload_idx]);
00507
00508 if (pres->row >= pres->rows) return 1;
00509
00510 if (pg_pg2fld(res->cmd->result, pres->res, pres->row, pcon->oid, pcon->flags)) return -1;
00511 res->cur_rec->fld = res->cmd->result;
00512 pres->row++;
00513 return 0;
00514 }
00515