sbufq.h

Go to the documentation of this file.
00001 /* 
00002  * $Id$
00003  * 
00004  * Copyright (C) 2010 iptelorg GmbH
00005  *
00006  * Permission to use, copy, modify, and distribute this software for any
00007  * purpose with or without fee is hereby granted, provided that the above
00008  * copyright notice and this permission notice appear in all copies.
00009  *
00010  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
00011  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
00012  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
00013  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
00014  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
00015  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
00016  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
00017  */
00023 /*
00024  * History:
00025  * --------
00026  *  2010-03-31  initial version, based on tcp_conn.h tcp_wbuffer_queue (andrei)
00027 */
00028 
00029 #ifndef __sbufq_h
00030 #define __sbufq_h
00031 
00032 #include "../../compiler_opt.h"
00033 #include "../../ut.h"
00034 #include "../../mem/shm_mem.h"
00035 #include "../../timer_ticks.h"
00036 #include "../../timer.h"
00037 #include "../../dprint.h"
00038 #include <string.h>
00039 
00040 
00041 struct sbuf_elem {
00042         struct sbuf_elem* next;
00043         unsigned int b_size; 
00044         char buf[1]; 
00045 };
00046 
00047 struct sbuffer_queue {
00048         struct sbuf_elem* first;
00049         struct sbuf_elem* last;
00050         ticks_t last_chg; 
00051         unsigned int queued; 
00052         unsigned int offset; 
00054         unsigned int last_used; 
00055 };
00056 
00057 
00058 /* sbufq_flush() output flags */
00059 #define F_BUFQ_EMPTY 1
00060 #define F_BUFQ_ERROR_FLUSH 2
00061 
00062 
00063 #define sbufq_empty(bq) ((bq)->first==0)
00064 #define sbufq_non_empty(bq) ((bq)->first!=0)
00065 
00066 
00067 
00077 inline static int sbufq_add(struct sbuffer_queue* q, const void* data,
00078                                                         unsigned int size, unsigned int min_buf_size)
00079 {
00080         struct sbuf_elem* b;
00081         unsigned int last_free;
00082         unsigned int b_size;
00083         unsigned int crt_size;
00084         ticks_t t;
00085         
00086         t=get_ticks_raw();
00087         
00088         if (likely(q->last==0)) {
00089                 b_size=MAX_unsigned(min_buf_size, size);
00090                 b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
00091                 if (unlikely(b==0))
00092                         goto error;
00093                 b->b_size=b_size;
00094                 b->next=0;
00095                 q->last=b;
00096                 q->first=b;
00097                 q->last_used=0;
00098                 q->offset=0;
00099                 q->last_chg=get_ticks_raw();
00100                 last_free=b_size;
00101                 crt_size=size;
00102                 goto data_cpy;
00103         }else{
00104                 b=q->last;
00105         }
00106         
00107         while(size){
00108                 last_free=b->b_size-q->last_used;
00109                 if (last_free==0){
00110                         b_size=MAX_unsigned(min_buf_size, size);
00111                         b=shm_malloc(sizeof(*b)+b_size-sizeof(b->buf));
00112                         if (unlikely(b==0))
00113                                 goto error;
00114                         b->b_size=b_size;
00115                         b->next=0;
00116                         q->last->next=b;
00117                         q->last=b;
00118                         q->last_used=0;
00119                         last_free=b->b_size;
00120                 }
00121                 crt_size=MIN_unsigned(last_free, size);
00122 data_cpy:
00123                 memcpy(b->buf+q->last_used, data, crt_size);
00124                 q->last_used+=crt_size;
00125                 size-=crt_size;
00126                 data+=crt_size;
00127                 q->queued+=crt_size;
00128         }
00129         return 0;
00130 error:
00131         return -1;
00132 }
00133 
00134 
00135 
00146 inline static int sbufq_insert(struct sbuffer_queue* q, const void* data, 
00147                                                         unsigned int size, unsigned int min_buf_size)
00148 {
00149         struct sbuf_elem* b;
00150         
00151         if (likely(q->first==0)) /* if empty, use sbufq_add */
00152                 return sbufq_add(q, data, size, min_buf_size);
00153         
00154         if (unlikely(q->offset)){
00155                 LOG(L_CRIT, "BUG: non-null offset %d (bad call, should"
00156                                 "never be called after sbufq_run())\n", q->offset);
00157                 goto error;
00158         }
00159         if ((q->first==q->last) && ((q->last->b_size-q->last_used)>=size)){
00160                 /* one block with enough space in it for size bytes */
00161                 memmove(q->first->buf+size, q->first->buf, size);
00162                 memcpy(q->first->buf, data, size);
00163                 q->last_used+=size;
00164         }else{
00165                 /* create a size bytes block directly */
00166                 b=shm_malloc(sizeof(*b)+size-sizeof(b->buf));
00167                 if (unlikely(b==0))
00168                         goto error;
00169                 b->b_size=size;
00170                 /* insert it */
00171                 b->next=q->first;
00172                 q->first=b;
00173                 memcpy(b->buf, data, size);
00174         }
00175         
00176         q->queued+=size;
00177         return 0;
00178 error:
00179         return -1;
00180 }
00181 
00182 
00191 inline static unsigned int sbufq_destroy(struct  sbuffer_queue* q)
00192 {
00193         struct sbuf_elem* b;
00194         struct sbuf_elem* next_b;
00195         int unqueued;
00196         
00197         unqueued=0;
00198         if (likely(q->first)){
00199                 b=q->first;
00200                 do{
00201                         next_b=b->next;
00202                         unqueued+=(b==q->last)?q->last_used:b->b_size;
00203                         if (b==q->first)
00204                                 unqueued-=q->offset;
00205                         shm_free(b);
00206                         b=next_b;
00207                 }while(b);
00208         }
00209         memset(q, 0, sizeof(*q));
00210         return unqueued;
00211 }
00212 
00213 
00214 
00238 inline static int sbufq_flush(struct sbuffer_queue* q, int* flags,
00239                                                                 int (*flush_f)(void* p1, void* p2,
00240                                                                                                 const void* buf,
00241                                                                                                 unsigned size),
00242                                                                 void* flush_p1, void* flush_p2)
00243 {
00244         struct sbuf_elem *b;
00245         int n;
00246         int ret;
00247         int block_size;
00248         char* buf;
00249         
00250         *flags=0;
00251         ret=0;
00252         while(q->first){
00253                 block_size=((q->first==q->last)?q->last_used:q->first->b_size)-
00254                                                 q->offset;
00255                 buf=q->first->buf+q->offset;
00256                 n=flush_f(flush_p1, flush_p2, buf, block_size);
00257                 if (likely(n>0)){
00258                         ret+=n;
00259                         if (likely(n==block_size)){
00260                                 b=q->first;
00261                                 q->first=q->first->next; 
00262                                 shm_free(b);
00263                                 q->offset=0;
00264                                 q->queued-=block_size;
00265                         }else{
00266                                 q->offset+=n;
00267                                 q->queued-=n;
00268                                 /* no break: if we are here n < block_size => partial write
00269                                    => the write should be retried */
00270                         }
00271                 }else{
00272                         if (unlikely(n<0))
00273                                 *flags|=F_BUFQ_ERROR_FLUSH;
00274                         break;
00275                 }
00276         }
00277         if (likely(q->first==0)){
00278                 q->last=0;
00279                 q->last_used=0;
00280                 q->offset=0;
00281                 *flags|=F_BUFQ_EMPTY;
00282         }
00283         return ret;
00284 }
00285 
00286 
00287 
00288 
00289 #endif /*__sbufq_h*/
00290 
00291 /* vi: set ts=4 sw=4 tw=79:ai:cindent: */