- 浏览: 130580 次
- 性别:
- 来自: ...
文章分类
最新评论
分析了I/O事件的存储,下面看看多个Worker同时工作时I/O事件的取得过程。首先看看有序的Worker的实现:
Worker的run()上来就是个无限循环,如果工人多了,则当前的Worker被就地裁员;如果没有可以处理的IOSession的事件了,则这个工人可以跳出循环然后不等休息就被裁员,这段代码的实现基本上体现了资本主义世界下公司的作风。开始干活的第一件事就是fetchSession()获取可用的IOSession,然后就是runTasks(getSessionTasksQueue(session))了--获取IOSession对应的I/O事件然后一个个的runTask()处理任务。有序的奥妙在这里完全的暴露出来:每个Worker都是先得到IOSession,session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS)采用了阻塞一定时间的方式获取可用的session,每一个Worker的session都是唯一的,当然除了EXIT_SIGNAL。这样每一个Worker按照对应的session的事件队列tasksQueue的事件顺序执行每一个事件,保证了有序性。再看看无序的实现:
从源码一眼就能看出差别,这个线程池的Worker是吃大锅饭的。开始干活的第一件事是fetchTask()取得I/O事件,然后就是runTask()处理事件。后者与有序的实现基本一致,而fetchTask()则暴露了所有的Worker都是从同一个队列取事件,而不像有序实现那样每一个Worker都有自己的一个专有的锅。fetchTask()的实现使得不同的Worker可能取得同一个IOSession的I/O事件,而这些事件的处理则完全听天由命的取决于Woker的快准狠!从而可能造成sessionClosed事件在messageReceived事件之前被处理。
另外就是有序的实现中虽然每个IOSession对应的队列是ConcurrentLinkedQueue的实例,支持无锁得并发访问。但是在入队和出队的操作时都是使用了synchronized的机制进行访问,主要原因我想一方面是要保证一系列操作的原子性,另一方面其本身就是无锁的实现,所以保证前者的情况下并不会使性能下降多少。
至此简单的分析了一下mina内部线程池有序和无序的实现,不得不说这个设计还是很精妙的。当然设计是简单的具体的实现要充分的考虑多线程的访问,还是有一定的复杂性的。
private class Worker implements Runnable { private volatile long completedTaskCount; private Thread thread; public void run() { thread = Thread.currentThread(); try { for (;;) { IoSession session = fetchSession(); idleWorkers.decrementAndGet(); if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { // Remove now to prevent duplicate exit. workers.remove(this); break; } } } if (session == EXIT_SIGNAL) { break; } try { if (session != null) { runTasks(getSessionTasksQueue(session)); } } finally { idleWorkers.incrementAndGet(); } } } finally { synchronized (workers) { workers.remove(this); OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount; workers.notifyAll(); } } } private IoSession fetchSession() { IoSession session = null; long currentTime = System.currentTimeMillis(); long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); for (;;) { try { long waitTime = deadline - currentTime; if (waitTime <= 0) { break; } try { session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); break; } finally { if (session == null) { currentTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { // Ignore. continue; } } return session; } private void runTasks(SessionTasksQueue sessionTasksQueue) { for (;;) { Runnable task; Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; synchronized (tasksQueue) { task = tasksQueue.poll(); if (task == null) { sessionTasksQueue.processingCompleted = true; break; } } eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task); runTask(task); } } private void runTask(Runnable task) { beforeExecute(thread, task); boolean ran = false; try { task.run(); ran = true; afterExecute(task, null); completedTaskCount ++; } catch (RuntimeException e) { if (!ran) { afterExecute(task, e); } throw e; } } }
Worker的run()上来就是个无限循环,如果工人多了,则当前的Worker被就地裁员;如果没有可以处理的IOSession的事件了,则这个工人可以跳出循环然后不等休息就被裁员,这段代码的实现基本上体现了资本主义世界下公司的作风。开始干活的第一件事就是fetchSession()获取可用的IOSession,然后就是runTasks(getSessionTasksQueue(session))了--获取IOSession对应的I/O事件然后一个个的runTask()处理任务。有序的奥妙在这里完全的暴露出来:每个Worker都是先得到IOSession,session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS)采用了阻塞一定时间的方式获取可用的session,每一个Worker的session都是唯一的,当然除了EXIT_SIGNAL。这样每一个Worker按照对应的session的事件队列tasksQueue的事件顺序执行每一个事件,保证了有序性。再看看无序的实现:
private class Worker implements Runnable { private volatile long completedTaskCount; private Thread thread; public void run() { thread = Thread.currentThread(); try { for (;;) { Runnable task = fetchTask(); idleWorkers.decrementAndGet(); if (task == null) { synchronized (workers) { if (workers.size() > corePoolSize) { // Remove now to prevent duplicate exit. workers.remove(this); break; } } } if (task == EXIT_SIGNAL) { break; } try { if (task != null) { queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); runTask(task); } } finally { idleWorkers.incrementAndGet(); } } } finally { synchronized (workers) { workers.remove(this); UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount; workers.notifyAll(); } } } private Runnable fetchTask() { Runnable task = null; long currentTime = System.currentTimeMillis(); long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); for (;;) { try { long waitTime = deadline - currentTime; if (waitTime <= 0) { break; } try { task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS); break; } finally { if (task == null) { currentTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { // Ignore. continue; } } return task; } private void runTask(Runnable task) { beforeExecute(thread, task); boolean ran = false; try { task.run(); ran = true; afterExecute(task, null); completedTaskCount ++; } catch (RuntimeException e) { if (!ran) { afterExecute(task, e); } throw e; } } }
从源码一眼就能看出差别,这个线程池的Worker是吃大锅饭的。开始干活的第一件事是fetchTask()取得I/O事件,然后就是runTask()处理事件。后者与有序的实现基本一致,而fetchTask()则暴露了所有的Worker都是从同一个队列取事件,而不像有序实现那样每一个Worker都有自己的一个专有的锅。fetchTask()的实现使得不同的Worker可能取得同一个IOSession的I/O事件,而这些事件的处理则完全听天由命的取决于Woker的快准狠!从而可能造成sessionClosed事件在messageReceived事件之前被处理。
另外就是有序的实现中虽然每个IOSession对应的队列是ConcurrentLinkedQueue的实例,支持无锁得并发访问。但是在入队和出队的操作时都是使用了synchronized的机制进行访问,主要原因我想一方面是要保证一系列操作的原子性,另一方面其本身就是无锁的实现,所以保证前者的情况下并不会使性能下降多少。
至此简单的分析了一下mina内部线程池有序和无序的实现,不得不说这个设计还是很精妙的。当然设计是简单的具体的实现要充分的考虑多线程的访问,还是有一定的复杂性的。
发表评论
文章已被作者锁定,不允许评论。
-
一道位操作的趣味编程题
2010-03-14 10:50 2084看到一道很有意思的编程题:大厅里有64盏灯,每盏灯都编 ... -
一道字符串截取的编程题
2010-03-11 10:52 2276最近接触到一道字符串截取的编程题:编写一个截取字符串的 ... -
一道多线程趣味热身题
2010-02-28 18:01 1915保持对知识点或者技术的熟悉度对于程序员至关重要,要学会 ... -
疑似Google多线程面试题的Java实现
2010-02-24 17:39 4906来到一个完全陌生的地方,即将一切从新开始,内心兴奋又忐 ... -
Mina的线程池实现分析(1)
2010-02-10 17:28 11575线程池是并发应用中,为了减少每个任务调用的开销增强性能 ... -
多线程基础总结十一--ConcurrentLinkedQueue
2010-02-03 17:52 12840ConcurrentLinkedQueue充分使用了a ... -
LinkedBlockingQueue应用--生产消费模型简单实现
2010-01-29 20:45 8137之前介绍时LinkedBlockingQueue提到了 ... -
多线程基础总结十--LinkedBlockingQueue
2010-01-28 14:33 15372随着多线程基础总结的增多,却明显的感觉知道的越来越少, ... -
号称放倒一片的一道J2SE基础题的个人理解
2010-01-23 14:07 2793近日无意中看到一道Java基础题,号称在接受测试的10 ... -
多线程基础总结九--Mina窥探(1)
2010-01-21 23:46 5401一直以来的多线程的基础总结都是脱离应用的,但是要说多线 ... -
多线程基础总结八--ReentrantReadWriteLock
2010-01-15 23:22 7510说到ReentrantReadWriteLock,首先 ... -
多线程基础总结七--ReentrantLock
2010-01-09 23:17 7678之前总结了部分无锁机制的多线程基础,理想的状态当然是利 ... -
关于atomic问题的一点理解
2009-12-30 16:42 2440之前看到一个帖子是关于atomic使用的,当时没有仔细 ... -
多线程基础总结六--synchronized(2)
2009-12-18 18:45 1869早在总结一时,我就尽量的把synchronized的重点 ... -
多线程基础总结五--atomic
2009-12-17 19:46 3550在简单介绍java.util.c ... -
多线程基础总结四--ThreadLocal
2009-12-16 19:48 2718说到ThreadLocal,首先 ... -
多线程基础总结三--volatile
2009-12-15 20:09 2525前面的两篇总结简 ... -
多线程基础总结二--Thread
2009-12-12 23:27 2668对于Thread来说 ... -
多线程基础总结一--synchronized(1)
2009-12-12 23:23 3068最近写关于并发的小应 ... -
由destory-method引发的IOC容器设计的思考
2009-12-07 16:51 1682第一次读Spring的源 ...
相关推荐
线程池是并发应用中,为了减少...在mina中大量的使用这一技术,除了Executors的工厂方法构建线程池之外,它还继承自ThreadPoolExecutor提供自己的线程池的实现OrderedThreadPoolExecutor和UnorderedThreadPoolExecutor
Mina2源码分析,学习mina不可多得的文档资料
我自己写的使用mina框架实现cmpp2.0服务端,经过一段使用解决了几个bug现在比较稳定。
本源码是《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战》一文的服务端实现(MINA2版),详见:http://www.52im.net/thread-378-1-1.html
mina 通讯 实现server端与基于Android系统的client端通讯
mina带心跳长链接,可实现服务间通信。socket长连接实现客户端与服务端的通信。对于通信技术学习是非常好的资料。改造后可实现企业应用
Mina2源码分析,mina2框架。mina nio框架。mina用于webgame游戏开发很多。
实现了mina 的简单通信通信,内部配置了累积协议编解码器、工具类和客户端与服务端的端口配置。能够实现基本功能,下载完成需要四个基本jar包才能实现功能。我的博客上有相应资源支持下载。
java Mina2源码分析
Mina框架研究与实现 Mina框架研究与实现
MINA入门实例,实现长连接,短连接通讯。
通讯层使用Mina实现一服务器多客户端的通信,可以修改成一个群体聊天室。Mina是手游开发常用的nio通讯框架,长连接优先使用Mina。希望对你有所帮助!
mina实现简单的登录功能,详细见博客:http://blog.csdn.net/guozeming122/article/details/18605937
基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
使用MINA实现长连接
这是一个有关Mina在Java通信中运用的简单的入门实例,MIna自带一种触发机制,无需再开线程等待收发数据。这个实例中有客户端和服务端,与软件TCPUDPDbg进行文字通话测试。用的时候注意添加Mina包,此实例中用到的是...
使用mina实现rpc调用.使用参考http://blog.csdn.net/stevexk/archive/2008/07/23/2697907.aspx
基于Apache Mina实现的TCP长连接和短连接实例.doc
此demo利用springmvc整合mina,实现客户端主动发送消息到服务端,并且以http接口的方式实现,亲测可用。
mina框架 运用封装socket 行一系列的