// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // #ifndef _LEVIN_CP_SERVER_H_ #define _LEVIN_CP_SERVER_H_ #include <winsock2.h> #include <rpc.h> #include <string> #include <map> #include <boost/shared_ptr.hpp> #include "misc_log_ex.h" //#include "threads_helper.h" #include "syncobj.h" #define ENABLE_PROFILING #include "profile_tools.h" #include "net_utils_base.h" #include "pragma_comp_defs.h" #define LEVIN_DEFAULT_DATA_BUFF_SIZE 2000 namespace epee { namespace net_utils { template<class TProtocol> class cp_server_impl//: public abstract_handler { public: cp_server_impl(/*abstract_handler* phandler = NULL*/); virtual ~cp_server_impl(); bool init_server(int port_no); bool deinit_server(); bool run_server(int threads_count = 0); bool send_stop_signal(); bool is_stop_signal(); virtual bool on_net_idle(){return true;} size_t get_active_connections_num(); typename TProtocol::config_type& get_config_object(){return m_config;} private: enum overlapped_operation_type { op_type_recv, op_type_send, op_type_stop }; struct io_data_base { OVERLAPPED m_overlapped; WSABUF DataBuf; overlapped_operation_type m_op_type; DWORD TotalBuffBytes; volatile LONG m_is_in_use; char Buffer[1]; }; PRAGMA_WARNING_PUSH PRAGMA_WARNING_DISABLE_VS(4355) template<class TProtocol> struct connection: public net_utils::i_service_endpoint { connection(typename TProtocol::config_type& ref_config):m_sock(INVALID_SOCKET), m_tprotocol_handler(this, ref_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0) { } //connection():m_sock(INVALID_SOCKET), m_tprotocol_handler(this, m_dummy_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0) //{ //} connection<TProtocol>& operator=(const connection<TProtocol>& obj) { return *this; } bool init_buffers() { m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1]; m_psend_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE; m_precv_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1]; m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE; return true; } bool query_shutdown() { if(!::InterlockedCompareExchange(&m_asked_to_shutdown, 1, 0)) { m_psend_data->m_op_type = op_type_stop; ::PostQueuedCompletionStatus(m_completion_port, 0, (ULONG_PTR)this, &m_psend_data->m_overlapped); } return true; } //bool set_config(typename TProtocol::config_type& config) //{ // this->~connection(); // new(this) connection<TProtocol>(config); // return true; //} ~connection() { if(m_psend_data) delete m_psend_data; if(m_precv_data) delete m_precv_data; } virtual bool handle_send(const void* ptr, size_t cb) { PROFILE_FUNC("[handle_send]"); if(m_psend_data->TotalBuffBytes < cb) resize_send_buff((DWORD)cb); ZeroMemory(&m_psend_data->m_overlapped, sizeof(OVERLAPPED)); m_psend_data->DataBuf.len = (u_long)cb;//m_psend_data->TotalBuffBytes; m_psend_data->DataBuf.buf = m_psend_data->Buffer; memcpy(m_psend_data->DataBuf.buf, ptr, cb); m_psend_data->m_op_type = op_type_send; InterlockedExchange(&m_psend_data->m_is_in_use, 1); DWORD bytes_sent = 0; DWORD flags = 0; int res = 0; { PROFILE_FUNC("[handle_send] ::WSASend"); res = ::WSASend(m_sock, &(m_psend_data->DataBuf), 1, &bytes_sent, flags, &(m_psend_data->m_overlapped), NULL); } if(res == SOCKET_ERROR ) { int err = ::WSAGetLastError(); if(WSA_IO_PENDING == err ) return true; } LOG_ERROR("BIG FAIL: WSASend error code not correct, res=" << res << " last_err=" << err); ::InterlockedExchange(&m_psend_data->m_is_in_use, 0); query_shutdown(); //closesocket(m_psend_data); return false; }else if(0 == res) { ::InterlockedExchange(&m_psend_data->m_is_in_use, 0); if(!bytes_sent || bytes_sent != cb) { int err = ::WSAGetLastError(); LOG_ERROR("BIG FAIL: WSASend immediatly complete? but bad results, res=" << res << " last_err=" << err); query_shutdown(); return false; }else { return true; } } return true; } bool resize_send_buff(DWORD new_size) { if(m_psend_data->TotalBuffBytes >= new_size) return true; delete m_psend_data; m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + new_size-1]; m_psend_data->TotalBuffBytes = new_size; LOG_PRINT("Connection buffer resized up to " << new_size, LOG_LEVEL_3); return true; } SOCKET m_sock; net_utils::connection_context_base context; TProtocol m_tprotocol_handler; typename TProtocol::config_type m_dummy_config; io_data_base* m_precv_data; io_data_base* m_psend_data; HANDLE m_completion_port; volatile LONG m_asked_to_shutdown; volatile LONG m_connection_shutwoned; }; PRAGMA_WARNING_POP bool worker_thread_member(); static unsigned CALLBACK worker_thread(void* param); bool add_new_connection(SOCKET new_sock, long ip_from, int port_from); bool shutdown_connection(connection<TProtocol>* pconn); typedef std::map<SOCKET, boost::shared_ptr<connection<TProtocol> > > connections_container; SOCKET m_listen_socket; HANDLE m_completion_port; connections_container m_connections; critical_section m_connections_lock; int m_port; volatile LONG m_stop; //abstract_handler* m_phandler; bool m_initialized; volatile LONG m_worker_thread_counter; typename TProtocol::config_type m_config; }; } } #include "abstract_tcp_server_cp.inl" #endif //_LEVIN_SERVER_H_