ПРОЕКТЫ 


  АРХИВ 


Apache-Talk @lexa.ru 

Inet-Admins @info.east.ru 

Filmscanners @halftone.co.uk 

Security-alerts @yandex-team.ru 

nginx-ru @sysoev.ru 


  СТАТЬИ 


  ПЕРСОНАЛЬНОЕ 


  ПРОГРАММЫ 



ПИШИТЕ
ПИСЬМА












     АРХИВ :: nginx-ru
Nginx-ru mailing list archive (nginx-ru@sysoev.ru)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Выполнение блокируемых вызовов в модуле



Invar пишет:
Valery Kholodkov Wrote:
-------------------------------------------------------
Принцип создания
неблокирующего модуля для
хождения в MySQL долго
объяснять, проще показать
исходник. Проблема в том,
что в моей версии не решена
одна достаточно
нетривиальная задача. Но её
можно попытаться решить
введя некоторые
ограничения.

Поэтому неплохо бы узнать,
зачем Вы хотите ходить в
MySQL?

Модуль ограничения прав доступа, в базе хранятся данные, кэширование через 
memcache. Можно посмотреть ваш исходник ?

См. вложение.

--
Best regards,
Valery Kholodkov
USE_SHA1=YES
ngx_addon_name=ngx_http_mysql_module
HTTP_MODULES="$HTTP_MODULES ngx_http_mysql_module ngx_http_xml_encoder_module"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS $ngx_addon_dir/ngx_http_mysql_module.c 
$ngx_addon_dir/ngx_http_xml_encoder_module.c"
HTTP_INCS="$HTTP_INCS $ngx_addon_dir"
/*
 * Copyright (C) 2010 Valery Kholodkov
 */


#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>

#if (NGX_HAVE_OPENSSL_SHA1_H)
#include <openssl/sha.h>
#else
#include <sha.h>
#endif

#define PACKET_HEADER_LEN               4 
#define SCRAMBLE_LEN                    20
#define SCRAMBLE_323_LEN                8
#define MIN_PACKET_LEN                  8 

#define OK_PACKET                       0x00
#define EOF_PACKET                      0xfe
#define ERROR_PACKET                    0xff

#define CLIENT_LONG_PASSWORD            1   /* new more secure passwords */
#define CLIENT_FOUND_ROWS               2   /* Found instead of affected rows */
#define CLIENT_LONG_FLAG                4   /* Get all column flags */
#define CLIENT_CONNECT_WITH_DB          8   /* One can specify db on connect */
#define CLIENT_NO_SCHEMA                16  /* Don't allow 
database.table.column */
#define CLIENT_COMPRESS                 32  /* Can use compression protocol */
#define CLIENT_ODBC                     64  /* Odbc client */
#define CLIENT_LOCAL_FILES              128 /* Can use LOAD DATA LOCAL */
#define CLIENT_IGNORE_SPACE             256 /* Ignore spaces before '(' */
#define CLIENT_PROTOCOL_41              512 /* New 4.1 protocol */
#define CLIENT_INTERACTIVE              1024    /* This is an interactive 
client */
#define CLIENT_SSL                      2048    /* Switch to SSL after 
handshake */
#define CLIENT_IGNORE_SIGPIPE           4096    /* IGNORE sigpipes */
#define CLIENT_TRANSACTIONS             8192    /* Client knows about 
transactions */
#define CLIENT_RESERVED                 16384   /* Old flag for 4.1 protocol  */
#define CLIENT_SECURE_CONNECTION        32768  /* New 4.1 authentication */
#define CLIENT_MULTI_STATEMENTS         (1UL << 16) /* Enable/disable 
multi-stmt support */
#define CLIENT_MULTI_RESULTS            (1UL << 17) /* Enable/disable 
multi-results */

#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG |     CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_MULTI_STATEMENTS | 
CLIENT_MULTI_RESULTS)

#if (NGX_HAVE_LITTLE_ENDIAN && NGX_HAVE_NONALIGNED)
#define EXTRACT_INT2(x) (uint16_t)(*(x) | *((x)+1) << 8)
#define EXTRACT_INT3(x) (uint32_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16)
#define EXTRACT_INT4(x) (uint32_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16 | 
*((x)+3) << 24)
#define EXTRACT_INT8(x) (uint64_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16 | 
*((x)+3) << 24     *((x)+4) << 32 | *((x)+5) << 40 | *((x)+6) << 48 | *((x)+7) << 56)

#define STORE_INT2(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff; 
(x) += 2; } while(0);
#define STORE_INT3(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff;      *((x)+2) = ((v) >> 16) & 0xff; (x) += 3; } while(0);
#define STORE_INT4(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff;      *((x)+2) = ((v) >> 16) & 0xff; *((x)+3) = ((v) >> 24) & 0xff; (x) += 4; } 
while(0);
#else
#define EXTRACT_INT2(x) (uint16_t)(*(x) << 8 | *((x)+1))
#define EXTRACT_INT3(x) (uint32_t)(*(x) << 16 | *((x)+1) << 8 | *((x)+2))
#define EXTRACT_INT4(x) (uint32_t)(*(x) << 24 | *((x)+1) << 16 | *((x)+2) << 8 
| *((x)+3))

#define STORE_INT2(x,v) do { *(x) = ((v) >> 8); (v) & 0xff; *((x)+1) = (v) & 
0xff; (x) += 2; } while(0);
#define STORE_INT3(x,v) do { *(x) = ((v) >> 16) & 0xff; *((x)+1) = ((v) >> 8) & 
0xff;      *((x)+2) = (v) & 0xff; (x) += 3; } while(0);
#define STORE_INT4(x,v) do { *(x) = ((v) >> 24) & 0xff; *((x)+1) = ((v) >> 16) 
& 0xff;      *((x)+2) = ((v) >> 8) & 0xff; *((x)+3) = (v) & 0xff; (x) += 4; } while(0);
#endif

#define NGX_MY_FIELD_LCS        0x00000002
#define NGX_MY_FIELD_ZEROT      0x00000004
#define NGX_MY_FIELD_EOP        0x00000008
#define NGX_MY_FIELD_RELEVANT   0x00000010

typedef enum {
    COM_STMT_PREPARE = 0x16,
    COM_STMT_EXECUTE = 0x17,
} ngx_http_mysql_command_t;

typedef enum {
    mysql_stage_authenticate = 0,
    mysql_stage_prepare,
    mysql_stage_execute,
} ngx_mysql_stage_e;

typedef enum {
    mysql_state_packet_header,
    mysql_state_field_count,
    mysql_state_handshake,
    mysql_state_ok,
    mysql_state_error,
    mysql_state_result,
    mysql_state_ignore
} ngx_mysql_reception_state_e;

struct ngx_http_mysql_ctx_s;

typedef void (*ngx_mysql_field_handler_p)(struct ngx_http_mysql_ctx_s *ctx);
typedef ngx_int_t (*ngx_mysql_packet_handler_p)(ngx_http_request_t *r);
typedef ngx_mysql_packet_handler_p ngx_mysql_row_handler_p;

typedef struct {
    size_t                      size;
    ngx_mysql_field_handler_p   handler;
} ngx_mysql_type_t;

typedef struct {
    size_t                      size;
    off_t                       offset;
    ngx_uint_t                  flags;
    ngx_mysql_field_handler_p   handler;

    ngx_uint_t                  byte;
    ngx_uint_t                  mask;
} ngx_mysql_field_t;

typedef struct {
    ngx_mysql_field_t          *fields;
    ngx_mysql_packet_handler_p  handler;
} ngx_mysql_packet_t;

typedef struct {
    u_char                      protocol_version;
    ngx_uint_t                  server_capabilities;
    ngx_uint_t                  thread_id;
    u_char                      server_lang;
    ngx_uint_t                  server_status;
} ngx_mysql_handshake_data_t;

typedef struct {
    ngx_uint_t                  affected_rows;
    ngx_uint_t                  insert_id;
    ngx_uint_t                  server_status;
    ngx_uint_t                  warning_count;
} ngx_mysql_ok_data_t;

typedef struct {
    ngx_uint_t                  statement_id;
    ngx_uint_t                  column_count;
    ngx_uint_t                  param_count;
} ngx_mysql_prepare_data_t;

typedef struct {
    ngx_uint_t                  type;
    ngx_uint_t                  flags;
    u_char                      decimals;
    ngx_uint_t                  length;
} ngx_mysql_param_data_t;

typedef struct {
    ngx_uint_t                  _errno;
    u_char                      marker;
    u_char                      sql_state[5];
    u_char                      message[128];
} ngx_mysql_error_data_t;

typedef struct {
    ngx_str_t                   catalog;
    ngx_str_t                   db;
    ngx_str_t                   table;
    ngx_str_t                   org_table;
    ngx_str_t                   name;
    ngx_str_t                   org_name;
    ngx_uint_t                  charset_number;
    ngx_uint_t                  length;
    ngx_uint_t                  type;
    ngx_uint_t                  flags;
    u_char                      decimals;
    ngx_str_t                   _default;

    ngx_str_t                   fixed_length_part;
} ngx_mysql_column_t;

typedef struct {
    ngx_str_t                   source;
    ngx_array_t                *lengths;
    ngx_array_t                *values;
} ngx_http_mysql_param_template_t;

typedef struct {
    ngx_str_t                   name;
    ngx_str_t                   host;
    ngx_str_t                   database;
    ngx_str_t                   username;
    ngx_str_t                   password;
} ngx_http_mysql_connector_t;

typedef struct {
    ngx_str_t                   name;
    ngx_mysql_type_t           *types;
    ngx_buf_tag_t               tag;
    ngx_mysql_row_handler_p     row_handler;
} ngx_http_mysql_encoder_t;

typedef struct {
    ngx_array_t                 connectors;
    ngx_array_t                 encoders;
} ngx_http_mysql_main_conf_t;

typedef struct {
    ngx_http_upstream_conf_t    upstream;

    ngx_str_t                   query;
    ngx_array_t                *param_templates;

    ngx_str_t                   connector_source;
    ngx_array_t                *connector_lengths;
    ngx_array_t                *connector_values;

    ngx_http_mysql_encoder_t   *encoder;
} ngx_http_mysql_loc_conf_t;

typedef struct ngx_http_mysql_ctx_s {
    ngx_http_mysql_connector_t *connector;   
    ngx_http_mysql_encoder_t   *encoder;

    ngx_mysql_reception_state_e state;

    ngx_uint_t                  packet_number;
    size_t                      packet_pos;
    size_t                      packet_len;

    ngx_mysql_handshake_data_t  handshake_data;

    ngx_mysql_prepare_data_t    prepare_data;

    union {
        ngx_mysql_param_data_t      param_data;
        ngx_mysql_ok_data_t         ok_data;
        ngx_mysql_error_data_t      error_data;
        ngx_mysql_column_t         *current_column;
    };

    u_char                      scramble[SCRAMBLE_LEN];
    ngx_uint_t                  client_flags;

    ngx_uint_t                  non_null_param_count;

    off_t                       current_field_len;
    off_t                       current_field_pos;
    u_char                      accumulator[100];

    u_char                     *current_field;
    u_char                     *current_field_ptr;

    ngx_mysql_field_t          *field_descriptor;
    ngx_mysql_packet_t         *packet_descriptor;

    void                       *field_ctx;

    u_char                     *null_bitmap;
    ngx_array_t                 columns;

    ngx_buf_t                  *in_buf;

    ngx_chain_t                *in, *out, *free, *busy;

    unsigned int                handshake_is_done:1;
    unsigned int                zerot:1;
    unsigned int                ignore_until_eof:1;
    unsigned int                eof:1;
    unsigned int                end_of_packet:1;
    unsigned int                done:1;
} ngx_http_mysql_ctx_t;

static ngx_int_t ngx_http_mysql_eval_connector(ngx_http_request_t *r,
    ngx_http_mysql_ctx_t *ctx);

static void ngx_http_mysql_upstream_init(ngx_http_request_t *r);
static void ngx_http_mysql_upstream_connect(ngx_http_request_t *r,
    ngx_http_upstream_t *u);
static void ngx_http_mysql_upstream_resolve_handler(ngx_resolver_ctx_t *ctx);
static void ngx_http_mysql_upstream_handler(ngx_event_t *ev);
static void ngx_http_mysql_upstream_finalize_request(ngx_http_request_t *r,
    ngx_http_upstream_t *u, ngx_int_t rc);
static void ngx_http_mysql_upstream_next(ngx_http_request_t *r, 
ngx_http_upstream_t *u,
    ngx_uint_t ft_type);
static void ngx_http_mysql_write_handler(ngx_http_request_t *r,
    ngx_http_upstream_t *u);
static void ngx_http_mysql_read_handler(ngx_http_request_t *r,
    ngx_http_upstream_t *u);
static ngx_int_t ngx_http_mysql_process_buf(ngx_http_request_t *r, ngx_buf_t 
*buf);
static ngx_int_t ngx_http_mysql_output_filter(ngx_http_request_t *r, 
ngx_chain_t *cl);
static void ngx_http_mysql_begin_packet(ngx_http_mysql_ctx_t *ctx, 
ngx_mysql_packet_t *pd);
static ngx_int_t ngx_http_mysql_next_field(ngx_http_request_t *r, 
ngx_http_mysql_ctx_t *ctx);

static void ngx_http_mysql_scramble(u_char *p, ngx_str_t *authenticator,
    ngx_str_t *password);
static ngx_int_t ngx_http_mysql_authenticate(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_prepare(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_prepare_ok_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_column(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_param(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_execute(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_execute_ok_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_rsheader_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_error(ngx_http_request_t *r);
static ngx_int_t ngx_http_mysql_init_pipe(ngx_http_request_t *r);

static void ngx_http_mysql_process_upstream(ngx_http_request_t *r,
    ngx_http_upstream_t *u);
static void ngx_http_mysql_process_downstream(ngx_http_request_t *r);
static void ngx_http_mysql_upstream_process_request(ngx_http_request_t *r);

static void ngx_mysql_uchar_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_int2_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_int4_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_str_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_strz_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_stre_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_lcb_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_lcs_handler(ngx_http_mysql_ctx_t *ctx);
static void ngx_mysql_null_bitmap_handler(ngx_http_mysql_ctx_t *ctx);

static char *ngx_http_mysql_query_command(ngx_conf_t *cf, ngx_command_t *cmd, 
void *conf);
static char *ngx_http_mysql_params_command(ngx_conf_t *cf, ngx_command_t *cmd, 
void *conf);
static char *ngx_http_mysql_connector_block(ngx_conf_t *cf, ngx_command_t *cmd, 
void *conf);
static char *ngx_http_mysql_connector(ngx_conf_t *cf, ngx_command_t *cmd, void 
*conf);
static char *ngx_http_mysql_encoder_command(ngx_conf_t *cf, ngx_command_t *cmd, 
void *conf);

static void *ngx_http_mysql_create_main_conf(ngx_conf_t *cf);
static void *ngx_http_mysql_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_mysql_merge_loc_conf(ngx_conf_t *cf, void *parent, void 
*child);

/*
 * Length-coded binary length in bytes
 */
static ngx_uint_t ngx_mysql_lcb_length[] = {0, 0, 1, 3, 4, 9, 0};

static ngx_command_t  ngx_http_mysql_commands[] = {

    { ngx_string("mysql_query"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE2,
      ngx_http_mysql_query_command,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("mysql_params"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_1MORE,
      ngx_http_mysql_params_command,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("mysql_connector"),
      NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1|NGX_CONF_BLOCK,
      ngx_http_mysql_connector_block,
      NGX_HTTP_MAIN_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("mysql_encoder"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
      ngx_http_mysql_encoder_command,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("mysql_buffers"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE2,
      ngx_conf_set_bufs_slot,
      NGX_HTTP_LOC_CONF_OFFSET,
      offsetof(ngx_http_mysql_loc_conf_t, upstream.bufs),
      NULL },

    { ngx_string("mysql_busy_buffers_size"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
      ngx_conf_set_size_slot,
      NGX_HTTP_LOC_CONF_OFFSET,
      offsetof(ngx_http_mysql_loc_conf_t, upstream.busy_buffers_size_conf),
      NULL },

      ngx_null_command
};

static ngx_http_module_t  ngx_http_mysql_module_ctx = {
    NULL,                                  /* preconfiguration */
    NULL,                                  /* postconfiguration */

    ngx_http_mysql_create_main_conf,       /* create main configuration */
    NULL,                                  /* init main configuration */

    NULL,                                  /* create server configuration */
    NULL,                                  /* merge server configuration */

    ngx_http_mysql_create_loc_conf,        /* create location configuration */
    ngx_http_mysql_merge_loc_conf          /* merge location configuration */
};

ngx_module_t  ngx_http_mysql_module = {
    NGX_MODULE_V1,
    &ngx_http_mysql_module_ctx,            /* module context */
    ngx_http_mysql_commands,               /* module directives */
    NGX_HTTP_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    NULL,                                  /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

static ngx_mysql_field_t ngx_mysql_handshake_fields[] = {

    { sizeof(u_char), offsetof(ngx_http_mysql_ctx_t, 
handshake_data.protocol_version),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_uchar_handler },

    { sizeof(u_char), 0,
      NGX_MY_FIELD_ZEROT, ngx_mysql_strz_handler },
  
    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
handshake_data.thread_id),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int4_handler },

    { SCRAMBLE_323_LEN, offsetof(ngx_http_mysql_ctx_t, scramble),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_str_handler },
  
    { sizeof(u_char), 0,
      0, ngx_mysql_uchar_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
handshake_data.server_capabilities),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(u_char), offsetof(ngx_http_mysql_ctx_t, 
handshake_data.server_lang),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_uchar_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
handshake_data.server_status),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { 13, 0,
      0, ngx_mysql_str_handler },

    { SCRAMBLE_LEN - SCRAMBLE_323_LEN, offsetof(ngx_http_mysql_ctx_t, scramble) 
+ SCRAMBLE_323_LEN,
      NGX_MY_FIELD_RELEVANT, ngx_mysql_str_handler },

    { sizeof(u_char), 0,
      0, ngx_mysql_uchar_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_ok_fields_41[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.affected_rows),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.insert_id),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.server_status),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.warning_count),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { 0, 0,
      NGX_MY_FIELD_EOP, ngx_mysql_stre_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_prepare_ok_fields[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
prepare_data.statement_id),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int4_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
prepare_data.column_count),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
prepare_data.param_count),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { 3, 0,
      0, ngx_mysql_str_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_column_fields_41[] = {

    { sizeof(size_t), offsetof(ngx_mysql_column_t, catalog.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, catalog),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, db.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, db),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, table.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, table),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, org_table.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, org_table),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, name.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, name),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, org_name.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, org_name),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { sizeof(size_t), offsetof(ngx_mysql_column_t, fixed_length_part.len),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_lcb_handler },

    { 0, offsetof(ngx_mysql_column_t, fixed_length_part),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_LCS, ngx_mysql_lcs_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_param_fields[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_mysql_param_data_t, type),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_mysql_param_data_t, flags),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(u_char), offsetof(ngx_mysql_param_data_t, decimals),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_uchar_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_mysql_param_data_t, length),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int4_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_rsheader_fields[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, 
prepare_data.column_count),
      0, ngx_mysql_lcb_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_eof_fields[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.warning_count),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, ok_data.server_status),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_field_t ngx_mysql_error_fields[] = {

    { sizeof(ngx_uint_t), offsetof(ngx_http_mysql_ctx_t, error_data._errno),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_int2_handler },

    { sizeof(u_char), offsetof(ngx_http_mysql_ctx_t, error_data.marker),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_uchar_handler },

    { 5, offsetof(ngx_http_mysql_ctx_t, error_data.sql_state),
      NGX_MY_FIELD_RELEVANT, ngx_mysql_str_handler },

    { 128, offsetof(ngx_http_mysql_ctx_t, error_data.message),
      NGX_MY_FIELD_RELEVANT|NGX_MY_FIELD_EOP, ngx_mysql_stre_handler },

    { 0, 0, 0, NULL }
};

static ngx_mysql_packet_t ngx_mysql_handshake_packet = {
    ngx_mysql_handshake_fields,
    ngx_http_mysql_authenticate
};

static ngx_mysql_packet_t ngx_mysql_auth_ok_packet = {
    ngx_mysql_ok_fields_41,
    ngx_http_mysql_prepare
};

static ngx_mysql_packet_t ngx_mysql_prepare_ok_packet = {
    ngx_mysql_prepare_ok_fields,
    ngx_http_mysql_prepare_ok_handler
};

static ngx_mysql_packet_t ngx_mysql_column_packet = {
    ngx_mysql_column_fields_41,
    ngx_http_mysql_column
};

static ngx_mysql_packet_t ngx_mysql_param_packet = {
    ngx_mysql_param_fields,
    ngx_http_mysql_param
};

static ngx_mysql_packet_t ngx_mysql_eof_packet = {
    ngx_mysql_eof_fields,
    ngx_http_mysql_execute
};

static ngx_mysql_packet_t ngx_mysql_rsheader_packet = {
    ngx_mysql_rsheader_fields,
    ngx_http_mysql_rsheader_handler
};

static ngx_mysql_packet_t ngx_mysql_execute_ok_packet = {
    ngx_mysql_ok_fields_41,
    ngx_http_mysql_execute_ok_handler
};

static ngx_mysql_packet_t ngx_mysql_error_packet = {
    ngx_mysql_error_fields,
    ngx_http_mysql_error
};

static ngx_int_t
ngx_http_mysql_handler(ngx_http_request_t *r)
{
    ngx_int_t                       rc;
    ngx_http_upstream_t            *u;
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;

    if(!(r->method & NGX_HTTP_GET)) {
        return NGX_DECLINED;
    }

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#endif

    u->conf = &mycf->upstream;

    r->upstream = u;

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);
    
    if(ctx == NULL) {
        ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_mysql_ctx_t));
        if (ctx == NULL) {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        ctx->encoder = mycf->encoder;
        ctx->state = mysql_state_packet_header;
        ctx->packet_pos = 0;

        ngx_http_set_ctx(r, ctx, ngx_http_mysql_module);
    }

    rc = ngx_http_mysql_eval_connector(r, ctx);

    if(rc == NGX_ERROR) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    /*
     * No connectors found
     */
    if(rc == NGX_DECLINED) {
        return NGX_DECLINED;
    }

    u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
    if (u->pipe == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u->pipe->input_filter = ngx_event_pipe_copy_input_filter;

#if defined nginx_version && nginx_version >= 8011
    r->main->count++;
#endif

    ngx_http_mysql_upstream_init(r);

    return NGX_DONE;
}

static ngx_int_t
ngx_http_mysql_eval_connector(ngx_http_request_t *r, ngx_http_mysql_ctx_t *ctx)
{
    ngx_str_t                   name;
    ngx_uint_t                  i;
    ngx_http_upstream_t        *u;
    ngx_http_mysql_main_conf_t *mmcf;
    ngx_http_mysql_loc_conf_t  *mycf;
    ngx_http_mysql_connector_t **mc;

    mmcf = ngx_http_get_module_main_conf(r, ngx_http_mysql_module);

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    if(mycf->connector_lengths != NULL && mycf->connector_values != NULL) {
        if(ngx_http_script_run(r, &name, mycf->connector_lengths->elts, 0,
                                mycf->connector_values->elts)
            == NULL)
        {
            return NGX_ERROR;
        }
    }
    else {
        name = mycf->connector_source;
    }

    if(mmcf->connectors.nelts == 0) {
        return NGX_DECLINED;
    }

    mc = mmcf->connectors.elts;

    for(i = 0; i < mmcf->connectors.nelts; i++) {
        if (mc[i]->name.len == name.len
            && ngx_memcmp(mc[i]->name.data, name.data, name.len) == 0)
        {
            ctx->connector = mc[i];
            break;
        }
    }

    if(i == mmcf->connectors.nelts) {
        return NGX_ERROR;
    }

    u = r->upstream;

    u->resolved = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_resolved_t));
    if(u->resolved == NULL) {
        return NGX_ERROR;
    }

    u->resolved->host = ctx->connector->host;
    u->resolved->no_port = 1;

    return NGX_OK;
}

static void ngx_http_mysql_upstream_init(ngx_http_request_t *r)
{
    ngx_str_t                      *host;
    ngx_uint_t                      i;
    ngx_connection_t               *c;
    ngx_resolver_ctx_t             *ctx, temp;
    ngx_http_cleanup_t             *cln;
    ngx_http_upstream_t            *u;
    ngx_http_core_loc_conf_t       *clcf;
    ngx_http_upstream_srv_conf_t   *uscf, **uscfp;
    ngx_http_upstream_main_conf_t  *umcf;

    c = r->connection;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "mysql init upstream, client timer: %d", c->read->timer_set);

    if (c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    u = r->upstream;

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    u->output.pool = r->pool;
    u->output.bufs.num = 1;
    u->output.bufs.size = clcf->client_body_buffer_size;
    u->output.output_filter = ngx_chain_writer;
    u->output.filter_ctx = &u->writer;

    u->writer.pool = r->pool;

    cln = ngx_http_cleanup_add(r, 0);
    if (cln == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

// TODO:    cln->handler = ngx_http_mysql_upstream_cleanup;
    cln->data = r;
    u->cleanup = &cln->handler;

    /*
     * If 
     */
    if (u->resolved->sockaddr) {

        if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
            != NGX_OK)
        {
            ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        goto found;
    }

    host = &u->resolved->host;

    umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

    uscfp = umcf->upstreams.elts;

    for (i = 0; i < umcf->upstreams.nelts; i++) {

        uscf = uscfp[i];

        if (uscf->host.len == host->len
            && ((uscf->port == 0 && u->resolved->no_port)
                 || uscf->port == u->resolved->port)
            && ngx_memcmp(uscf->host.data, host->data, host->len) == 0)
        {
            goto found;
        }
    }

    temp.name = *host;

    ctx = ngx_resolve_start(clcf->resolver, &temp);
    if (ctx == NULL) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if (ctx == NGX_NO_RESOLVER) {
        ngx_log_error(NGX_LOG_ERR, c->log, 0,
                      "no resolver defined to resolve %V", host);

        ngx_http_finalize_request(r, NGX_HTTP_BAD_GATEWAY);
        return;
    }

    ctx->name = *host;
    ctx->type = NGX_RESOLVE_A;
    ctx->handler = ngx_http_mysql_upstream_resolve_handler;
    ctx->data = r;
    ctx->timeout = clcf->resolver_timeout;

    u->resolved->ctx = ctx;

    if (ngx_resolve_name(ctx) != NGX_OK) {
        u->resolved->ctx = NULL;
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    return;

found:

    if(uscf->peer.init(r, uscf) != NGX_OK) {
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_http_mysql_upstream_connect(r, u);
}

static void
ngx_http_mysql_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t               rc;
    ngx_connection_t       *c;

    r->connection->log->action = "connecting to mysql";

    r->connection->single_connection = 0;

    rc = ngx_event_connect_peer(&u->peer);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "mysql connect: %i", rc);

    if (rc == NGX_ERROR) {
        ngx_http_mysql_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

//    u->state->peer = u->peer.name;

    if (rc == NGX_BUSY) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams");
        ngx_http_mysql_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE);
        return;
    }

    if (rc == NGX_DECLINED) {
        ngx_http_mysql_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    /* rc == NGX_OK || rc == NGX_AGAIN */

    c = u->peer.connection;

    c->data = r;

    c->write->handler = c->read->handler = ngx_http_mysql_upstream_handler;
    u->write_event_handler = ngx_http_mysql_write_handler;
    u->read_event_handler = ngx_http_mysql_read_handler;

    c->pool = r->pool;
    c->log = r->connection->log;
    c->read->log = c->log;
    c->write->log = c->log;

    /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */

    u->writer.out = NULL;
    u->writer.last = &u->writer.out;
    u->writer.connection = c;
    u->writer.limit = 0;

    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->connect_timeout);
        return;
    }
}

static void
ngx_http_mysql_upstream_resolve_handler(ngx_resolver_ctx_t *ctx)
{
    ngx_http_request_t            *r;
    ngx_http_upstream_resolved_t  *ur;

    r = ctx->data;

    r->upstream->resolved->ctx = NULL;

    if (ctx->state) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                      "%V could not be resolved (%i: %s)",
                      &ctx->name, ctx->state,
                      ngx_resolver_strerror(ctx->state));

        ngx_resolve_name_done(ctx);
        ngx_http_finalize_request(r, NGX_HTTP_BAD_GATEWAY);
        return;
    }

    ur = r->upstream->resolved;
    ur->naddrs = ctx->naddrs;
    ur->addrs = ctx->addrs;

    if (ngx_http_upstream_create_round_robin_peer(r, ur) != NGX_OK) {
        ngx_resolve_name_done(ctx);
        ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_resolve_name_done(ctx);

    ngx_http_mysql_upstream_connect(r, r->upstream);
}

static void
ngx_http_mysql_upstream_handler(ngx_event_t *ev)
{
    ngx_connection_t     *c;
    ngx_http_request_t   *r;
    ngx_http_log_ctx_t   *ctx;
    ngx_http_upstream_t  *u;

    c = ev->data;
    r = c->data;

    u = r->upstream;
    c = r->connection;

    ctx = c->log->data;
    ctx->current_request = r;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "mysql upstream handler: \"%V?%V\"", &r->uri, &r->args);

    if (ev->write) {
        u->write_event_handler(r, u);

    } else {
        u->read_event_handler(r, u);
    }

    ngx_http_run_posted_requests(c);
}

static void
ngx_http_mysql_upstream_finalize_request(ngx_http_request_t *r,
    ngx_http_upstream_t *u, ngx_int_t rc)
{
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "finalize mysql upstream request: %i", rc);

    if (u->cleanup) {
        *u->cleanup = NULL;
    }

    if (u->peer.free) {
        u->peer.free(&u->peer, u->peer.data, 0);
    }

    if (u->peer.connection) {

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "close mysql upstream connection: %d",
                       u->peer.connection->fd);

        ngx_close_connection(u->peer.connection);
    }

    u->peer.connection = NULL;

    if (u->pipe && u->pipe->temp_file) {
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "mysql upstream temp fd: %d",
                       u->pipe->temp_file->file.fd);
    }

    if (u->header_sent
        && (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE))
    {
        rc = 0;
    }

    if (rc == NGX_DECLINED) {
        return;
    }

    r->connection->log->action = "sending to client";

    if (rc == 0 && r == r->main && !r->post_action) {
        rc = ngx_http_send_special(r, NGX_HTTP_LAST);
    }

    ngx_http_finalize_request(r, rc);
}

static void
ngx_http_mysql_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u,
    ngx_uint_t ft_type)
{
    ngx_uint_t  status, state;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "mysql next upstream, %xi", ft_type);

    if (ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_404) {
        state = NGX_PEER_NEXT;
    } else {
        state = NGX_PEER_FAILED;
    }

    if (ft_type != NGX_HTTP_UPSTREAM_FT_NOLIVE) {
        u->peer.free(&u->peer, u->peer.data, state);
    }

    if (ft_type == NGX_HTTP_UPSTREAM_FT_TIMEOUT) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, NGX_ETIMEDOUT,
                      "mysql connection timed out");
    }

    if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) {
        status = 0;

    } else {
        switch(ft_type) {

        case NGX_HTTP_UPSTREAM_FT_TIMEOUT:
            status = NGX_HTTP_GATEWAY_TIME_OUT;
            break;

        case NGX_HTTP_UPSTREAM_FT_HTTP_500:
            status = NGX_HTTP_INTERNAL_SERVER_ERROR;
            break;

        case NGX_HTTP_UPSTREAM_FT_HTTP_404:
            status = NGX_HTTP_NOT_FOUND;
            break;
        default:
            status = NGX_HTTP_BAD_GATEWAY;
        }
    }

    if (r->connection->error) {
        ngx_http_mysql_upstream_finalize_request(r, u,
                                           NGX_HTTP_CLIENT_CLOSED_REQUEST);
        return;
    }

    if (status) {
        if (u->peer.tries == 0 || !(u->conf->next_upstream & ft_type)) {

            ngx_http_mysql_upstream_finalize_request(r, u, status);
            return;
        }
    }

    if (u->peer.connection) {
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "close mysql connection: %d",
                       u->peer.connection->fd);

        ngx_close_connection(u->peer.connection);
    }

    ngx_http_mysql_upstream_connect(r, u);
}

static ngx_int_t
ngx_http_mysql_send_chain(ngx_http_request_t *r, ngx_http_upstream_t *u, 
ngx_chain_t *chain)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "mysql send chain");

    /*
     * Unset TCP push flag if set
     */
    if(c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            return NGX_ERROR;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }

    /*
     * If event is not active, activate it
     */
    if(!c->write->active) {
        if(ngx_add_event(c->write, NGX_WRITE_EVENT, 0) != NGX_OK) {
            return NGX_ERROR;
        }
    }

    rc = ngx_output_chain(&u->output, chain);

    if (rc == NGX_ERROR) {
        return NGX_ERROR;
    }

    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }

    if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);

        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            return NGX_ERROR;
        }

        return NGX_AGAIN;
    }

    /* rc == NGX_OK */

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        return NGX_ERROR;
    }

    return NGX_OK;
}

static void
ngx_http_mysql_write_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;

    rc = ngx_http_mysql_send_chain(r, u, NULL);

    if(rc == NGX_ERROR) {
        ngx_http_mysql_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if(rc >= NGX_HTTP_SPECIAL_RESPONSE) {
        ngx_http_mysql_upstream_finalize_request(r, u, rc);
        return;
    }
}

static void
ngx_http_mysql_read_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;

    /*
     * Allocate input buffer if it is not allocated yet
     */
    if (u->buffer.start == NULL) {
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);

        if (u->buffer.start == NULL) {
            ngx_http_mysql_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;
    }

    do {

        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        if (n == NGX_AGAIN) {
            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_mysql_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }

        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "server has prematurely closed the connection");
        }

        if (n == NGX_ERROR || n == 0) {
            ngx_http_mysql_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        u->buffer.last += n;

        rc = ngx_http_mysql_process_buf(r, &u->buffer);

        if((rc == NGX_OK && u->buffer.pos == u->buffer.last) || rc == NGX_BUSY) 
{
            u->buffer.pos = u->buffer.last = u->buffer.start;
        }

    } while(rc == NGX_AGAIN);

    if(rc == NGX_BUSY) {
        /*
         * Write is in progress, must wait with reading
         */
        if(ngx_del_event(c->write, NGX_READ_EVENT, 0) != NGX_OK) {
            ngx_http_mysql_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
        return;
    }

    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_mysql_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    if (rc == NGX_ERROR) {
        ngx_http_mysql_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    if(rc >= NGX_HTTP_SPECIAL_RESPONSE) {
        ngx_http_mysql_upstream_finalize_request(r, u, rc);
        return;
    }
}

static ngx_int_t
ngx_http_mysql_change_pipe_direction(ngx_int_t rc)
{
    if(rc == NGX_AGAIN) {
        return NGX_BUSY;
    }
    
    if(rc == NGX_BUSY) {
        return NGX_AGAIN;
    }

    return rc;
}

static ngx_int_t
ngx_http_mysql_process_buf(ngx_http_request_t *r, ngx_buf_t *buf) {
    ngx_int_t             rc;
    ngx_http_mysql_ctx_t *ctx = ngx_http_get_module_ctx(r, 
ngx_http_mysql_module);

    ctx->in_buf = buf;

    for(; buf->pos != buf->last ; buf->pos++) {
        switch(ctx->state) {
            case mysql_state_packet_header: /* {{{ */
                if(ctx->current_field_pos == 0) {
                    ctx->current_field_len = PACKET_HEADER_LEN;
                    ctx->current_field_ptr = ctx->current_field = 
ctx->accumulator;

                    ctx->end_of_packet = 0;
                }

                *ctx->current_field_ptr++ = *buf->pos;
                ctx->current_field_pos++;
                
                if(ctx->current_field_pos == ctx->current_field_len) {
                    ctx->current_field_pos = 0;

                    ctx->packet_pos = 0;
                    ctx->packet_len = EXTRACT_INT3(ctx->current_field);
                    ctx->packet_number = ctx->current_field[3];

                    if(!ctx->handshake_is_done) {
                        ngx_http_mysql_begin_packet(ctx, 
&ngx_mysql_handshake_packet);
                        ctx->state = mysql_state_handshake;
                    }
                    else {
                        ctx->state = ctx->ignore_until_eof ? mysql_state_ignore 
: mysql_state_field_count;
                    }
                }
                break; /* }}} */
            case mysql_state_field_count: /* {{{ */
                if(*buf->pos == OK_PACKET) {
                    ctx->state = mysql_state_ok;
                    ctx->packet_pos++;
                    break;
                } else if(*buf->pos == ERROR_PACKET) {
                    ngx_http_mysql_begin_packet(ctx, &ngx_mysql_error_packet);
                    ctx->state = mysql_state_error;
                    ctx->packet_pos++;
                    break;
                } else if(*buf->pos == EOF_PACKET && ctx->packet_len <= 
MIN_PACKET_LEN) {
                    ngx_http_mysql_begin_packet(ctx, &ngx_mysql_eof_packet);
                    ctx->state = mysql_state_ok;
                    ctx->packet_pos++;
                    ctx->eof = 1;
                    break;
                } else {
                    ctx->state = mysql_state_result;
                    /*
                     * Fall through
                     */
                } /* }}} */
            case mysql_state_ok:
            case mysql_state_error:
            case mysql_state_result:
            case mysql_state_handshake: /* {{{ */
                if(ctx->current_field_pos == 0)
                {
                    if(!(ctx->field_descriptor->flags & NGX_MY_FIELD_ZEROT)) {
                        /*
                         * Call field handler first time to set up the length
                         */
                        ctx->field_descriptor->handler(ctx);

                        if(ctx->current_field_len == 0) {
                            return NGX_ERROR;
                        }

                        if(ctx->packet_pos + ctx->current_field_len > 
ctx->packet_len) {
                            return NGX_ERROR;
                        }
                    }

                    ctx->zerot = ctx->field_descriptor->flags & 
NGX_MY_FIELD_ZEROT ? 1 : 0;

                    if(ctx->field_descriptor->flags & NGX_MY_FIELD_LCS) {
                        ctx->current_field_ptr = ctx->current_field = 
ngx_palloc(r->pool, ctx->current_field_len);

                        if(ctx->current_field == NULL) {
                            return NGX_ERROR;
                        }
                    }
                    else {
                        ctx->current_field_ptr = ctx->current_field = 
ctx->accumulator;
                    }
                }

                *ctx->current_field_ptr++ = *buf->pos;
                ctx->current_field_pos++;
                ctx->packet_pos++;

                if(ctx->packet_pos > ctx->packet_len) {
                    return NGX_ERROR;
                }
                
                if((!ctx->zerot && ctx->current_field_pos == 
ctx->current_field_len)
                    || (ctx->zerot && *buf->pos == '\0'))
                {
                    rc = ngx_http_mysql_next_field(r, ctx);

                    if(rc != NGX_OK) {
                        return rc;
                    }
                }
                break; /* }}} */
            case mysql_state_ignore: /* {{{ */
                if(ctx->packet_pos == 0) {
                    ctx->eof = (*buf->pos == EOF_PACKET && ctx->packet_len <= 
MIN_PACKET_LEN);
                }

                ctx->packet_pos++;

                if(ctx->packet_pos == ctx->packet_len) {
                    ctx->current_field_pos = 0;
                    ctx->state = mysql_state_packet_header;
                    if(ctx->eof) {
                        ctx->ignore_until_eof = ctx->eof = 0;
                    }
                }
                break; /* }}} */
        }
    }

    return ctx->end_of_packet ? NGX_OK : NGX_AGAIN;
}

static ngx_int_t
ngx_http_mysql_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t rc;
    ngx_http_mysql_ctx_t *ctx = ngx_http_get_module_ctx(r, 
ngx_http_mysql_module);

    if(in) {
        if(ngx_chain_add_copy(r->pool, &ctx->in, in) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    for(;;) {
        /*
         * First feed input into encoder and produce as much
         * output as possible
         */
        while(ctx->in != NULL) {
            rc = ngx_http_mysql_process_buf(r, ctx->in->buf);

            if(rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
                return NGX_ERROR;
            }

            if(rc == NGX_BUSY) {
                break;
            }

            if(ctx->in->buf->pos == ctx->in->buf->last) {
                ctx->in = ctx->in->next;
            }
        }

        /*
         * Now write the output
         */
        rc = ngx_http_output_filter(r, ctx->out);

        if(rc == NGX_ERROR) {
            goto failed;
        }

        ngx_chain_update_chains(&ctx->free, &ctx->busy, &ctx->out, 
ctx->encoder->tag);

        if(ctx->done) {
            return rc;
        }
    }

failed:
    ctx->done = 1;

    return NGX_OK;
}

static void
ngx_http_mysql_begin_packet(ngx_http_mysql_ctx_t *ctx, ngx_mysql_packet_t *pd) {
    ctx->packet_descriptor = pd;
    ctx->field_descriptor = ctx->packet_descriptor->fields;
    ctx->field_ctx = ctx;
}

static ngx_int_t
ngx_http_mysql_finish_packet(ngx_http_request_t *r, ngx_http_mysql_ctx_t *ctx) {
    ngx_int_t rc;

    if(ctx->packet_pos != ctx->packet_len) {
        return NGX_ERROR;
    }

    ctx->state = mysql_state_packet_header;

    /*
     * Call packet handler
     */
    rc = ctx->packet_descriptor->handler(r);

    ctx->end_of_packet = 1;

    if(rc != NGX_OK) {
        return rc;
    }

    return NGX_OK;
}

static ngx_int_t
ngx_http_mysql_next_field(ngx_http_request_t *r, ngx_http_mysql_ctx_t *ctx) {
    ngx_int_t                rc;
    ngx_mysql_field_t       *fd = ctx->field_descriptor;

    /*
     * Call field handler second time to extract field value
     */
    fd->handler(ctx);

    fd++;

    /*
     * Advance through non-null fields
     */
    if(ctx->null_bitmap) {
        do {
            fd++;
        }while(ctx->null_bitmap[fd->byte] & fd->mask && fd->handler != NULL);
    }
    
    ctx->current_field_pos = 0;

    /*
     * If we are at the end of the packet, skip all remaining
     * fields which span until the end of packed, since they are
     * all empty
     */
    if(ctx->packet_pos == ctx->packet_len) {
        while(fd->flags & NGX_MY_FIELD_EOP) {
            fd++;
        }
    }

    ctx->field_descriptor = fd;

    if(fd->handler == NULL) {
        rc = ngx_http_mysql_finish_packet(r, ctx);

        if(rc != NGX_OK) {
            return rc;
        }

        if(ctx->eof) {
            ctx->done = 1;
        }
    }

    return NGX_OK;
}

/*
 * See scramble() in libmysql/password.c
 */
static void
ngx_http_mysql_scramble(u_char *p, ngx_str_t *authenticator, ngx_str_t 
*password)
{
    SHA_CTX sha1_context;
    u_char hash_stage1[SHA_DIGEST_LENGTH];
    u_char hash_stage2[SHA_DIGEST_LENGTH];
    u_char *q;

    SHA1_Init(&sha1_context);
    /* stage 1: hash password */
    SHA1_Update(&sha1_context, password->data, password->len);
    SHA1_Final(hash_stage1, &sha1_context);
    /* stage 2: hash stage 1; note that hash_stage2 is stored in the database */
    SHA1_Init(&sha1_context);
    SHA1_Update(&sha1_context, hash_stage1, SHA_DIGEST_LENGTH);
    SHA1_Final(hash_stage2, &sha1_context);
    /* create crypt string as sha1(message, hash_stage2) */;
    SHA1_Init(&sha1_context);
    SHA1_Update(&sha1_context, authenticator->data, authenticator->len);
    SHA1_Update(&sha1_context, hash_stage2, SHA_DIGEST_LENGTH);
    /* xor allows 'from' and 'to' overlap: lets take advantage of it */
    SHA1_Final(p, &sha1_context);

    for(q = hash_stage1; q != hash_stage1 + SCRAMBLE_LEN; p++, q++)
        *p = *p ^ *q;
}

static ngx_int_t
ngx_http_mysql_authenticate(ngx_http_request_t *r)
{
    size_t                          len;
    ngx_buf_t                      *b;
    ngx_chain_t                    *cl;
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;
    ngx_http_mysql_connector_t     *c;
    ngx_str_t                       authenticator;

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    r->connection->log->action = "loggin on to mysql";

    ngx_log_debug6(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql authenticate[%d]: protcol_version=%d, tid=%d, sc=%xi, sl=%d, 
ss=%d",
        ctx->packet_number,
        ctx->handshake_data.protocol_version,
        ctx->handshake_data.thread_id,
        ctx->handshake_data.server_capabilities,
        ctx->handshake_data.server_lang,
        ctx->handshake_data.server_status
        );

    ctx->client_flags = ctx->handshake_data.server_capabilities
        & ~(CLIENT_SSL|CLIENT_COMPRESS);

    ctx->client_flags |= CLIENT_CAPABILITIES;

    ctx->handshake_is_done = 1;

    ngx_http_mysql_begin_packet(ctx, &ngx_mysql_auth_ok_packet);

    c = ctx->connector;

    authenticator.data = ctx->scramble;
    authenticator.len = SCRAMBLE_LEN;

    len = PACKET_HEADER_LEN + 4 + 4 + 1 + 23 + c->username.len + 1 + 1 + 
SCRAMBLE_LEN +
         c->database.len + 1;

    b = ngx_create_temp_buf(r->pool, len);
    if (b == NULL) {
        return NGX_ERROR;
    }

    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf = b;
    cl->next = NULL;

    if(!ctx->client_flags & CLIENT_PROTOCOL_41) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    STORE_INT3(b->last, len - PACKET_HEADER_LEN);

    *b->last++ = ++ctx->packet_number;

    STORE_INT4(b->last, ctx->client_flags);
    STORE_INT4(b->last, 1024L*1024L*1024L);

    *b->last++ = ctx->handshake_data.server_lang;

    ngx_memzero(b->last, 23);

    b->last += 23;

    b->last = ngx_copy(b->last, c->username.data, c->username.len); 

    *b->last++ = '\0';

    *b->last++ = SCRAMBLE_LEN;

    ngx_http_mysql_scramble(b->last, &authenticator, &c->password);

    b->last += SCRAMBLE_LEN;

    if(c->database.len && (ctx->handshake_data.server_capabilities & 
CLIENT_CONNECT_WITH_DB)) {
        b->last = ngx_copy(b->last, c->database.data, c->database.len); 

        *b->last++ = '\0';
    }

    return ngx_http_mysql_change_pipe_direction(
        ngx_http_mysql_send_chain(r, r->upstream, cl));
}

static ngx_int_t
ngx_http_mysql_prepare(ngx_http_request_t *r)
{
    ngx_buf_t                      *b;
    ngx_chain_t                     cl[2];
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    r->connection->log->action = "preparing SQL statement";

    ngx_log_debug5(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql prepare[%d]: affected_rows=%d, insert_id=%d, ss=%d, wc=%d",
        ctx->packet_number,
        ctx->ok_data.affected_rows,
        ctx->ok_data.insert_id,
        ctx->ok_data.server_status,
        ctx->ok_data.warning_count
        );

    ctx->packet_number = 0;

    ngx_http_mysql_begin_packet(ctx, &ngx_mysql_prepare_ok_packet);

    b = ngx_create_temp_buf(r->pool, PACKET_HEADER_LEN + 1 + mycf->query.len);
    if (b == NULL) {
        return NGX_ERROR;
    }

    cl[0].buf = b;
    cl[0].next = &cl[1];

    STORE_INT3(b->last, 1 + mycf->query.len);

    *b->last++ = ctx->packet_number;

    *b->last++ = COM_STMT_PREPARE;

    b = ngx_calloc_buf(r->pool);
    if (b == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    cl[1].buf = b;
    cl[1].next = NULL;

    b->memory = 1;
    b->pos = b->start = mycf->query.data;
    b->last = b->end = mycf->query.data + mycf->query.len;

    return ngx_http_mysql_change_pipe_direction(
        ngx_http_mysql_send_chain(r, r->upstream, cl));
}

static ngx_int_t
ngx_http_mysql_prepare_ok_handler(ngx_http_request_t *r)
{
    ngx_http_mysql_ctx_t           *ctx;
    ngx_mysql_column_t             *c;

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql prepared[%d]: statement_id=%d, column_count=%d, param_count=%d",
        ctx->packet_number,
        ctx->prepare_data.statement_id,
        ctx->prepare_data.column_count,
        ctx->prepare_data.param_count
        );

    if(ctx->prepare_data.param_count != 0) {
        ngx_http_mysql_begin_packet(ctx, &ngx_mysql_param_packet);
        ctx->field_ctx = &ctx->param_data;
        return NGX_OK;
    }

    if(ctx->prepare_data.column_count != 0) {

        /*
         * Allocate storage for columns
         */
        if(ngx_array_init(&ctx->columns, r->pool, 
ctx->prepare_data.column_count,
            sizeof(ngx_mysql_column_t)) != NGX_OK)
        {
            return NGX_ERROR;
        }

        c = ngx_array_push(&ctx->columns);

        if(c == NULL) {
            return NGX_ERROR;
        }

        ngx_http_mysql_begin_packet(ctx, &ngx_mysql_column_packet);

        ctx->field_ctx = ctx->current_column = c;

        return NGX_OK;
    }

    return ngx_http_mysql_execute(r);
}

static ngx_int_t
ngx_http_mysql_param(ngx_http_request_t *r)
{
    ngx_http_mysql_ctx_t           *ctx;

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    ngx_log_debug5(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql param[%d]: type=%xu, flags=%xu, decimals=%d, length=%d",
        ctx->packet_number,
        ctx->param_data.type,
        ctx->param_data.flags,
        ctx->param_data.decimals,
        ctx->param_data.length
        );

    ngx_http_mysql_begin_packet(ctx, &ngx_mysql_param_packet);

    return NGX_OK;
}

static ngx_int_t
ngx_http_mysql_column(ngx_http_request_t *r)
{
    ngx_http_mysql_ctx_t           *ctx;
    ngx_mysql_column_t             *c;
    u_char                         *p;

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    c = ctx->current_column;

    /*
     * Ensure fixed length part is of expected length
     */
    if(c->fixed_length_part.len != 12) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
            "fixed length part has unexpected length: %ui, column_name=%V",
            c->fixed_length_part.len,
            c->name);
        return NGX_ERROR;
    }

    /*
     * Decode fixed length part
     */
    p = c->fixed_length_part.data;

    c->charset_number = EXTRACT_INT2(p);
    c->length = EXTRACT_INT4(p + 2);
    c->type = *(p+6);
    c->flags = EXTRACT_INT2(p+7);
    c->decimals = *(p+9);

    ngx_log_debug5(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql column[%d]: idx=%d, name=\"%V\", type=%d, length=%d",
        ctx->packet_number,
        ctx->columns.nelts,
        &c->name,
        c->type,
        c->length
        );

    if(ctx->columns.nelts < ctx->prepare_data.column_count) {

        c = ngx_array_push(&ctx->columns);

        if(c == NULL) {
            return NGX_ERROR;
        }

        ngx_http_mysql_begin_packet(ctx, &ngx_mysql_column_packet);

        ctx->field_ctx = ctx->current_column = c;

        return NGX_OK;
    }

    if(ctx->prepare_data.param_count != 0) {
        ngx_http_mysql_begin_packet(ctx, &ngx_mysql_param_packet);
        return NGX_OK;
    }

    return NGX_OK;
}

static ngx_int_t
ngx_http_mysql_execute(ngx_http_request_t *r)
{
    size_t                          len;
    ngx_buf_t                      *b;
    ngx_chain_t                     cl[2];
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    r->connection->log->action = "executing SQL statement";

    ngx_http_mysql_begin_packet(ctx, ctx->columns.nelts > 0 ?
        &ngx_mysql_rsheader_packet : &ngx_mysql_execute_ok_packet);

    ctx->packet_number = 0;

    len = 4 + 1 + 4 + (ctx->prepare_data.param_count+7)/8 + 1
        + ctx->non_null_param_count * 2;

    b = ngx_create_temp_buf(r->pool, len);
    if (b == NULL) {
        return NGX_ERROR;
    }

    cl[1].buf = b;
    cl[1].next = NULL;

    /*
     * statement_id
     */
    STORE_INT4(b->last, ctx->prepare_data.statement_id);

    /*
     * flags
     */
    *b->last++ = 0;

    /*
     * iteration_count
     */
    STORE_INT4(b->last, 1);

/*
    if(ngx_http_mysql_store_null_bitmap(b, r, ctx) != NGX_OK) {
        return NGX_ERROR;
    }
*/

    /*
     * new_parameter_bound_flag
     */
    *b->last++ = ctx->prepare_data.param_count > 0 ? 1 : 0;

/*
    if(ngx_http_mysql_store_param_types(b, r, ctx) != NGX_OK) {
        return NGX_ERROR;
    }

    if(ngx_http_mysql_store_params(b, r, ctx) != NGX_OK) {
        return NGX_ERROR;
    }
*/

    b = ngx_create_temp_buf(r->pool, PACKET_HEADER_LEN + 1);
    if (b == NULL) {
        return NGX_ERROR;
    }

    cl[0].buf = b;
    cl[0].next = &cl[1];

    STORE_INT3(b->last, len + 1);

    *b->last++ = ctx->packet_number;

    *b->last++ = COM_STMT_EXECUTE;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql execute[%d]", ctx->packet_number);

    return ngx_http_mysql_change_pipe_direction(
        ngx_http_mysql_send_chain(r, r->upstream, cl));
}

static ngx_int_t
ngx_http_mysql_execute_ok_handler(ngx_http_request_t *r)
{
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    ngx_log_debug5(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql ok[%d]: affected_rows=%d, insert_id=%d, ss=%d, wc=%d",
        ctx->packet_number,
        ctx->ok_data.affected_rows,
        ctx->ok_data.insert_id,
        ctx->ok_data.server_status,
        ctx->ok_data.warning_count
        );

    return NGX_OK;
}

static ngx_int_t
ngx_http_mysql_rsheader_handler(ngx_http_request_t *r)
{
    ngx_http_mysql_loc_conf_t      *mycf;
    ngx_http_mysql_ctx_t           *ctx;
    ngx_mysql_type_t               *type;
    ngx_mysql_field_t              *fields, *f;
    ngx_mysql_packet_t             *packet;
    ngx_mysql_column_t             *column;
    ngx_uint_t                      i;
    ngx_int_t                       rc;

    mycf = ngx_http_get_module_loc_conf(r, ngx_http_mysql_module);

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
        "mysql result set header[%d]: column_count=%d",
        ctx->packet_number,
        ctx->prepare_data.column_count
        );

    /*
     * Create a custom definition for row packet on the fly
     */
    fields = ngx_palloc(r->pool, (1 + ctx->columns.nelts) * 
sizeof(ngx_mysql_field_t));

    if(fields == NULL) {
        return NGX_ERROR;
    }

    packet = ngx_palloc(r->pool, sizeof(ngx_mysql_packet_t));

    if(packet == NULL) {
        return NGX_ERROR;
    }

    ctx->null_bitmap = ngx_palloc(r->pool, ((ctx->columns.nelts + 9) / 8) * 
sizeof(u_char));

    if(ctx->null_bitmap == NULL) {
        return NGX_ERROR;
    }

    f = fields;

    /*
     * A special field descriptor for NULL bitmap
     */
    f->flags = NGX_MY_FIELD_RELEVANT;
    f->size = sizeof(u_char);
    f->offset = 0;
    f->handler = ngx_mysql_null_bitmap_handler;
    f++;

    /*
     * The rest of the columns
     */
    column = ctx->columns.elts;

    for(i=0;i<ctx->columns.nelts;i++) {
        type = ctx->encoder->types + column[i].type;

        f->flags = NGX_MY_FIELD_RELEVANT;
        f->size = type->size;
        f->offset = 0;
        f->handler = type->handler;
        f++;
    }

    packet->fields = fields;
    packet->handler = ctx->encoder->row_handler;

    ngx_http_mysql_begin_packet(ctx, packet);

    r->headers_out.status = 200;

    rc = ngx_http_send_header(r);

    if (rc == NGX_ERROR || rc > NGX_OK) {
        return rc;
    }

    /*
     * Ignore column definition and then start
     * processing the rows
     */
    ctx->ignore_until_eof = 1;

    return ngx_http_mysql_init_pipe(r);
}

static ngx_int_t
ngx_http_mysql_init_pipe(ngx_http_request_t *r)
{
    ngx_http_upstream_t       *u = r->upstream;
    ngx_event_pipe_t          *p;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

    c = r->connection;

    p = u->pipe;

    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter 
/* ngx_http_mysql_output_filter */ ;
    p->output_ctx = r;
    p->tag = u->output.tag;
    p->bufs = u->conf->bufs;
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;
    p->downstream = c;
    p->pool = r->pool;
    p->log = c->log;

    p->cacheable = 0;

    p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
    if (p->temp_file == NULL) {
        return NGX_ERROR;
    }

    p->temp_file->file.fd = NGX_INVALID_FILE;
    p->temp_file->file.log = c->log;
    p->temp_file->path = u->conf->temp_path;
    p->temp_file->pool = r->pool;
    p->temp_file->log_level = NGX_LOG_WARN;
    p->temp_file->warn = "mysql result set is buffered "
                         "into a temporary file";

    p->max_temp_file_size = u->conf->max_temp_file_size;
    p->temp_file_write_size = u->conf->temp_file_write_size;

    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        return NGX_ERROR;
    }

    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;

    p->preread_size = u->buffer.last - u->buffer.pos;

    if (ngx_event_flags & NGX_USE_AIO_EVENT) {
        /* the posted aio operation may currupt a shadow buffer */
        p->single_buf = 1;
    }

    /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
    p->free_bufs = 1;

    /*
     * event_pipe will do u->buffer.last += p->preread_size
     * as if these bytes were read
     */
    u->buffer.last = u->buffer.pos;

    p->cyclic_temp_file = 0;

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    p->read_timeout = u->conf->read_timeout;
    p->send_timeout = clcf->send_timeout;
    p->send_lowat = clcf->send_lowat;

    u->read_event_handler = ngx_http_mysql_process_upstream;
    r->write_event_handler = ngx_http_mysql_process_downstream;

    ngx_http_mysql_process_upstream(r, u);

    return NGX_DONE;
}

static void
ngx_http_mysql_process_upstream(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    ngx_connection_t  *c;

    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http mysql process upstream");

    c->log->action = "reading mysql result set";

    if (c->read->timedout) {
        u->pipe->upstream_error = 1;
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
    } else {
        c = r->connection;

        if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) {

            if (c->destroyed) {
                return;
            }

            ngx_http_mysql_upstream_finalize_request(r, u, 0);
            return;
        }
    }

    ngx_http_mysql_upstream_process_request(r);
}

static void
ngx_http_mysql_process_downstream(ngx_http_request_t *r)
{
    ngx_event_t          *wev;
    ngx_connection_t     *c;
    ngx_event_pipe_t     *p;
    ngx_http_upstream_t  *u;

    c = r->connection;
    u = r->upstream;
    p = u->pipe;
    wev = c->write;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http mysql process downstream");

    c->log->action = "encoding and sending to client";

    if (wev->timedout) {

        if (wev->delayed) {

            wev->timedout = 0;
            wev->delayed = 0;

            if (!wev->ready) {
                ngx_add_timer(wev, p->send_timeout);

                if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
                    ngx_http_mysql_upstream_finalize_request(r, u, 0);
                }

                return;
            }

            if (ngx_event_pipe(p, wev->write) == NGX_ABORT) {

                if (c->destroyed) {
                    return;
                }

                ngx_http_mysql_upstream_finalize_request(r, u, 0);
                return;
            }

        } else {
            p->downstream_error = 1;
            c->timedout = 1;
            ngx_connection_error(c, NGX_ETIMEDOUT, "client timed out");
        }

    } else {

        if (wev->delayed) {

            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                           "http downstream delayed");

            if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
                ngx_http_mysql_upstream_finalize_request(r, u, 0);
            }

            return;
        }

        if (ngx_event_pipe(p, 1) == NGX_ABORT) {

            if (c->destroyed) {
                return;
            }

            ngx_http_mysql_upstream_finalize_request(r, u, 0);
            return;
        }
    }

    ngx_http_mysql_upstream_process_request(r);
}

static void
ngx_http_mysql_upstream_process_request(ngx_http_request_t *r)
{
    ngx_event_pipe_t     *p;
    ngx_http_upstream_t  *u;

    u = r->upstream;
    p = u->pipe;

    if(u->peer.connection) {
        if(p->upstream_done) {
            u->read_event_handler = ngx_http_mysql_read_handler;
            return;
        }

        if(p->upstream_eof || p->upstream_error) {
            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                           "mysql upstream exit: %p", p->out);
            ngx_http_mysql_upstream_finalize_request(r, u, 0);
            return;
        }
    }

    if(p->downstream_error) {
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http mysql downstream error");

        if(u->peer.connection) {
            ngx_http_mysql_upstream_finalize_request(r, u, 0);
        }
    }
}

static ngx_int_t
ngx_http_mysql_error(ngx_http_request_t *r)
{
    ngx_http_mysql_ctx_t           *ctx;

    ctx = ngx_http_get_module_ctx(r, ngx_http_mysql_module);

    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
        "mysql error: %s",
        ctx->error_data.message
        );

    return NGX_HTTP_BAD_REQUEST;
}

static void
ngx_mysql_uchar_handler(ngx_http_mysql_ctx_t *ctx)
{
    u_char      *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = 1;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {
        value = ((u_char*)ctx->field_ctx) + ctx->field_descriptor->offset;
        *value = ctx->current_field[0];
    }
}

static void
ngx_mysql_int2_handler(ngx_http_mysql_ctx_t *ctx)
{
    ngx_uint_t   *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = 2;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {
        value = (ngx_uint_t*)(((u_char*)ctx->field_ctx) + 
ctx->field_descriptor->offset);
        *value = EXTRACT_INT2(ctx->current_field);
    }
}

static void
ngx_mysql_int4_handler(ngx_http_mysql_ctx_t *ctx)
{
    ngx_uint_t   *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = 4;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {
        value = (ngx_uint_t*)(((u_char*)ctx->field_ctx) + 
ctx->field_descriptor->offset);
        *value = EXTRACT_INT4(ctx->current_field);
    }
}

static void
ngx_mysql_str_handler(ngx_http_mysql_ctx_t *ctx)
{
    u_char   *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = ctx->field_descriptor->size;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {
        value = ((u_char*)ctx->field_ctx) + ctx->field_descriptor->offset;
        value = ngx_copy(value, ctx->current_field, 
ctx->field_descriptor->size);
    }
}

static void
ngx_mysql_strz_handler(ngx_http_mysql_ctx_t *ctx)
{
    /*
     * We don't need zero-terminated strings
     */
}

static void
ngx_mysql_stre_handler(ngx_http_mysql_ctx_t *ctx)
{
    u_char   *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = ctx->packet_len - ctx->packet_pos;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {
        value = ((u_char*)ctx->field_ctx) + ctx->field_descriptor->offset;
        value = ngx_copy(value, ctx->current_field, 
ctx->field_descriptor->size);
    }
}

static void
ngx_mysql_lcb_handler(ngx_http_mysql_ctx_t *ctx)
{
    ngx_uint_t   *value;

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = *ctx->in_buf->pos > 250 ? 
ngx_mysql_lcb_length[*ctx->in_buf->pos & 0x7] : 1;
    }
    else if(ctx->field_descriptor->flags & NGX_MY_FIELD_RELEVANT) {

        value = (ngx_uint_t*)(((u_char*)ctx->field_ctx) + 
ctx->field_descriptor->offset);

        switch(ctx->current_field_len) {
            case 1:
                *value = ctx->current_field[0];
                break;
            case 3:
                *value = EXTRACT_INT2(ctx->current_field + 1);
                break;
            case 4:
                *value = EXTRACT_INT3(ctx->current_field + 1);
                break;
            default:
                *value = 0;
                break;
        }
    }
}

static void
ngx_mysql_lcs_handler(ngx_http_mysql_ctx_t *ctx)
{
    ngx_str_t   *value;

    value = (ngx_str_t*)(((u_char*)ctx->field_ctx) + 
ctx->field_descriptor->offset);

    if(ctx->current_field_pos == 0) {
        ctx->current_field_len = value->len;
    }
    else {
        value->data = ctx->current_field;
    }
}

static void
ngx_mysql_null_bitmap_handler(ngx_http_mysql_ctx_t *ctx)
{
}

static void *
ngx_http_mysql_create_main_conf(ngx_conf_t *cf)
{
    ngx_http_mysql_main_conf_t  *mmcf;

    mmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_mysql_main_conf_t));
    if(mmcf == NULL) {
        return NULL;
    }

    if(ngx_array_init(&mmcf->connectors, cf->pool, 5,
        sizeof(ngx_http_mysql_connector_t*)) != NGX_OK)
    {
        return NULL;
    }

    if(ngx_array_init(&mmcf->encoders, cf->pool, 2,
        sizeof(ngx_http_mysql_encoder_t)) != NGX_OK)
    {
        return NULL;
    }

    return mmcf;
}

static void *
ngx_http_mysql_create_loc_conf(ngx_conf_t *cf)
{
    ngx_http_mysql_loc_conf_t  *conf;

    conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_mysql_loc_conf_t));

    if (conf == NULL) {
        return NGX_CONF_ERROR;
    }

    conf->upstream.connect_timeout = NGX_CONF_UNSET_MSEC;
    conf->upstream.send_timeout = NGX_CONF_UNSET_MSEC;
    conf->upstream.read_timeout = NGX_CONF_UNSET_MSEC;

    conf->upstream.send_lowat = NGX_CONF_UNSET_SIZE;
    conf->upstream.buffer_size = NGX_CONF_UNSET_SIZE;

    conf->upstream.busy_buffers_size_conf = NGX_CONF_UNSET_SIZE;

    /* the hardcoded values */
    conf->upstream.cyclic_temp_file = 0;
    conf->upstream.buffering = 0;
    conf->upstream.ignore_client_abort = 0;
    conf->upstream.send_lowat = 0;
    conf->upstream.max_temp_file_size = 0;
    conf->upstream.temp_file_write_size = 0;
    conf->upstream.intercept_errors = 1;
    conf->upstream.intercept_404 = 1;
    conf->upstream.pass_request_headers = 0;
    conf->upstream.pass_request_body = 0;

    return conf;
}

static char *
ngx_http_mysql_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
    size_t                     size;
    ngx_http_mysql_loc_conf_t *prev = parent;
    ngx_http_mysql_loc_conf_t *conf = child;

    ngx_conf_merge_msec_value(conf->upstream.connect_timeout,
                              prev->upstream.connect_timeout, 60000);

    ngx_conf_merge_msec_value(conf->upstream.send_timeout,
                              prev->upstream.send_timeout, 60000);

    ngx_conf_merge_msec_value(conf->upstream.read_timeout,
                              prev->upstream.read_timeout, 60000);

    ngx_conf_merge_size_value(conf->upstream.buffer_size,
                              prev->upstream.buffer_size,
                              (size_t) ngx_pagesize);

    ngx_conf_merge_bufs_value(conf->upstream.bufs, prev->upstream.bufs,
                              8, ngx_pagesize);

    if(conf->upstream.bufs.num < 2) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "there must be at least 2 \"mysql_buffers\"");
        return NGX_CONF_ERROR;
    }

    size = conf->upstream.buffer_size;
    if(size < conf->upstream.bufs.size) {
        size = conf->upstream.bufs.size;
    }

    ngx_conf_merge_size_value(conf->upstream.busy_buffers_size_conf,
                              prev->upstream.busy_buffers_size_conf,
                              NGX_CONF_UNSET_SIZE);

    if(conf->upstream.busy_buffers_size_conf == NGX_CONF_UNSET_SIZE) {
        conf->upstream.busy_buffers_size = 2 * size;
    }
    else {
        conf->upstream.busy_buffers_size = 
conf->upstream.busy_buffers_size_conf;
    }

    if (conf->upstream.busy_buffers_size < size) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
             "\"mysql_busy_buffers_size\" must be equal or bigger than "
             "maximum of the value of \"mysql_buffer_size\" and "
             "one of the \"mysql_buffers\"");

        return NGX_CONF_ERROR;
    }

    ngx_conf_merge_bitmask_value(conf->upstream.next_upstream,
                              prev->upstream.next_upstream,
                              (NGX_CONF_BITMASK_SET
                               |NGX_HTTP_UPSTREAM_FT_ERROR
                               |NGX_HTTP_UPSTREAM_FT_TIMEOUT));

    if (conf->upstream.next_upstream & NGX_HTTP_UPSTREAM_FT_OFF) {
        conf->upstream.next_upstream = NGX_CONF_BITMASK_SET
                                       |NGX_HTTP_UPSTREAM_FT_OFF;
    }

    if (conf->upstream.upstream == NULL) {
        conf->upstream.upstream = prev->upstream.upstream;
    }

    if(conf->encoder == NULL) {
        conf->encoder = prev->encoder;
    }

    return NGX_CONF_OK;
}

static char *
ngx_http_mysql_query_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_mysql_loc_conf_t *mycf = conf;
    ngx_http_core_loc_conf_t  *clcf;
    ngx_str_t                 *value;
    ngx_uint_t                 n;
    ngx_http_script_compile_t  sc;

    value = cf->args->elts;

    mycf->connector_source = value[1];

    n = ngx_http_script_variables_count(&value[1]);

    if(n) { 
        ngx_memzero(&sc, sizeof(ngx_http_script_compile_t));

        sc.cf = cf;
        sc.source = &value[1];
        sc.lengths = &mycf->connector_lengths;
        sc.values = &mycf->connector_values;
        sc.variables = n;
        sc.complete_lengths = 1;
        sc.complete_values = 1;

        if (ngx_http_script_compile(&sc) != NGX_OK) {
            return NGX_CONF_ERROR;
        }

        return NGX_CONF_OK;
    }

    mycf->query = value[2];

    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
    clcf->handler = ngx_http_mysql_handler;

    return NGX_CONF_OK;
}

static char *
ngx_http_mysql_params_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_mysql_loc_conf_t           *mycf = conf;
    ngx_http_mysql_param_template_t     *t;
    ngx_str_t                           *value;
    ngx_uint_t                           i, n;
    ngx_http_script_compile_t            sc;

    if(mycf->param_templates == NULL) {
        mycf->param_templates = ngx_array_create(cf->pool, cf->args->nelts,
            sizeof(ngx_http_mysql_param_template_t));

        if(mycf->param_templates == NULL) {
            return NGX_CONF_ERROR;
        }
    }

    value = cf->args->elts;

    for(i = 0;i < cf->args->nelts;i++) {
        t = ngx_array_push(mycf->param_templates);

        if(t == NULL) {
            return NGX_CONF_ERROR;
        }

        t->source = value[i];

        n = ngx_http_script_variables_count(&t->source);

        if(n) { 
            ngx_memzero(&sc, sizeof(ngx_http_script_compile_t));

            sc.cf = cf;
            sc.source = &value[i];
            sc.lengths = &t->lengths;
            sc.values = &t->values;
            sc.variables = n;
            sc.complete_lengths = 1;
            sc.complete_values = 1;

            if (ngx_http_script_compile(&sc) != NGX_OK) {
                return NGX_CONF_ERROR;
            }

            return NGX_CONF_OK;
        }
    }

    return NGX_CONF_OK;
}

static char *
ngx_http_mysql_connector_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_mysql_main_conf_t      *mmcf = conf;
    ngx_str_t                       *value;
    ngx_http_mysql_connector_t      *c, **pc;
    ngx_conf_t                       save;
    static char                     *rv;

    value = cf->args->elts;

    c = ngx_pcalloc(cf->pool, sizeof(ngx_http_mysql_connector_t));

    if(c == NULL) {
        return NGX_CONF_ERROR;
    }


    c->name = value[1];

    save = *cf;
    cf->ctx = c;
    cf->handler = ngx_http_mysql_connector;
    cf->handler_conf = conf;

    rv = ngx_conf_parse(cf, NULL);

    *cf = save;

    if(rv != NGX_CONF_OK) {
        return rv;
    }

    if(c->host.len == 0 || c->database.len == 0 || c->username.len == 0) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "at least hostname, database and user name must be 
specified"
                           "inside mysql_connector block");
        return NGX_CONF_ERROR;
    }

    pc = ngx_array_push(&mmcf->connectors);

    *pc = c;

    return NGX_CONF_OK;
}

static char *
ngx_http_mysql_connector(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_str_t                       *value;
    ngx_http_mysql_connector_t      *c;

    c = cf->ctx;

    value = cf->args->elts;

    if(value[0].len == 0) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "invalid directive inside mysql_connector block");
        return NGX_CONF_ERROR;
    }

    if(cf->args->nelts < 1) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "too few arguments for a directive inside 
mysql_connector block");
        return NGX_CONF_ERROR;
    }

    if (ngx_strcmp(value[0].data, "host") == 0) {
        c->host = value[1];
    }
    else if (ngx_strcmp(value[0].data, "database") == 0) {
        c->database = value[1];
    }
    else if (ngx_strcmp(value[0].data, "username") == 0) {
        c->username = value[1];
    }
    else if (ngx_strcmp(value[0].data, "password") == 0) {
        c->password = value[1];
    }
    else {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "unknown directive inside mysql_connector block");
        return NGX_CONF_ERROR;
    }

    return NGX_CONF_OK;
}

static char *
ngx_http_mysql_encoder_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_uint_t                           i;
    ngx_http_mysql_loc_conf_t           *mycf = conf;
    ngx_http_mysql_main_conf_t          *mmcf;
    ngx_str_t                           *value;
    ngx_http_mysql_encoder_t           **e;

    mmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_mysql_module);

    e = mmcf->encoders.elts;

    value = cf->args->elts;

    for(i = 0;i < mmcf->encoders.nelts;i++) {
        if(!ngx_strncasecmp(value[1].data, e[i]->name.data, value[1].len)) {
            mycf->encoder = e[i];
            return NGX_CONF_OK;
        }
    }

    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
        "encoder %V not found", &value[1]);

    return NGX_CONF_ERROR;
}

/*
 * Copyright (C) 2010 Valery Kholodkov
 */


#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>

#ifndef _NGX_HTTP_MYSQL_H_INCLUDED_
#define _NGX_HTTP_MYSQL_H_INCLUDED_

#if (NGX_HAVE_OPENSSL_SHA1_H)
#include <openssl/sha.h>
#else
#include <sha.h>
#endif

#define PACKET_HEADER_LEN               4 
#define SCRAMBLE_LEN                    20
#define SCRAMBLE_323_LEN                8
#define MIN_PACKET_LEN                  8 

#define OK_PACKET                       0x00
#define EOF_PACKET                      0xfe
#define ERROR_PACKET                    0xff

#define CLIENT_LONG_PASSWORD            1   /* new more secure passwords */
#define CLIENT_FOUND_ROWS               2   /* Found instead of affected rows */
#define CLIENT_LONG_FLAG                4   /* Get all column flags */
#define CLIENT_CONNECT_WITH_DB          8   /* One can specify db on connect */
#define CLIENT_NO_SCHEMA                16  /* Don't allow 
database.table.column */
#define CLIENT_COMPRESS                 32  /* Can use compression protocol */
#define CLIENT_ODBC                     64  /* Odbc client */
#define CLIENT_LOCAL_FILES              128 /* Can use LOAD DATA LOCAL */
#define CLIENT_IGNORE_SPACE             256 /* Ignore spaces before '(' */
#define CLIENT_PROTOCOL_41              512 /* New 4.1 protocol */
#define CLIENT_INTERACTIVE              1024    /* This is an interactive 
client */
#define CLIENT_SSL                      2048    /* Switch to SSL after 
handshake */
#define CLIENT_IGNORE_SIGPIPE           4096    /* IGNORE sigpipes */
#define CLIENT_TRANSACTIONS             8192    /* Client knows about 
transactions */
#define CLIENT_RESERVED                 16384   /* Old flag for 4.1 protocol  */
#define CLIENT_SECURE_CONNECTION        32768  /* New 4.1 authentication */
#define CLIENT_MULTI_STATEMENTS         (1UL << 16) /* Enable/disable 
multi-stmt support */
#define CLIENT_MULTI_RESULTS            (1UL << 17) /* Enable/disable 
multi-results */

#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG |     CLIENT_TRANSACTIONS | CLIENT_PROTOCOL_41 | CLIENT_MULTI_STATEMENTS | 
CLIENT_MULTI_RESULTS)

#if (NGX_HAVE_LITTLE_ENDIAN && NGX_HAVE_NONALIGNED)
#define EXTRACT_INT2(x) (uint16_t)(*(x) | *((x)+1) << 8)
#define EXTRACT_INT3(x) (uint32_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16)
#define EXTRACT_INT4(x) (uint32_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16 | 
*((x)+3) << 24)
#define EXTRACT_INT8(x) (uint64_t)(*(x) | *((x)+1) << 8 | *((x)+2) << 16 | 
*((x)+3) << 24     *((x)+4) << 32 | *((x)+5) << 40 | *((x)+6) << 48 | *((x)+7) << 56)

#define STORE_INT2(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff; 
(x) += 2; } while(0);
#define STORE_INT3(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff;      *((x)+2) = ((v) >> 16) & 0xff; (x) += 3; } while(0);
#define STORE_INT4(x,v) do { *(x) = (v) & 0xff; *((x)+1) = ((v) >> 8) & 0xff;      *((x)+2) = ((v) >> 16) & 0xff; *((x)+3) = ((v) >> 24) & 0xff; (x) += 4; } 
while(0);
#else
#define EXTRACT_INT2(x) (uint16_t)(*(x) << 8 | *((x)+1))
#define EXTRACT_INT3(x) (uint32_t)(*(x) << 16 | *((x)+1) << 8 | *((x)+2))
#define EXTRACT_INT4(x) (uint32_t)(*(x) << 24 | *((x)+1) << 16 | *((x)+2) << 8 
| *((x)+3))

#define STORE_INT2(x,v) do { *(x) = ((v) >> 8); (v) & 0xff; *((x)+1) = (v) & 
0xff; (x) += 2; } while(0);
#define STORE_INT3(x,v) do { *(x) = ((v) >> 16) & 0xff; *((x)+1) = ((v) >> 8) & 
0xff;      *((x)+2) = (v) & 0xff; (x) += 3; } while(0);
#define STORE_INT4(x,v) do { *(x) = ((v) >> 24) & 0xff; *((x)+1) = ((v) >> 16) 
& 0xff;      *((x)+2) = ((v) >> 8) & 0xff; *((x)+3) = (v) & 0xff; (x) += 4; } while(0);
#endif

#define NGX_MY_FIELD_LCB        0x00000001
#define NGX_MY_FIELD_LCS        0x00000002
#define NGX_MY_FIELD_ZEROT      0x00000004
#define NGX_MY_FIELD_EOP        0x00000008
#define NGX_MY_FIELD_RELEVANT   0x00000010

typedef enum {
    COM_STMT_PREPARE = 0x16,
    COM_STMT_EXECUTE = 0x17,
} ngx_http_mysql_command_t;

typedef enum {
    mysql_stage_authenticate = 0,
    mysql_stage_prepare,
    mysql_stage_execute,
} ngx_mysql_stage_e;

typedef enum {
    mysql_state_packet_header,
    mysql_state_field_count,
    mysql_state_handshake,
    mysql_state_ok,
    mysql_state_error,
    mysql_state_result,
    mysql_state_ignore
} ngx_mysql_reception_state_e;

struct ngx_http_mysql_ctx_s;

typedef void (*ngx_mysql_field_handler_p)(struct ngx_http_mysql_ctx_s *ctx);
typedef ngx_int_t (*ngx_mysql_packet_handler_p)(ngx_http_request_t *r);
typedef ngx_mysql_packet_handler_p ngx_mysql_row_handler_p;

typedef struct {
    size_t                      size;
    ngx_mysql_field_handler_p   handler;
} ngx_mysql_type_t;

typedef struct {
    size_t                      size;
    off_t                       offset;
    ngx_uint_t                  flags;
    ngx_mysql_field_handler_p   handler;

    ngx_uint_t                  byte;
    ngx_uint_t                  mask;
} ngx_mysql_field_t;

typedef struct {
    ngx_mysql_field_t          *fields;
    ngx_mysql_packet_handler_p  handler;
} ngx_mysql_packet_t;

typedef struct {
    u_char                      protocol_version;
    ngx_uint_t                  server_capabilities;
    ngx_uint_t                  thread_id;
    u_char                      server_lang;
    ngx_uint_t                  server_status;
} ngx_mysql_handshake_data_t;

typedef struct {
    ngx_uint_t                  affected_rows;
    ngx_uint_t                  insert_id;
    ngx_uint_t                  server_status;
    ngx_uint_t                  warning_count;
} ngx_mysql_ok_data_t;

typedef struct {
    ngx_uint_t                  statement_id;
    ngx_uint_t                  column_count;
    ngx_uint_t                  param_count;
} ngx_mysql_prepare_data_t;

typedef struct {
    ngx_uint_t                  type;
    ngx_uint_t                  flags;
    u_char                      decimals;
    ngx_uint_t                  length;
} ngx_mysql_param_data_t;

typedef struct {
    ngx_uint_t                  _errno;
    u_char                      marker;
    u_char                      sql_state[5];
    u_char                      message[128];
} ngx_mysql_error_data_t;

typedef struct {
    ngx_str_t                   catalog;
    ngx_str_t                   db;
    ngx_str_t                   table;
    ngx_str_t                   org_table;
    ngx_str_t                   name;
    ngx_str_t                   org_name;
    ngx_uint_t                  charset_number;
    ngx_uint_t                  length;
    ngx_uint_t                  type;
    ngx_uint_t                  flags;
    u_char                      decimals;
    ngx_str_t                   _default;

    ngx_str_t                   fixed_length_part;
} ngx_mysql_column_t;

typedef struct {
    ngx_str_t                   source;
    ngx_array_t                *lengths;
    ngx_array_t                *values;
} ngx_http_mysql_param_template_t;

typedef struct {
    ngx_str_t                   name;
    ngx_str_t                   host;
    ngx_str_t                   database;
    ngx_str_t                   username;
    ngx_str_t                   password;
} ngx_http_mysql_connector_t;

typedef struct {
    ngx_str_t                   name;
    ngx_mysql_type_t           *types;
    ngx_buf_tag_t               tag;
    ngx_mysql_row_handler_p     row_handler;
} ngx_http_mysql_encoder_t;

typedef struct {
    ngx_array_t                 connectors;
    ngx_array_t                 encoders;
} ngx_http_mysql_main_conf_t;

typedef struct {
    ngx_http_upstream_conf_t    upstream;

    ngx_str_t                   query;
    ngx_array_t                *param_templates;

    ngx_str_t                   connector_source;
    ngx_array_t                *connector_lengths;
    ngx_array_t                *connector_values;

    ngx_http_mysql_encoder_t   *encoder;
} ngx_http_mysql_loc_conf_t;

typedef struct ngx_http_mysql_ctx_s {
    ngx_http_mysql_connector_t *connector;   
    ngx_http_mysql_encoder_t   *encoder;

    ngx_mysql_reception_state_e state;

    ngx_uint_t                  packet_number;
    size_t                      packet_pos;
    size_t                      packet_len;

    ngx_mysql_handshake_data_t  handshake_data;

    ngx_mysql_prepare_data_t    prepare_data;

    union {
        ngx_mysql_param_data_t      param_data;
        ngx_mysql_ok_data_t         ok_data;
        ngx_mysql_error_data_t      error_data;
        ngx_mysql_column_t         *current_column;
    };

    u_char                      scramble[SCRAMBLE_LEN];
    ngx_uint_t                  client_flags;

    ngx_uint_t                  non_null_param_count;

    off_t                       current_field_len;
    off_t                       current_field_pos;
    u_char                      accumulator[100];

    u_char                     *current_field;
    u_char                     *current_field_ptr;

    ngx_mysql_field_t          *field_descriptor;
    ngx_mysql_packet_t         *packet_descriptor;

    void                       *field_ctx;

    u_char                     *null_bitmap;
    ngx_array_t                 columns;

    ngx_chain_t                *in, *out, *free, *busy;

    unsigned int                handshake_is_done:1;
    unsigned int                zerot:1;
    unsigned int                ignore_until_eof:1;
    unsigned int                eof:1;
    unsigned int                end_of_packet:1;
    unsigned int                done:1;
} ngx_http_mysql_ctx_t;

extern ngx_module_t ngx_http_mysql_module;

#endif //_NGX_HTTP_MYSQL_H_INCLUDED_


_______________________________________________
nginx-ru mailing list
nginx-ru@xxxxxxxxx
http://nginx.org/mailman/listinfo/nginx-ru


 




Copyright © Lexa Software, 1996-2009.