`
ftj20003
  • 浏览: 130580 次
  • 性别: Icon_minigender_1
  • 来自: ...
社区版块
存档分类
最新评论

Mina的线程池实现分析(2)

    博客分类:
  • Java
阅读更多
    分析了I/O事件的存储,下面看看多个Worker同时工作时I/O事件的取得过程。首先看看有序的Worker的实现:
   private class Worker implements Runnable {

        private volatile long completedTaskCount;
        private Thread thread;
        
        public void run() {
            thread = Thread.currentThread();

            try {
                for (;;) {
                    IoSession session = fetchSession();

                    idleWorkers.decrementAndGet();

                    if (session == null) {
                        synchronized (workers) {
                            if (workers.size() > getCorePoolSize()) {
                                // Remove now to prevent duplicate exit.
                                workers.remove(this);
                                break;
                            }
                        }
                    }

                    if (session == EXIT_SIGNAL) {
                        break;
                    }

                    try {
                        if (session != null) {
                            runTasks(getSessionTasksQueue(session));
                        }
                    } finally {
                        idleWorkers.incrementAndGet();
                    }
                }
            } finally {
                synchronized (workers) {
                    workers.remove(this);
                    OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
                    workers.notifyAll();
                }
            }
        }

        private IoSession fetchSession() {
            IoSession session = null;
            long currentTime = System.currentTimeMillis();
            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
            for (;;) {
                try {
                    long waitTime = deadline - currentTime;
                    if (waitTime <= 0) {
                        break;
                    }

                    try {
                        session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
                        break;
                    } finally {
                        if (session == null) {
                            currentTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    // Ignore.
                    continue;
                }
            }
            return session;
        }

        private void runTasks(SessionTasksQueue sessionTasksQueue) {
            for (;;) {
                Runnable task;
                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
                
                synchronized (tasksQueue) {
                    task = tasksQueue.poll();
                    
                    if (task == null) {
                        sessionTasksQueue.processingCompleted = true;
                        break;
                    }
                }

                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);

                runTask(task);
            }
        }

        private void runTask(Runnable task) {
            beforeExecute(thread, task);
            boolean ran = false;
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                completedTaskCount ++;
            } catch (RuntimeException e) {
                if (!ran) {
                    afterExecute(task, e);
                }
                throw e;
            }
        }
    }

    Worker的run()上来就是个无限循环,如果工人多了,则当前的Worker被就地裁员;如果没有可以处理的IOSession的事件了,则这个工人可以跳出循环然后不等休息就被裁员,这段代码的实现基本上体现了资本主义世界下公司的作风。开始干活的第一件事就是fetchSession()获取可用的IOSession,然后就是runTasks(getSessionTasksQueue(session))了--获取IOSession对应的I/O事件然后一个个的runTask()处理任务。有序的奥妙在这里完全的暴露出来:每个Worker都是先得到IOSession,session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS)采用了阻塞一定时间的方式获取可用的session,每一个Worker的session都是唯一的,当然除了EXIT_SIGNAL。这样每一个Worker按照对应的session的事件队列tasksQueue的事件顺序执行每一个事件,保证了有序性。再看看无序的实现:
    private class Worker implements Runnable {

        private volatile long completedTaskCount;
        private Thread thread;

        public void run() {
            thread = Thread.currentThread();

            try {
                for (;;) {
                    Runnable task = fetchTask();

                    idleWorkers.decrementAndGet();

                    if (task == null) {
                        synchronized (workers) {
                            if (workers.size() > corePoolSize) {
                                // Remove now to prevent duplicate exit.
                                workers.remove(this);
                                break;
                            }
                        }
                    }

                    if (task == EXIT_SIGNAL) {
                        break;
                    }

                    try {
                        if (task != null) {
                            queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
                            runTask(task);
                        }
                    } finally {
                        idleWorkers.incrementAndGet();
                    }
                }
            } finally {
                synchronized (workers) {
                    workers.remove(this);
                    UnorderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
                    workers.notifyAll();
                }
            }
        }

        private Runnable fetchTask() {
            Runnable task = null;
            long currentTime = System.currentTimeMillis();
            long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
            for (;;) {
                try {
                    long waitTime = deadline - currentTime;
                    if (waitTime <= 0) {
                        break;
                    }

                    try {
                        task = getQueue().poll(waitTime, TimeUnit.MILLISECONDS);
                        break;
                    } finally {
                        if (task == null) {
                            currentTime = System.currentTimeMillis();
                        }
                    }
                } catch (InterruptedException e) {
                    // Ignore.
                    continue;
                }
            }
            return task;
        }

        private void runTask(Runnable task) {
            beforeExecute(thread, task);
            boolean ran = false;
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                completedTaskCount ++;
            } catch (RuntimeException e) {
                if (!ran) {
                    afterExecute(task, e);
                }
                throw e;
            }
        }
    }

    从源码一眼就能看出差别,这个线程池的Worker是吃大锅饭的。开始干活的第一件事是fetchTask()取得I/O事件,然后就是runTask()处理事件。后者与有序的实现基本一致,而fetchTask()则暴露了所有的Worker都是从同一个队列取事件,而不像有序实现那样每一个Worker都有自己的一个专有的锅。fetchTask()的实现使得不同的Worker可能取得同一个IOSession的I/O事件,而这些事件的处理则完全听天由命的取决于Woker的快准狠!从而可能造成sessionClosed事件在messageReceived事件之前被处理。
   
    另外就是有序的实现中虽然每个IOSession对应的队列是ConcurrentLinkedQueue的实例,支持无锁得并发访问。但是在入队和出队的操作时都是使用了synchronized的机制进行访问,主要原因我想一方面是要保证一系列操作的原子性,另一方面其本身就是无锁的实现,所以保证前者的情况下并不会使性能下降多少。
   
    至此简单的分析了一下mina内部线程池有序和无序的实现,不得不说这个设计还是很精妙的。当然设计是简单的具体的实现要充分的考虑多线程的访问,还是有一定的复杂性的。
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

Global site tag (gtag.js) - Google Analytics