00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00068 #define MAXCOLUMNS 512
00069
00070 #include <string.h>
00071 #include <stdio.h>
00072 #include <stdlib.h>
00073 #include "../../dprint.h"
00074 #include "../../mem/mem.h"
00075 #include "../../lib/srdb1/db.h"
00076 #include "../../lib/srdb1/db_ut.h"
00077 #include "../../lib/srdb1/db_query.h"
00078 #include "../../locking.h"
00079 #include "../../hashes.h"
00080 #include "km_dbase.h"
00081 #include "km_pg_con.h"
00082 #include "km_val.h"
00083 #include "km_res.h"
00084 #include "pg_mod.h"
00085
00086 static gen_lock_set_t *_pg_lock_set = NULL;
00087 static unsigned int _pg_lock_size = 0;
00088
00094 int pg_init_lock_set(int sz)
00095 {
00096 if(sz>0 && sz<=10)
00097 {
00098 _pg_lock_size = 1<<sz;
00099 } else {
00100 _pg_lock_size = 1<<4;
00101 }
00102 _pg_lock_set = lock_set_alloc(_pg_lock_size);
00103 if(_pg_lock_set==NULL || lock_set_init(_pg_lock_set)==NULL)
00104 {
00105 LM_ERR("cannot initiate lock set\n");
00106 return -1;
00107 }
00108 return 0;
00109 }
00110
00111 void pg_destroy_lock_set(void)
00112 {
00113 if(_pg_lock_set!=NULL)
00114 {
00115 lock_set_destroy(_pg_lock_set);
00116 lock_set_dealloc(_pg_lock_set);
00117 _pg_lock_set = NULL;
00118 _pg_lock_size = 0;
00119 }
00120 }
00121
00122 static void db_postgres_free_query(const db1_con_t* _con);
00123
00124
00131 db1_con_t *db_postgres_init(const str* _url)
00132 {
00133 return db_do_init(_url, (void*) db_postgres_new_connection);
00134 }
00135
00142 db1_con_t *db_postgres_init2(const str* _url, db_pooling_t pooling)
00143 {
00144 return db_do_init2(_url, (void*) db_postgres_new_connection, pooling);
00145 }
00146
00152 void db_postgres_close(db1_con_t* _h)
00153 {
00154 db_do_close(_h, db_postgres_free_connection);
00155 }
00156
00157
00164 static int db_postgres_submit_query(const db1_con_t* _con, const str* _s)
00165 {
00166 int i, retries;
00167 ExecStatusType pqresult;
00168
00169 if(! _con || !_s || !_s->s)
00170 {
00171 LM_ERR("invalid parameter value\n");
00172 return(-1);
00173 }
00174
00175
00176 switch(PQstatus(CON_CONNECTION(_con)))
00177 {
00178 case CONNECTION_OK:
00179 break;
00180 case CONNECTION_BAD:
00181 LM_DBG("connection reset\n");
00182 PQreset(CON_CONNECTION(_con));
00183 break;
00184 case CONNECTION_STARTED:
00185 case CONNECTION_MADE:
00186 case CONNECTION_AWAITING_RESPONSE:
00187 case CONNECTION_AUTH_OK:
00188 case CONNECTION_SETENV:
00189 case CONNECTION_SSL_STARTUP:
00190 case CONNECTION_NEEDED:
00191 default:
00192 LM_ERR("%p PQstatus(%s) invalid: %.*s\n", _con,
00193 PQerrorMessage(CON_CONNECTION(_con)), _s->len, _s->s);
00194 return -1;
00195 }
00196
00197 if (CON_TRANSACTION(_con) == 1)
00198 retries = 0;
00199 else
00200 retries = pg_retries;
00201
00202 for(i = 0; i <= retries; i++) {
00203
00204 db_postgres_free_query(_con);
00205
00206 if (PQsendQuery(CON_CONNECTION(_con), _s->s)) {
00207 pqresult = PQresultStatus(CON_RESULT(_con));
00208 if((pqresult!=PGRES_FATAL_ERROR)
00209 || (PQstatus(CON_CONNECTION(_con))==CONNECTION_OK))
00210 {
00211 LM_DBG("sending query ok: %p (%d) - [%.*s]\n",
00212 _con, pqresult, _s->len, _s->s);
00213 return 0;
00214 }
00215 LM_WARN("postgres result check failed with code %d (%s)\n",
00216 pqresult, PQresStatus(pqresult));
00217 }
00218 LM_WARN("postgres query command failed, connection status %d,"
00219 " error [%s]\n", PQstatus(CON_CONNECTION(_con)),
00220 PQerrorMessage(CON_CONNECTION(_con)));
00221 if(PQstatus(CON_CONNECTION(_con))!=CONNECTION_OK)
00222 {
00223 LM_DBG("reseting the connection to postgress server\n");
00224 PQreset(CON_CONNECTION(_con));
00225 }
00226 }
00227 LM_ERR("%p PQsendQuery Error: %s Query: %.*s\n", _con,
00228 PQerrorMessage(CON_CONNECTION(_con)), _s->len, _s->s);
00229 return -1;
00230 }
00231
00232
00248 int db_postgres_fetch_result(const db1_con_t* _con, db1_res_t** _res, const int nrows)
00249 {
00250 int rows;
00251 PGresult *res = NULL;
00252 ExecStatusType pqresult;
00253
00254 if (!_con || !_res || nrows < 0) {
00255 LM_ERR("invalid parameter value\n");
00256 return -1;
00257 }
00258
00259
00260 if (nrows == 0) {
00261 if (*_res)
00262 db_free_result(*_res);
00263
00264 *_res = 0;
00265 return 0;
00266 }
00267
00268 if (*_res == NULL) {
00269
00270 *_res = db_new_result();
00271
00272
00273 while (1) {
00274 if ((res = PQgetResult(CON_CONNECTION(_con)))) {
00275 CON_RESULT(_con) = res;
00276 } else {
00277 break;
00278 }
00279 }
00280 pqresult = PQresultStatus(CON_RESULT(_con));
00281 LM_DBG("%p PQresultStatus(%s) PQgetResult(%p)\n", _con,
00282 PQresStatus(pqresult), CON_RESULT(_con));
00283
00284 switch(pqresult) {
00285 case PGRES_COMMAND_OK:
00286
00287
00288 return 0;
00289
00290 case PGRES_TUPLES_OK:
00291
00292
00293 if (db_postgres_get_columns(_con, *_res) < 0) {
00294 LM_ERR("failed to get column names\n");
00295 return -2;
00296 }
00297 break;
00298
00299 case PGRES_FATAL_ERROR:
00300 LM_ERR("%p - invalid query, execution aborted\n", _con);
00301 LM_ERR("%p - PQresultStatus(%s)\n", _con,
00302 PQresStatus(pqresult));
00303 LM_ERR("%p: %s\n", _con,
00304 PQresultErrorMessage(CON_RESULT(_con)));
00305 if (*_res)
00306 db_free_result(*_res);
00307 *_res = 0;
00308 return -3;
00309
00310 case PGRES_EMPTY_QUERY:
00311
00312 case PGRES_NONFATAL_ERROR:
00313
00314 case PGRES_COPY_OUT:
00315 case PGRES_COPY_IN:
00316
00317 case PGRES_BAD_RESPONSE:
00318 default:
00319 LM_ERR("%p - probable invalid query\n", _con);
00320 LM_ERR("%p - PQresultStatus(%s)\n", _con, PQresStatus(pqresult));
00321 LM_ERR("%p: %s\n", _con, PQresultErrorMessage(CON_RESULT(_con)));
00322 if (*_res)
00323 db_free_result(*_res);
00324 *_res = 0;
00325 return -4;
00326 }
00327
00328 } else {
00329 if(RES_ROWS(*_res) != NULL) {
00330 db_free_rows(*_res);
00331 }
00332 RES_ROWS(*_res) = 0;
00333 RES_ROW_N(*_res) = 0;
00334 }
00335
00336
00337 RES_NUM_ROWS(*_res) = PQntuples(CON_RESULT(_con));
00338
00339
00340 rows = RES_NUM_ROWS(*_res) - RES_LAST_ROW(*_res);
00341
00342
00343 if (rows <= 0)
00344 return 0;
00345
00346
00347
00348 if (nrows < rows)
00349 rows = nrows;
00350
00351 RES_ROW_N(*_res) = rows;
00352
00353 LM_DBG("converting row %d of %d count %d\n", RES_LAST_ROW(*_res),
00354 RES_NUM_ROWS(*_res), RES_ROW_N(*_res));
00355
00356 if (db_postgres_convert_rows(_con, *_res) < 0) {
00357 LM_ERR("failed to convert rows\n");
00358 if (*_res)
00359 db_free_result(*_res);
00360
00361 *_res = 0;
00362 return -3;
00363 }
00364
00365
00366 RES_LAST_ROW(*_res) += rows;
00367 return 0;
00368 }
00369
00370
00375 static void db_postgres_free_query(const db1_con_t* _con)
00376 {
00377 if(CON_RESULT(_con))
00378 {
00379 LM_DBG("PQclear(%p) result set\n", CON_RESULT(_con));
00380 PQclear(CON_RESULT(_con));
00381 CON_RESULT(_con) = 0;
00382 }
00383 }
00384
00385
00392 int db_postgres_free_result(db1_con_t* _con, db1_res_t* _r)
00393 {
00394 if ((!_con) || (!_r)) {
00395 LM_ERR("invalid parameter value\n");
00396 return -1;
00397 }
00398 if (db_free_result(_r) < 0) {
00399 LM_ERR("unable to free result structure\n");
00400 return -1;
00401 }
00402 db_postgres_free_query(_con);
00403 return 0;
00404 }
00405
00406
00420 int db_postgres_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
00421 const db_val_t* _v, const db_key_t* _c, const int _n, const int _nc,
00422 const db_key_t _o, db1_res_t** _r)
00423 {
00424 return db_do_query(_h, _k, _op, _v, _c, _n, _nc, _o, _r, db_postgres_val2str,
00425 db_postgres_submit_query, db_postgres_store_result);
00426 }
00427
00428
00436 int db_postgres_raw_query(const db1_con_t* _h, const str* _s, db1_res_t** _r)
00437 {
00438 return db_do_raw_query(_h, _s, _r, db_postgres_submit_query,
00439 db_postgres_store_result);
00440 }
00441
00442
00456 int db_postgres_store_result(const db1_con_t* _con, db1_res_t** _r)
00457 {
00458 PGresult *res = NULL;
00459 ExecStatusType pqresult;
00460 int rc = 0;
00461
00462 *_r = db_new_result();
00463 if (*_r==NULL) {
00464 LM_ERR("failed to init new result\n");
00465 rc = -1;
00466 goto done;
00467 }
00468
00469 while (1) {
00470 if ((res = PQgetResult(CON_CONNECTION(_con)))) {
00471 CON_RESULT(_con) = res;
00472 } else {
00473 break;
00474 }
00475 }
00476
00477 pqresult = PQresultStatus(CON_RESULT(_con));
00478
00479 LM_DBG("%p PQresultStatus(%s) PQgetResult(%p)\n", _con,
00480 PQresStatus(pqresult), CON_RESULT(_con));
00481
00482 CON_AFFECTED(_con) = 0;
00483
00484 switch(pqresult) {
00485 case PGRES_COMMAND_OK:
00486
00487
00488 rc = 0;
00489 CON_AFFECTED(_con) = atoi(PQcmdTuples(CON_RESULT(_con)));
00490 break;
00491
00492 case PGRES_TUPLES_OK:
00493
00494
00495 if (db_postgres_convert_result(_con, *_r) < 0) {
00496 LM_ERR("error while converting result\n");
00497 LM_DBG("freeing result set at %p\n", _r);
00498 pkg_free(*_r);
00499 *_r = 0;
00500 rc = -4;
00501 break;
00502 }
00503 rc = 0;
00504 CON_AFFECTED(_con) = atoi(PQcmdTuples(CON_RESULT(_con)));
00505 break;
00506
00507 case PGRES_FATAL_ERROR:
00508 LM_ERR("invalid query, execution aborted\n");
00509 LM_ERR("driver error: %s, %s\n", PQresStatus(pqresult), PQresultErrorMessage(CON_RESULT(_con)));
00510 db_free_result(*_r);
00511 *_r = 0;
00512 rc = -3;
00513 break;
00514
00515 case PGRES_EMPTY_QUERY:
00516
00517 case PGRES_NONFATAL_ERROR:
00518
00519 case PGRES_COPY_OUT:
00520 case PGRES_COPY_IN:
00521
00522 case PGRES_BAD_RESPONSE:
00523 default:
00524 LM_ERR("probable invalid query, execution aborted\n");
00525 LM_ERR("driver message: %s, %s\n", PQresStatus(pqresult), PQresultErrorMessage(CON_RESULT(_con)));
00526 db_free_result(*_r);
00527 *_r = 0;
00528 rc = -4;
00529 break;
00530 }
00531
00532 done:
00533 db_postgres_free_query(_con);
00534 return (rc);
00535 }
00536
00537
00546 int db_postgres_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
00547 const int _n)
00548 {
00549 db1_res_t* _r = NULL;
00550
00551 int ret = db_do_insert(_h, _k, _v, _n, db_postgres_val2str, db_postgres_submit_query);
00552
00553 int tmp = db_postgres_store_result(_h, &_r);
00554
00555 if (tmp < 0) {
00556 LM_WARN("unexpected result returned");
00557 ret = tmp;
00558 }
00559
00560 if (_r)
00561 db_free_result(_r);
00562
00563 return ret;
00564 }
00565
00566
00576 int db_postgres_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00577 const db_val_t* _v, const int _n)
00578 {
00579 db1_res_t* _r = NULL;
00580 int ret = db_do_delete(_h, _k, _o, _v, _n, db_postgres_val2str,
00581 db_postgres_submit_query);
00582 int tmp = db_postgres_store_result(_h, &_r);
00583
00584 if (tmp < 0) {
00585 LM_WARN("unexpected result returned");
00586 ret = tmp;
00587 }
00588
00589 if (_r)
00590 db_free_result(_r);
00591
00592 return ret;
00593 }
00594
00595
00608 int db_postgres_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00609 const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv, const int _n,
00610 const int _un)
00611 {
00612 db1_res_t* _r = NULL;
00613 int ret = db_do_update(_h, _k, _o, _v, _uk, _uv, _n, _un, db_postgres_val2str,
00614 db_postgres_submit_query);
00615 int tmp = db_postgres_store_result(_h, &_r);
00616
00617 if (tmp < 0) {
00618 LM_WARN("unexpected result returned");
00619 ret = tmp;
00620 }
00621
00622 if (_r)
00623 db_free_result(_r);
00624
00625 return ret;
00626 }
00627
00633 int db_postgres_affected_rows(const db1_con_t* _h)
00634 {
00635 if (!_h) {
00636 LM_ERR("invalid parameter value\n");
00637 return -1;
00638 }
00639 return CON_AFFECTED(_h);
00640 }
00641
00647 int db_postgres_start_transaction(db1_con_t* _h)
00648 {
00649 db1_res_t *res = NULL;
00650 str query_str = str_init("BEGIN");
00651
00652 if (!_h) {
00653 LM_ERR("invalid parameter value\n");
00654 return -1;
00655 }
00656
00657 if (CON_TRANSACTION(_h) == 1) {
00658 LM_ERR("transaction already started\n");
00659 return -1;
00660 }
00661
00662 if (db_postgres_raw_query(_h, &query_str, &res) < 0)
00663 {
00664 LM_ERR("executing raw_query\n");
00665 return -1;
00666 }
00667
00668 if (res) db_postgres_free_result(_h, res);
00669
00670 CON_TRANSACTION(_h) = 1;
00671 return 0;
00672 }
00673
00679 int db_postgres_end_transaction(db1_con_t* _h)
00680 {
00681 db1_res_t *res = NULL;
00682 str query_str = str_init("COMMIT");
00683
00684 if (!_h) {
00685 LM_ERR("invalid parameter value\n");
00686 return -1;
00687 }
00688
00689 if (CON_TRANSACTION(_h) == 0) {
00690 LM_ERR("transaction not in progress\n");
00691 return -1;
00692 }
00693
00694 if (db_postgres_raw_query(_h, &query_str, &res) < 0)
00695 {
00696 LM_ERR("executing raw_query\n");
00697 return -1;
00698 }
00699
00700 if (res) db_postgres_free_result(_h, res);
00701
00702
00703
00704
00705 CON_TRANSACTION(_h) = 0;
00706 return 0;
00707 }
00708
00714 int db_postgres_abort_transaction(db1_con_t* _h)
00715 {
00716 db1_res_t *res = NULL;
00717 str query_str = str_init("ROLLBACK");
00718
00719 if (!_h) {
00720 LM_ERR("invalid parameter value\n");
00721 return -1;
00722 }
00723
00724 if (CON_TRANSACTION(_h) == 0) {
00725 LM_DBG("nothing to rollback\n");
00726 return 0;
00727 }
00728
00729
00730
00731 CON_TRANSACTION(_h) = 0;
00732
00733 if (db_postgres_raw_query(_h, &query_str, &res) < 0)
00734 {
00735 LM_ERR("executing raw_query\n");
00736 return -1;
00737 }
00738
00739 if (res) db_postgres_free_result(_h, res);
00740
00741 return 1;
00742 }
00743
00750 int db_postgres_use_table(db1_con_t* _con, const str* _t)
00751 {
00752 return db_use_table(_con, _t);
00753 }
00754
00755
00766 int db_postgres_replace(const db1_con_t* _h, const db_key_t* _k,
00767 const db_val_t* _v, const int _n, const int _un, const int _m)
00768 {
00769 unsigned int pos = 0;
00770 int i;
00771
00772 if(_un > _n)
00773 {
00774 LM_ERR("number of columns for unique key is too high\n");
00775 return -1;
00776 }
00777
00778 if(_un > 0)
00779 {
00780 for(i=0; i<_un; i++)
00781 {
00782 if(!VAL_NULL(&_v[i]))
00783 {
00784 switch(VAL_TYPE(&_v[i]))
00785 {
00786 case DB1_INT:
00787 pos += VAL_UINT(&_v[i]);
00788 break;
00789 case DB1_STR:
00790 pos += get_hash1_raw((VAL_STR(&_v[i])).s,
00791 (VAL_STR(&_v[i])).len);
00792 break;
00793 case DB1_STRING:
00794 pos += get_hash1_raw(VAL_STRING(&_v[i]),
00795 strlen(VAL_STRING(&_v[i])));
00796 break;
00797 default:
00798 break;
00799 }
00800 }
00801 }
00802 pos &= (_pg_lock_size-1);
00803 lock_set_get(_pg_lock_set, pos);
00804 if(db_postgres_update(_h, _k, 0, _v, _k + _un,
00805 _v + _un, _un, _n -_un)< 0)
00806 {
00807 LM_ERR("update failed\n");
00808 lock_set_release(_pg_lock_set, pos);
00809 return -1;
00810 }
00811
00812 if (db_postgres_affected_rows(_h) <= 0)
00813 {
00814 if(db_postgres_insert(_h, _k, _v, _n)< 0)
00815 {
00816 LM_ERR("insert failed\n");
00817 lock_set_release(_pg_lock_set, pos);
00818 return -1;
00819 }
00820 LM_DBG("inserted new record in database table\n");
00821 } else {
00822 LM_DBG("updated record in database table\n");
00823 }
00824 lock_set_release(_pg_lock_set, pos);
00825 } else {
00826 if(db_postgres_insert(_h, _k, _v, _n)< 0)
00827 {
00828 LM_ERR("direct insert failed\n");
00829 return -1;
00830 }
00831 LM_DBG("directly inserted new record in database table\n");
00832 }
00833 return 0;
00834 }
00835