自底向上理解Julia中的并行计算

最近看到一些有关Julia并行计算的提问,所以这里不妨开个头,介绍下Julia中并行计算的实现,希望能有更多人能参与进来一起讨论。在Julia文档中,有专门的一部分讲解Parallel Computing(中文翻译见并行计算),采用的是一种Top-Down的方式分别介绍了协程、多线程及分布式处理。这里我打算采用一种Bottom-Up的方式来介绍下Julia中的并行计算,建议先读完官方文档后继续往下看。

几个基本概念

Task

顾名思义,Task就是构造一段执行任务,Task的定义在task.c文件中,不过作为使用者,我们更关心的是调用接口:

julia> methods(Task)
# 1 method for generic function "(::Type)":
[1] Task(f) in Core at boot.jl:377

julia> methodswith(Task)
[1] bind(c::Channel, task::Task) in Base at channels.jl:191
[2] serialize(s::Serialization.AbstractSerializer, t::Task) in Serialization at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.0/Serialization/src/Serialization.jl:427
[3] fetch(t::Task) in Base at task.jl:202
[4] istaskdone(t::Task) in Base at task.jl:117
[5] istaskstarted(t::Task) in Base at task.jl:134
[6] schedule(t::Task) in Base at event.jl:95
[7] schedule(t::Task, arg) in Base at event.jl:129
[8] show(io::IO, ::MIME{Symbol("text/plain")}, t::Task) in Base at show.jl:150
[9] show(io::IO, t::Task) in Base at task.jl:58
[10] wait(t::Task) in Base at task.jl:182
[11] yield(t::Task) in Base at event.jl:166
[12] yield(t::Task, x) in Base at event.jl:166
[13] yieldto(t::Task) in Base at event.jl:181
[14] yieldto(t::Task, x) in Base at event.jl:181

julia> fieldnames(Task)
(:parent, :storage, :state, :donenotify, :result, :exception, :backtrace, :logstate, :code)

task的构造函数只有一个Task(f),其唯一的一个参数f必须是不带参数的函数,如果传一个带参数的函数,会在真正执行时触发MethodError

julia> t = Task((x) -> x + 1)
Task (runnable) @0x00007f600e180d30

julia> schedule(t)
Task (failed) @0x00007f600e180d30
MethodError: no method matching (::getfield(Main, Symbol("##11#12")))()
Closest candidates are:
  #11(::Any) at REPL[29]:1

当然,每次都要记得构造一个闭包很傻,有一个@task宏可以用于简化这个过程:

julia> t = @task println("Hi")
Task (runnable) @0x00007f600f25aa10

julia> schedule(t)
Hi
Task (queued) @0x00007f600f25aa10

julia> t
Task (done) @0x00007f600f25aa10

上面为了看到一个task的执行结果,我们使用了schedule函数,其作用是将这个runnable的task加入到一个全局的task队列中,然后将task的状态置成:queued,系统在空闲时会执行该task(TODO:调度的逻辑),执行结果存在:result字段下,并根据执行结果修改其:state状态(:failed,:done)。不过上面的例子似乎给人一种错觉,在执行完schedule(t)之后,task t立即就执行了,并没有感受到所谓的等待系统空闲。下面这个例子用一个计算密集型的任务来验证下:

t = @task begin 
    println("begin task")
    inv(rand(2000, 2000))
    println("end task") 
    end

begin 
    schedule(t)
    println(length(Base.Workqueue))
    println(t.state)
    println("begin computing")
    println(sum(inv(randn(1500, 1500))))
    println("end computing")
    println(length(Base.Workqueue))
    println(t.state)
end

# 1
# queued
# begin computing
# 97.12983082590253
# end computing
# 1
# queued
# begin task
# end task

可以看到,在schedule(t)之后,t并没有立即被执行,而是被添加到了Base.Workqueue中一直处于queued状态,主流程继续执行,先进行了求逆计算,结束之后,系统再进行task切换,执行t。以上,就是所谓的并发(Concurrency)。对于单一进程来说,并发执行计算密集型任务并没有太大收益,不过,对IO密集型任务来说,则非常有用,在等待的过程中,可以切换到其它任务,一旦条件满足,再切回来就执行,这样看起来,似乎是在同时执行多个任务(并发)。Julia对这里所谓的条件提供了一个统一的概念,称为Condition():

julia> fieldnames(Condition)
(:waitq,)

Condition()只有一个类型为Vector的字段:waitq用于记录在等待该条件的所有task,在一个task内部,可以通过执行wait(c::Condition),声明其正在等待某个条件,然后将自己添加到Base.Workqueue尾部,同时从中取出第一个task并做切换。当条件满足时,通过执行notify(c::Condition)再将这些task重新加入到Base.Workqueue中等待执行。

julia> c = Condition()
Condition(Any[])

julia> t = @task begin
           println("waiting condition")
           wait(c)
           println("condition meet")
           end
Task (runnable) @0x00007f2d954c07f0

julia> schedule(t)
waiting condition
Task (queued) @0x00007f2d954c07f0

julia> notify(c)
condition meet
1

除了通过执行wait进行task切换之外,还可以通过执行yield()主动进行task的切换(其实也是调用了wait()函数)。

yield() = (enq_work(current_task()); wait())

下面看一个yield的例子:

julia> t1 = @task begin
       println("task1 begin")
       yield()
       println("task1 resumed")
       end
Task (runnable) @0x00007f2d954c2f50

julia> t2 = @task begin
       println("task2 begin")
       yield()
       println("task2 resumed")
       end
Task (runnable) @0x00007f2d954c31f0

julia> begin
       schedule(t1)
       schedule(t2)
       yield()
       end
task1 begin
task2 begin
task1 resumed
task2 resumed

关于task,理解这些基本够用了。一个典型的应用是Timer,其中有个字段:cond就是一个Condition(),每当设定的时间周期到了的时候,就会notify挂在该:cond上的task。另外经常用到的@async宏其实就是先构造了一个task,然后执行了schedule(二合一了)

下面我们再深入理解一个更有意思的例子。

Channel

Channel就是一个通道,不同的task可以从一端往其中写入数据,而另外一些task则可以从另外一端读取数据。Channel的结构很简单:

mutable struct Channel{T} <: AbstractChannel{T}
    cond_take::Condition                 # waiting for data to become available
    cond_put::Condition                  # waiting for a writeable slot
    state::Symbol
    excp::Union{Exception, Nothing}         # exception to be thrown when state != :open

    data::Vector{T}
    sz_max::Int                          # maximum size of channel

    # Used when sz_max == 0, i.e., an unbuffered channel.
    waiters::Int
    takers::Vector{Task}
    putters::Vector{Task}
end

其中state字段表示当前channel的状态(:open, :closed),sz_max则表示channel的长度(该长度可以设为0,即无限大)。

对于长度有限的channel来说,执行put!(c, v)写入数据时,如果当前data的长度已经达到了sz_max,则会调用wait()将当前task阻塞,然后每个事件周期都会检查data的长度是否已经小于sz_max,一旦该条件满足,就会往data中写入v,同时通知所有挂在cond_take字段上的task。而执行take!(c)读取数据时,如果当前data中有数据,则取出来,同时通知挂在cond_put上的task,否则,将当前task挂起到cond_take中,等待新的数据。

对于无限长的channel而言,需要用到takersputters字段。在写入数据时,如果takers为空,就将当前task写入到putters中(然后还会通知cond_take上的task,这类task是通过wait(c)挂在在无限长channel上的),否则,从takers中取一个出来重新执行(这里用的是yield(t, v)操作)。取数据时,先将自己加入到waiters中,然后判断putters是否为空,若空,则调用wait()将自己挂起,否则从putters中取出一个执行。

此外,关于Channel有个挺好用的函数Channel(func::Function; ctype=Any, csize=0, taskref=nothing)。关于Channel的例子实在太多了,手册中的那个生产者消费者的例子就挺不错的,这里不列举了。

多线程

这里暂时先不深入介绍多线程,主要是这个Julia中老大难的问题了,目前的接口仍然是实验性的,此外也有一些PR正在做这方面的事情,建议subscribe一些multithreading的PR,了解下最新的进展(比如这个

多进程

前面提到的都还是并发,要实现真正的并行,需要充分利用多核/多台机器。手册里有提到,Julia实现的并行机制有点类似MPI,不过是单向的(也就是说,有一个master进程负责给其它进程分配执行任务)。所有分布式相关的代码都在Julia源码的stdlib/Distributedpackage下,接下来我们一步步展开介绍(如果你想在REPL中测试下面的示例代码,记得先执行using Distributed)。

首先讨论单机多进程的情况。在Julia中,一个工作进程称作一个worker,管理这些worker的进程是LocalProcess(也就是打开REPL后进入的进程)。每个进程都有自己的pid,LocalProcess的pid是1(为了表述方便,以下称其为master)。接下来先回答几个问题:

1. 如何表示一个work中的对象?

对于master而言,worker中的对象有两种表示,一个是Future,另一个是RemoteChannel

mutable struct Future <: AbstractRemoteRef
    where::Int
    whence::Int
    id::Int
    v::Union{Some{Any}, Nothing}
end

其中,where表示v所在的pid,whenceid一般通过RRID生成,分别表示生成该Future对象的进程的pid,而id则是从1开始自增的id。RemoteChannel也类似:

mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
    where::Int
    whence::Int
    id::Int
end

2. 怎么发起远程调用?

Julia中,提供了一个底层函数remotecall来实现远程调用,执行后会立即返回一个Future对象,然后可以通过fetch将value写入到Futurev字段中(此时会发生数据转移,也就是导致并行计算性能瓶颈的地方)。例如:

julia> using Distributed

julia> addprocs()
4-element Array{Int64,1}:
 2
 3
 4
 5

julia> m = remotecall(rand, 5, 2, 2)
Future(5, 1, 6, nothing)

julia> fetch(m)
2×2 Array{Float64,2}:
 0.109123  0.304667
 0.454125  0.197551

此外,Distributed中还提供了一些工具函数和有用的宏,这里不深入介绍,我们更关心的是:

3. 什么时候会发生GC?

Distributed中有一个clear!函数用于将worker中的变量置成nothing,不过,如果不引入全局变量的话,大多时候并不需要手动进行该操作。fetch会自动执行send_del_client函数,并通知gc.此外手册里也提到,由于对master来说,一个RemoteReference的内存占用很小,并不会马上被gc,因而可以调用finalize,从而会立即执行send_del_client向worker发送gc信号。

TODO: 一个分布式并行计算的实例