On 09/24/2015 08:54 AM, Ahmad Alzaeem wrote: > If I run it with no SMP 10000 listenting ports , it works ok and problem > > If I run squid with 10000 listening port with 2 workers èkid timeout > registeration > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:29995 > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:29996 > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:29997 > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:29998 > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:29999 > 2015/09/24 14:51:25 kid2| Closing HTTP port [::]:30000 ... > FATAL: kid2 registration timed out > do we need to increase timeout ?? since it take long time to load the > the ips. The existing SMP http_port sharing algorithm needs lots of UDS buffer space to share lots of ports. You may be able to get your configuration working by allocating lots of UDS buffer space (sysctl net.local.dgram.recvspace and such), but it may turn out to be impossible for 10K ports. If there is not enough UDS buffer space, increasing timeout will not help. The attached patch for Squid v3.3.11 changes the port sharing algorithm to minimize memory usage (at the expense of registration time). Please see the patch preamble for technical details. The patch worked with 3K ports (24 workers * 128 http_ports each); the registration lasted less than 5 seconds. I do not recall whether we have tested the patch with 10K ports -- you may need to increase the hard-coded kid registration timeout to handle 10K ports with a patched Squid. Sorry, I do not have a patch for other Squid versions at this time. HTH, Alex.
In SMP mode, limit concurrent worker registrations of shared http[s]_ports to support many listening ports shared by many workers with small UDS buffers. Initial implementation allowed each worker to send all "give me a shared listening port" UDS requests to Coordinator nearly at once. Each successful response (carrying a listening FD) is about 4KB in size. With many workers and/or many http[s]_ports, it was easy to run out of UDS buffer space (especially on FreeBSD), leading to worker registration timeouts and other related problems. This implementation limits each worker to one listening request at a time. This effectively limits concurrent responses to only a "few" (up to the number of workers) at a time. The latter allows SMP Squid to support at least 24 workers and 128 http_ports with default UDS receive buffer space (net.local.dgram.recvspace=65536). The total startup delay in that rather "high-end" configuration is still under 5 seconds. === modified file 'src/ipc/SharedListen.cc' --- src/ipc/SharedListen.cc 2012-12-02 07:23:32 +0000 +++ src/ipc/SharedListen.cc 2013-12-16 21:35:07 +0000 @@ -1,51 +1,56 @@ /* * DEBUG: section 54 Interprocess Communication */ #include "squid.h" #include "comm.h" #include "base/TextException.h" #include "comm/Connection.h" #include "globals.h" #include "ipc/Port.h" #include "ipc/Messages.h" #include "ipc/Kids.h" #include "ipc/TypedMsgHdr.h" #include "ipc/StartListening.h" #include "ipc/SharedListen.h" #include "tools.h" +#include <list> #include <map> /// holds information necessary to handle JoinListen response class PendingOpenRequest { public: Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters AsyncCall::Pointer callback; // who to notify }; /// maps ID assigned at request time to the response callback typedef std::map<int, PendingOpenRequest> SharedListenRequestMap; static SharedListenRequestMap TheSharedListenRequestMap; +/// accumulates delayed requests until they are ready to be sent, in FIFO order +typedef std::list<PendingOpenRequest> DelayedSharedListenRequests; +static DelayedSharedListenRequests TheDelayedRequests; + static int AddToMap(const PendingOpenRequest &por) { // find unused ID using linear seach; there should not be many entries for (int id = 0; true; ++id) { if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) { TheSharedListenRequestMap[id] = por; return id; } } assert(false); // not reached return -1; } Ipc::OpenListenerParams::OpenListenerParams() { memset(this, 0, sizeof(*this)); } bool @@ -83,73 +88,103 @@ Ipc::SharedListenResponse::SharedListenR fd(aFd), errNo(anErrNo), mapId(aMapId) { } Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg): fd(-1), errNo(0), mapId(-1) { hdrMsg.checkType(mtSharedListenResponse); hdrMsg.getPod(*this); fd = hdrMsg.getFd(); // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin() } void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.setType(mtSharedListenResponse); hdrMsg.putPod(*this); hdrMsg.putFd(fd); } -void Ipc::JoinSharedListen(const OpenListenerParams ¶ms, - AsyncCall::Pointer &callback) -{ - PendingOpenRequest por; - por.params = params; - por.callback = callback; - SharedListenRequest request; +static void +SendSharedListenRequest(const PendingOpenRequest &por) +{ + Ipc::SharedListenRequest request; request.requestorId = KidIdentifier; request.params = por.params; request.mapId = AddToMap(por); debugs(54, 3, HERE << "getting listening FD for " << request.params.addr << " mapId=" << request.mapId); - TypedMsgHdr message; + Ipc::TypedMsgHdr message; request.pack(message); - SendMessage(coordinatorAddr, message); + SendMessage(Ipc::coordinatorAddr, message); +} + +static void +kickDelayedRequest() { + if (TheDelayedRequests.empty()) + return; // no pending requests to resume + + debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + + SendSharedListenRequest(*TheDelayedRequests.begin()); + TheDelayedRequests.pop_front(); +} + +void Ipc::JoinSharedListen(const OpenListenerParams ¶ms, + AsyncCall::Pointer &callback) +{ + PendingOpenRequest por; + por.params = params; + por.callback = callback; + + const DelayedSharedListenRequests::size_type concurrencyLimit = 1; + if (TheSharedListenRequestMap.size() >= concurrencyLimit) { + debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() << + " active + " << TheDelayedRequests.size() << " delayed requests"); + TheDelayedRequests.push_back(por); + } else { + SendSharedListenRequest(por); + } } void Ipc::SharedListenJoined(const SharedListenResponse &response) { // Dont debugs c fully since only FD is filled right now. - debugs(54, 3, HERE << "got listening FD " << response.fd << " errNo=" << - response.errNo << " mapId=" << response.mapId); + debugs(54, 3, "got listening FD " << response.fd << " errNo=" << + response.errNo << " mapId=" << response.mapId << " with " << + TheSharedListenRequestMap.size() << " active + " << + TheDelayedRequests.size() << " delayed requests"); Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); PendingOpenRequest por = TheSharedListenRequestMap[response.mapId]; Must(por.callback != NULL); TheSharedListenRequestMap.erase(response.mapId); StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer()); assert(cbd && cbd->conn != NULL); Must(cbd && cbd->conn != NULL); cbd->conn->fd = response.fd; if (Comm::IsConnOpen(cbd->conn)) { OpenListenerParams &p = por.params; cbd->conn->local = p.addr; cbd->conn->flags = p.flags; // XXX: leave the comm AI stuff to comm_import_opened()? struct addrinfo *AI = NULL; p.addr.GetAddrInfo(AI); AI->ai_socktype = p.sock_type; AI->ai_protocol = p.proto; comm_import_opened(cbd->conn, FdNote(p.fdNote), AI); p.addr.FreeAddrInfo(AI); } cbd->errNo = response.errNo; cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); + + kickDelayedRequest(); }
_______________________________________________ squid-users mailing list squid-users@xxxxxxxxxxxxxxxxxxxxx http://lists.squid-cache.org/listinfo/squid-users