dbcassa_base.cpp

00001 /*
00002  * $Id$
00003  *
00004  * CASSANDRA module interface
00005  *
00006  * Copyright (C) 2012 1&1 Internet AG
00007  *
00008  * This file is part of Kamailio, a free SIP server.
00009  *
00010  * Kamailio is free software; you can redistribute it and/or modify
00011  * it under the terms of the GNU General Public License as published by
00012  * the Free Software Foundation; either version 2 of the License, or
00013  * (at your option) any later version
00014  *
00015  * Kamailio is distributed in the hope that it will be useful,
00016  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  * GNU General Public License for more details.
00019  *
00020  * You should have received a copy of the GNU General Public License 
00021  * along with this program; if not, write to the Free Software 
00022  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023  *
00024  * History:
00025  * --------
00026  * 2012-01  first version (Anca Vamanu)
00027  */
00028 
00029 #include <stdio.h>
00030 #include <stdlib.h>
00031 #include <unistd.h>
00032 #include <sys/time.h>
00033 #include <poll.h>
00034 #include <iostream>
00035 #include <boost/lexical_cast.hpp>
00036 #include <protocol/TBinaryProtocol.h>
00037 #include <transport/TSocket.h>
00038 #include <transport/TTransportUtils.h>
00039 
00040 extern "C" {
00041 #include "../../timer.h"
00042 #include "../../mem/mem.h"
00043 #include "dbcassa_table.h"
00044 }
00045 
00046 #include "Cassandra.h"
00047 #include "dbcassa_base.h"
00048 
00049 namespace at  = apache::thrift;
00050 namespace att = apache::thrift::transport;
00051 namespace atp = apache::thrift::protocol;
00052 namespace oac = org::apache::cassandra;
00053 
00054 static const char cassa_key_delim  = ' ';
00055 static const int  cassa_max_key_len= 512;
00056 
00057 #define MAX_ROWS_NO        128     /* TODO: make this configurable or dynamic */
00058 int row_slices[MAX_ROWS_NO][2];
00059 
00060 /*
00061  * ----         Cassandra Connection Section               ----
00062  *  */
00063 
00064 struct cassa_con {
00065         struct db_id* id;           
00066         unsigned int ref;           
00067         struct pool_con* next;      
00069         str db_name;                
00070         oac::CassandraClient* con;  
00071 };
00072 
00073 #define CON_CASSA(db_con)    ((struct cassa_con*)db_con->tail)
00074 
00079 oac::CassandraClient* dbcassa_open(struct db_id* id)
00080 {
00081         try {
00082                 boost::shared_ptr<att::TSocket> socket(new att::TSocket(id->host, id->port));
00083                 boost::shared_ptr<att::TTransport> transport(new att::TFramedTransport (socket));
00084                 boost::shared_ptr<atp::TProtocol> protocol(new atp::TBinaryProtocol(transport));
00085 
00086                 socket->setConnTimeout(cassa_conn_timeout);
00087                 socket->setSendTimeout(cassa_send_timeout);
00088                 socket->setRecvTimeout(cassa_recv_timeout);
00089 
00090                 std::auto_ptr<oac::CassandraClient> cassa_client(new oac::CassandraClient(protocol));
00091 
00092                 transport->open();
00093                 if (!transport->isOpen()) {
00094                         LM_ERR("Failed to open transport to Cassandra\n");
00095                         return 0;
00096                 }
00097 
00098                 /* database name ->  keyspace */
00099 
00100                 cassa_client->set_keyspace(id->database);
00101                 if(id->username && id->password) {
00102                         oac::AuthenticationRequest au_req;
00103                         std::map<std::string, std::string>  cred;
00104                         cred.insert(std::pair<std::string, std::string>("username", id->username));
00105                         cred.insert(std::pair<std::string, std::string>("password", id->password));
00106                         au_req.credentials = cred;
00107                         try {
00108                                 cassa_client->login(au_req);
00109                         } catch (const oac::AuthenticationException& autx) {
00110                                 LM_ERR("Authentication failure: Credentials not valid, %s\n", autx.why.c_str());
00111                         } catch (const oac::AuthorizationException & auzx) {
00112                                 LM_ERR("Authentication failure: Credentials not valid for the selected database, %s\n", auzx.why.c_str());
00113                         }
00114                 }
00115 
00116                 LM_DBG("Opened connection to Cassandra cluster  %s:%d\n", id->host, id->port);
00117                 return cassa_client.release();
00118 
00119         } catch (const oac::InvalidRequestException &irx) {
00120                 LM_ERR("Database does not exist %s, %s\n", id->database, irx.why.c_str());
00121         } catch (const at::TException &tx) {
00122                 LM_ERR("Failed to open connection to Cassandra cluster %s:%d, %s\n",
00123                                 id->database, id->port, tx.what());
00124         } catch (const std::exception &ex) {
00125                 LM_ERR("Failed: %s\n", ex.what());
00126         } catch (...) {
00127                 LM_ERR("Failed to open connection to Cassandra cluster\n");
00128         }
00129 
00130         return 0;
00131 }
00132 
00137 void* db_cassa_new_connection(struct db_id* id)
00138 {
00139         struct cassa_con* ptr;
00140 
00141         if (!id) {
00142                 LM_ERR("invalid db_id parameter value\n");
00143                 return 0;
00144         }
00145 
00146         if (id->port) {
00147                 LM_DBG("opening connection: cassa://xxxx:xxxx@%s:%d/%s\n", ZSW(id->host),
00148                         id->port, ZSW(id->database));
00149         } else {
00150                 LM_DBG("opening connection: cassa://xxxx:xxxx@%s/%s\n", ZSW(id->host),
00151                         ZSW(id->database));
00152         }
00153 
00154         ptr = (struct cassa_con*)pkg_malloc(sizeof(struct cassa_con));
00155         if (!ptr) {
00156                 LM_ERR("failed trying to allocated %lu bytes for connection structure."
00157                                 "\n", (unsigned long)sizeof(struct cassa_con));
00158                 return 0;
00159         }
00160         LM_DBG("%p=pkg_malloc(%lu)\n", ptr, (unsigned long)sizeof(struct cassa_con));
00161 
00162         memset(ptr, 0, sizeof(struct cassa_con));
00163 
00164         ptr->db_name.s = id->database;
00165         ptr->db_name.len = strlen(id->database);
00166         ptr->id = id;
00167         ptr->ref = 1;
00168 
00169         ptr->con = dbcassa_open(id);
00170         if(!ptr->con) {
00171                 LM_ERR("Failed to open connection to Cassandra cluster\n");
00172                 pkg_free(ptr);
00173                 return 0;
00174         }
00175         return ptr;
00176 }
00177 
00178 
00183 void dbcassa_close(oac::CassandraClient* con)
00184 {
00185         if(! con) return;
00186 
00187         delete con;
00188 }
00189 
00194 void db_cassa_free_connection(struct pool_con* con)
00195 {
00196         struct cassa_con * _c;
00197 
00198         if (!con) return;
00199 
00200         _c = (struct cassa_con*) con;
00201         dbcassa_close(_c->con);
00202         pkg_free(_c);
00203 }
00204 
00209 void dbcassa_reconnect(struct cassa_con* con)
00210 {
00211         dbcassa_close(con->con);
00212         con->con = dbcassa_open(con->id);
00213 }
00214 
00215 
00216 /*
00217  * ----              DB Operations Section                          ----
00218  * */
00219 
00220 /*
00221  *      Util functions
00222  * */
00223 static int cassa_get_res_col(std::vector<oac::ColumnOrSuperColumn> result, int r_si, int r_fi, int prefix_len, db_key_t qcol)
00224 {
00225         str res_col_name;
00226 
00227         for (int i = r_si; i< r_fi; i++) {
00228                 res_col_name.s = (char*)result[i].column.name.c_str()+prefix_len;
00229                 res_col_name.len = (int)result[i].column.name.size() - prefix_len;
00230 
00231                 if(res_col_name.len == qcol->len &&
00232                                 strncmp(res_col_name.s, qcol->s, qcol->len )==0)
00233                         return i;
00234         }
00235         return -1;
00236 }
00237 
00238 static int cassa_convert_result(db_key_t qcol, std::vector<oac::ColumnOrSuperColumn> result,
00239                 int r_si, int r_fi, int prefix_len, db_val_t* sr_cell)
00240 {
00241         str col_val;
00242         int idx_rescol;
00243         oac::Column res_col;
00244 
00245         idx_rescol = cassa_get_res_col(result, r_si, r_fi, prefix_len, qcol);
00246         if(idx_rescol< 0) {
00247                 LM_DBG("Column not found in result %.*s\n", qcol->len, qcol->s);
00248                 sr_cell->nul  = 1;
00249                 return 0;
00250         }
00251         res_col = result[idx_rescol].column;
00252 
00253         col_val.s = (char*)res_col.value.c_str();
00254         if(!col_val.s) {
00255                 LM_DBG("Column not found in result %.*s- NULL\n", qcol->len, qcol->s);
00256                 sr_cell->nul  = 1;
00257                 return 0;
00258         }
00259         col_val.len = strlen(col_val.s);
00260 
00261         sr_cell->nul  = 0;
00262         sr_cell->free  = 0;
00263 
00264         switch (sr_cell->type) {
00265                 case DB1_INT:
00266                         if(str2int(&col_val, (unsigned int*)&sr_cell->val.int_val) < 0) {
00267                                 LM_ERR("Wrong value [%s] - len=%d, expected integer\n", col_val.s, col_val.len);
00268                                 return -1;
00269                         }
00270                         break;
00271                 case DB1_BIGINT:
00272                         if(sscanf(col_val.s, "%lld", &sr_cell->val.ll_val) < 0) {
00273                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00274                                 return -1;
00275                         }
00276                         break;
00277                 case DB1_DOUBLE:
00278                         if(sscanf(col_val.s, "%lf", &sr_cell->val.double_val) < 0) {
00279                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00280                                 return -1;
00281                         }
00282                         break;
00283                 case DB1_STR:
00284                         pkg_str_dup(&sr_cell->val.str_val, &col_val);
00285                         sr_cell->free  = 1;
00286                         break;
00287                 case DB1_STRING:
00288                         col_val.len++;
00289                         pkg_str_dup(&sr_cell->val.str_val, &col_val);
00290                         sr_cell->val.str_val.len--;
00291                         sr_cell->val.str_val.s[col_val.len-1]='\0';
00292                         sr_cell->free  = 1;
00293                         break;
00294                 case DB1_BLOB:
00295                         pkg_str_dup(&sr_cell->val.blob_val, &col_val);
00296                         sr_cell->free  = 1;
00297                         break;
00298                 case DB1_BITMAP:
00299                         if(str2int(&col_val, &sr_cell->val.bitmap_val) < 0) {
00300                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00301                                 return -1;
00302                         }
00303                         break;
00304                 case DB1_DATETIME:
00305                         if(sscanf(col_val.s, "%ld", (long int*)&sr_cell->val.time_val) < 0) {
00306                                 LM_ERR("Wrong value [%s], expected integer\n", col_val.s);
00307                                 return -1;
00308                         }
00309                         break;
00310         }
00311         return 0;
00312 }
00313 
00314 static char* dbval_to_string(db_val_t dbval, char* pk)
00315 {
00316         switch(dbval.type) {
00317                 case DB1_STRING: strcpy(pk, dbval.val.string_val);
00318                                                    pk+= strlen(dbval.val.string_val);
00319                                                    break;
00320                 case DB1_STR:    memcpy(pk, dbval.val.str_val.s, dbval.val.str_val.len);
00321                                                    pk+= dbval.val.str_val.len;
00322                                                    break;
00323                 case DB1_INT:    pk+= sprintf(pk, "%d", dbval.val.int_val);
00324                                                    break;
00325                 case DB1_BIGINT: pk+= sprintf(pk, "%lld", dbval.val.ll_val);
00326                                                    break;
00327                 case DB1_DOUBLE: pk+= sprintf(pk, "%lf", dbval.val.double_val);
00328                                                    break;
00329                 case DB1_BLOB:   pk+= sprintf(pk, "%.*s", dbval.val.blob_val.len, dbval.val.blob_val.s);
00330                                                    break;
00331                 case DB1_BITMAP: pk+= sprintf(pk, "%u", dbval.val.bitmap_val);
00332                                                    break;
00333                 case DB1_DATETIME:pk+= sprintf(pk, "%ld", (long int)dbval.val.time_val);
00334                                                   break;
00335         }
00336         return pk;
00337 }
00338 
00339 
00340 int cassa_constr_key( const db_key_t* _k, const db_val_t* _v,
00341                 int _n, int key_len, dbcassa_column_p* key_array, int *no_kc, char* key)
00342 {
00343         int i, j;
00344         char* pk = key;
00345 
00346         if(!key_array)
00347                 return 0;
00348 
00349         for(j = 0; j< _n; j++) {
00350                 LM_DBG("query col = %.*s\n",  _k[j]->len,  _k[j]->s);
00351         }
00352 
00353         for(i = 0; i< key_len; i++) {
00354                 /* look in the received columns to search the key column */
00355                 for(j = 0; j< _n; j++) {
00356                         if(_k[j]->len == key_array[i]->name.len &&
00357                                         !strncmp(_k[j]->s, key_array[i]->name.s, _k[j]->len))
00358                                 break;
00359                 }
00360                 if(j == _n) {
00361                         LM_ERR("The key column with name [%.*s] not found in values\n", key_array[i]->name.len, key_array[i]->name.s);
00362                         break;
00363                 }
00364                 pk= dbval_to_string(_v[j], pk);
00365                 *(pk++) = cassa_key_delim;
00366         }
00367         if(pk > key)
00368                 *(--pk) = '\0';
00369         else
00370                 *key = '\0';
00371 
00372         if(no_kc)
00373                 *no_kc = i;
00374 
00375         LM_DBG("key = %s\n", key);
00376 
00377         return pk - key;
00378 }
00379 
00380 
00381 int cassa_result_separate_rows(std::vector<oac::ColumnOrSuperColumn> result) {
00382         int rows_no =0, i = 0;
00383         int res_size = result.size();
00384 
00385         while(i< res_size) {
00386                 size_t found;
00387                 std::string curr_seckey;
00388 
00389                 found = result[i].column.name.find(cassa_key_delim);
00390                 if(found< 0) {
00391                         LM_ERR("Wrong formated column name - secondary key part not found [%s]\n",
00392                                         result[i].column.name.c_str());
00393                         return -1;
00394                 }
00395                 curr_seckey = result[i].column.name.substr(0, found);
00396 
00397                 while(++i < res_size) {
00398                         if(result[i].column.name.compare(0, found, curr_seckey)) {
00399                                 LM_DBG("Encountered a new secondary key %s - %s\n", result[i].column.name.c_str(), curr_seckey.c_str());
00400                                 break;
00401                         }
00402                 }
00403                 /* the current row stretches until index 'i' and the corresponding key prefix has length 'found' */
00404                 row_slices[rows_no][0] = i;
00405                 row_slices[rows_no][1] = found +1;
00406                 rows_no++;
00407         }
00408 
00409         /* debug messages */
00410         for(int i = 0; i< rows_no; i++) {
00411                 LM_DBG("Row %d until index %d with prefix len %d\n", i, row_slices[i][0], row_slices[i][1]);
00412         }
00413 
00414         return rows_no;
00415 }
00416 
00417 dbcassa_column_p cassa_search_col(dbcassa_table_p tbc, db_key_t col_name)
00418 {
00419         dbcassa_column_p colp;
00420 
00421         colp = tbc->cols;
00422         while(colp) {
00423                 if(colp->name.len == col_name->len && !strncmp(colp->name.s, col_name->s, col_name->len))
00424                         return colp;
00425                 colp = colp->next;
00426         }
00427         return 0;
00428 }
00429 
00430 typedef std::vector<oac::ColumnOrSuperColumn>  ColumnVec;
00431 typedef std::auto_ptr<ColumnVec>  ColumnVecPtr;
00432 
00433 ColumnVecPtr cassa_translate_query(const db1_con_t* _h, const db_key_t* _k,
00434                 const db_val_t* _v, const db_key_t* _c, int _n, int _nc, int* ret_rows_no)
00435 {
00436         char row_key[cassa_max_key_len];
00437         char sec_key[cassa_max_key_len];
00438         int key_len=0, seckey_len = 0;
00439         int no_kc, no_sec_kc;
00440         dbcassa_table_p tbc;
00441 
00443         if(_k) {
00444                 tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00445                 if(!tbc) {
00446                         LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00447                         return ColumnVecPtr(NULL);
00448                 }
00449                 cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc, row_key);
00450 
00451                 if(no_kc != tbc->key_len) {/* was not able to construct the whole key */
00452                         LM_ERR("Query not supported - key not provided\n");
00453                         dbcassa_lock_release(tbc);
00454                         return ColumnVecPtr(NULL);
00455                 }
00456                 key_len = tbc->key_len;
00457 
00458                 cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
00459                 seckey_len = tbc->seckey_len;
00460 
00461                 dbcassa_lock_release(tbc);
00462         }
00463 
00464         try {
00465                 oac::SlicePredicate sp;
00466                 if(seckey_len) { // seckey defined for this table
00467                         if(no_sec_kc == seckey_len) { // was able to build the complete secondary key
00468                                 if(_c) { /* if queried for specific columns */
00469                                         /* query for the specific columns */
00470                                         for(int i=0; i< _nc; i++) {
00471                                                 std::string col_name = sec_key;
00472                                                 col_name.push_back(cassa_key_delim);
00473                                                 col_name.append(_c[i]->s);
00474                                                 sp.column_names.push_back(col_name);
00475                                                 LM_DBG("Query col: %s\n", col_name.c_str());
00476                                         }
00477                                         sp.__isset.column_names = true; // set
00478                                 } else { /* query for columns starting with this secondary key */
00479                                         oac::SliceRange sr;
00480                                         sr.start = sec_key;
00481                                         sr.start.push_back(cassa_key_delim);
00482                                         sr.finish = sec_key;
00483                                         sr.finish.push_back(cassa_key_delim +1);
00484                                         sp.slice_range = sr;
00485                                         sp.__isset.slice_range = true; // set
00486                                 }
00487                         } else {  /* query all columns */
00488                                 oac::SliceRange sr;
00489                                 sr.start = "";
00490                                 sr.finish = "";
00491                                 sp.slice_range = sr;
00492                                 sp.__isset.slice_range = true; // set
00493                         }
00494                 } else { /* the table doesn't have any secondary key defined */
00495                         if(_c) {
00496                                 for(int i=0; i< _nc; i++) {
00497                                         sp.column_names.push_back(_c[i]->s);
00498                                         LM_DBG("Query col: %s\n", _c[i]->s);
00499                                 }
00500                                 LM_DBG("get %d columns\n", _nc);
00501                                 sp.__isset.column_names = true; // set
00502                         } else {
00503                                 /* return all columns */
00504                                 oac::SliceRange sr;
00505                                 sr.start = "";
00506                                 sr.finish = "";
00507                                 sp.slice_range = sr;
00508                                 sp.__isset.slice_range = true; // set
00509                                 LM_DBG("get all columns\n");
00510                         }
00511                 }
00512 
00513                 unsigned int retr = 0;
00514                 oac::ColumnParent cparent;
00515                 cparent.column_family = _h->table->s;
00516                 ColumnVecPtr cassa_result(new std::vector<oac::ColumnOrSuperColumn>);
00517                 do {
00518                         if(CON_CASSA(_h)->con) {
00519                                 try {
00520 
00521                                         if(_k) {
00522                                                 CON_CASSA(_h)->con->get_slice(*cassa_result, row_key, cparent, sp, oac::ConsistencyLevel::ONE);
00523                                                 *ret_rows_no = 1;
00524                                         } else {
00525                                                 oac::KeyRange keyRange;
00526                                                 keyRange.start_key = "";
00527                                                 keyRange.start_key = "";
00528                                                 std::vector<oac::KeySlice> key_slice_vect;
00529                                                 keyRange.__isset.start_key = 1;
00530                                                 keyRange.__isset.end_key = 1;
00531                                                 ColumnVec::iterator it = cassa_result->begin();
00532 
00533                                                 /* get in a loop 100 records at a time */
00534                                                 int rows_no =0;
00535                                                 while(1) {
00536                                                         CON_CASSA(_h)->con->get_range_slices(key_slice_vect, cparent, sp, keyRange, oac::ConsistencyLevel::ONE);
00537                                                         /* construct cassa_result */
00538                                                         LM_DBG("Retuned %d key slices\n", key_slice_vect.size());
00539                                                         for(unsigned int i = 0; i< key_slice_vect.size(); i++) {
00540                                                                 if(key_slice_vect[i].columns.size()==0) {
00541                                                                         continue;
00542                                                                 }
00543                                                                 cassa_result->insert(it, key_slice_vect[i].columns.begin(), key_slice_vect[i].columns.end());
00544                                                                 it = cassa_result->begin();
00545                                                                 row_slices[rows_no][0] = cassa_result->size();
00546                                                                 row_slices[rows_no][1] = 0;
00547                                                                 rows_no++;
00548                                                         }
00549                                                         if(key_slice_vect.size() < (unsigned int)keyRange.count)
00550                                                                 break;
00551                                                 }
00552 
00553                                                 *ret_rows_no = rows_no;
00554                                         }
00555 
00556                                         return cassa_result;
00557                                 } catch (const att::TTransportException &tx) {
00558                                         LM_ERR("Failed to query: %s\n", tx.what());
00559                                 }
00560                         }
00561                         dbcassa_reconnect(CON_CASSA(_h));
00562                 } while(cassa_auto_reconnect && retr++ < cassa_retries);
00563 
00564         } catch (const oac::InvalidRequestException ir) {
00565                 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
00566         } catch (const at::TException &tx) {
00567                 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
00568         } catch (const std::exception &ex) {
00569                 LM_ERR("Failed std error: %s\n", ex.what());
00570         } catch (...) {
00571                 LM_ERR("Failed generic error\n");
00572         }
00573 
00574         LM_DBG("Query with get slice no_kc=%d tbc->key_len=%d  _n=%d\n", no_kc, key_len,_n);
00575         return ColumnVecPtr(NULL);
00576 }
00577 
00578 
00579 /*
00580  *      The functions for the DB Operations: query, delete, update.
00581  * */
00582 
00583 /*
00584  * Query table for specified rows
00585  * _h: structure representing database connection
00586  * _k: key names
00587  * _op: operators
00588  * _v: values of the keys that must match
00589  * _c: column names to return
00590  * _n: number of key=values pairs to compare
00591  * _nc: number of columns to return
00592  * _o: order by the specified column
00593  */
00594 int db_cassa_query(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _op,
00595                 const db_val_t* _v, const db_key_t* _c, int _n, int _nc,
00596                 const db_key_t _o, db1_res_t** _r)
00597 {
00598         db1_res_t* db_res = 0;
00599         int rows_no;
00600         ColumnVecPtr cassa_result;
00601         dbcassa_table_p tbc;
00602         int seckey_len;
00603 
00604         if (!_h || !CON_TABLE(_h) || !_r) {
00605                 LM_ERR("invalid parameter value\n");
00606                 return -1;
00607         }
00608         LM_DBG("query table=%s\n", _h->table->s);
00609 
00612         cassa_result = cassa_translate_query(_h, _k, _v, _c, _n, _nc, &rows_no);
00613 
00614         if(cassa_result.get() == NULL) {
00615                 LM_ERR("Failed to query Cassandra cluster\n");
00616                 return -1;
00617         }
00618 
00619         /* compare the number of queried cols with the key cols*/
00620 //      if(no_kc + no_sec_kc < _n) { /* TODO */
00621                 /* filter manually for the rest of the values */
00622 //      }
00623 
00624         db_res = db_new_result();
00625         if (!db_res) {
00626                 LM_ERR("no memory left\n");
00627                 goto error;
00628         }
00629         RES_COL_N(db_res)= _nc;
00630         if(!db_allocate_columns(db_res, _nc) < 0) {
00631                 LM_ERR("no more memory\n");
00632                 goto error;
00633         }
00634 
00635         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00636         if(!tbc) {
00637                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00638                 return -1;
00639         }
00640 
00642         /* fill in the columns name and type */
00643         for(int col = 0; col < _nc; col++) {
00644                 RES_NAMES(db_res)[col] = (str*)pkg_malloc(sizeof(str));
00645                 if (! RES_NAMES(db_res)[col]) {
00646                         LM_ERR("no private memory left\n");
00647                         dbcassa_lock_release(tbc);
00648                         db_free_columns(db_res);
00649                         goto error;
00650                 }
00651 
00652                 *RES_NAMES(db_res)[col]   = *_c[col];
00653 
00654                 /* search the column in table schema to get the type */
00655                 dbcassa_column_p colp = cassa_search_col(tbc, _c[col]);
00656                 if(!colp) {
00657                         LM_ERR("No column with name [%.*s] found\n", _c[col]->len, _c[col]->s);
00658                         dbcassa_lock_release(tbc);
00659                         db_free_columns(db_res);
00660                         goto error;
00661                 }
00662                 RES_TYPES(db_res)[col] = colp->type;
00663 
00664                 LM_DBG("RES_NAMES(%p)[%d]=[%.*s]\n", RES_NAMES(db_res)[col], col,
00665                                 RES_NAMES(db_res)[col]->len, RES_NAMES(db_res)[col]->s);
00666         }
00667         /* TODO  if all columns asked - take from table schema */
00668         seckey_len = tbc->seckey_len;
00669         dbcassa_lock_release(tbc);
00670 
00671         if(!cassa_result->size()) {
00672                 LM_DBG("The query returned no result\n");
00673                 RES_ROW_N(db_res) = 0;
00674                 goto done;
00675         }
00676 
00677         /* Initialize the row_slices vector for the case with one column and no secondary key */
00678         if(rows_no == 1) {
00679                 row_slices[0][0]= cassa_result->size();
00680                 row_slices[0][1]= 0;
00681 
00682                 if(seckey_len) { /* if the table has a secondary key defined */
00683                         /* pass through the result once to see how many rows there are */
00684                         rows_no = cassa_result_separate_rows(*cassa_result);
00685                         if(rows_no < 0) {
00686                                 LM_ERR("Wrong formated column names\n");
00687                                 goto error;
00688                         }
00689                 }
00690         }
00691 
00692         RES_ROW_N(db_res) = rows_no;
00693 
00694         if (db_allocate_rows(db_res) < 0) {
00695                 LM_ERR("could not allocate rows");
00696                 goto error;
00697         }
00698 
00699         for(int ri=0; ri < rows_no; ri++) {
00700                 if (db_allocate_row(db_res, &(RES_ROWS(db_res)[ri])) != 0) {
00701                         LM_ERR("could not allocate row");
00702                         goto error;
00703                 }
00704 
00705                 /* complete the row with the columns */
00706                 for(int col = 0; col< _nc; col++) {
00707                         RES_ROWS(db_res)[ri].values[col].type = RES_TYPES(db_res)[col];
00708                         cassa_convert_result(_c[col], *cassa_result, (ri>0?row_slices[ri-1][0]:0),  row_slices[ri][0],
00709                                         row_slices[ri][1], &RES_ROWS(db_res)[ri].values[col]);
00710                 }
00711         }
00712 
00713 done:
00714         *_r = db_res;
00715         LM_DBG("Exited with success\n");
00716         return 1;
00717 
00718 error:
00719         if(db_res)
00720                 db_free_result(db_res);
00721         return -1;
00722 }
00723 
00724 /*
00725  * Insert or update the table for specified row key
00726  * _h: structure representing database connection
00727  * _k: key names
00728  * _op: operators
00729  * _v: values of the keys that must match
00730  * _uk: column names to update
00731  * _uv: values for the columns to update
00732  * _n: number of key=values pairs to compare
00733  * _un: number of columns to update
00734  */
00735 int db_cassa_modify(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
00736                 const db_key_t* _uk, const db_val_t* _uv, int _n, int _un)
00737 {
00738         dbcassa_table_p tbc;
00739         char row_key[cassa_max_key_len];
00740         char sec_key[cassa_max_key_len];
00741         int64_t ts = 0;
00742         str ts_col_name={0, 0};
00743         int seckey_len;
00744         unsigned int curr_time = time(NULL);
00745 
00746         if (!_h || !CON_TABLE(_h) || !_k || !_v) {
00747                 LM_ERR("invalid parameter value\n");
00748                 return -1;
00749         }
00750 
00751         LM_DBG("modify table=%s\n", _h->table->s);
00752 
00754         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00755         if(!tbc) {
00756                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00757                 return -1;
00758         }
00759         if(tbc->ts_col)
00760                 pkg_str_dup(&ts_col_name, (const str*)&tbc->ts_col->name);
00761 
00762         cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, 0, row_key);
00763         cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, 0, sec_key);
00764         seckey_len = tbc->seckey_len;
00765 
00766         dbcassa_lock_release(tbc);
00767 
00769         try {
00770                 /* Set the columns */
00771                 std::vector<oac::Mutation> mutations;
00772                 for(int i=0; i< _un; i++) {
00773                         if(_uv[i].nul)
00774                                 continue;
00775 
00776                         std::stringstream out;
00777                         std::string value;
00778                         int cont = 0;
00779 
00780                         switch(_uv[i].type) {
00781                                 case DB1_INT:   out << _uv[i].val.int_val;
00782                                                                 value = out.str();
00783                                                                 break;
00784                                 case DB1_BIGINT:out << _uv[i].val.ll_val;
00785                                                                 value = out.str();
00786                                                                 break;
00787                                 case DB1_DOUBLE:out << _uv[i].val.double_val;
00788                                                                 value = out.str();
00789                                                                 break;
00790                                 case DB1_BITMAP:out << _uv[i].val.bitmap_val;
00791                                                                 value = out.str();
00792                                                                 break;
00793                                 case DB1_STRING:value = _uv[i].val.string_val;
00794                                                                 break;
00795                                 case DB1_STR:   if(!_uv[i].val.str_val.s) {
00796                                                                         cont = 1;
00797                                                                         break;
00798                                                                 }
00799                                                                 value = std::string(_uv[i].val.str_val.s, _uv[i].val.str_val.len);
00800                                                                 break;
00801                                 case DB1_BLOB:  value = std::string(_uv[i].val.blob_val.s, _uv[i].val.blob_val.len);
00802                                                                 break;
00803                                 case DB1_DATETIME:      unsigned int exp_time = (unsigned int)_uv[i].val.time_val;
00804                                                                         out << exp_time;
00805                                                                         value = out.str();
00806                                                                         if(ts_col_name.s && ts_col_name.len==_uk[i]->len &&
00807                                                                                         strncmp(ts_col_name.s, _uk[i]->s, ts_col_name.len)==0) {
00808                                                                                 ts = exp_time;
00809                                                                                 LM_DBG("Found timestamp col [%.*s]\n", ts_col_name.len, ts_col_name.s);
00810                                                                         }
00811                                                                         break;
00812                         }
00813                         if (cont)
00814                                 continue;
00815 
00816                         LM_DBG("ADDED column [%.*s] type [%d], value [%s]\n", _uk[i]->len, _uk[i]->s,
00817                                 _uv[i].type, value.c_str());
00818 
00819                         oac::Mutation mut;
00820                         oac::ColumnOrSuperColumn col;
00821                         if(seckey_len) {
00822                                 col.column.name = sec_key;
00823                                 col.column.name.push_back(cassa_key_delim);
00824                                 col.column.name.append(_uk[i]->s);
00825                         }
00826                         else
00827                                 col.column.name = _uk[i]->s;
00828                         col.column.value = value;
00829                         col.column.__isset.value = true;
00830                         col.__isset.column = true;
00831                         col.column.timestamp = curr_time;
00832                         col.column.__isset.timestamp = true;
00833                         mut.column_or_supercolumn = col;
00834                         mut.__isset.column_or_supercolumn = true;
00835                         mutations.push_back(mut);
00836                 }
00837                 if(ts_col_name.s)
00838                         pkg_free(ts_col_name.s);
00839                 ts_col_name.s = 0;
00840 
00841                 if(ts) {
00842                         int32_t ttl = ts - curr_time;
00843                         LM_DBG("Set expires to %d seconds\n", ttl);
00844                         for(size_t mi=0; mi< mutations.size(); mi++) {
00845                                 mutations[mi].column_or_supercolumn.column.ttl = ttl;
00846                                 mutations[mi].column_or_supercolumn.column.__isset.ttl = true;
00847                         }
00848                 }
00849 
00850                 LM_DBG("Perform the mutation, add [%d] columns\n", (int)mutations.size());
00851 
00852                 std::map<std::string, std::vector<oac::Mutation> > innerMap;
00853                 innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
00854                 std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
00855                 CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
00856                 unsigned int retr = 0;
00857 
00858                 do {
00859                         if(CON_CASSA(_h)->con) {
00860                                 try{
00861                                         CON_CASSA(_h)->con->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
00862                                         return 1;
00863                                 }  catch (const att::TTransportException &tx) {
00864                                         LM_ERR("Failed to query: %s\n", tx.what());
00865                                 }
00866                         }
00867                         dbcassa_reconnect(CON_CASSA(_h));
00868                 } while (cassa_auto_reconnect && retr++ < cassa_retries);
00869 
00870         } catch (const oac::InvalidRequestException ir) {
00871                 LM_ERR("Failed Invalid query request: %s\n", ir.why.c_str());
00872         } catch (const at::TException &tx) {
00873                 LM_ERR("Failed generic Thrift error: %s\n", tx.what());
00874         } catch (const std::exception &ex) {
00875                 LM_ERR("Failed std error: %s\n", ex.what());
00876         } catch (...) {
00877                 LM_ERR("Failed generic error\n");
00878         }
00879 
00880         LM_ERR("Insert/Update query failed\n");
00881         return -1;
00882 }
00883 
00884 
00885 int db_cassa_insert(const db1_con_t* _h, const db_key_t* _k, const db_val_t* _v,
00886                 int _n)
00887 {
00888         LM_DBG("db_cassa_insert:\n");
00889         return db_cassa_modify(_h, _k, _v, _k, _v, _n, _n);
00890 }
00891 
00892 
00893 int db_cassa_update(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00894                 const db_val_t* _v, const db_key_t* _uk, const db_val_t* _uv,
00895                 int _n, int _un)
00896 {
00897         LM_DBG("db_cassa_update:\n");
00898         return db_cassa_modify(_h, _k, _v, _uk, _uv, _n, _un);
00899 }
00900 
00901 
00902 int db_cassa_free_result(db1_con_t* _h, db1_res_t* _r)
00903 {
00904         return db_free_result(_r);
00905 }
00906 
00907 /*
00908  * Delete after primary or primary and secondary key
00909  * _h: structure representing database connection
00910  * _k: key names
00911  * _op: operators
00912  * _v: values of the keys that must match
00913  * _n: number of key=values pairs to compare
00914  */
00915 int db_cassa_delete(const db1_con_t* _h, const db_key_t* _k, const db_op_t* _o,
00916                 const db_val_t* _v, int _n)
00917 {
00918         oac::CassandraClient* cassa_client = CON_CASSA(_h)->con;
00919         char row_key[cassa_max_key_len];
00920         char sec_key[cassa_max_key_len];
00921         dbcassa_table_p tbc;
00922         int no_kc, no_sec_kc;
00923         unsigned int retr = 0;
00924         int seckey_len;
00925         oac::Mutation m;
00926 
00927         if (!_h || !CON_TABLE(_h) || !_k || !_v) {
00928                 LM_ERR("invalid parameter value\n");
00929                 return -1;
00930         }
00931 
00932         LM_DBG("query table=%s\n", _h->table->s);
00933 
00934         /* get the table schema and construct primary and secondary key */
00935         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
00936         if(!tbc)
00937         {
00938                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
00939                 return -1;
00940         }
00941 
00942         cassa_constr_key(_k, _v, _n, tbc->key_len, tbc->key, &no_kc,   row_key);
00943         cassa_constr_key(_k, _v, _n, tbc->seckey_len, tbc->sec_key, &no_sec_kc, sec_key);
00944         seckey_len = tbc->seckey_len;
00945 
00946         if (_n != no_kc && no_sec_kc == seckey_len) {
00947                 /* if the conditions are also for secondary key */
00948                 LM_DBG("Delete after primary and secondary key %s %s\n", row_key, sec_key);
00949                 dbcassa_column_p colp = tbc->cols;
00950                 try {
00951                         while(colp) {
00952                                 std::string col_name = sec_key;
00953                                 col_name.push_back(cassa_key_delim);
00954                                 col_name.append(colp->name.s);
00955                                 m.deletion.predicate.column_names.push_back(col_name);
00956                                 colp = colp->next;
00957                         }
00958                 } catch (...) {
00959                         LM_ERR("Failed to construct the list of column names\n");
00960                         dbcassa_lock_release(tbc);
00961                         return -1;
00962                 }
00963         }
00964 
00965         dbcassa_lock_release(tbc);
00966 
00967         for(int i=0; i < _n; i++)
00968                 LM_DBG("delete query col = %.*s\n", _k[i]->len, _k[i]->s);
00969 
00970         if(no_kc == 0 ) {
00971                 LM_DBG("Delete operation not supported\n");
00972                 return -1;
00973         }
00974 
00975         try {
00976                 if (_n == no_kc) {
00977                         LM_DBG("Delete after row key %s\n", row_key);
00978                         oac::ColumnPath cp;
00979                         cp.column_family = _h->table->s;
00980                         do {
00981                                 if(CON_CASSA(_h)->con) {
00982                                         try {
00983                                                 cassa_client->remove(row_key, cp, (int64_t)time(0), oac::ConsistencyLevel::ONE);
00984                                                 return 1;
00985                                         } catch  (const att::TTransportException &tx) {
00986                                                         LM_ERR("Failed to query: %s\n", tx.what());
00987                                         }
00988                                 }
00989                                 dbcassa_reconnect(CON_CASSA(_h));
00990                         } while(cassa_auto_reconnect && retr++ < cassa_retries);
00991                 } else {
00992 
00993                         if(!seckey_len) {
00994                                 LM_ERR("Delete operation not supported\n");
00995                                 return -1;
00996                         }
00997 
00998 //                      oac::Mutation m;
00999                         m.deletion.timestamp = (int64_t)time(0);
01000                         m.deletion.__isset.timestamp = true;
01001                         m.__isset.deletion = true;
01002 
01003 #if 0
01004                         /* push all columns for the corresponding secondary key */
01005                         tbc = dbcassa_db_get_table(&CON_CASSA(_h)->db_name, CON_TABLE(_h));
01006                         if(!tbc)
01007                         {
01008                                 LM_ERR("table %.*s does not exist!\n", CON_TABLE(_h)->len, CON_TABLE(_h)->s);
01009                                 return -1;
01010                         }
01011                         dbcassa_column_p colp = tbc->cols;
01012                         try {
01013                                 while(colp) {
01014                                         std::string col_name = sec_key;
01015                                         col_name.push_back(cassa_key_delim);
01016                                         col_name.append(colp->name.s);
01017                                         m.deletion.predicate.column_names.push_back(col_name);
01018                                         colp = colp->next;
01019                                 }
01020                         } catch (...) {
01021                                 LM_ERR("Failed to construct the list of column names\n");
01022                                 dbcassa_lock_release(tbc);
01023                                 return -1;
01024                         }
01025                         dbcassa_lock_release(tbc);
01026 #endif
01027                         m.deletion.__isset.predicate = true;
01028                         m.deletion.predicate.__isset.column_names = true; // set
01029 
01030                         std::vector<oac::Mutation> mutations;
01031                         mutations.push_back(m);
01032 
01033                         /* innerMap - column_family + mutations vector */
01034                         std::map<std::string, std::vector<oac::Mutation> > innerMap;
01035                         innerMap.insert(std::pair<std::string, std::vector<oac::Mutation> > (_h->table->s, mutations));
01036                         std::map <std::string, std::map<std::string, std::vector<oac::Mutation> > > CFMap;
01037                         CFMap.insert(std::pair<std::string, std::map<std::string, std::vector<oac::Mutation> > >(row_key, innerMap));
01038 
01039                         do {
01040                                 if(CON_CASSA(_h)->con) {
01041                                         try {
01042                                                 cassa_client->batch_mutate(CFMap, oac::ConsistencyLevel::ONE);
01043                                                 return 1;
01044                                         } catch  (const att::TTransportException &tx) {
01045                                                         LM_ERR("Failed to query: %s\n", tx.what());
01046                                         }
01047                                 }
01048                                 dbcassa_reconnect(CON_CASSA(_h));
01049                         } while(cassa_auto_reconnect && retr++ < cassa_retries);
01050                 }
01051                 return 1;
01052         } catch (const oac::InvalidRequestException ir) {
01053                 LM_ERR("Invalid query: %s\n", ir.why.c_str());
01054         } catch (const at::TException &tx) {
01055                 LM_ERR("Failed TException: %s\n", tx.what());
01056         } catch (std::exception &e) {
01057                 LM_ERR("Failed: %s\n", e.what());
01058         } catch (...) {
01059                 LM_ERR("Failed generic error\n");
01060         }
01061 
01062         return -1;
01063 }
01064