Skip to content

File MPSCQueue.h

File List > Amplitude > Core > MPSCQueue.h

Go to the documentation of this file

// Copyright (c) 2026-present Sparky Studios. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _AM_CORE_MPSCQUEUE_H
#define _AM_CORE_MPSCQUEUE_H

#include <atomic>

#include <SparkyStudios/Audio/Amplitude/Core/Common.h>

namespace SparkyStudios::Audio::Amplitude
{
    template<typename T, AmSize Capacity = 256>
    class MPSCQueue
    {
        static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of 2");
        static_assert(Capacity > 0, "Capacity must be greater than 0");

    public:
        MPSCQueue()
            : _tail(0)
            , _head(0)
        {
            for (AmSize i = 0; i < Capacity; ++i)
                _buffer[i].sequence.store(i, std::memory_order_relaxed);
        }

        ~MPSCQueue() = default;

        MPSCQueue(const MPSCQueue&) = delete;
        MPSCQueue& operator=(const MPSCQueue&) = delete;

        bool TryEnqueue(const T& item)
        {
            Cell* cell;
            AmSize pos = _tail.load(std::memory_order_relaxed);

            for (;;)
            {
                cell = &_buffer[pos & kMask];
                const AmSize seq = cell->sequence.load(std::memory_order_acquire);
                const auto diff = static_cast<std::ptrdiff_t>(seq) - static_cast<std::ptrdiff_t>(pos);

                if (diff == 0)
                {
                    // Slot is free — try to claim it
                    if (_tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (diff < 0)
                {
                    // Queue is full
                    return false;
                }
                else
                {
                    // Another producer claimed this slot, reload tail
                    pos = _tail.load(std::memory_order_relaxed);
                }
            }

            cell->data = item;
            cell->sequence.store(pos + 1, std::memory_order_release);
            return true;
        }

        bool TryEnqueue(T&& item)
        {
            Cell* cell;
            AmSize pos = _tail.load(std::memory_order_relaxed);

            for (;;)
            {
                cell = &_buffer[pos & kMask];
                const AmSize seq = cell->sequence.load(std::memory_order_acquire);
                const auto diff = static_cast<std::ptrdiff_t>(seq) - static_cast<std::ptrdiff_t>(pos);

                if (diff == 0)
                {
                    if (_tail.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                        break;
                }
                else if (diff < 0)
                {
                    return false;
                }
                else
                {
                    pos = _tail.load(std::memory_order_relaxed);
                }
            }

            cell->data = std::move(item);
            cell->sequence.store(pos + 1, std::memory_order_release);
            return true;
        }

        bool TryDequeue(T& item)
        {
            Cell* cell = &_buffer[_head & kMask];
            const AmSize seq = cell->sequence.load(std::memory_order_acquire);
            const auto diff = static_cast<std::ptrdiff_t>(seq) - static_cast<std::ptrdiff_t>(_head + 1);

            if (diff < 0)
                return false; // Queue is empty

            item = std::move(cell->data);
            cell->sequence.store(_head + Capacity, std::memory_order_release);
            ++_head;
            return true;
        }

        [[nodiscard]] bool IsEmpty() const
        {
            const Cell& cell = _buffer[_head & kMask];
            const AmSize seq = cell.sequence.load(std::memory_order_acquire);
            const auto diff = static_cast<std::ptrdiff_t>(seq) - static_cast<std::ptrdiff_t>(_head + 1);
            return diff < 0;
        }

        [[nodiscard]] static constexpr AmSize GetCapacity()
        {
            return Capacity;
        }

    private:
        static constexpr AmSize kCacheLineSize = 64;
        static constexpr AmSize kMask = Capacity - 1;

        struct Cell
        {
            std::atomic<AmSize> sequence;
            T data;
        };

        alignas(kCacheLineSize) Cell _buffer[Capacity];
        alignas(kCacheLineSize) std::atomic<AmSize> _tail;
        alignas(kCacheLineSize) AmSize _head; // Consumer-only, no atomic needed
    };

} // namespace SparkyStudios::Audio::Amplitude

#endif // _AM_CORE_MPSCQUEUE_H