try {
serverTransport_.listen();
} catch (TTransportException ttx) {
return;
}
...
while (!stopped_) {
...
try {
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
...
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
}
} catch (...) {
...
}
}
int port = 7911;
TServerSocket serverTransport = new TServerSocket(port);
TServer server = new TSimpleServer(new TSimpleServer.Args(serverTransport).processor(processor));
server.serve();
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
return;
}
...
while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
...
while(true) {
try {
executorService_.execute(wp);
break;
} catch(Throwable t) {
...
}
}
} catch (...) {
...
}
}
int port = 7911;
TServerSocket serverTransport = new TServerSocket(port);
TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor));
server.serve();
// Accept, Read, Write 처리를 위한 쓰레드를 생성한다
selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
selectAcceptThread_.start();
// SelectAcceptThread.run()
public void run() {
try {
...
while (!stopped_) {
select();
processInterestChanges();
}
...
} catch (Throwable t) {
...
} finally {
...
}
}
private void select() {
try {
// wait for io events.
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (...) {
...
}
}
int port = 7911;
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(port);
TServer server = new TNonblockingServer(new TNonblockingServer.Args(serverSocket).processor(processor));
server.serve();
// 기본적인 구현은 TNonblockingServer과 동일하다
// 데이터 수신이 완료 되었을때 다른 쓰레드를 통하여 처리한다
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
...
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
// invoker는 ExecutorService 인스턴스다
try {
Runnable invocation = getRunnable(frameBuffer);
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
int port = 7911;
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(port);
TServer server = new THsHaServer(new THsHaServer.Args(serverSocket).processor(processor));
server.serve();
// AcceptThread
public void run() {
...
while (!stopped_) {
select();
}
}
private void select() {
try {
// wait for connect events.
acceptSelector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
...
if (key.isAcceptable()) {
handleAccept();
} else {
...
}
}
} catch (...) {
...
}
}
private void handleAccept() {
final TNonblockingTransport client = doAccept();
if (client != null) {
// Pass this connection to a selector thread
final SelectorThread targetThread = threadChooser.nextThread();
...
doAddAccept(targetThread, client);
...
}
}
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
// SelectorThread
public void run() {
...
while (!stopped_) {
select();
processAcceptedConnections();
processInterestChanges();
}
}
private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {
TNonblockingTransport accepted = acceptedQueue.poll();
if (accepted == null) {
break;
}
registerAccepted(accepted);
}
}
private void select() {
try {
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
...
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (...) {
...
}
}
int port = 7911;
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(port);
TServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(serverSocket).processor(processor));
server.serve();