2017-11-11 2 views

Antwort

12

Hier ist eine Lösung mit der (Art von experimentellen) Base.Threads Threading-Modul.

Eine Lösung mit einer pmap (etc) für eine verteilte Parallelität wäre ähnlich. Obwohl ich denke, dass der Kommunikationsaufwand zwischen den Prozessen Sie verletzen würde.

Die Idee ist, es in Blöcke zu sortieren (einen pro Thread), so dass jeder Thread völlig unabhängig sein kann, nur um seine Blöcke kümmern.

Dann kommt, diese vorsortierten Blöcke zusammenzuführen.

Dies ist ein ziemlich bekanntes Problem beim Zusammenführen sortierter Listen. Siehe auch andere questions zu diesem Thema.

Und vergessen Sie nicht, das Multithreading einzurichten, indem Sie die Umgebungsvariable JULIA_NUM_THREADS vor dem Start einstellen.

Hier ist mein Code:

using Base.Threads 

function blockranges(nblocks, total_len) 
    rem = total_len % nblocks 
    main_len = div(total_len, nblocks) 

    starts=Int[1] 
    ends=Int[] 
    for ii in 1:nblocks 
     len = main_len 
     if rem>0 
      len+=1 
      rem-=1 
     end 
     push!(ends, starts[end]+len-1) 
     push!(starts, ends[end] + 1) 
    end 
    @assert ends[end] == total_len 
    starts[1:end-1], ends 
end 

function threadedsort!(data::Vector) 
    starts, ends = blockranges(nthreads(), length(data)) 

    # Sort each block 
    @threads for (ss, ee) in collect(zip(starts, ends)) 
     @inbounds sort!(@view data[ss:ee]) 
    end 


    # Go through each sorted block taking out the smallest item and putting it in the new array 
    # This code could maybe be optimised. see https://stackoverflow.com/a/22057372/179081 
    ret = similar(data) # main bit of allocation right here. avoiding it seems expensive. 
    # Need to not overwrite data we haven't read yet 
    @inbounds for ii in eachindex(ret) 
     minblock_id = 1 
     ret[ii]=data[starts[1]] 
     @inbounds for blockid in 2:endof(starts) # findmin allocates a lot for some reason, so do the find by hand. (maybe use findmin! ?) 
      ele = data[starts[blockid]] 
      if ret[ii] > ele 
       ret[ii] = ele 
       minblock_id = blockid 
      end 
     end 
     starts[minblock_id]+=1 # move the start point forward 
     if starts[minblock_id] > ends[minblock_id] 
      deleteat!(starts, minblock_id) 
      deleteat!(ends, minblock_id) 
     end 
    end 
    data.=ret # copy back into orignal as we said we would do it inplace 
    return data 
end 

ich einige Benchmarking getan haben:

using Plots 
function evaluate_timing(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(Int, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing(0:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 

Meine Ergebnisse: mit 8 Threads.

plot normal scale plot loglog scale

fand ich den Kreuzungspunkt überraschend niedrig zu sein, etwas über 1024. stellt fest, dass die anfängliche genommen lange Zeit ignoriert werden kann - dass der Code ist JIT für die erste kompilierte Lauf.

Seltsamerweise reproduzieren diese Ergebnisse nicht, wenn Sie BenchmarkTools verwenden. Die Benchmark-Tools hätten dieses anfängliche Timing gestoppt. Sie reproduzieren jedoch sehr konsistent, wenn Sie den normalen Timing-Code verwenden, wie ich oben im Benchmark-Code angegeben habe. Ich denke, es etwas tut, das tötet die Multithreading einige, wie

Big dank @xiaodai, die meine Analyse Code einen Fehler in

1

ich getestet habe weiter, wenn es nur 1% der Artikel sind darauf hingewiesen, einzigartig und enter image description here auch Probenahme von 1:1_000_000. Die Ergebnisse sind unten

enter image description here Funktion evaluate_timing_w_repeats (Bereich) Größen = Int [] threadsort_times = Float64 [] sort_times = Float64 [] für sz in 2^sammeln (Bereich) data_orig = rand (rand (Int, sz 100), sz) drücken! (Größen, sz)

  data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing_w_repeats(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort_with_repeats.png") 

function evaluate_timing1m(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(1:1_000_000, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing1m(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time sampel from 1:1_000_000", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort1m.png") 
Verwandte Themen