[Feature] [ActiveSupport::Notifications] Support start and finish APIs with accurate start time detection for async tasks

Background

The start and finish APIs on the instrumenter are public but cannot be used because the time stack won’t be able to determine the accurate start time. With proper start time detection, we can instrument async tasks, which first can fire the start event to register the start times and on completion send the finish event.

ActiveSupport::Notifications.subscribe('foo') do |name, start, finish, id, payload|
  puts format(
    '[%<name>s] %<id>s [%<start>i -> %<finish>i = %<duration>.3f] %<payload>s',
    name: name,
    id: id,
    start: start.to_i,
    finish: finish.to_i,
    duration: finish - start,
    payload: payload
  )
end

instrumenter = ActiveSupport::Notifications.instrumenter

Assuming the following five async tasks queued:

payloads = Hash.new { |hash, key| hash[key] = {} }

1.upto(5) do |i|
  payloads[i] = { id: i, start_time: Time.now.to_i }
  instrumenter.start('foo', payloads[i])
  
  sleep(1)
end

Simulating completion here:

1.upto(5) do |i|
  sleep(1)

  instrumenter.finish('foo', payloads[i].merge(end_time: Time.now.to_i))
end

# [foo] dd85abbba742e3982f02 [1642165418 -> 1642165426 = 7.654] {:id=>1, :start_time=>1642165414, :end_time=>1642165426}
# [foo] dd85abbba742e3982f02 [1642165417 -> 1642165427 = 9.659] {:id=>2, :start_time=>1642165415, :end_time=>1642165427}
# [foo] dd85abbba742e3982f02 [1642165416 -> 1642165428 = 11.666] {:id=>3, :start_time=>1642165416, :end_time=>1642165428}
# [foo] dd85abbba742e3982f02 [1642165415 -> 1642165429 = 13.674] {:id=>4, :start_time=>1642165417, :end_time=>1642165429}
# [foo] dd85abbba742e3982f02 [1642165414 -> 1642165430 = 15.683] {:id=>5, :start_time=>1642165418, :end_time=>1642165430}

The start times are inaccurate and it is not possible to detect the correct time unless we have timings mapped to some unique identifier for these tasks.

Proposal

Currently, the start times are pushed into a stack, but we can use Concurrent::Map with task identifier as keys and start time as values.

The instrumenter injects the task id when calling instrument. The direct calls to start and finish must have this in the payload.

diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
index c69e8cd8d4..2dd41dafac 100644
--- a/activesupport/lib/active_support/notifications/instrumenter.rb
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -18,6 +18,7 @@ def initialize(notifier)
       # notifier. Notice that events get sent even if an error occurs in the
       # passed-in block.
       def instrument(name, payload = {})
+        payload[:instrumented_task_id] = unique_id
         # some of the listeners might have state
         listeners_state = start name, payload
         begin

The notifier maintains the time map:

diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 0759d3a086..8b90fbc65c 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -226,13 +226,13 @@ def publish(name, *args)
           end

           def start(name, id, payload)
-            timestack = IsolatedExecutionState[:_timestack] ||= []
-            timestack.push Time.now
+            start_times = IsolatedExecutionState[:_timestack] ||= Concurrent::Map.new
+            start_times[payload[:instrumented_task_id]] = Time.now
           end

           def finish(name, id, payload)
-            timestack = IsolatedExecutionState[:_timestack]
-            started = timestack.pop
+            start_times = IsolatedExecutionState[:_timestack]
+            started = start_times.delete(payload[:instrumented_task_id])
             @delegate.call(name, started, Time.now, id, payload)
           end
         end
@@ -243,13 +243,13 @@ def publish(name, *args)
           end

           def start(name, id, payload)
-            timestack = IsolatedExecutionState[:_timestack_monotonic] ||= []
-            timestack.push Process.clock_gettime(Process::CLOCK_MONOTONIC)
+            start_times = IsolatedExecutionState[:_timestack_monotonic] ||= Concurrent::Map.new
+            start_times[payload[:instrumented_task_id]] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
           end

           def finish(name, id, payload)
-            timestack = IsolatedExecutionState[:_timestack_monotonic]
-            started = timestack.pop
+            start_times = IsolatedExecutionState[:_timestack_monotonic]
+            started = start_times.delete(payload[:instrumented_task_id])
             @delegate.call(name, started, Process.clock_gettime(Process::CLOCK_MONOTONIC), id, payload)
           end
         end