Browse Source

get tests working

update-for-1.6
Dave Thomas 3 years ago
parent
commit
1e17064982
6 changed files with 69 additions and 66 deletions
  1. +3
    -3
      lib/work_queue.ex
  2. +20
    -20
      lib/work_queue/options.ex
  3. +4
    -4
      lib/work_queue/worker.ex
  4. +2
    -2
      lib/work_queue/worker_supervisor.ex
  5. +10
    -7
      mix.exs
  6. +30
    -30
      test/options_test.exs

+ 3
- 3
lib/work_queue.ex View File

@ -44,7 +44,7 @@ defmodule WorkQueue do
before_returning results do
results -> params.opts.report_progress_to.({:finished, results})
end
end
end
defp loop_with_ticker(params, running, max) do
@ -80,7 +80,7 @@ defmodule WorkQueue do
:tick ->
params.opts.report_progress_to.({:progress, length(params.results)})
wait_for_answers(params, running, max)
{ :processed, worker, { :ok, result } } ->
if worker in running do
params = update_in(params.results, &[result|&1])
@ -93,6 +93,6 @@ defmodule WorkQueue do
defp get_next_item(params) do
{status, item, new_state} = params.opts.get_next_item.(params.item_source)
{status, Dict.put(params, :item_source, new_state), item}
{status, Map.put(params, :item_source, new_state), item}
end
end

+ 20
- 20
lib/work_queue/options.ex View File

@ -1,7 +1,7 @@
defmodule WorkQueue.Options do
def analyze(params, defaults \\ default_options) do
opts = Dict.get(params, :opts, [])
def analyze(params, defaults \\ default_options()) do
opts = Map.get(params, :opts, [])
case Enum.reduce(opts, {:ok, defaults}, &option/2) do
error = {:error,_} ->
error
@ -9,26 +9,26 @@ defmodule WorkQueue.Options do
{:ok, setup_get_next_item(new_options, params) }
end
end
defp default_options do
%{
worker_count: round(processing_units*0.667),
report_each_result_to: fn _ -> end,
report_progress_to: fn _ -> end,
worker_count: round(processing_units()*0.667),
report_each_result_to: fn _ -> nil end,
report_progress_to: fn _ -> nil end,
report_progress_interval: false,
worker_args: [],
item_source: [],
get_next_item: false
}
end
# Once we have an error, ignore further options
defp option(_, result = {:error, _}), do: result
defp option({:report_progress_interval, n}, result)
when is_integer(n),
do: update(result, :report_progress_interval, n)
defp option({:report_progress_to, func}, result)
when is_function(func),
do: update(result, :report_progress_to, func)
@ -40,24 +40,24 @@ defmodule WorkQueue.Options do
defp option({:worker_args, args}, result)
when is_list(args),
do: update(result, :worker_args, args)
defp option({:worker_args, arg}, result),
do: option({:worker_args, [arg]}, result)
defp option({:worker_count, n}, result)
when is_integer(n) and n > 0,
do: update(result, :worker_count, n)
defp option({:worker_count, ratio}, result)
when is_float(ratio) and ratio >= 0.5,
do: option({:worker_count, round(ratio*processing_units)}, result)
do: option({:worker_count, round(ratio*processing_units())}, result)
defp option({:worker_count, :cpu_bound}, result),
do: option({:worker_count, processing_units}, result)
do: option({:worker_count, processing_units()}, result)
defp option({:worker_count, :io_bound}, result),
do: option({:worker_count, 10.0}, result)
defp option({option, value}, _result) do
{ :error, "Invalid option [ #{option}: #{inspect value} ] to #{__MODULE__}" }
end
@ -68,7 +68,7 @@ defmodule WorkQueue.Options do
# 1. Don't override if set by caller
defp setup_get_next_item(options = %{ get_next_item: get_next_item}, params)
when get_next_item do
Dict.put(params, :opts, options)
Map.put(params, :opts, options)
end
# 2. Handle lists as a special case
@ -80,20 +80,20 @@ defmodule WorkQueue.Options do
[h|t] ->
{:ok, h, t}
end
options = Dict.put(options, :get_next_item, handler)
Dict.put(params, :opts, options)
options = Map.put(options, :get_next_item, handler)
Map.put(params, :opts, options)
end
# and the rest
# and the rest
defp setup_get_next_item(options, params = %{ item_source: item_source }) do
params = %{params | item_source: Enum.to_list(item_source)}
setup_get_next_item(options, params)
end
defp update({:ok, result}, key, val) do
{ :ok, Dict.put(result, key, val) }
{ :ok, Map.put(result, key, val) }
end
def processing_units, do: :erlang.system_info(:logical_processors)
end

+ 4
- 4
lib/work_queue/worker.ex View File

@ -1,12 +1,12 @@
defmodule WorkQueue.Worker do
use GenServer
require Logger
#use GenServer
#require Logger
#######
# API #
#######
def process(params, scheduler_pid, item) do
Task.Supervisor.start_child(params.supervisor_pid, __MODULE__, :do_process,
[params, scheduler_pid, item])
@ -18,7 +18,7 @@ defmodule WorkQueue.Worker do
def do_process(params, scheduler_pid, item) do
{:ok, result} = params.worker_fn.(item, params.opts.worker_args)
send(scheduler_pid, {:processed, self, {:ok, {item, result}}})
send(scheduler_pid, {:processed, self(), {:ok, {item, result}}})
end
end

+ 2
- 2
lib/work_queue/worker_supervisor.ex View File

@ -1,8 +1,8 @@
defmodule WorkQueue.WorkerSupervisor do
require Logger
def start_link(params) do
{ :ok, supervisor_pid } = Task.Supervisor.start_link()
{ :ok, Dict.put(params, :supervisor_pid, supervisor_pid) }
{ :ok, Map.put(params, :supervisor_pid, supervisor_pid) }
end
end

+ 10
- 7
mix.exs View File

@ -2,17 +2,20 @@ defmodule WorkQueue.Mixfile do
use Mix.Project
def project do
[app: :work_queue,
version: "0.0.3",
elixir: ">= 1.0.0",
deps: deps,
description: description,
package: package,
[
app: :work_queue,
version: "0.0.3",
elixir: ">= 1.0.0",
deps: deps(),
description: description(),
package: package(),
]
end
def application do
[applications: [:logger]]
[
applications: [:logger]
]
end
defp deps do

+ 30
- 30
test/options_test.exs View File

@ -10,100 +10,100 @@ defmodule OptionsTest do
report_progress_interval: 123,
worker_args: [],
}
|> Dict.merge(overrides)
|> Map.merge(Enum.into(overrides, %{}))
end
defp params(overrides \\ []) do
%{
item_source: [ 333, 444, 555 ]
} |> Dict.merge(overrides)
} |> Map.merge(Enum.into(overrides, %{}))
end
test "no options returns opts" do
given = params(opts: [])
assert {:ok, %{ opts: opts }} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options
assert {:ok, %{ opts: opts }} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options()
end
test "can override report_each_result_to" do
callback = fn _ -> 0 end
given = params(opts: [ report_each_result_to: callback ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(report_each_result_to: callback)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(report_each_result_to: callback)
end
test "can override report_progress_to" do
callback = fn _ -> 0 end
given = params(opts: [ report_progress_to: callback ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(report_progress_to: callback)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(report_progress_to: callback)
end
test "can override report_progress_interval" do
given = params(opts: [ report_progress_interval: 1234 ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(report_progress_interval: 1234)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(report_progress_interval: 1234)
end
test "worker_args accepts list" do
given = params(opts: [ worker_args: [1,2,3] ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_args: [1,2,3])
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_args: [1,2,3])
end
test "worker_args accepts single term" do
given = params(opts: [ worker_args: 123 ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_args: [123])
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_args: [123])
end
test "absolute worker count can be set" do
given = params(opts: [ worker_count: 12 ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_count: 12)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_count: 12)
end
test "ratio worker count can be given" do
given = params(opts: [ worker_count: 2.0 ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_count: 2 * processing_units)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_count: 2 * processing_units())
end
test "cpu_bound returns full house of workers" do
given = params(opts: [ worker_count: :cpu_bound ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_count: processing_units)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_count: processing_units())
end
test "io_bound returns overcommits workers by a factor of 10" do
given = params(opts: [ worker_count: :io_bound ])
assert {:ok, %{opts: opts}} = analyze(given, default_options)
assert Dict.delete(opts, :get_next_item) == default_options(worker_count: 10*processing_units)
assert {:ok, %{opts: opts}} = analyze(given, default_options())
assert Map.delete(opts, :get_next_item) == default_options(worker_count: 10*processing_units())
end
test "invalid option is rejected" do
given = params(opts: [ invalid: 1234 ])
expect = { :error, "Invalid option [ invalid: 1234 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, default_options)
assert expect == analyze(given, default_options())
end
test "zero worker count is rejected" do
given = params(opts: [ worker_count: 0 ])
expect = { :error, "Invalid option [ worker_count: 0 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, default_options)
assert expect == analyze(given, default_options())
end
test "negative worker count is rejected" do
given = params(opts: [ worker_count: -1.5 ])
expect = { :error, "Invalid option [ worker_count: -1.5 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, default_options)
assert expect == analyze(given, default_options())
end
test "automatically supplied next item function" do
result = analyze(params, default_options)
result = analyze(params(), default_options())
assert {:ok, %{opts: %{ get_next_item: get_next_item }}} = result
work = params.item_source
work = params().item_source
assert [333, 444, 555] == work
assert {:ok, 333, work=[444, 555]} = get_next_item.(work)
assert {:ok, 444, work=[555]} = get_next_item.(work)

Loading…
Cancel
Save