00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include <stdio.h>
00030 #include <stdlib.h>
00031 #include <unistd.h>
00032 #include <sys/time.h>
00033 #include <poll.h>
00034 #include <iostream>
00035 #include <boost/lexical_cast.hpp>
00036 #include <protocol/TBinaryProtocol.h>
00037 #include <transport/TSocket.h>
00038 #include <transport/TTransportUtils.h>
00039
00040 extern "C" {
00041 #include "../../timer.h"
00042 #include "../../mem/mem.h"
00043 #include "dbcassa_table.h"
00044 }
00045
00046 #include "Cassandra.h"
00047 #include "dbcassa_base.h"
00048
00049 namespace at = apache::thrift;
00050 namespace att = apache::thrift::transport;
00051 namespace atp = apache::thrift::protocol;
00052 namespace oac = org::apache::cassandra;
00053
00054 static const char cassa_key_delim = ' ';
00055 static const int cassa_max_key_len= 512;
00056
00057 #define MAX_ROWS_NO 128
00058 int row_slices[MAX_ROWS_NO][2];
00059
00060
00061
00062
00063
00064 struct cassa_con {
00065 struct db_id* id;
00066 unsigned int ref;
00067 struct pool_con* next;
00069 str db_name;
00070 oac::CassandraClient* con;
00071 };
00072
00073 #define CON_CASSA(db_con) ((struct cassa_con*)db_con->tail)
00074
00079 oac::CassandraClient* dbcassa_open(struct db_id* id)
00080 {
00081 try {
00082 boost::shared_ptr<att::TSocket> socket(new att::TSocket(id->host, id->port));
00083 boost::shared_ptr<att::TTransport> transport(new att::TFramedTransport (socket));
00084 boost::shared_ptr<atp::TProtocol> protocol(new atp::TBinaryProtocol(transport));
00085
00086 socket->setConnTimeout(cassa_conn_timeout);
00087 socket->setSendTimeout(cassa_send_timeout);
00088 socket->setRecvTimeout(cassa_recv_timeout);
00089
00090 std::auto_ptr<oac::CassandraClient> cassa_client(new oac::CassandraClient(protocol));
00091
00092 transport->open();
00093 if (!transport->isOpen()) {
00094 LM_ERR("Failed to open transport to Cassandra\n");
00095 return 0;
00096 }
00097
00098
00099
00100 cassa_client->set_keyspace(id->database);
00101 if(id->username && id->password) {
00102 oac::AuthenticationRequest au_req;
00103 std::map<std::string, std::string> cred;
00104 cred.insert(std::pair<std::string, std::string>("username", id->username));
00105 cred.insert(std::pair<std::string, std::string>("password", id->password));
00106 au_req.credentials = cred;
00107 try {
00108 cassa_client->login(au_req);
00109 } catch (const oac::AuthenticationException& autx) {
00110 LM_ERR("Authentication failure: Credentials not valid, %s\n", autx.why.c_str());
00111 } catch (const oac::AuthorizationException & auzx) {
00112 LM_ERR("Authentication failure: Credentials not valid for the selected database, %s\n", auzx.why.c_str());
00113 }
00114 }
00115
00116 LM_DBG("Opened connection to Cassandra cluster %s:%d\n", id->host, id->port);
00117 return cassa_client.release();
00118
00119 } catch (const oac::InvalidRequestException &irx) {
00120 LM_ERR("Database does not exist %s, %s\n", id->database, irx.why.c_str());
00121 } catch (const at::TException &tx) {
00122 LM_ERR("Failed to open connection to Cassandra cluster %s:%d, %s\n",
00123 id->database, id->port, tx.what());
00124 } catch (const std::exception &ex) {
00125 LM_ERR("Failed: %s\n", ex.what());
00126 } catch (...) {
00127 LM_ERR("Failed to open connection to Cassandra cluster\n");
00128 }
00129
00130 return 0;
00131 }
00132
00137 void* db_cassa_new_connection(struct db_id* id)
00138 {
00139 struct cassa_con* ptr;
00140
00141 if (!id) {
00142 LM_ERR("invalid db_id parameter value\n");
00143 return 0;
00144 }
00145
00146 if (id->port) {
00147 LM_DBG("opening connection: cassa://xxxx:xxxx@%s:%d/%s\n", ZSW(id->host),
00148 id->port, ZSW(id->database));
00149 } else {
00150 LM_DBG("opening connection: cassa://xxxx:xxxx@%s/%s\n", ZSW(id->host),
00151 ZSW(id->database));
00152 }
00153
00154 ptr = (struct cassa_con*)pkg_malloc(sizeof(struct cassa_con));
00155 if (!ptr) {
00156 LM_ERR("failed trying to allocated %lu bytes for connection structure."
00157 "\n", (unsigned long)sizeof(struct cassa_con));
00158 return 0;
00159 }
00160 LM_DBG("%p=pkg_malloc(%lu)\n", ptr, (unsigned long)sizeof(struct cassa_con));
00161
00162 memset(ptr, 0, sizeof(struct cassa_con));
00163
00164 ptr->db_name.s = id->database;
00165 ptr->db_name.len = strlen(id->database);
00166 ptr->id = id;
00167 ptr->ref = 1;
00168
00169 ptr->con = dbcassa_open(id);
00170 if(!ptr->con) {
00171 LM_ERR("Failed to open connection to Cassandra cluster\n");
00172 pkg_free(ptr);
00173 return 0;
00174 }
00175 return ptr;
00176 }
00177
00178
00183 void dbcassa_close(oac::CassandraClient* con)
00184 {
00185 if(! con) return;
00186
00187 delete con;
00188 }
00189
00194 void db_cassa_free_connection(struct pool_con* con)
00195 {
00196 struct cassa_con * _c;
00197
00198 if (!con) return;
00199
00200 _c = (struct cassa_con*) con;
00201 dbcassa_close(_c->con);
00202 pkg_free(_c);
00203 }
00204
00209 void dbcassa_reconnect(struct cassa_con* con)
00210 {
00211 dbcassa_close(con->con);
00212 con->con = dbcassa_open(con->id);
00213 }
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223 static int cassa_get_res_col(std::vector<oac::ColumnOrSuperColumn> result, int r_si, int r_fi, int prefix_len, db_key_t qcol)
00224 {
00225 str res_col_name;
00226
00227 for (int i = r_si; i< r_fi; i++) {
00228 res_col_name.s = (char*)result[i].column.name.c_str()+prefix_len;
00229 res_col_name.len = (int)result[i].column.name.size() - prefix_len;
00230
00231 if(res_col_name.len == qcol->len &&
00232 strncmp(res_col_name.s, qcol->s, qcol->len )==0)
00233 return i;
00234 }
00235 return -1;
00236 }
00237
00238 static int cassa_convert_result(db_key_t qcol, std::vector<oac::ColumnOrSuperColumn> result,
00239 int r_si, int r_fi, int prefix_len, db_val_t* sr_cell)
00240 {
00241 str col_val;
00242 int idx_rescol;
00243 oac::Column res_col;
00244
00245 idx_rescol = cassa_get_res_col(result, r_si, r_fi, prefix_len, qcol);
00246 if(idx_rescol< 0) {
00247 LM_DBG("Column not found in result %.*s\n", qcol->len, qcol->s);
00248 sr_cell->nul = 1;
00249 return 0;
00250 }
00251 res_col = result[idx_rescol].column;
00252
00253 col_val.s = (char*)res_col.value.c_str();
00254 if(!col_val.s) {
00255 LM_DBG("Column not found in result %.*s- NULL\n", qcol->len, qcol->s);
00256 sr_cell->nul = 1;
00257 return 0;
00258 }
00259 col_val.len = strlen(col_val.s);
00260
00261 sr_cell->nul = 0;
00262 sr_cell->free = 0;
00263
00264 switch (sr_cell->type) {
00265 case DB1_INT:
00266 if(str2int(&col_val, (unsigned int*)&sr_cell->val.int_val) < 0) {
00267 LM_ERR("Wrong value [%s] - len=%d, expected integer\n", col_val.s, col_val.len);
00268 return -1;
00269 }
00270 break;
00271 case DB1_BIGINT:
00272 if(sscanf(col_val.s, "%lld", &sr_cell->val.ll_val) < 0) {
00273 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00274 return -1;
00275 }
00276 break;
00277 case DB1_DOUBLE:
00278 if(sscanf(col_val.s, "%lf", &sr_cell->val.double_val) < 0) {
00279 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00280 return -1;
00281 }
00282 break;
00283 case DB1_STR:
00284 pkg_str_dup(&sr_cell->val.str_val, &col_val);
00285 sr_cell->free = 1;
00286 break;
00287 case DB1_STRING:
00288 col_val.len++;
00289 pkg_str_dup(&sr_cell->val.str_val, &col_val);
00290 sr_cell->val.str_val.len--;
00291 sr_cell->val.str_val.s[col_val.len-1]='\0';
00292 sr_cell->free = 1;
00293 break;
00294 case DB1_BLOB:
00295 pkg_str_dup(&sr_cell->val.blob_val, &col_val);
00296 sr_cell->free = 1;
00297 break;
00298 case DB1_BITMAP:
00299 if(str2int(&col_val, &sr_cell->val.bitmap_val) < 0) {
00300 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00301 return -1;
00302 }
00303 break;
00304 case DB1_DATETIME:
00305 if(sscanf(col_val.s, "%ld", (long int*)&sr_cell->val.time_val) < 0) {
00306 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00307 return -1;
00308 }
00309 break;
00310 }
00311 return 0;
00312 }
00313
00314 static char* dbval_to_string(db_val_t dbval, char* pk)
00315 {
00316 switch(dbval.type) {
00317 case DB1_STRING: strcpy(pk, dbval.val.string_val);
00318 pk+= strlen(dbval.val.string_val);
00319 break;
00320 case DB1_STR: memcpy(pk, dbval.val.str_val.s, dbval.val.str_val.len);
00321 pk+= dbval.val.str_val.len;
00322 break;
00323 case DB1_INT: pk+= sprintf(pk, "%d", dbval.val.int_val);
00324 break;
00325 case DB1_BIGINT: pk+= sprintf(pk, "%lld", dbval.val.ll_val);
00326 break;
00327 case DB1_DOUBLE: pk+= sprintf(pk, "%lf", dbval.val.double_val);
00328 break;
00329 case DB1_BLOB: pk+= sprintf(pk, "%.*s", dbval.val.blob_val.len, dbval.val.blob_val.s);
00330 break;
00331 case DB1_BITMAP: pk+= sprintf(pk, "%u", dbval.val.bitmap_val);
00332 break;
00333 case DB1_DATETIME:pk+= sprintf(pk, "%ld", (long int)dbval.val.time_val);
00334 break;
00335 }
00336 return pk;
00337 }
00338
00339
00340 int cassa_constr_key( const db_key_t* _k, const db_val_t* _v,
00341 int _n, int key_len, dbcassa_column_p* key_array, int *no_kc, char* key)
00342 {
00343 int i, j;
00344 char* pk = key;
00345
00346 if(!key_array)
00347 return 0;
00348
00349 for(j = 0; j< _n; j++) {
00350 LM_DBG("query col = %.*s\n", _k[j]->len, _k[j]->s);
00351 }
00352
00353 for(i = 0; i< key_len; i++) {
00354
00355 for(j = 0; j< _n; j++) {
00356 if(_k[j]->len == key_array[i]->name.len &&
00357 !strncmp(_k[j]->s, key_array[i]->name.s, _k[j]->len))
00358 break;
00359 }
00360 if(j == _n) {
00361 LM_ERR("The key column with name [%.*s] not found in values\n", key_array[i]->name.len, key_array[i]->name.s);
00362 break;
00363 }
00364 pk= dbval_to_string(_v[j], pk);
00365 *(pk++) = cassa_key_delim;
00366 }
00367 if(pk > key)
00368 *(--pk) = '\0';
00369 else
00370 *key = '\0';
00371
00372 if(no_kc)
00373 *no_kc = i;
00374
00375 LM_DBG("key = %s\n", key);
00376
00377 return pk - key;
00378 }
00379
00380
00381 int cassa_result_separate_rows(std::vector<oac::ColumnOrSuperColumn> result) {
00382 int rows_no =0, i = 0;
00383 int res_size = result.size();
00384
00385 while(i< res_size) {
00386 size_t found;
00387 std::string curr_seckey;
00388
00389 found = result[i].column.name.find(cassa_key_delim);
00390 if(found< 0) {
00391 LM_ERR("Wrong formated column name - secondary key part not found [%s]\n",
00392 result[i].column.name.c_str());
00393 return -1;
00394 }
00395 curr_seckey = result[i].column.name.substr(0, found);
00396
00397 while(++i < res_size) {
00398 if(result[i].column.name.compare(0, found, curr_seckey)) {
00399 LM_DBG("Encountered a new secondary key %s - %s\n", result[i].column.name.c_str(), curr_seckey.c_str());
00400 break;
00401 }
00402 }
00403
00404 row_slices[rows_no][0] = i;
00405 row_slices[rows_no][1] = found +1;
00406 rows_no++;
00407 }
00408
00409
00410 for(int i = 0; i< rows_no; i++) {
00411 LM_DBG("Row %d until index %d with prefix len %d\n", i, row_slices[i][0], row_slices[i][1]);
00412 }
00413
00414 return rows_no;
00415 }
00416
00417 dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
00418 {
00419 dbcassa_column_p colp;
00420
00421 colp = tbc->cols;
00422 while(colp) {
00423 if(colp->name.len == col_name->len && !strncmp(colp->name.s, col_name->s, col_name->len))
00424 return colp;
00425 colp = colp->next;
00426 }
00427 return 0;
00428 }
00429
00430 typedef std::vector<oac::ColumnOrSuperColumn> ColumnVec;
00431 typedef std::auto_ptr<ColumnVec> ColumnVecPtr;
00432
00433 ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
00434 const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
00435 {
00436 char row_key[cassa_max_key_len];
00437 char sec_key[cassa_max_key_len];
00438 int key_len=0, seckey_len = 0;
00439 int no_kc, no_sec_kc;
00440 dbcassa_table_p tbc;
00441
00443 if(_k) {
00444 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00445 if(!tbc) {
00446 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00447 return ColumnVecPtr(NULL);
00448 }
00449 cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
00450
00451 if(no_kc != tbc->key_len) {
00452 LM_ERR("Query not supported - key not provided\n");
00453 dbcassa_lock_release(tbc);
00454 return ColumnVecPtr(NULL);
00455 }
00456 key_len = tbc->key_len;
00457
00458 cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
00459 seckey_len = tbc->seckey_len;
00460
00461 dbcassa_lock_release(tbc);
00462 }
00463
00464 try {
00465 oac::SlicePredicate sp;
00466 if(seckey_len) {
00467 if(no_sec_kc == seckey_len) {
00468 if(_c) {
00469
00470 for(int i=0; i< _nc; i++) {
00471 std::string col_name = sec_key;
00472 col_name.push_back(cassa_key_delim);
00473 col_name.append(_c[i]->s);
00474 sp.column_names.push_back(col_name);
00475 LM_DBG("Query col: %s\n", col_name.c_str());
00476 }
00477 sp.__isset.column_names = true;
00478 } else {
00479 oac::SliceRange sr;
00480 sr.start = sec_key;
00481 sr.start.push_back(cassa_key_delim);
00482 sr.finish = sec_key;
00483 sr.finish.push_back(cassa_key_delim +1);
00484 sp.slice_range = sr;
00485 sp.__isset.slice_range = true;
00486 }
00487 } else {
00488 oac::SliceRange sr;
00489 sr.start = "";
00490 sr.finish = "";
00491 sp.slice_range = sr;
00492 sp.__isset.slice_range = true;
00493 }
00494 } else {
00495 if(_c) {
00496 for(int i=0; i< _nc; i++) {
00497 sp.column_names.push_back(_c[i]->s);
00498 LM_DBG("Query col: %s\n", _c[i]->s);
00499 }
00500 LM_DBG("get %d columns\n", _nc);
00501 sp.__isset.column_names = true;
00502 } else {
00503
00504 oac::SliceRange sr;
00505 sr.start = "";
00506 sr.finish = "";
00507 sp.slice_range = sr;
00508 sp.__isset.slice_range = true;
00509 LM_DBG("get all columns\n");
00510 }
00511 }
00512
00513 unsigned int retr = 0;
00514 oac::ColumnParent cparent;
00515 cparent.column_family = _h->table->s;
00516 ColumnVecPtr cassa_result(new std::vector<oac::ColumnOrSuperColumn>);
00517 do {
00518 if(CON_CASSA(_h)->con) {
00519 try {
00520
00521 if(_k) {
00522 CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
00523 *ret_rows_no = 1;
00524 } else {
00525 oac::KeyRange keyRange;
00526 keyRange.start_key = "";
00527 keyRange.start_key = "";
00528 std::vector<oac::KeySlice> key_slice_vect;
00529 keyRange.__isset.start_key = 1;
00530 keyRange.__isset.end_key = 1;
00531 ColumnVec::iterator it = cassa_result->begin();
00532
00533
00534 int rows_no =0;
00535 while(1) {
00536 CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
00537
00538 LM_DBG("Retuned %d key slices\n", key_slice_vect.size());
00539 for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
00540 if(key_slice_vect[i].columns.size()==0) {
00541 continue;
00542 }
00543 cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
00544 it = cassa_result->begin();
00545 row_slices[rows_no][0] = cassa_result->size();
00546 row_slices[rows_no][1] = 0;
00547 rows_no++;
00548 }
00549 if(key_slice_vect.size() < (unsigned int)keyRange.count)
00550 break;
00551 }
00552
00553 *ret_rows_no = rows_no;
00554 }
00555
00556 return cassa_result;
00557 } catch (const att::TTransportException &tx) {
00558 LM_ERR("Failed to query: %s\n", tx.what());
00559 }
00560 }
00561 dbcassa_reconnect(CON_CASSA(_h));
00562 } while(cassa_auto_reconnect && retr++ < cassa_retries);
00563
00564 } catch (const oac::InvalidRequestException ir) {
00565 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
00566 } catch (const at::TException &tx) {
00567 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
00568 } catch (const std::exception &ex) {
00569 LM_ERR("Failed std error: %s\n", ex.what());
00570 } catch (...) {
00571 LM_ERR("Failed generic error\n");
00572 }
00573
00574 LM_DBG("Query with get slice no_kc=%d tbc->key_len=%d _n=%d\n", no_kc, key_len,_n);
00575 return ColumnVecPtr(NULL);
00576 }
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
00595 const db_val_t* _v, const db_key_t* _c, int _n, int _nc,
00596 const db_key_t _o, db1_res_t** _r)
00597 {
00598 db1_res_t* db_res = 0;
00599 int rows_no;
00600 ColumnVecPtr cassa_result;
00601 dbcassa_table_p tbc;
00602 int seckey_len;
00603
00604 if (!_h || !CON_TABLE(_h) || !_r) {
00605 LM_ERR("invalid parameter value\n");
00606 return -1;
00607 }
00608 LM_DBG("query table=%s\n", _h->table->s);
00609
00612 cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
00613
00614 if(cassa_result.get() == NULL) {
00615 LM_ERR("Failed to query Cassandra cluster\n");
00616 return -1;
00617 }
00618
00619
00620
00621
00622
00623
00624 db_res = db_new_result();
00625 if (!db_res) {
00626 LM_ERR("no memory left\n");
00627 goto error;
00628 }
00629 RES_COL_N(db_res)= _nc;
00630 if(!db_allocate_columns(db_res, _nc) < 0) {
00631 LM_ERR("no more memory\n");
00632 goto error;
00633 }
00634
00635 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00636 if(!tbc) {
00637 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00638 return -1;
00639 }
00640
00642
00643 for(int col = 0; col < _nc; col++) {
00644 RES_NAMES(db_res)[col] = (str*)pkg_malloc(sizeof(str));
00645 if (! RES_NAMES(db_res)[col]) {
00646 LM_ERR("no private memory left\n");
00647 dbcassa_lock_release(tbc);
00648 db_free_columns(db_res);
00649 goto error;
00650 }
00651
00652 *RES_NAMES(db_res)[col] = *_c[col];
00653
00654
00655 dbcassa_column_p colp = cassa_search_col(tbc, _c[col]);
00656 if(!colp) {
00657 LM_ERR("No column with name [%.*s] found\n", _c[col]->len, _c[col]->s);
00658 dbcassa_lock_release(tbc);
00659 db_free_columns(db_res);
00660 goto error;
00661 }
00662 RES_TYPES(db_res)[col] = colp->type;
00663
00664 LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
00665 RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
00666 }
00667
00668 seckey_len = tbc->seckey_len;
00669 dbcassa_lock_release(tbc);
00670
00671 if(!cassa_result->size()) {
00672 LM_DBG("The query returned no result\n");
00673 RES_ROW_N(db_res) = 0;
00674 goto done;
00675 }
00676
00677
00678 if(rows_no == 1) {
00679 row_slices[0][0]= cassa_result->size();
00680 row_slices[0][1]= 0;
00681
00682 if(seckey_len) {
00683
00684 rows_no = cassa_result_separate_rows(*cassa_result);
00685 if(rows_no < 0) {
00686 LM_ERR("Wrong formated column names\n");
00687 goto error;
00688 }
00689 }
00690 }
00691
00692 RES_ROW_N(db_res) = rows_no;
00693
00694 if (db_allocate_rows(db_res) < 0) {
00695 LM_ERR("could not allocate rows");
00696 goto error;
00697 }
00698
00699 for(int ri=0; ri < rows_no; ri++) {
00700 if (db_allocate_row(db_res, &(RES_ROWS(db_res)[ri])) != 0) {
00701 LM_ERR("could not allocate row");
00702 goto error;
00703 }
00704
00705
00706 for(int col = 0; col< _nc; col++) {
00707 RES_ROWS(db_res)[ri].values[col].type = RES_TYPES(db_res)[col];
00708 cassa_convert_result(_c[col], *cassa_result, (ri>0?row_slices[ri-1][0]:0), row_slices[ri][0],
00709 row_slices[ri][1], &RES_ROWS(db_res)[ri].values[col]);
00710 }
00711 }
00712
00713 done:
00714 *_r = db_res;
00715 LM_DBG("Exited with success\n");
00716 return 1;
00717
00718 error:
00719 if(db_res)
00720 db_free_result(db_res);
00721 return -1;
00722 }
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735 int db_cassa_modify(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
00736 const db_key_t* _uk, const db_val_t* _uv, int _n, int _un)
00737 {
00738 dbcassa_table_p tbc;
00739 char row_key[cassa_max_key_len];
00740 char sec_key[cassa_max_key_len];
00741 int64_t ts = 0;
00742 str ts_col_name={0, 0};
00743 int seckey_len;
00744 unsigned int curr_time = time(NULL);
00745
00746 if (!_h || !CON_TABLE(_h) || !_k || !_v) {
00747 LM_ERR("invalid parameter value\n");
00748 return -1;
00749 }
00750
00751 LM_DBG("modify table=%s\n", _h->table->s);
00752
00754 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00755 if(!tbc) {
00756 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00757 return -1;
00758 }
00759 if(tbc->ts_col)
00760 pkg_str_dup(&ts_col_name, (const str*)&tbc->ts_col->name);
00761
00762 cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, 0, row_key);
00763 cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, 0, sec_key);
00764 seckey_len = tbc->seckey_len;
00765
00766 dbcassa_lock_release(tbc);
00767
00769 try {
00770
00771 std::vector<oac::Mutation> mutations;
00772 for(int i=0; i< _un; i++) {
00773 if(_uv[i].nul)
00774 continue;
00775
00776 std::stringstream out;
00777 std::string value;
00778 int cont = 0;
00779
00780 switch(_uv[i].type) {
00781 case DB1_INT: out << _uv[i].val.int_val;
00782 value = out.str();
00783 break;
00784 case DB1_BIGINT:out << _uv[i].val.ll_val;
00785 value = out.str();
00786 break;
00787 case DB1_DOUBLE:out << _uv[i].val.double_val;
00788 value = out.str();
00789 break;
00790 case DB1_BITMAP:out << _uv[i].val.bitmap_val;
00791 value = out.str();
00792 break;
00793 case DB1_STRING:value = _uv[i].val.string_val;
00794 break;
00795 case DB1_STR: if(!_uv[i].val.str_val.s) {
00796 cont = 1;
00797 break;
00798 }
00799 value = std::string(_uv[i].val.str_val.s, _uv[i].val.str_val.len);
00800 break;
00801 case DB1_BLOB: value = std::string(_uv[i].val.blob_val.s, _uv[i].val.blob_val.len);
00802 break;
00803 case DB1_DATETIME: unsigned int exp_time = (unsigned int)_uv[i].val.time_val;
00804 out << exp_time;
00805 value = out.str();
00806 if(ts_col_name.s && ts_col_name.len==_uk[i]->len &&
00807 strncmp(ts_col_name.s, _uk[i]->s, ts_col_name.len)==0) {
00808 ts = exp_time;
00809 LM_DBG("Found timestamp col [%.*s]\n", ts_col_name.len, ts_col_name.s);
00810 }
00811 break;
00812 }
00813 if (cont)
00814 continue;
00815
00816 LM_DBG("ADDED column [%.*s] type [%d], value [%s]\n", _uk[i]->len, _uk[i]->s,
00817 _uv[i].type, value.c_str());
00818
00819 oac::Mutation mut;
00820 oac::ColumnOrSuperColumn col;
00821 if(seckey_len) {
00822 col.column.name = sec_key;
00823 col.column.name.push_back(cassa_key_delim);
00824 col.column.name.append(_uk[i]->s);
00825 }
00826 else
00827 col.column.name = _uk[i]->s;
00828 col.column.value = value;
00829 col.column.__isset.value = true;
00830 col.__isset.column = true;
00831 col.column.timestamp = curr_time;
00832 col.column.__isset.timestamp = true;
00833 mut.column_or_supercolumn = col;
00834 mut.__isset.column_or_supercolumn = true;
00835 mutations.push_back(mut);
00836 }
00837 if(ts_col_name.s)
00838 pkg_free(ts_col_name.s);
00839 ts_col_name.s = 0;
00840
00841 if(ts) {
00842 int32_t ttl = ts - curr_time;
00843 LM_DBG("Set expires to %d seconds\n", ttl);
00844 for(size_t mi=0; mi< mutations.size(); mi++) {
00845 mutations[mi].column_or_supercolumn.column.ttl = ttl;
00846 mutations[mi].column_or_supercolumn.column.__isset.ttl = true;
00847 }
00848 }
00849
00850 LM_DBG("Perform the mutation, add [%d] columns\n", (int)mutations.size());
00851
00852 std::map<std::string, std::vector<oac::Mutation> > innerMap;
00853 innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
00854 std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
00855 CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
00856 unsigned int retr = 0;
00857
00858 do {
00859 if(CON_CASSA(_h)->con) {
00860 try{
00861 CON_CASSA(_h)->con->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
00862 return 1;
00863 } catch (const att::TTransportException &tx) {
00864 LM_ERR("Failed to query: %s\n", tx.what());
00865 }
00866 }
00867 dbcassa_reconnect(CON_CASSA(_h));
00868 } while (cassa_auto_reconnect && retr++ < cassa_retries);
00869
00870 } catch (const oac::InvalidRequestException ir) {
00871 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
00872 } catch (const at::TException &tx) {
00873 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
00874 } catch (const std::exception &ex) {
00875 LM_ERR("Failed std error: %s\n", ex.what());
00876 } catch (...) {
00877 LM_ERR("Failed generic error\n");
00878 }
00879
00880 LM_ERR("Insert/Update query failed\n");
00881 return -1;
00882 }
00883
00884
00885 int db_cassa_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
00886 int _n)
00887 {
00888 LM_DBG("db_cassa_insert:\n");
00889 return db_cassa_modify(_h, _k, _v, _k, _v, _n, _n);
00890 }
00891
00892
00893 int db_cassa_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00894 const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv,
00895 int _n, int _un)
00896 {
00897 LM_DBG("db_cassa_update:\n");
00898 return db_cassa_modify(_h, _k, _v, _uk, _uv, _n, _un);
00899 }
00900
00901
00902 int db_cassa_free_result(db1_con_t* _h, db1_res_t* _r)
00903 {
00904 return db_free_result(_r);
00905 }
00906
00907
00908
00909
00910
00911
00912
00913
00914
00915 int db_cassa_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00916 const db_val_t* _v, int _n)
00917 {
00918 oac::CassandraClient* cassa_client = CON_CASSA(_h)->con;
00919 char row_key[cassa_max_key_len];
00920 char sec_key[cassa_max_key_len];
00921 dbcassa_table_p tbc;
00922 int no_kc, no_sec_kc;
00923 unsigned int retr = 0;
00924 int seckey_len;
00925 oac::Mutation m;
00926
00927 if (!_h || !CON_TABLE(_h) || !_k || !_v) {
00928 LM_ERR("invalid parameter value\n");
00929 return -1;
00930 }
00931
00932 LM_DBG("query table=%s\n", _h->table->s);
00933
00934
00935 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00936 if(!tbc)
00937 {
00938 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00939 return -1;
00940 }
00941
00942 cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
00943 cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
00944 seckey_len = tbc->seckey_len;
00945
00946 if (_n != no_kc && no_sec_kc == seckey_len) {
00947
00948 LM_DBG("Delete after primary and secondary key %s %s\n", row_key, sec_key);
00949 dbcassa_column_p colp = tbc->cols;
00950 try {
00951 while(colp) {
00952 std::string col_name = sec_key;
00953 col_name.push_back(cassa_key_delim);
00954 col_name.append(colp->name.s);
00955 m.deletion.predicate.column_names.push_back(col_name);
00956 colp = colp->next;
00957 }
00958 } catch (...) {
00959 LM_ERR("Failed to construct the list of column names\n");
00960 dbcassa_lock_release(tbc);
00961 return -1;
00962 }
00963 }
00964
00965 dbcassa_lock_release(tbc);
00966
00967 for(int i=0; i < _n; i++)
00968 LM_DBG("delete query col = %.*s\n", _k[i]->len, _k[i]->s);
00969
00970 if(no_kc == 0 ) {
00971 LM_DBG("Delete operation not supported\n");
00972 return -1;
00973 }
00974
00975 try {
00976 if (_n == no_kc) {
00977 LM_DBG("Delete after row key %s\n", row_key);
00978 oac::ColumnPath cp;
00979 cp.column_family = _h->table->s;
00980 do {
00981 if(CON_CASSA(_h)->con) {
00982 try {
00983 cassa_client->remove(row_key, cp, (int64_t)time(0), oac::ConsistencyLevel::ONE);
00984 return 1;
00985 } catch (const att::TTransportException &tx) {
00986 LM_ERR("Failed to query: %s\n", tx.what());
00987 }
00988 }
00989 dbcassa_reconnect(CON_CASSA(_h));
00990 } while(cassa_auto_reconnect && retr++ < cassa_retries);
00991 } else {
00992
00993 if(!seckey_len) {
00994 LM_ERR("Delete operation not supported\n");
00995 return -1;
00996 }
00997
00998
00999 m.deletion.timestamp = (int64_t)time(0);
01000 m.deletion.__isset.timestamp = true;
01001 m.__isset.deletion = true;
01002
01003 #if 0
01004
01005 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
01006 if(!tbc)
01007 {
01008 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
01009 return -1;
01010 }
01011 dbcassa_column_p colp = tbc->cols;
01012 try {
01013 while(colp) {
01014 std::string col_name = sec_key;
01015 col_name.push_back(cassa_key_delim);
01016 col_name.append(colp->name.s);
01017 m.deletion.predicate.column_names.push_back(col_name);
01018 colp = colp->next;
01019 }
01020 } catch (...) {
01021 LM_ERR("Failed to construct the list of column names\n");
01022 dbcassa_lock_release(tbc);
01023 return -1;
01024 }
01025 dbcassa_lock_release(tbc);
01026 #endif
01027 m.deletion.__isset.predicate = true;
01028 m.deletion.predicate.__isset.column_names = true;
01029
01030 std::vector<oac::Mutation> mutations;
01031 mutations.push_back(m);
01032
01033
01034 std::map<std::string, std::vector<oac::Mutation> > innerMap;
01035 innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
01036 std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
01037 CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
01038
01039 do {
01040 if(CON_CASSA(_h)->con) {
01041 try {
01042 cassa_client->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
01043 return 1;
01044 } catch (const att::TTransportException &tx) {
01045 LM_ERR("Failed to query: %s\n", tx.what());
01046 }
01047 }
01048 dbcassa_reconnect(CON_CASSA(_h));
01049 } while(cassa_auto_reconnect && retr++ < cassa_retries);
01050 }
01051 return 1;
01052 } catch (const oac::InvalidRequestException ir) {
01053 LM_ERR("Invalid query: %s\n", ir.why.c_str());
01054 } catch (const at::TException &tx) {
01055 LM_ERR("Failed TException: %s\n", tx.what());
01056 } catch (std::exception &e) {
01057 LM_ERR("Failed: %s\n", e.what());
01058 } catch (...) {
01059 LM_ERR("Failed generic error\n");
01060 }
01061
01062 return -1;
01063 }
01064