java 线程的几种状态

java thread的运行周期中, 有几种状态, 在 java.lang.Thread.State 中有详细定义和说明:

NEW 状态是指线程刚创建, 尚未启动

RUNNABLE 状态是线程正在正常运行中, 当然可能会有某种耗时计算/IO等待的操作/CPU时间片切换等, 这个状态下发生的等待一般是其他系统资源, 而不是锁, Sleep等

BLOCKED  这个状态下, 是在多个线程有同步操作的场景, 比如正在等待另一个线程的synchronized 块的执行释放, 或者可重入的 synchronized块里别人调用wait() 方法, 也就是这里是线程在等待进入临界区

WAITING  这个状态下是指线程拥有了某个锁之后, 调用了他的wait方法, 等待其他线程/锁拥有者调用 notify / notifyAll 一遍该线程可以继续下一步操作, 这里要区分 BLOCKED 和 WATING 的区别, 一个是在临界点外面等待进入, 一个是在理解点里面wait等待别人notify, 线程调用了join方法 join了另外的线程的时候, 也会进入WAITING状态, 等待被他join的线程执行结束

TIMED_WAITING  这个状态就是有限的(时间限制)的WAITING, 一般出现在调用wait(long), join(long)等情况下, 另外一个线程sleep后, 也会进入TIMED_WAITING状态

TERMINATED 这个状态下表示 该线程的run方法已经执行完毕了, 基本上就等于死亡了(当时如果线程被持久持有, 可能不会被回收)

下面谈谈如何让线程进入以上几种状态:

1. NEW, 这个最简单了,  
 
     static void NEW() {
          Thread t = new Thread ();
         System. out.println(t.getState());
    }
 
输出NEW
 
2. RUNNABLE, 也简单, 让一个thread start, 同时代码里面不要sleep或者wait等
 
   private static void RUNNABLE() {
         Thread t = new Thread(){
             
              public void run(){
                  for(int i=0; i<Integer.MAX_VALUE; i++){
                      System. out.println(i);
                 }
             }
             
         };
         
         t.start();
    }
 
 71e94764e28c7f8bbd3ef91c1c0088b4
 
3. BLOCKED, 这个就必须至少两个线程以上, 然后互相等待synchronized 块
          
     private static void BLOCKED() {
         
          final Object lock = new Object();
         
         Runnable run = new Runnable() {
             
              @Override
              public void run() {
                  for(int i=0; i<Integer.MAX_VALUE; i++){
                      
                       synchronized (lock) {
                          System. out.println(i);
                      }
                      
                 }
             }
         };
         
         Thread t1 = new Thread(run);
         t1.setName( “t1”);
         Thread t2 = new Thread(run);
         t2.setName( “t2”);
         
         t1.start();
         t2.start();
         
    }
 
8e9ad1eadf9d38c0b6c8cb024cb36c0c
这时候, 一个在RUNNABLE, 另一个就会在BLOCKED (等待另一个线程的 System.out.println.. 这是个IO操作, 属于系统资源, 不会造成WAITING等)
 
4. WAITING, 这个需要用到生产者消费者模型, 当生产者生产过慢的时候, 消费者就会等待生产者的下一次notify
 
     private static void WAITING() {
 
          final Object lock = new Object();
         Thread t1 = new Thread(){
              @Override
              public void run() {
                 
                  int i = 0;
                 
                  while(true ){
                       synchronized (lock) {
                           try {
                               lock.wait();
                          } catch (InterruptedException e) {
                          }
                          System. out.println(i++);
                      }
                 }
             }
         };
         
         Thread t2 = new Thread(){
              @Override
              public void run() {
                 
                  while(true ){
                       synchronized (lock) {
                           for(int i = 0; i< 10000000; i++){
                              System. out.println(i);
                          }
                          lock.notifyAll();
                      }
                      
                 }
             }
         };
         
         t1.setName( “^^t1^^”);
         t2.setName( “^^t2^^”);
         
         t1.start();
         t2.start();
    }
 
 b43a3d9b67bab266ffea4537fb043bba
 
5. TIMED_WAITING, 这个仅需要在4的基础上, 在wait方法加上一个时间参数进行限制就OK了.
 
把4中的synchronized 块改成如下就可以了.
 
synchronized (lock) {
   try {
      lock.wait(60 * 1000L);
   } catch (InterruptedException e) {
   }
   System. out .println(i++);
 }
 
 88d9047d8a709c2d63c695bcf58a0297
另外看stack的输出,  他叫 TIMED_WAITING(on  object monitor) , 说明括号后面还有其他的情况, 比如sleep, 我们直接把t2的for循环改成sleep试试:
 
synchronized (lock) {
    
    try {
          sleep(30*1000L);
    } catch (InterruptedException e) {
    }
    lock.notifyAll();
}
a37ef4c72c00e793f8b6c746d74fd4d9 
 
看到了吧, t2的state是 TIMED_WAITING( sleeping),  而t1依然是on object monitor , 因为t1还是wait在等待t2 notify, 而t2是自己sleep
 
另外, join操作也是进入 on object monitor
 
6. TERMINATED, 这个状态只要线程结束了run方法, 就会进入了…
 
    private static void TERMINATED() {
         Thread t1 = new Thread();
         t1.start();
         System. out.println(t1.getState());
          try {
             Thread. sleep(1000L);
         } catch (InterruptedException e) {
         }
         System. out.println(t1.getState());
    }
输出: 
RUNNABLE
TERMINATED
 
由于线程的start方法是异步启动的, 所以在其执行后立即获取状态有可能才刚进入RUN方法且还未执行完毕
 
 
废话了这么多, 了解线程的状态究竟有什么用?
所以说这是个钓鱼贴么…
 
好吧, 一句话, 在找到系统中的潜在性能瓶颈有作用.
 
当java系统运行慢的时候, 我们想到的应该先找到性能的瓶颈, 而jstack等工具, 通过jvm当前的stack可以看到当前整个vm所有线程的状态, 当我们看到一个线程状态经常处于
WAITING 或者 BLOCKED的时候, 要小心了, 他可能在等待资源经常没有得到释放(当然, 线程池的调度用的也是各种队列各种锁, 要区分一下, 比如下图)
6db341bbd7680bbc2e6ae37a66329397
这是个经典的并发包里面的线程池, 其调度队列用的是LinkedBlockingQueue, 执行take的时候会block住, 等待下一个任务进入队列中, 然后进入执行, 这种理论上不是系统的性能瓶颈, 找瓶颈一般先找自己的代码stack,再去排查那些开源的组件/JDK的问题
 
排查问题的几个思路:
 
0. 如何跟踪一个线程?
看到上面的stack输出没有, 第一行是内容是 threadName priority tid nid desc
更过跟踪tid, nid 都可以唯一找到该线程.
 
1. 发现有线程进入BLOCK, 而且持续好久, 这说明性能瓶颈存在于synchronized块中, 因为他一直block住, 进不去, 说明另一个线程一直没有处理好, 也就这个synchronized块中处理速度比较慢, 然后再深入查看. 当然也有可能同时block的线程太多, 排队太久造成.
 
2. 发现有线程进入WAITING, 而且持续好久, 说明性能瓶颈存在于触发notify的那段逻辑. 当然还有就是同时WAITING的线程过多, 老是等不到释放.
 
3. 线程进入TIME_WAITING 状态且持续好久的, 跟2的排查方式一样.
 
 
上面的黑底白字截图都是通过jstack打印出来的, 可以直接定位到你想知道的线程的执行栈, 这对java性能瓶颈的分析是有极大作用的.
 
NOTE: 上面所有代码都是为了跟踪线程的状态而写的, 千万不要在线上应用中这么写…

struts2 获取文件上传进度方法

一、上传机制

1. struts2 的 Dispacher 类中 wrapRequest 方法, 将http请求头中带有multipart/form-data 开头的请求都会包装成为 MultiPartRequest 的实例

 Dispacher.wrapRequest

这里 MultiPartRequestWrapper 构造方法就是用MultiPartRequest 再包装一层, 同时还会调用他的parse 方法, 进行文件的上传

2. 这里getContainer().getInstance 得到的对象, 是从struts的 ObjectFactory中得到的(可以在对应的struts配置文件的<bean>标签找到), 查看struts默认配置文件 struts-default.xml 可以看到下面这一句

<bean type=“org.apache.struts2.dispatcher.multipart.MultiPartRequest” 
name=“jakarta” class=“org.apache.struts2.dispatcher.multipart.JakartaMultiPartRequest” scope=“default” optional=“true” />

这里的定义, 可以理解为:  Interface obj = cls.newInstance() 写法变成xml而已

default里面定义了两个, name 分别是 struts 和 jakarata, 作用却是完全一样的, 支持文件的分块上传, 这样大文件的上传才不会出问题.

3. 进入JakartaMultiPartRequest 类, 查看 parse方法代码

可以看出, 这里用的是 commons-fileupload 包来做文件上传的.

4. commons-fileupload 包中, ServletFileUpload 类可以使用一个 progressListener 回调对象来监听上传进度, 而这个JakartaMultiPartRequest 类中已经写死了, 没有使用任何listener:

 ServletFileUpload upload = new ServletFileUpload(fac);
 upload.setSizeMax(maxSize);
 List items = upload.parseRequest(createRequestContext(servletRequest));

这时候因为没有注入到监听器, 而且在这里会parseRequest, 也就是把文件上传过来并保存在saveDir目录中, 最后走到action,交给业务去处理。

以上是整个上传的机制,struts2通过封装commons-fileupload,直接免去了文件上传代码的编写,需要做的仅仅是在action里面保存文件以及一些后续工作而已。

但是上面的第4点中提到,因为代码写死了,没有指定监听器,也不能通过注入的形式(因为ServletFileUpload对象是临时new出来的),所以导致文件上传的过程中我们无法知道进度。

二、解决方案

5. 先理解为什么struts2为什么会使用jakarta来包装文件上传的请求,其实是在

     struts.multipart.parser 这个常量中指定的,默认情况下

   <constant name=“struts.multipart.parser” value=“jakarta”></constant>

   这里的value,就是在struts-default.xml 中定义的bean的name  (见第2节)

6. 所以解决的方法是, 我们自己来实现MultiPartRequest  这个接口, 然后自己定义一个bean, 比如叫 jakartaExt, 然后把5中的parser常量的值设置为jakartaExt, 就可以替换系统默认的包装, 用我们自己定义的了, 这个时候我们就可以给ServletFileUpload对象注入listener了.

注入listener
配置

注: 这里的uploadId仅是为了区分不同的上传线程而已(同一个用户可以开多个页面同时上传)

尝试上传文件, 可以看到日志打出来的结果:

2013-03-06 11:48:15,DEBUG,file upload, url=http://localhost/struts2/upload.htm
2013-03-06 11:48:15,DEBUG,[1362541661052] upload progress:0%
2013-03-06 11:48:28,DEBUG,[1362541661052] upload progress:10%
2013-03-06 11:48:38,DEBUG,[1362541661052] upload progress:20%
2013-03-06 11:48:48,DEBUG,[1362541661052] upload progress:30%
2013-03-06 11:48:58,DEBUG,[1362541661052] upload progress:40%
2013-03-06 11:49:08,DEBUG,[1362541661052] upload progress:50%
2013-03-06 11:49:19,DEBUG,[1362541661052] upload progress:60%
2013-03-06 11:49:30,DEBUG,[1362541661052] upload progress:70%
2013-03-06 11:49:40,DEBUG,[1362541661052] upload progress:80%
2013-03-06 11:49:52,DEBUG,[1362541661052] upload progress:90%
2013-03-06 11:50:05,DEBUG,[1362541661052] upload progress:100%
2013-03-06 11:50:05,INFO,[1362541661052] contentLength:2869568 parse use 109772ms

这样, 我们就可以通过ajax的形式, 从session中获取到对应uploadId的上传进度, 页面也就可以做出进度条了.

7. 另一种解决方案

     就是采用servlet来上传, 自己也用commons-fileupload组件来操作, 要注意的是, 把web.xml中struts2的filter-mapping 不能是 /*, 而是应该对应的 /*.action /*.htm 之类的, 否则如果拦截了所有请求, 这时候会先经过struts2的 filter而导致request被wrap掉(wrap的过程就会有parse的操作),导致servlet有响应的时候, 实际上已经上传好了…

我个人还是倾向6的做法, 因为这时候技能做到上传与业务无关, 也能监控进度, 如果在servlet中做的话, 你每上传一种文件都要一段新的处理逻辑挤进去(因为每种文件的处理逻辑不一样)

Java 慎用方法级别的synchronized关键字

为什么要这么说呢, 因为笔者被这个坑过(其实是自己坑自己)╮(╯_╰)╭

先看一段synchronized 的详解:

synchronized 是 java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。

一、当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块。

二、然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(this)同步代码块。

三、尤其关键的是,当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。

四、第三个例子同样适用其它同步代码块。也就是说,当一个线程访问object的一个synchronized(this)同步代码块时,它就获得了这个object的对象锁。结果,其它线程对该object对象所有同步代码部分的访问都被暂时阻塞。

五、以上规则对其它对象锁同样适用.
简单来说, synchronized就是为当前的线程声明一个锁, 拥有这个锁的线程可以执行区块里面的指令, 其他的线程只能等待获取锁, 然后才能相同的操作.
这个很好用, 但是笔者遇到另一种比较奇葩的情况.
1. 在同一类中, 有两个方法是用了synchronized关键字声明
2. 在执行完其中一个方法的时候, 需要等待另一个方法(异步线程回调)也执行完, 所以用了一个countDownLatch来做等待
3. 代码解构如下:
synchronized void  a(){
  countDownLatch = new CountDownLatch(1);
  // do someing
  countDownLatch.await();
}

synchronized void b(){
     countDownLatch.countDown();
}
其中
a方法由主线程执行, b方法由异步线程执行后回调
执行结果是:
主线程执行 a方法后开始卡住, 不再往下做, 任你等多久都没用.
这是一个很经典的死锁问题
a等待b执行, 其实不要看b是回调的, b也在等待a执行. 为什么呢? synchronized 起了作用.
一般来说, 我们要synchronized一段代码块的时候, 我们需要使用一个共享变量来锁住, 比如:
byte[]  mutex = new byte[0];

void a1(){
     synchronized(mutex){
          //dosomething
     }
}

void b1(){

     synchronized(mutex){
          // dosomething
     }

}
如果把a方法和b方法的内容分别迁移到 a1和b1 方法的synchronized块里面, 就很好理解了.
a1执行完后会间接等待(countDownLatch)b1方法执行
然而由于 a1 中的mutex并没有释放, 就开始等待b1了, 这时候, 即使是异步的回调b1方法, 由于需要等待mutex释放锁, 所以b方法并不会执行
于是就引起了死锁
而这里的synchronized关键字放在方法前面, 起的作用就是一样的. 只是java语言帮你隐去了mutex的声明和使用而已. 同一个对象中的synchronized 方法用到的mutex是相同的, 所以即使是异步回调, 也会引起死锁, 所以要注意这个问题. 这种级别的错误是属于synchronized关键字使用不当. 不要乱用, 而且要用对.
那么这样的 隐形的mutex 对象究竟是 什么呢?
很容易想到的就是 实例本身. 因为这样就不用去定义新的对象了做锁了. 为了证明这个设想, 可以写一段程序来证明.
思路很简单, 定义一个类, 有两个方法, 一个方法声明为 synchronized, 一个在 方法体里面使用synchronized(this), 然后启动两个线程, 来分别调用这两个方法, 如果两个方法之间发生锁竞争(等待)的话, 就可以说明 方法声明的 synchronized 中的隐形的mutex其实就是 实例本身了.
public class MultiThreadSync {

    public synchronized void m1() throws InterruptedException{
         System. out.println("m1 call" );
         Thread. sleep(2000);
         System. out.println("m1 call done" );
    }

    public void m2() throws InterruptedException{
          synchronized (this ) {
             System. out.println("m2 call" );
             Thread. sleep(2000);
             System. out.println("m2 call done" );
         }
    }

    public static void main(String[] args) {
          final MultiThreadSync thisObj  = new MultiThreadSync();

         Thread t1 = new Thread(){
              @Override
              public void run() {
                  try {
                      thisObj.m1();
                 } catch (InterruptedException e) {
                      e.printStackTrace();
                 }
             }
         };

         Thread t2 = new Thread(){
              @Override
              public void run() {
                  try {
                      thisObj.m2();
                 } catch (InterruptedException e) {
                      e.printStackTrace();
                 }
             }
         };

         t1.start();
         t2.start();

    }

}
结果输出是:
m1 call
m1 call done
m2 call
m2 call done
说明方法m2的sync块等待了m1的执行. 这样就可以证实 上面的设想了.
另外需要说明的是, 当sync加在 static的方法上的时候, 由于是类级别的方法, 所以锁住的对象是当前类的class实例. 同样也可以写程序进行证明.这里略.
所以方法的synchronized 关键字, 在阅读的时候可以自动替换为synchronized(this){}就很好理解了.
                                        void method(){
void synchronized method(){                 synchronized(this){
      // biz code                               // biz code
}                             ------>>>      }
                                        }

Bloomfilter 原理与应用

前言: 本文只讲解原理, 不讲解BloomFilter中各项指标的公式的推算, 能让你知道有这么一个东西(what), 他能达到什么效果(why), 是如何做到的(how).
(What)

Bloom Filter 是用一个 位数组(数组的每个元素不是1就是0) 来表示一个大的元素集合, 而且通过这个数组就可以判断某个元素是不是属于这个集合

大大节省了空间, 但是有代价的, 就是有一定的错误率(为什么会有, 后面举个例说明一下)
假定我们有一个已知的集合,  S = {a1, a2, … , an} (这里的元素不仅仅是数字, 而是泛指所有对象), 我们要判断一个元素 x 是否属于这个集合, 普通的做法有两种形式:
     1. 遍历 比较,  性能和S的大小成反比     (时间问题)
     2. hash+链表(传说中的hashMap), 速度较快, 但是要把集合的所有数据都存进内存   (空间问题)
在性能要求高, 而且空间不足的情况下, BloomFilter就派上用场了
(Why)
BloomFilter能解决什么问题?
     以少量的内存空间判断一个 元素 是否属于这个集合, 代价是有一定的错误率
(How)
工作原理
     1. 初始化一个数组, 所有位标为0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始为0)
     2. 将已知集合S中的每一个数组, 按以下方式映射到A中
          2.0  选取n个互相独立的hash函数 h1, h2, … hk
          2.1  将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
          2.2  将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
     3.  对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
          如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S
几个前提
     1. hash函数的计算不能性能太差, 否则得不偿失
     2. 任意两个hash函数之间必须是独立的.
          即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义
错误率
     工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的.
     如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S     标红的这句话是绝对靠谱的.
     至于错误率有多大, 我这里不想去推算, 后面会给出参考文章, 不在本文的讨论范围, 很简单就能举个例子说明错误率是存在的
         当A长度不是很大时, 很容易出现一种情况, 使得A上的元素全部被标记为1了, 这时所有的元素都会被认为是S里的元素, 所以, 错误率是存在的!
          (可以看出, 错误的大小跟A的长度以及hash函数的个数有关)
(In Action)
应用
1. 假如你有一个很大的商品库(亿级别), 然后你要做一个浏览型的网站, 这时候, 你不可能把所有的商品都丢给用户去浏览, 而是从商品中挑选出部分属于
     “精品”的商品来给用户浏览, 提高用户体验和转化率, 你对你的精品库建立一套搜索引擎
2. 由于互动需要, 你需要对你维护的精品库的商品数据实时更新动态, 比如XXX在某时间给了一个”好评”,  “购买了一笔”, “赞”,”喜欢”等等
3. 为了实现这种实时更新, 你通过MQ订阅了商品相关的消息(notify)(交易, 评价, SNS), 只要商品发生动态就会发送给你的系统.
4. 这时候, 由于全网的商品很多, 发生动态的消息很多情况下是跟你的精品库没有关系的, 这时候你需要挡掉这些消息, 不进行处理.
5. 此时, 不可能每来一个商品数据你先通过搜索引擎判断一下商品不是在你的精品库内(效率问题, 压力问题), 这时候, bloomFilter派送用场了.
6. 从上面错误率的点, 我们可以看到, 如果一个元素被BloomFilter判断为不属于原有的集合, 那么这个元素是肯定不属于这个集合的(被排除的准确率是100%的)
     通过bloomfilter的几项指标, 就可以挡掉大多数没有相关的数据, 而只处理有关系(虽然有部分无关)的数据了.
(Graphics)附图一张
参考文章:
     http://blog.csdn.net/jiaomeng/article/details/1495500
     http://en.wikipedia.org/wiki/Bloom_filter

mongodb 修改器学习

1. 执行update相关操作都可以使用修改器.
2. 修改器是为了不整个大文档来替换修改, 而是修改局部
3. 详情:
3.1 $inc (自增)
为增加某个字段的数值
例子:
原文档如下:

{
 "_id" : ObjectId("4b253b067525f35f94b60a31"),
 "url" : "www.example.com",
 "pageviews" : 52
}

执行修改器 $inc
db.test.update({url:’www.example.com’},{$inc:{pageviews: 1}})
表示为pageviews这个 key 的值 加上1. 后面是对应要加上的值.
注意, $inc中, 字段对应的值只能是 数字. 但是可以是 正数也可以是负数. 如果是负数, 就是执行一个减法操作.
3.2 $set (设置)

修改某个字段的值为指定的值. 这里的修改是与字段类型无关的. 你可以把一个string修改成long或者一个对象都行.
例子:
原文档如下:

{
"_id" : ObjectId("4b253b067525f35f94b60a31"),
"name" : "joe",
"age" : 30,
"sex" : "male",
"location" : "Wisconsin"
}

修改性别, 变成false (String -> Boolean)
执行修改器 $set

db.test.update({name:'joe'}, {$set:{sex:false}})

表示将 name为joe的这个对象的 sex修改为false
3.3 $unset (删除字段)
这个修改器是为了删除某个指定的字段及其值.
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "howlong" : 6, 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo"
}

把howlong删除
执行修改器 $unset

db.test.update({person:'jiacheo'},{$unset:{howlong:1}})

后面的1没具体意义, 相当于确认
3.4 $push (把一个数据放到数组里面)
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo", 
 "supporters" : [ "gongjin" ]
}

执行修改器, 增加一个人到supporters里面

db.test.update({person:'jiacheo'},{$push:{supporters:'ziming'}})

这时候变成:

{ 
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [
   "gongjin", "ziming" 
 ]
}

若果再执行一遍上面的代码, 那么数组还是会加上同样的名称, 如果你需要排同, 可以用后面这个修改器
3.5 $addToSet (将一个对象放到集合里面. 集合里面不会出现两个重复的一模一样的对象)
例子:
原文档如下:

{
"_id" : ObjectId("4f9105377522000000006be0"),
"lovePerson" : "zhanying",
"person" : "jiacheo",
"supporters" : [
"gongjin",
"ziming"
]
}

执行修改器 $addToSet

db.test.update({person:'jiacheo'},{$addToSet:{supporters:'ziming'}})

文档内容没有改变..
以上两个修改器还可以配合 $each, 把一个数据里的数据append到另一个数组上去(后者会排重)
比如:

db.test.update({person:'jiacheo'}, {$addToSet:{supporters:{$each: ["gongjin","ziming","liuxun","feidu"] }}})

执行后, 结果如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

3.6 $pop 队列出列 (按数组索引顺序出列(正序或者倒序))
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

执行修改器 $pop

db.test.update({person:'jiacheo'},{$pop:{supporters:1}})

修改后变为

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

value 值为1 表示数组索引靠后的先出列, 也就是后进先出, 相当于出栈的概念. value值为-1表示数组索引考前的先出列, 也就是先进先出, 相当于一个FIFO的队列
3.7 $pull (符合条件的元素出列)
与上一个相比, 这个修改器会把符合条件的元素从数组中删除, 而不是简单的按照索引值的大小来处理.
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

执行修改器 $pull

db.test.update({person:'jiacheo'},{$pull:{supporters: 'gongjin'}})

修改后:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "ziming", "feidu" ]
}

可以见, gongjin从supporters中被remove掉了.
这里所有被匹配的元素都会被出列

tomcat thread dump 分析

  1. 前言

Java Thread Dump 是一个非常有用的应用诊断工具, 通过thread dump出来的信息, 可以定位到你需要了解的线程, 以及这个线程的调用栈. 如果配合linux的top命令, 可以找到你的系统中的最耗CPU的线程代码段, 这样才能有针对性地进行优化.

  1. 场景和实践

    2.1. 后台系统一直是在黑盒运行, 除了能暂停一部分任务的执行, 根本无法知道哪些任务耗CPU过多。所以一直以为是业务代码的问题, 经过各种优化(删减没必要的逻辑, 合并写操作)等等优化, 系统负载还是很高. 没什么访问量, 后台任务处理也就是每天几百万的级别, load还是达到了15以上. CPU只有4核,天天收到load告警却无从下手, 于是乎就被迫来分析一把线程.

   2.2 系统跑的是java tomcat, 要触发tomcat thread dump很简单, 先找到tomcat对应的进程id, 我们设置为PID
   【linux 命令】:  ps -ef | grep tomcat
   可以找到, 然后给这个进程发送一个QUIT的信号量, 让其触发线程的dump,  下面的操作先别急着动手, 等到看完2.3再动手不迟
    【linux 命令】: kill -3 $PID   /  kill -QUIT $PID
tomcat会把thread dump的内容输出到控制台
     【linux 命令】:cd $tomcathome/logs/
查看 catalina.out 文件, 把最后的跟thread相关的内容获取出来.
大致内容如下:
2012-04-13 16:30:41
Full thread dump OpenJDK 64-Bit Server VM (1.6.0-b09 mixed mode):
"TP-Processor12" daemon prio=10 tid=0x00000000045acc00 nid=0x7f19 in Object.wait() [0x00000000483d0000..0x00000000483d0a90]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00002aaab5bfce70> (a org.apache.tomcat.util.threads.ThreadPool$ControlRunnable)
at java.lang.Object.wait(Object.java:502)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:662)
- locked <0x00002aaab5bfce70> (a org.apache.tomcat.util.threads.ThreadPool$ControlRunnable)
at java.lang.Thread.run(Thread.java:636)

"TP-Processor11" daemon prio=10 tid=0x00000000048e3c00 nid=0x7f18 in Object.wait() [0x00000000482cf000..0x00000000482cfd10]
java.lang.Thread.State: WAITING (on object monitor)
....
"VM Thread" prio=10 tid=0x00000000042ff400 nid=0x77de runnable"GC task thread#0 (ParallelGC)" prio=10 tid=0x000000000429c400 nid=0x77d9 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x000000000429d800 nid=0x77da runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x000000000429ec00 nid=0x77db runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00000000042a0000 nid=0x77dc runnable

"VM Periodic Task Thread" prio=10 tid=0x0000000004348400 nid=0x77e5 waiting on condition

JNI global references: 815

Heap
PSYoungGen      total 320192K, used 178216K [0x00002aaadce00000, 0x00002aaaf1800000, 0x00002aaaf1800000)
eden space 303744K, 55% used [0x00002aaadce00000,0x00002aaae718e048,0x00002aaaef6a0000)
from space 16448K, 65% used [0x00002aaaf0690000,0x00002aaaf110c1b0,0x00002aaaf16a0000)
to   space 16320K, 0% used [0x00002aaaef6a0000,0x00002aaaef6a0000,0x00002aaaf0690000)
PSOldGen        total 460992K, used 425946K [0x00002aaab3a00000, 0x00002aaacfc30000, 0x00002aaadce00000)
object space 460992K, 92% used [0x00002aaab3a00000,0x00002aaacd9f6a30,0x00002aaacfc30000)
PSPermGen       total 56192K, used 55353K [0x00002aaaae600000, 0x00002aaab1ce0000, 0x00002aaab3a00000)
object space 56192K, 98% used [0x00002aaaae600000,0x00002aaab1c0e520,0x00002aaab1ce0000)
最后一段是系统的对内存的使用情况.
2.3. 要知道thread dump是不会告诉你每个线程的负载情况的, 需要知道每个线程的负载情况, 还得靠top命令来查看.
    【linux 命令】:top -H -p $PID
这时候, 可以看到java进程下各个线程的负载和内存等使用情况. 也不用全部搞下来, 只要top几个负载过高的记录即可(最好按下SHIFT+T 按CPU耗时总时间倒序排序,这样找到的top几个是最耗CPU时间的,而且系统启动时间应该持续15分钟以上,这样容易看出哪个线程耗时多。)
     大致内容如下:
Tasks: 118 total,   2 running, 116 sleeping,   0 stopped,   0 zombie
Cpu(s): 92.6%us,  2.3%sy,  0.0%ni,  3.8%id,  0.7%wa,  0.1%hi,  0.7%si,  0.0%st
Mem:   4054168k total,  3892212k used,   161956k free,   115816k buffers
Swap:  4192956k total,   294448k used,  3898508k free,  2156024k cachedPID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
8091 admin     16   0 1522m 814m 9660 R 22.3 20.6   4:05.61 java
8038 admin     16   0 1522m 814m 9660 R 10.3 20.6   2:46.31 java
8043 admin     15   0 1522m 814m 9660 S  3.7 20.6   1:52.04 java
8039 admin     15   0 1522m 814m 9660 S  0.7 20.6   2:10.98 java
8041 admin     15   0 1522m 814m 9660 S  0.7 20.6   1:39.66 java
8009 admin     15   0 1522m 814m 9660 S  0.3 20.6   0:27.05 java
8040 admin     15   0 1522m 814m 9660 S  0.3 20.6   0:51.46 java
7978 admin     25   0 1522m 814m 9660 S  0.0 20.6   0:00.00 java
7980 admin     19   0 1522m 814m 9660 S  0.0 20.6   0:05.05 java
7981 admin     16   0 1522m 814m 9660 S  0.0 20.6   0:06.31 java
7982 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.50 java
7983 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.66 java
7984 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.87 java
7985 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:33.82 java
几个字段跟top的字段意思是一致的, 就是这里的 PID是 线程在系统里面的ID, 也就是进程每创建一个线程, 不仅进程自己会分配ID, 系统也会的. 接下来的问题排查就是主要根据这个PID来走的.
看到上面的部分数据, 当前正在跑的任务中, CPU占用最高的几个线程ID
2.4. 如果不借助工具, 自己分析的话, 可以把PID字段从10进制数改为 16进制, 然后到threaddump日志中去查找一把, 找对对应的线程上下文信息, 就可以知道哪段代码耗CPU最多了.
比如 8091  的16进制是 1F9B, 查找 thread dump 日志中, nid=0x1F9B 的线程( 这里的nid意思是nativeid, 也就是上面讲的系统为线程分配的ID), 然后找到相关的代码段, 进行优化即可.
比如
"链路检测" prio=10 tid=0x00002aaafa498000 nid=0x1F9B runnable [0x0000000045fac000..0x0000000045facd10]</div>

java.lang.Thread.State: RUNNABLE
at cn.emay.sdk.communication.socket.AsynSocket$CheckConnection.run(AsynSocket.java:112)
at java.lang.Thread.run(Thread.java:636)
可以看出, 这是一个 发短信的客户端的链路检测引擎的系统负载飙升. (实际上这个线程引起的负载绝不止这么一点.)
2.5 第三方的jar包, 我感到顿时泪奔. 接下来是反编译, 看详细的代码… 果然是有一段死循环监听的… 目前是像他们要一份SDK的源代码, 或者要他们进行优化。
2.6 使用工具的话, 可以看到更多一点的信息, java的tda工具就是专门分析thread dump的.
具体功能自己去挖掘啦.

Java如何等待子线程执行结束

工作中往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总的产景, 这时候就需要使用线程了。
一个线程启动之后, 是异步的去执行需要执行的内容的, 不会影响主线程的流程,  往往需要让主线程指定后, 等待子线程的完成. 这里有几种方式.
站在 主线程的角度, 我们可以分为主动式和被动式.
主动式指主线主动去检测某个标志位, 判断子线程是否已经完成. 被动式指主线程被动的等待子线程的结束, 很明显, 比较符合人们的胃口. 就是你事情做完了, 你告诉我, 我汇总一下, 哈哈.
那么主线程如何等待子线程工作完成呢. 很简单, Thread 类给我们提供了join 系列的方法, 这些方法的目的就是等待当前线程的die. 举个例子:

public class Threads {
    public static void main(String[] args) {
        SubThread thread = new SubThread();
        thread.start();
        //主线程处理其他工作,让子线程异步去执行.
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        //主线程其他工作完毕,等待子线程的结束, 调用join系列的方法即可(可以设置超时时间)
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("now all done.");
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {
        @Override
        public void run() {
            working();
        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

本程序的数据有可能是如下:

main thread work start
sub thread start working.
main thread work done.
now waiting sub thread done.
sub thread stop working.
now all done.

忽略标号, 当然输出也有可能是1和2调换位置了. 这个我们是无法控制的. 我们看下线程的join操作, 究竟干了什么.

public final void join() throws InterruptedException {
   join(0);
}

这里是调用了

public final synchronized void join(long millis)
    throws InterruptedException

方法, 参数为0, 表示没有超时时间, 等到线程结束为止. join(millis)方法里面有这么一段代码:

        while (isAlive()) {
                wait(0);
        }

说明, 当线程处于活跃状态的时候, 会一直等待, 直到这里的isAlive方法返回false, 才会结束.isAlive方法是一个本地方法, 他的作用是判断线程是否已经执行结束. 注释是这么写的:

Tests if this thread is alive. A thread is alive if it has been started and has not yet died.

可见, join系列方法可以帮助我们等待一个子线程的结束.
那么要问, 有没有另外一种方法可以等待子线程结束? 当然有的, 我们可以使用并发包下面的Future模式.
Future是一个任务执行的结果, 他是一个将来时, 即一个任务执行, 立即异步返回一个Future对象, 等到任务结束的时候, 会把值返回给这个future对象里面. 我们可以使用ExecutorService接口来提交一个线程.

public class Threads {

    static ExecutorService executorService = Executors.newFixedThreadPool(1);

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SubThread thread = new SubThread();
        // thread.start();
        Future future = executorService.submit(thread);
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        future.get();
        // try {
        // thread.join();
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        System.out.println("now all done.");
        executorService.shutdown();
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {
        @Override
        public void run() {
            working();
        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

这里, ThreadPoolExecutor 是实现了 ExecutorService的方法, sumbit的过程就是把一个Runnable接口对象包装成一个 Callable接口对象, 然后放到 workQueue里等待调度执行. 当然, 执行的启动也是调用了thread的start来做到的, 只不过这里被包装掉了. 另外, 这里的thread是会被重复利用的, 所以这里要退出主线程, 需要执行以下shutdown方法以示退出使用线程池. 扯远了. 
这种方法是得益于Callable接口和Future模式, 调用future接口的get方法, 会同步等待该future执行结束, 然后获取到结果. Callbale接口的接口方法是 V call(); 是可以有返回结果的, 而Runnable的 void run(), 是没有返回结果的. 所以, 这里即使被包装成Callbale接口, future.get返回的结果也是null的.如果需要得到返回结果, 建议使用Callable接口.
通过队列来控制线程的进度, 是很好的一个理念. 我们完全可以自己搞个队列, 自己控制. 这样也可以实现. 不信看代码:

public class Threads {

    // static ExecutorService executorService = Executors.newFixedThreadPool(1);
    static final BlockingQueue queue = new ArrayBlockingQueue(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SubThread thread = new SubThread(queue);
        thread.start();
        // Future future = executorService.submit(thread);
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        // future.get();
        queue.take();
        // try {
        // thread.join();
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        System.out.println("now all done.");
        // executorService.shutdown();
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {

        private BlockingQueue queue;

        /**
         * @param queue
         */
        public SubThread(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                working();
            } finally {
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

这里是得益于我们用了一个阻塞队列, 他的put操作和take操作都会阻塞(同步), 在满足条件的情况下.当我们调用take()方法时, 由于子线程还没结束, 队列是空的, 所以这里的take操作会阻塞, 直到子线程结束的时候, 往队列里面put了个元素, 表明自己结束了. 这时候主线程的take()就会返回他拿到的数据. 当然, 他拿到什么我们是不必去关心的.
以上几种情况都是针对子线程只有1个的时候. 当子线程有多个的时候, 情况就不妙了.
第一种方法, 你要调用很多个线程的join, 特别是当你的线程不是for循环创建的, 而是一个一个创建的时候.
第二种方法, 要调用很多的future的get方法, 同第一种方法.
第三种方法, 比较方便一些, 只需要每个线程都在queue里面 put一个元素就好了.但是, 第三种方法, 这个队列里的对象, 对我们是毫无用处, 我们为了使用队列, 而要不明不白浪费一些内存, 那有没有更好的办法呢?
有的, concurrency包里面提供了好多有用的东东, 其中, CountDownLanch就是我们要用的.
CountDownLanch 是一个倒数计数器, 给一个初始值(>=0), 然后每countDown一次就会减1, 这很符合等待多个子线程结束的场景: 一个线程结束的时候, countDown一次, 直到所有都countDown了 , 那么所有子线程就都结束了.
先看看CountDownLanch有哪些方法:

CountDownLatch

await: 会阻塞等待计数器减少到0位置. 带参数的await是多了等待时间.
countDown: 将当前的技术减1
getCount(): 返回当前的计数
显而易见, 我们只需要在子线程执行之前, 赋予初始化countDownLanch, 并赋予线程数量为初始值.
每个线程执行完毕的时候, 就countDown一下.主线程只需要调用await方法, 可以等待所有子线程执行结束, 看代码:

public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
    static final BlockingQueue queue = new ArrayBlockingQueue(1);
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int threads = 5;
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        for(int i=0;i < threads;i++){
            SubThread thread = new SubThread(2000*(i+1), countDownLatch);
            thread.start();
        }
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        countDownLatch.await();
        System.out.println("now all done.");
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread{

        // private BlockingQueue queue;
        private CountDownLatch countDownLatch;
        private long work;


        public SubThread(long work, CountDownLatch countDownLatch) {
            // this.queue = queue;
            this.countDownLatch = countDownLatch;
            this.work = work;
        }

        @Override
        public void run() {
            try{
                working();
            }finally{
                countDownLatch.countDown();
            }
        }

        private void working() {
            System.out.println(getName()+" sub thread start working.");
            busy();
            System.out.println(getName()+" sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(work);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

此种方法也适用于使用 ExecutorService summit 的任务的执行.
另外还有一个并发包的类CyclicBarrier, 这个是(子)线程之间的互相等待的利器. 栅栏, 就是把大家都在一个地方堵住, 就像水闸, 等大家都完成了之前的操作, 在一起继续下面的操作. 不过就不再本篇的讨论范围内了.

MapReduce编程(入门篇)

 一. MapReduce 编程模型

还是以一个经典的图片来说明问题.

map reduce thought

1. 首先, 我们能确定我们有一份输入, 而且他的数据量会很大

2. 通过split之后, 他变成了若干的分片, 每个分片交给一个Map处理

3. map处理完后, tasktracker会把数据进行复制和排序, 然后通过输出的key 和value进行 partition的划分, 并把partition相同的map输出, 合并为相同的reduce的输入.
4. ruducer通过处理, 把数据输出, 每个相同的key, 一定在一个reduce中处理完, 每一个reduce至少对应一份输出(可以通过扩展MultipleOutputFormat来得到多分输出)
5. 来看一个例子, 如下图:(来自 《hadoop权威指南》 一书)
实例
  说明几点:
  5.1 输入的数据可能就是一堆文本
  5.2 mapper会解析每行数据, 然后提取有效的数据, 作为输出. 这里的例子是 从日志文件中提取每一年每天的气温, 最后会计算每年的最高气温
  5.3 map的输出就是一条一条的 key-value
  5.4 通过shuffle之后, 变成reduce的输入, 这是相同的key对应的value被组合成了一个迭代器
  5.5 reduce的任务是提取每一年的最高气温, 然后输出

二. Mapper

1. mapper可以选择性地继承 MapreduceBase这个基类, 他只是把一些方法实现了而已, 即使方法体是空的.
2. mapper必须实现 Mapper 接口(0.20以前的版本), 这是一个泛型接口, 需要执行输入和输出的key-value的类型, 这些类型通常都是Wriable接口的实现类
3. 实现map方法, 方法有四个参数, 前面两个就是输入的 Key 和 value, 第三个参数是 OuputCollector, 用于收集输出的, 第四个是reporter,用来报告一些状态的,可以用于debug
  3.1 input 默认是一行一条记录, 每天记录都放在value里边
  3.2 output  每次搜集一条 K-V记录, 一个K可以对应多个value, 在reduce 里面体现为一个 iterator
4. 覆盖 configure方法可以得到JobConf的实例, 这个JobConf是在Job运行时传递过来的, 可以跟外部资源进行数据交互

三. Reducer

1. reduce也可以选择继承 MapreduceBase这个基类, 功能跟mapper一样.
2. reducer必须实现Reducer接口, 这个接口同样是泛型接口, 意义跟Mapper的类似
3. 实现reduce方法, 这个方法也有四个参数, 第一个是输入的key, 第二个是输入的 value的迭代器, 可以遍历所有的value,相当于一个列表, outputCollector跟map的一样, 是输出的搜集器, 每次搜集都是key-value的形式, report的作用跟map的相同.
4. 在新版本中, hadoop已经将后面两个参数合并到一个context对象里边了, 当然还会兼容就版本的 接口. >0.19.x
5. 覆盖configure方法, 作用跟map的相同
6. 覆盖close 方法,可以做一些reduce结束后的处理工作.(clean up)

四. Combiner

1. combiner的作用是, 将map的输出,先计算一遍,得到初步的合并结果, 减少reduce的计算压力.
2. combiner的编写方法跟reduce是一样的, 他本来就是一个Reducer的实现类
3. 当reducer符合函数  F(a,b) = F(F(a), F(b)) 时, combinner可以与reduce相同. 比如 sum(a,b,c,d,e,f,g) = sum(sum(a,b) ,sum(c,d,e,f) , sum(g)) 还有max, min等等.
4. 编写正确的combiner可以优化整个mapreduce程序的性能.(特别是当reduce是性能瓶颈的时候.)
5. combiner可以跟reducer不同.

五. Configuration

1. 后加的属性的值会覆盖前面定义的相同名称的属性的值.
2. 被定义为 final的属性(在属性定义中加上<final>true</final>标签)不会被后面的同名属性定义的值给覆盖.
3. 系统属性比通过资源定义的属性优先级高, 也就是通过System.setProperty()方法会覆盖在资源文件中定义的属性的值.
4. 系统属性定义必须在资源文件中有相应的定义才会生效.
5. 通过 -D 选项定义的属性, 比在资源文件中定义的属性优先级要高.

六. Run Jobs

1. 设置 inputs & output
    1.1 先判断输入是否存在 (不存在会导致出错,最好利用程序来判断.)
    1.2 判断输出是否已经存在(存在也会导致出错)
    1.3 养成一种好的习惯(先判断,再执行)
2. 设置 mapper、reducer、combiner. 各个实现类的class对象.  XXXX.class
3. 设置 inputformat & outputformat & types
    3.1 input和output format都有两种, 一种是 textfile, 一种是sequencefile. 简单理解, textfile是文本组织的形式,sequence file是 二进制组织的形式.
    3.2 Types的设置, 根据输入和输出的数据类型, 设置各种Writable接口的实现类的class对象.
4. 设置reduce count
    4.1 reduce count可以为0, 当你的数据无需reduce的时候.
    4.2 reduce数量最好稍微少于当前可用的slots的数量, 这样reduce就能在一波计算中算好. (一个slot可以理解为一个计算单元(资源).)

七. 其他的一些细节.

1. ChainMapper可以实现链式执行mapper 他本身就是一个Mapper的实现类. 提供了一个addMapper的方法.
2. ChainReducer 跟ChainMapper类似, 可以实现链式执行reducer, 他是Reducer的实现类.
3. 多个job先后运行, 可以通过先后执行 JobClient.runJob方法来实现先后顺序
4. 扩展MultipleOutputFormat接口, 可以实现一个reduce对应多份输出 (而且可以指定文件名哦)
5. Partitioner 接口用于将 Map的输出结果进行分区, 分区相同的key对应的数据会被同一个reducer处理
    5.1 提供了一个接口方法: public int getPartition(K2 key, V2 value, int numReduceTasks)
    5.2 可以自己定义, 根据key的某些特指来划分, 也可以根据value的某些特质来划分.
    5.3 numReduceTasks就是设置的reduce的个数.一般返回的partition的值应该都小于这个值.(%)
6. reporter的作用
    6.1 reporter.incrCounter(key, amount). 比如对数据计算是, 一些不合规范的脏数据, 我们可以通过counter来记录有多少
    6.2 reporter.setStatus(status); 方法可以设置一条状态消息, 当我们发现job运行出现这条消息是, 说明出现了我们预期的(错误或者正确)的情况, 用于debug.
    6.3 reporter.progress(), 像mapreduce框架报告当前运行进度. 这个progress可以起到心跳的作用. 一个task要是超过10分钟没有想mapreduce框架报告情况, 这个reduce会被kill掉. 当你的任务处理会比较旧是, 最好定时向mapreduce汇报你的状态.
7. 通过实现Wriable接口, 我们可以自定义key和value的类型, 使用起来就像pojo, 不需要每次都进行parse. 如果你的自定义类型是Key的类型, 则需要同时实现Comparable 接口, 用于排序. 比如MapWritable就是一个例子.

八. 实战.(简单篇)

简单篇:

1. 需求: 统计某个站点每天的PV

2. 数据输入: 以天为分区存放着的日志数据, 一条日志代表一个PV

3. 数据输出: 日期   PV

4. Mapper编写

主要的工作很简单, split每一条日志, 取出日期, 并对该日期的PV搜集一条记录, 记录的value为ONE(1, 一条记录代表一个PV)

5. Reducer编写

reduce的任务是将每天(key相同的为同一天) 的日志进行汇总(sum), 最后以天为key输出汇总结果.

6. 设置环境, 指定job(Run)

6.1 设置输入路径.

 

 

 

 

6.2 设置输出路径

6.3 设置Mapper/Reducer 和 输入数据的数据格式和数据类型

6.4  执行命令:

hadoop jar site-pv-job.jar org.jiacheo.SitePVSumSampleJob

6.5 查看hadoop的web 工具, 显示当前job进度.

可以看出, 此次输入产生了14292个map,和29个reduce. reduce数这么少是因为我的reduce的slots数只有30, 所以设置为29, 以防一个挂了, 还能在一波reduce中算好.

6.6 计算结果.

上面部分是hadoop cli客户端显示的进度, 中间是web工具显示的输入输出的一些数据的统计.可以看出, 此次输入数据总共有1.6TB大小, 设计的总记录数为69.6亿. 也就是这份数据记录了该站点的69.6亿的PV. 左下角可以看出, 执行时间比较长, 用了18分钟+46秒.这里慢的原因不在于reduce, 而是我的map的slots太少, 只有300个, 总共一万多个map, 那要分好几百波才能算完map, 所以瓶颈在map这里.右下角是统计的结果数据, 可以看出, 该站点的整体的PV是呈现上升趋势的.

至此, 一个简单的map/reduce程序就写好并运行了.

下面介绍复杂一点的实践. 当然, 还是等有时间再来介绍吧. 碎觉先.

 

 

linux screen初体验

由于公司的线上线下环境(除了本机)都是linux环境, 所以工作的时候, securecrt少不了, 往往还需要同时登陆多台机器进行操作. 在操作的时候, 有时候还需要等待一个命令执行完才能搞其他东西, 有点浪费时间. 当然, 开多个securecrt来连接到远程机器可以解决这个问题, 但是另一个问题, 这种做法是解决不了的, 就是如果我要保存上一次的工作状态, 等下次登陆之后可以立刻进入上一次的状态继续工作, 这个时候, 需要我们对这种状态做一下保存, 而securecrt终端是不维护这种状态的, 每次断开重新登陆后都要重新敲一遍命令回到上次的状态. 这时候, screen命令就非常有用了. screen提供了类似于linux中的桌面的功能, 而且可以同时有多个桌面一起工作, 称之为window(窗口). 通过一些快捷键, 可以很方便地从一个窗口切换到另一个窗口, 而且这些窗口的状态都是保存着的. 下面介绍screen命令如何使用, 以及使用效果.

1. screen有哪些选项, man一把就知道了, 自己看看.

2. screen配置文件, 这个可有可无, 有这个配置文件可以让screen看起来比较容易理解, 我们只需要新建一个文本文件, 在里面键入以下配置信息即可:


caption always "%{wb} %-w%{+b bw}%n %t%{-}%+w"      #蓝底白字

#caption always "%{=b kR} %-w%{-b bg}%n %t%{-}%+w"   #红色的,这两句选一句就可以了

defutf8 on    #如果有中文乱码就加上,否则可以不用

defencoding utf-8

encoding utf-8 utf-8

defscrollback 8192     #可以解决闪屏的问题

保存文件, 比如文件名为.screenrc

3. 接下来是启动screen

敲入命令


screen -D -RR youscrid -c ${yourdir}/.screenrc

这里, yousrcid是由你自己定义的, 就是一个screen的sessionid, 下次再回到这个screen时, 需要用到这个id, 可以取个容易记住的名称, 比如linux的用户名之类的.

-c后面的选项值就是第2步保存的配置文件.

4. 经过前面3步, screen以及建立了, 这时候你在这个screen中干啥都行了.

4.1 新建一个新的窗口:  ctrl+a c 这里是快捷键, 先按下ctrl+a两个键, 然后再敲一下c (create)键就可以创建一个新的窗口了.

4.2 为窗口命名, 默认窗口的名称都叫 bash, 为了工作方便, 你可以给每个窗口命名, 快捷键是 ctrl+a A  注意这里后面是大写的A,(你也可以按下shift+a(小写a), 就不用开启caps lock了)

4.3 切换窗口,  切到下一个窗口的快捷键是:  ctrl+a n (next)  , 切换到上一个窗口快捷键是:  ctrl+a p (pre) . 另外还可以  ctrl+a  num  num值数字, 每新建一个screen都会有一个数字作为他的tab下标的(默认从0开始, 这个你肉眼是看的到的, 有了上面的配置的话)

4.4 退出窗口,  直接 exit 就可以退出当前的一个window了.

4.5  退出到shell控制台, 而不退出窗口,  ctrl+a d  这时候会回到你创建这个screen的那个控制台, 当前的screen状态都会被保持着. 这时候即使你在screen里面远程连接到其他机器上, 还是可以保持的, 而且下次恢复到screen的适合, 这种连接不会断开.

4.6  回到原有的screen, 在linux控制台下敲入命令:


screen -r yourscrid

这里的yoursrcid就是你在第3步的时候创建screen时候指定的id, 这个时候就会很神奇地回到上一次的工作状态中了!

5. 其他选项有待各位看官自己挖掘了.

6. 使用截图

 

参考文档

http://www.ibm.com/developerworks/cn/linux/l-cn-screen/

http://www.ibm.com/developerworks/cn/aix/library/au-gnu_screen/

http://blog.chinaunix.net/u2/82938/showart_1821389.html

 

罗汉堂课程学习分享:【在合作中成长】

上周二下午一整个下午参加了罗汉堂举办的 《在合作中成长》课程, 讲师:劝天。

我觉得这是一堂非常有意义的课程,在课程我过程中, 我学到了很多, 也结合自己的实际情况, 做了一些调整。(下面是剧透)

课堂一开始, 老师就让我们通过组合梭哈牌的方法来组成队伍,每个人都只能看到别人的牌, 不能知道自己的牌, 最终组合的牌最大的一组胜出。我一想首先牌比较少, 而且我也不知道自己的牌是什么牌, 要跟别人组成同花顺不是很靠谱, 于是想当下应该是4条最能明确, 我搜寻了一下, 发现旁边4个女生是4个Q,我就直接叫他们跟我组成一队了, 没想到他们也这么信任我, 就稀里糊涂跟我走鸟. 最后我们组成的队伍牌果然是最大的, 4条Q+方片A, 完美的组合啊. 后来想想, 是不是因为几个女生都不是很懂得梭哈才跟了我走呀, 哈哈. 后面选组长, 我居然成了组长, 带领5个美女, 朝着我们的目标: TO BE NO.1 前行。