Project

riverqueue

0.0
The project is in a healthy, maintained state
River is a fast job queue for Go. Use this gem in conjunction with gems riverqueue-activerecord or riverqueue-sequel to insert jobs in Ruby which will be worked from Go.
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
 Dependencies
 Project Readme

River client for Ruby Build Status Gem Version

An insert-only Ruby client for River packaged in the riverqueue gem. Allows jobs to be inserted in Ruby and run by a Go worker, but doesn't support working jobs in Ruby.

Basic usage

Your project's Gemfile should contain the riverqueue gem and a driver like riverqueue-sequel (see drivers):

gem "riverqueue"
gem "riverqueue-sequel"

Initialize a client with:

DB = Sequel.connect("postgres://...")
client = River::Client.new(River::Driver::Sequel.new(DB))

Define a job and insert it:

class SortArgs
  attr_accessor :strings

  def initialize(strings:)
    self.strings = strings
  end

  def kind = "sort"

  def to_json = JSON.dump({strings: strings})
end

insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
insert_res.job # inserted job row

Job args should:

  • Respond to #kind with a unique string that identifies them in the database, and which a Go worker will recognize.
  • Response to #to_json with a JSON serialization that'll be parseable as an object in Go.

They may also respond to #insert_opts with an instance of InsertOpts to define insertion options that'll be used for all jobs of the kind.

Insertion options

Inserts take an insert_opts parameter to customize features of the inserted job:

insert_res = client.insert(
  SimpleArgs.new(strings: ["whale", "tiger", "bear"]),
  insert_opts: River::InsertOpts.new(
    max_attempts: 17,
    priority: 3,
    queue: "my_queue",
    tags: ["custom"]
  )
)

Inserting unique jobs

Unique jobs are supported through InsertOpts#unique_opts, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.

insert_res = client.insert(args, insert_opts: River::InsertOpts.new(
  unique_opts: River::UniqueOpts.new(
    by_args: true,
    by_period: 15 * 60,
    by_queue: true,
    by_state: [River::JOB_STATE_AVAILABLE]
  )
)

# contains either a newly inserted job, or an existing one if insertion was skipped
insert_res.job

# true if insertion was skipped
insert_res.unique_skipped_as_duplicated

Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to guarantee that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.

Inserting jobs in bulk

Use #insert_many to bulk insert jobs as a single operation for improved efficiency:

num_inserted = client.insert_many([
  SimpleArgs.new(job_num: 1),
  SimpleArgs.new(job_num: 2)
])

Or with InsertManyParams, which may include insertion options:

num_inserted = client.insert_many([
  River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5)),
  River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: River::InsertOpts.new(queue: "high_priority"))
])

Inserting in a transaction

No extra code is needed to insert jobs from inside a transaction. Just make sure that one is open from your ORM of choice, call the normal #insert or #insert_many methods, and insertions will take part in it.

ActiveRecord::Base.transaction do
  client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
end
DB.transaction do
  client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
end

Inserting with a Ruby hash

JobArgsHash can be used to insert with a kind and JSON hash so that it's not necessary to define a class:

insert_res = client.insert(River::JobArgsHash.new("hash_kind", {
    job_num: 1
}))

RBS and type checking

The gem bundles RBS files containing type annotations for its API to support type checking in Ruby through a tool like Sorbet or Steep.

Drivers

ActiveRecord

Use River with ActiveRecord by putting the riverqueue-activerecord driver in your Gemfile:

gem "riverqueue"
gem "riverqueue-activerecord"

Then initialize driver and client:

ActiveRecord::Base.establish_connection("postgres://...")
client = River::Client.new(River::Driver::ActiveRecord.new)

Sequel

Use River with Sequel by putting the riverqueue-sequel driver in your Gemfile:

gem "riverqueue"
gem "riverqueue-sequel"

Then initialize driver and client:

DB = Sequel.connect("postgres://...")
client = River::Client.new(River::Driver::Sequel.new(DB))

Development

See development.