Ich möchte eine große Anzahl von Dingen sortieren.Wie kann ich die Sortierung parallelisieren?
Julias Standard-Bibliothekssortierung ist single-threaded. Wie kann ich meine Multicore-Maschine nutzen, um Dinge schneller zu sortieren?
Ich möchte eine große Anzahl von Dingen sortieren.Wie kann ich die Sortierung parallelisieren?
Julias Standard-Bibliothekssortierung ist single-threaded. Wie kann ich meine Multicore-Maschine nutzen, um Dinge schneller zu sortieren?
Hier ist eine Lösung mit der (Art von experimentellen) Base.Threads
Threading-Modul.
Eine Lösung mit einer pmap
(etc) für eine verteilte Parallelität wäre ähnlich. Obwohl ich denke, dass der Kommunikationsaufwand zwischen den Prozessen Sie verletzen würde.
Die Idee ist, es in Blöcke zu sortieren (einen pro Thread), so dass jeder Thread völlig unabhängig sein kann, nur um seine Blöcke kümmern.
Dann kommt, diese vorsortierten Blöcke zusammenzuführen.
Dies ist ein ziemlich bekanntes Problem beim Zusammenführen sortierter Listen. Siehe auch andere questions zu diesem Thema.
Und vergessen Sie nicht, das Multithreading einzurichten, indem Sie die Umgebungsvariable JULIA_NUM_THREADS
vor dem Start einstellen.
Hier ist mein Code:
using Base.Threads
function blockranges(nblocks, total_len)
rem = total_len % nblocks
main_len = div(total_len, nblocks)
starts=Int[1]
ends=Int[]
for ii in 1:nblocks
len = main_len
if rem>0
len+=1
rem-=1
end
push!(ends, starts[end]+len-1)
push!(starts, ends[end] + 1)
end
@assert ends[end] == total_len
starts[1:end-1], ends
end
function threadedsort!(data::Vector)
starts, ends = blockranges(nthreads(), length(data))
# Sort each block
@threads for (ss, ee) in collect(zip(starts, ends))
@inbounds sort!(@view data[ss:ee])
end
# Go through each sorted block taking out the smallest item and putting it in the new array
# This code could maybe be optimised. see https://stackoverflow.com/a/22057372/179081
ret = similar(data) # main bit of allocation right here. avoiding it seems expensive.
# Need to not overwrite data we haven't read yet
@inbounds for ii in eachindex(ret)
minblock_id = 1
ret[ii]=data[starts[1]]
@inbounds for blockid in 2:endof(starts) # findmin allocates a lot for some reason, so do the find by hand. (maybe use findmin! ?)
ele = data[starts[blockid]]
if ret[ii] > ele
ret[ii] = ele
minblock_id = blockid
end
end
starts[minblock_id]+=1 # move the start point forward
if starts[minblock_id] > ends[minblock_id]
deleteat!(starts, minblock_id)
deleteat!(ends, minblock_id)
end
end
data.=ret # copy back into orignal as we said we would do it inplace
return data
end
ich einige Benchmarking getan haben:
using Plots
function evaluate_timing(range)
sizes = Int[]
threadsort_times = Float64[]
sort_times = Float64[]
for sz in 2.^collect(range)
data_orig = rand(Int, sz)
push!(sizes, sz)
data = copy(data_orig)
push!(sort_times, @elapsed sort!(data))
data = copy(data_orig)
push!(threadsort_times, @elapsed threadedsort!(data))
@show (sz, sort_times[end], threadsort_times[end])
end
return sizes, threadsort_times, sort_times
end
sizes, threadsort_times, sort_times = evaluate_timing(0:28)
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"])
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10)
Meine Ergebnisse: mit 8 Threads.
fand ich den Kreuzungspunkt überraschend niedrig zu sein, etwas über 1024. stellt fest, dass die anfängliche genommen lange Zeit ignoriert werden kann - dass der Code ist JIT für die erste kompilierte Lauf.
Seltsamerweise reproduzieren diese Ergebnisse nicht, wenn Sie BenchmarkTools verwenden. Die Benchmark-Tools hätten dieses anfängliche Timing gestoppt. Sie reproduzieren jedoch sehr konsistent, wenn Sie den normalen Timing-Code verwenden, wie ich oben im Benchmark-Code angegeben habe. Ich denke, es etwas tut, das tötet die Multithreading einige, wie
Big dank @xiaodai, die meine Analyse Code einen Fehler in
ich getestet habe weiter, wenn es nur 1% der Artikel sind darauf hingewiesen, einzigartig und auch Probenahme von 1:1_000_000
. Die Ergebnisse sind unten
Funktion evaluate_timing_w_repeats (Bereich) Größen = Int [] threadsort_times = Float64 [] sort_times = Float64 [] für sz in 2^sammeln (Bereich) data_orig = rand (rand (Int, sz 100), sz) drücken! (Größen, sz)
data = copy(data_orig)
push!(sort_times, @elapsed sort!(data))
data = copy(data_orig)
push!(threadsort_times, @elapsed threadedsort!(data))
@show (sz, sort_times[end], threadsort_times[end])
end
return sizes, threadsort_times, sort_times
end
sizes, threadsort_times, sort_times = evaluate_timing_w_repeats(7:28)
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"])
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10)
savefig("sort_with_repeats.png")
function evaluate_timing1m(range)
sizes = Int[]
threadsort_times = Float64[]
sort_times = Float64[]
for sz in 2.^collect(range)
data_orig = rand(1:1_000_000, sz)
push!(sizes, sz)
data = copy(data_orig)
push!(sort_times, @elapsed sort!(data))
data = copy(data_orig)
push!(threadsort_times, @elapsed threadedsort!(data))
@show (sz, sort_times[end], threadsort_times[end])
end
return sizes, threadsort_times, sort_times
end
sizes, threadsort_times, sort_times = evaluate_timing1m(7:28)
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"])
plot(sizes, [threadsort_times sort_times]; title="Sorting Time sampel from 1:1_000_000", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10)
savefig("sort1m.png")