Browse Source

initial load (not working)

pull/1/head
pragdave 6 years ago
commit
541422c498
12 changed files with 407 additions and 0 deletions
  1. +5
    -0
      .gitignore
  2. +13
    -0
      Guardfile
  3. +25
    -0
      README.md
  4. +24
    -0
      config/config.exs
  5. +91
    -0
      lib/work_queue.ex
  6. +67
    -0
      lib/work_queue/options.ex
  7. +33
    -0
      lib/work_queue/worker.ex
  8. +16
    -0
      lib/work_queue/worker_supervisor.ex
  9. +20
    -0
      mix.exs
  10. +96
    -0
      test/options_test.exs
  11. +1
    -0
      test/test_helper.exs
  12. +16
    -0
      test/work_queue_test.exs

+ 5
- 0
.gitignore View File

@ -0,0 +1,5 @@
/_build
/deps
erl_crash.dump
*.ez
mix.lock

+ 13
- 0
Guardfile View File

@ -0,0 +1,13 @@
# -*- ruby -*-
guard :shell do
interactor :off
notification :emacs
watch(/^(lib|test).*\.exs?$/) do |f|
`mix test >/dev/tty`
if $?.success?
Notifier.notify "Success", type: "success"
else
Notifier.notify "Failed", type: "failed"
end
end
end

+ 25
- 0
README.md View File

@ -0,0 +1,25 @@
WorkQueue
=========
** TODO: Add description **
WorkQueue.start_link (
WorkerModule,
args,
work_source ( enum | fn ),
worker_count: n | float | :processor_bound | :io_bound,
report_results_to: fn | module,
report_progress_to: fn | module,
report_progress_interval: nnn mS,
)
WorkQueue.start_link(
SignatureGenerator,
[ batch ],
fn -> dir_walker.next end,
worker_count: 1.5,
report_results_to: signature_writer,
report_progress_to: report_function)

+ 24
- 0
config/config.exs View File

@ -0,0 +1,24 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config
# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for third-
# party users, it should be done in your mix.exs file.
# Sample configuration:
#
# config :logger, :console,
# level: :info,
# format: "$date $time [$level] $metadata$message\n",
# metadata: [:user_id]
# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"

+ 91
- 0
lib/work_queue.ex View File

@ -0,0 +1,91 @@
defmodule WorkQueue do
use PipeWhileOk
alias WorkQueue.Options
def start_link(worker_fn, work_to_process, get_next_item_fn, extra_opts \\ []) do
pipe_while_ok do
package_parameters(worker_fn, work_to_process, get_next_item_fn, extra_opts)
|> Options.analyze
|> start_workers
|> schedule_work
end
end
defp package_parameters(worker_fn, work_to_process, get_next_item_fn, extra_opts) do
{ :ok,
%{
worker_fn: worker_fn,
work_to_process: work_to_process,
get_next_item_fn: get_next_item_fn,
opts: extra_opts,
result_count: 0,
running_workers: 0,
}
}
end
defp start_workers(params) do
WorkerQueue.WorkerSupervisor.start_link(params)
end
defp schedule_work(params) do
params.report_progress_to.({:starting})
params = Dict.put(params, :running_workers, params.opts.worker_count)
count = if params.report_progress_interval do
loop_with_ticker(params)
else
loop(params)
end
params.report_progress_to.({:finished, count})
end
defp loop_with_ticker(params) do
{:ok, ticker} = :timer.send_interval(params.report_progress_interval, self, :tick)
count = loop(params)
:timer.cancel(ticker)
count
end
defp loop(params) do
require Logger
receive do
{ :send_work, worker } ->
{params, next_item} = get_next_item(params)
WorkQueue.Worker.process(worker, next_item)
loop(params)
{ :processed, worker, { :ok, result } } ->
params = update_in(params[:result_count], &(&1+1))
params.report_each_result_to.(result)
{params, next_item} = get_next_item(params)
WorkQueue.Worker.process(worker, next_item)
loop(params)
{ :shutdown, _worker } ->
if params.running_workers > 1 do
loop(%{params | running_workers: params.running_workers - 1})
else
params.result_count
end
:tick ->
params.report_progress_to.({:progress, params.result_count})
loop(params)
other ->
Logger.error("Unknown message in work queue scheduler: #{inspect other}")
end
end
defp get_next_item(params) do
{item, new_state} = params.get_next_item_fn(params.work_to_process)
{Dict.put(params, :work_to_process, new_state), item}
end
end

+ 67
- 0
lib/work_queue/options.ex View File

@ -0,0 +1,67 @@
defmodule WorkQueue.Options do
def analyze(params, defaults \\ default_options) do
Enum.reduce(params.opts, {:ok, defaults}, &option/2)
|> case do
error = {:error,_} -> error
{:ok, new_options} -> {:ok, %{ params | opts: new_options} }
end
end
defp default_options do
%{
worker_count: round(processing_units*0.667),
report_each_result_to: fn _ -> end,
report_progress_to: fn _ -> end,
report_progress_interval: false,
worker_args: [],
}
end
# Once we have an error, ignore further options
defp option(_, result = {:error, _}), do: result
defp option({:report_progress_interval, n}, result)
when is_integer(n),
do: update(result, :report_progress_interval, n)
defp option({:report_progress_to, func}, result)
when is_function(func),
do: update(result, :report_progress_to, func)
defp option({:report_each_result_to, func}, result)
when is_function(func),
do: update(result, :report_each_result_to, func)
defp option({:worker_args, args}, result)
when is_list(args),
do: update(result, :worker_args, args)
defp option({:worker_args, arg}, result),
do: option({:worker_args, [arg]}, result)
defp option({:worker_count, n}, result)
when is_integer(n) and n > 0,
do: update(result, :worker_count, n)
defp option({:worker_count, ratio}, result)
when is_float(ratio) and ratio >= 0.5,
do: option({:worker_count, round(ratio*processing_units)}, result)
defp option({:worker_count, :cpu_bound}, result),
do: option({:worker_count, processing_units}, result)
defp option({:worker_count, :io_bound}, result),
do: option({:worker_count, 10.0}, result)
defp option({option, value}, _result) do
{ :error, "Invalid option [ #{option}: #{inspect value} ] to #{__MODULE__}" }
end
defp update({:ok, result}, key, val) do
{ :ok, Dict.put(result, key, val) }
end
def processing_units, do: :erlang.system_info(:logical_processors)
end

+ 33
- 0
lib/work_queue/worker.ex View File

@ -0,0 +1,33 @@
defmodule WorkQueue.Worker do
use GenServer
#######
# API #
#######
def process(me, work_item) do
GenServer.cast(me, {:process, work_item})
end
##################
# Implementation #
##################
def init(state = [params, scheduler_pid]) do
send(scheduler_pid, {:send_work, self})
{ :ok, state }
end
def handle_cast({:process, nil}, state) do
send(state.scheduler_pid, { :shutdown, self })
{ :stop, :normal, state }
end
def handle_cast({:process, work_item}, state = %{ params: params }) do
result = params.worker_fn(work_item, params.opts.worker_args)
send(params.scheduler_pid, { :processed, self, result})
{ :noreply, state }
end
end

+ 16
- 0
lib/work_queue/worker_supervisor.ex View File

@ -0,0 +1,16 @@
defmodule WorkQueue.WorkerSupervisor do
use Supervisor
def start_link(params) do
{ :ok, pid } = Supervisor.start_link(__MODULE__, [params, self])
{ :ok, Dict.put(params, :supervisor_pid, pid) }
end
def init([params, scheduler_pid]) do
workers = [ worker(WorkQueue.Worker, [params, scheduler_pid], restart: :temporary) ]
{:ok, _} = supervise(workers, strategy: :simple_one_for_one)
1..params.opts.worker_count
|> Enum.each(&(Supervisor.start_child(self, [&1])))
end
end

+ 20
- 0
mix.exs View File

@ -0,0 +1,20 @@
defmodule WorkQueue.Mixfile do
use Mix.Project
def project do
[app: :work_queue,
version: "0.0.1",
elixir: ">= 1.0.0",
deps: deps]
end
def application do
[applications: [:logger]]
end
defp deps do
[
pipe_while_ok: ">0.0.0"
]
end
end

+ 96
- 0
test/options_test.exs View File

@ -0,0 +1,96 @@
defmodule OptionsTest do
use ExUnit.Case
import WorkQueue.Options, only: [analyze: 2, processing_units: 0]
defp defaults(overrides \\ %{}) do
%{
worker_count: 4,
report_each_result_to: :rrt,
report_progress_to: :rpt,
report_progress_interval: 123,
worker_args: [],
}
|> Dict.merge(overrides)
end
test "no options returns defaults" do
given = %{ opts: [] }
expect = { :ok, %{ opts: defaults } }
assert expect == analyze(given, defaults)
end
test "can override report_each_result_to" do
callback = fn _ -> 0 end
given = %{ opts: [ report_each_result_to: callback ] }
expect = { :ok, %{ opts: defaults(report_each_result_to: callback) } }
assert expect == analyze(given, defaults)
end
test "can override report_progress_to" do
callback = fn _ -> 0 end
given = %{ opts: [ report_progress_to: callback ] }
expect = { :ok, %{ opts: defaults(report_progress_to: callback) } }
assert expect == analyze(given, defaults)
end
test "can override report_progress_interval" do
given = %{ opts: [ report_progress_interval: 1234 ] }
expect = { :ok, %{ opts: defaults(report_progress_interval: 1234) } }
assert expect == analyze(given, defaults)
end
test "worker_args accepts list" do
given = %{ opts: [ worker_args: [1,2,3] ] }
expect = { :ok, %{ opts: defaults(worker_args: [1,2,3]) } }
assert expect == analyze(given, defaults)
end
test "worker_args accepts single term" do
given = %{ opts: [ worker_args: 123 ] }
expect = { :ok, %{ opts: defaults(worker_args: [123]) } }
assert expect == analyze(given, defaults)
end
test "absolute worker count can be set" do
given = %{ opts: [ worker_count: 12 ] }
expect = { :ok, %{ opts: defaults(worker_count: 12) } }
assert expect == analyze(given, defaults)
end
test "ratio worker count can be given" do
given = %{ opts: [ worker_count: 2.0 ] }
expect = { :ok, %{ opts: defaults(worker_count: 2 * processing_units) } }
assert expect == analyze(given, defaults)
end
test "cpu_bound returns full house of workers" do
given = %{ opts: [ worker_count: :cpu_bound ] }
expect = { :ok, %{ opts: defaults(worker_count: processing_units) } }
assert expect == analyze(given, defaults)
end
test "io_bound returns overcommits workers by a factor of 10" do
given = %{ opts: [ worker_count: :io_bound ] }
expect = { :ok, %{ opts: defaults(worker_count: 10*processing_units) } }
assert expect == analyze(given, defaults)
end
test "invalid option is rejected" do
given = %{ opts: [ invalid: 1234 ] }
expect = { :error, "Invalid option [ invalid: 1234 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, defaults)
end
test "zero worker cout\nt is rejected" do
given = %{ opts: [ worker_count: 0 ] }
expect = { :error, "Invalid option [ worker_count: 0 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, defaults)
end
test "negative worker cout\nt is rejected" do
given = %{ opts: [ worker_count: -1.5 ] }
expect = { :error, "Invalid option [ worker_count: -1.5 ] to Elixir.WorkQueue.Options"}
assert expect == analyze(given, defaults)
end
end

+ 1
- 0
test/test_helper.exs View File

@ -0,0 +1 @@
ExUnit.start()

+ 16
- 0
test/work_queue_test.exs View File

@ -0,0 +1,16 @@
defmodule WorkQueueTest do
use ExUnit.Case
test "doesn't crash!" do
WorkQueue.start_link(
&double/2, # worker
[ 1, 2, 3 ], # work items to process
&traverse_list/1) # func to get next work item
end
defp double(value, _), do: value * 2
defp traverse_list([]), do: {nil, []}
defp traverse_list([h|t]), do: {h, t}
end

Loading…
Cancel
Save