关于java:Java并发基石篇上

34次阅读

共计 10569 个字符,预计需要花费 27 分钟才能阅读完成。

概要

并行是这个时代的主旋律,也是很多古代操作系统须要提供的必备性能,在过来摩尔定律催生下,单个 CPU 外围计算的速度越来越快。然而随着产业的倒退,单个 CPU 外围的计算下限曾经难以冲破,传统的增强单核的思维模式曾经不能满足需要。在现代,人们须要弱小的战马来驱动战车,为了可能使得战斗力越来越强,人们驯化了越来越强劲的战马,然而单匹马的力量始终是无限的,因而人们创造了多马并驾的战车构造。同样地,在古代计算机领域,人们在单个 CPU 外围能力无限的状况下,应用多个外围的 CPU 进行并行计算以驱动弱小的算力。
然而,多 CPU 和多战马是远远不同的,在事实世界中的计算工作大多须要互相协调,其根本原因是人类的思维形式是线性串行的,设计一个齐全并行的计算逻辑体系还是有相当大难度的。

如何设计一个高并发的程序,不仅仅是工程界的难题,在计算机学术界也是一个须要一直冲破的钻研畛域。从学术实践提出,到算法设计,再到工程施行,再到长夜验证调优,整个流程都须要比拟长的工夫来进行迭代,究其基本,并行计算自身救赎非常复杂,不确定的,不可预测的逻辑系统。

多核零碎中的一致性

Java 号称一次编写,到处运行,其自身也是构建在不同的零碎之上的,以其运行时 JVM 来屏蔽零碎底层的差别。因而,在介绍 Java 并发体系之前,有必要简要介绍依稀计算机系统层面上的并发,以及面对的问题。

咱们的目标其实很简略,就是让计算机在同一时刻,可能运行更多的工作。而并行计算,提供了十分不错的解决方案。尽管这看起来很天然,但实际上面临着泛滥的问题,其中一个重大的问题就是绝大多数的计算不仅仅是 CPU 一个人的事,而是须要很多计算机系统部件独特参加。然而咱们晓得,计算机系统中运行速度最快的就是 CPU,其余部件例如:内存、磁盘、网络等等都是及其迟缓的,同时这些操作在目前的计算机体系中是很难打消的,因为咱们不可能仅仅靠寄存器就实现所有的计算工作。面对高速 CPU 和低速存储之间的鸿沟,如果想要实现高效数据通讯,一个良好的解决方案就是在他们之间台南佳一个 cache 层,这个 cache 层的速度和整体的速度关系如下:

CPU --> cache --> 存储 

通过 cache 这个缓冲地带,实现 CPU 和存储之间的高效沟通,这是计算机和软件畛域通用的一个问题解决问题:减少中间层,如果一个中间层解决不了,那就两层。在运算的时候,CPU 将须要应用到的数据复制到 cache 中,当前每次获取数据都较为疾速的从 cache 中获取,放慢访问速度。

所谓现实很饱满,事实很骨感。这种计算体系有一个重要的问题须要解决,那就是:缓存一致性(cache coherence)问题。在古代的计算机系统中,次要都是多核零碎为主。在这些计算机系统中,每一个 CPU 都领有本人独立的高速缓存,然而因为主存只有一个,因而他们之间只能共享,这种零碎也被称为:共享内存多核零碎(Shared-Menory multiprocessors System)。

同时为了保障 CPU 数据存储的一致性,须要定义一个对立的缓存一致性协定,这类协定有很多,例如:MSI、MESI、MOSI、Synapse、Firefly 以及 Dragon Protocol 等等。所以,通常状况下,共享内存多核零碎的架构如下:

除了应用高速 cache 来缓存 CPU 和存储设备之间的速度鸿沟,为了可能充分利用多核 CPU 的解决性能,解决在理论执行机器指令时并不一定会依照程序设定的指令程序执行,可能存在代码乱序执行(Out-Of_Order Execution)优化。然而,仅仅只是在代码层面上乱序执行,零碎会保障执行的后果逻辑正确,从宏观上看就如同是程序执行一样。

Java 内存模型

下面咱们探讨了共享内存多核零碎的内存模型,咱们提到了高速缓存以及缓存一致性问题,同时还介绍了指令乱序执行的问题。其实,这些概念在 Java 中也是存在的。因为 Java 的指标是:一次编写,到处运行,因而必须在 JVM 层面上将零碎之间的差别屏蔽掉。面对如此多的零碎,最好的形式就是定义一套 Java 本人的内存拜访模型,而后在不同的硬件平台和操作系统上别离利用本地接口来实现。这里的思维其实和减少 cache 是一样的,通过减少中间层来解决零碎差别带来的合作问题。

Java 工作内存和主存之间的一致性保障次要通过以下 4 种操作实现:

  1. read:Java 执行引擎拜访引擎拜访本地工作内存中的变量正本,如果变量正本有效(变量正本不存在也是有效的一种),那就去主存中获取,同时在本地工作内存中缓存一份
  2. write:Java 执行引擎将最新的变量值赋值给工作内存中的变量正本,同时须要判断是否须要将这个新的值立刻同步到主内存,如果须要同步的话,还须要配合 lock 操作
  3. lock:Java 执行引擎将主内存中的变量锁定,锁定的含意有:其余的线程在此之后不能拜访这个变量直到本线程 unlock;一旦锁定,其余线程对这个变量的操作必须期待
  4. unlock:Java 执行引擎将主内存中的变量解锁,解锁之后各个线程能力从新并发拜访这个变量,直到变量被某个线程再次锁定

Java Thread 创立

在 Java 中,咱们都晓得,一个线程间接对应了一个 Thread 对象。创立和启动一个线程是比拟容易的,咱们只须要创立一个 Thread 对象,而后调用对象的 start 办法即可。然而在创立一个 Thread 对象和启动线程 JVM 中到底产生了什么?本节咱们就来看下。

在创立一个 Thread 对象的时候,除了一些初始化设置之外就没有其余实质性的操作,真正的工作其实是在 start 办法调用中产生的。

Java 通过 registerNatives 办法将 Thread 类中的 java 办法和一个本地的 C /C++ 函数进行对应,同时 registerNatives 办法是类加载的时候调用的,因而在类首次加载的时候(Bootstarp 类加载)就会注册这些 native 办法。

/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {registerNatives();
}
static JNINativeMethod methods[] = {{"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

再看看对应 JNI 的构造体

/*
 * used in RegisterNatives to describe native method name, signature, * and function pointer. */
typedef struct {
    char *name;
    char *signature;
    void *fnPtr;
} JNINativeMethod;

即第一列是 Java 中定义的 native 办法名称,第二列是 Java 办法签名,第三列是本地办法对应函数。因而,Java 中的 start 办法就是对应 native 的 JVM——StartThread 函数:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_StartThread");
  JavaThread *native_thread = NULL;

  // We cannot hold the Threads_lock when we throw an exception,   // due to rank ordering issues. Example:  we might need to grab the   // Heap_lock while we construct the exception.   bool throw_illegal_thread_state = false;

  // We must release the Threads_lock before we can post a jvmti event   // in Thread::start.   {// Ensure that the C++ Thread and OSThread structures aren't freed before     // we operate.     MutexLocker mu(Threads_lock);

    // Since JDK 5 the java.lang.Thread threadStatus is used to prevent     // re-starting an already started thread, so we should usually find     // that the JavaThread is null. However for a JNI attached thread     // there is a small window between the Thread object being created     // (with its JavaThread set) and the update to its threadStatus, so we     // have to check for this     if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {throw_illegal_thread_state = true;} else {
      // We could also check the stillborn flag to see if this thread was already stopped, but       // for historical reasons we let the thread detect that itself when it starts running 
      jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      // Allocate the C++ Thread structure and create the native thread.  The       // stack size retrieved from java is 64-bit signed, but the constructor takes       // size_t (an unsigned type), which may be 32 or 64-bit depending on the platform.       //  - Avoid truncating on 32-bit platforms if size is greater than UINT_MAX.       //  - Avoid passing negative values which would result in really large stacks.       NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;
      // 重点看这里!!!native_thread = new JavaThread(&thread_entry, sz);

      // At this point it may be possible that no osthread was created for the       // JavaThread due to lack of memory. Check for this situation and throw       // an exception if necessary. Eventually we may want to change this so       // that we only grab the lock if the thread was created successfully -       // then we can also do this check and throw the exception in the       // JavaThread constructor.       if (native_thread->osthread() != NULL) {// Note: the current thread is not being used within "prepare".         native_thread->prepare(jthread);
      }
    }
  }

  if (throw_illegal_thread_state) {THROW(vmSymbols::java_lang_IllegalThreadStateException());
  }

  assert(native_thread != NULL, "Starting null thread?");

  if (native_thread->osthread() == NULL) {// No one should hold a reference to the 'native_thread'.     native_thread->smr_delete();
    if (JvmtiExport::should_post_resource_exhausted()) {
      JvmtiExport::post_resource_exhausted(
        JVMTI_RESOURCE_EXHAUSTED_OOM_ERROR | JVMTI_RESOURCE_EXHAUSTED_THREADS,
        os::native_thread_creation_failed_msg());
    }
    THROW_MSG(vmSymbols::java_lang_OutOfMemoryError(),
              os::native_thread_creation_failed_msg());
  }

  Thread::start(native_thread);

JVM_END

这段代码的次要作用是创立一个 JavaThread 对象并启动。咱们进入创立 JavaThread 构造函数

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point);
  // Create the native thread itself.   // %note runtime_23   os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
                                                     os::java_thread;
  // 通过 os 类的 create_thread 函数来创立一个线程   os::create_thread(this, thr_type, stack_sz);
  // The _osthread may be NULL here because we ran out of memory (too many threads active).   // We need to throw and OutOfMemoryError - however we cannot do this here because the caller   // may hold a lock and all locks must be unlocked before throwing the exception (throwing   // the exception consists of creating the exception object & initializing it, initialization   // will leave the VM via a JavaCall and then all locks must be unlocked).   //   // The thread is still suspended when we reach here. Thread must be explicit started   // by creator! Furthermore, the thread must also explicitly be added to the Threads list   // by calling Threads:add. The reason why this is not done here, is because the thread   // object must be fully initialized (take a look at JVM_Start) }

能够看到,重点是通过 os 类的 create_thread 函数来创立一个线程,因为 JVM 是跨平台的,并且不同操作系统上的线程实现机制可能不太一样,因而这里的 create_thread 必定会有多个针对不同平台的实现,咱们查看这个函数的实现就晓得了:

能够看到,HotSpot 提供了次要的操作系统上的实现,因为在服务器上,linux 的占比是很高的,因而咱们这里就看下 linux 上的实现:

bool os::create_thread(Thread* thread, ThreadType thr_type,
                       size_t req_stack_size) {
  ...
  // init thread attributes   pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  // Calculate stack size if it's not specified by caller.   size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
  // In the Linux NPTL pthread implementation the guard size mechanism   // is not implemented properly. The posix standard requires adding   // the size of the guard pages to the stack size, instead Linux   // takes the space out of 'stacksize'. Thus we adapt the requested   // stack_size by the size of the guard pages to mimick proper   // behaviour. However, be careful not to end up with a size   // of zero due to overflow. Don't add the guard page in that case.   size_t guard_size = os::Linux::default_guard_size(thr_type);
  if (stack_size <= SIZE_MAX - guard_size) {stack_size += guard_size;}
  assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

  int status = pthread_attr_setstacksize(&attr, stack_size);
  assert_status(status == 0, status, "pthread_attr_setstacksize");

  // Configure glibc guard page.   pthread_attr_setguardsize(&attr, os::Linux::default_guard_size(thr_type));
  ...
  pthread_t tid;
  // 创立并启动线程   int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
  ...
}

这个函数比拟长,这里就省略局部,只保留和线程创立启动相干的局部,能够看到,在 linux 平台上,JVM 的线程是通过赫赫有名的 pthread 库来创立启动线程的,这里须要留神的是,在指定线程栈大小的时候,并不是程序员指定多少就是多少,而是要依据零碎平台的限度来综合决定的。咱们也能够得出结论,Java Thread 在底层对应一个 pthread 线程。咱们看下 pthread 创立并启动线程的接口:

int thread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

第一个是 pthread_t 构造体数据指针,寄存线程信息,第二个是线程的属性,第三个是线程体,也就是线程理论执行的函数,第四个是线程体的参数列表。
下面调用这个接口的中央,咱们指定了线程体函数是 thread_native_entry,参数是 thread 指针。咱们先看下 thread_native_entry 这个函数的定义:

// Thread start routine for all newly created threads static void *thread_native_entry(Thread *thread) {
  ...
  // call one more level start routine   thread->run();
  ...
}

同样,这里只保留了重点代码,通过正文咱们能够晓得,thread->run() 这一行是最可能执行咱们 run 办法的中央。咱们看一下代码:

// The first routine called by a new Java thread void JavaThread::run() {
  ...
  // We call another function to do the rest so we are sure that the stack addresses used   // from there will be lower than the stack base just computed   thread_main_inner();}

这里重点是调用了 thread_main_inner 函数:

void JavaThread::thread_main_inner() {assert(JavaThread::current() == this, "sanity check");
  assert(this->threadObj() != NULL, "just checking");

  // Execute thread entry point unless this thread has a pending exception   // or has been stopped before starting.   // Note: Due to JVM_StopThread we can have pending exceptions already!   if (!this->has_pending_exception() &&
      !java_lang_Thread::is_stillborn(this->threadObj())) {
    {ResourceMark rm(this);
      this->set_native_thread_name(this->get_thread_name());
    }
    HandleMark hm(this);
    // 这里开始调用 java thread 的 run 办法啦~~~     this->entry_point()(this, this);
  }

  DTRACE_THREAD_PROBE(stop, this);

  // java 中的 run 办法执行结束了,这里须要退出线程并清理资源   this->exit(false);
  // delete cpp 的对象   this->smr_delete();}

能够看到,Java Thread 中的 run 办法就是在 this->entry_point()(this,this); 这里调用的。看这里的调用形式就晓得,entry_point() 返回的是一个函数指针,而后间接调用,entry_point 函数实现如下:

ThreadFunction entry_point() const             { return _entry_point;}

那么_entry_point 是哪里来的?咱们再看下面 JavaThread 的构造函数,咱们发现了一个办法 set_entry_point(entry_point),_entry_point 就是咱们创立 JavaThread 对象时传入的函数指针。

正文完
 0