Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions ext/msgpack/buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,17 @@ int msgpack_rb_encindex_ascii8bit;

ID s_uminus;

static msgpack_rmem_t s_rmem;

void msgpack_buffer_static_init(void)
{
s_uminus = rb_intern("-@");

msgpack_rb_encindex_utf8 = rb_utf8_encindex();
msgpack_rb_encindex_usascii = rb_usascii_encindex();
msgpack_rb_encindex_ascii8bit = rb_ascii8bit_encindex();

msgpack_rmem_init(&s_rmem);
}

void msgpack_buffer_static_destroy(void)
{
msgpack_rmem_destroy(&s_rmem);
}

void msgpack_buffer_init(msgpack_buffer_t* b)
Expand All @@ -59,9 +54,7 @@ static void _msgpack_buffer_chunk_destroy(msgpack_buffer_chunk_t* c)
{
if(c->mem != NULL) {
if(c->rmem) {
if(!msgpack_rmem_free(&s_rmem, c->mem)) {
rb_bug("Failed to free an rmem pointer, memory leak?");
}
msgpack_rmem_free(c->mem);
} else {
xfree(c->mem);
}
Expand Down Expand Up @@ -354,7 +347,7 @@ static inline void* _msgpack_buffer_chunk_malloc(
if((size_t)(b->rmem_end - b->rmem_last) < required_size) {
/* alloc new rmem page */
*allocated_size = MSGPACK_RMEM_PAGE_SIZE;
char* buffer = msgpack_rmem_alloc(&s_rmem);
char* buffer = msgpack_rmem_alloc();
c->mem = buffer;

/* update rmem owner */
Expand Down
1 change: 1 addition & 0 deletions ext/msgpack/extconf.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'mkmf'

have_func("rb_enc_interned_str", "ruby.h") # Ruby 3.0+
have_func("rb_ext_ractor_safe", "ruby.h") # Ruby 3.0+
have_func("rb_hash_new_capa", "ruby.h") # Ruby 3.2+
have_func("rb_proc_call_with_block", "ruby.h") # CRuby (TruffleRuby doesn't have it)
have_func("rb_gc_mark_locations", "ruby.h") # Missing on TruffleRuby
Expand Down
6 changes: 6 additions & 0 deletions ext/msgpack/rbinit.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@

RUBY_FUNC_EXPORTED void Init_msgpack(void)
{
/* No process-global mutable state, so packing/unpacking is safe per-Ractor.
* rb_ext_ractor_safe is Ruby 3.0+, so guard for older supported Rubies. */
#ifdef HAVE_RB_EXT_RACTOR_SAFE
rb_ext_ractor_safe(true);
#endif

VALUE mMessagePack = rb_define_module("MessagePack");

MessagePack_Buffer_module_init(mMessagePack);
Expand Down
93 changes: 0 additions & 93 deletions ext/msgpack/rmem.c

This file was deleted.

81 changes: 6 additions & 75 deletions ext/msgpack/rmem.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,91 +19,22 @@
#define MSGPACK_RUBY_RMEM_H__

#include "compat.h"
#include "sysdep.h"

#ifndef MSGPACK_RMEM_PAGE_SIZE
#define MSGPACK_RMEM_PAGE_SIZE (4*1024)
#endif

struct msgpack_rmem_t;
typedef struct msgpack_rmem_t msgpack_rmem_t;

struct msgpack_rmem_chunk_t;
typedef struct msgpack_rmem_chunk_t msgpack_rmem_chunk_t;

/*
* a chunk contains 32 pages.
* size of each buffer is MSGPACK_RMEM_PAGE_SIZE bytes.
*/
struct msgpack_rmem_chunk_t {
unsigned int mask;
char* pages;
};

struct msgpack_rmem_t {
msgpack_rmem_chunk_t head;
msgpack_rmem_chunk_t* array_first;
msgpack_rmem_chunk_t* array_last;
msgpack_rmem_chunk_t* array_end;
};

/* assert MSGPACK_RMEM_PAGE_SIZE % sysconf(_SC_PAGE_SIZE) == 0 */
void msgpack_rmem_init(msgpack_rmem_t* pm);

void msgpack_rmem_destroy(msgpack_rmem_t* pm);

void* _msgpack_rmem_alloc2(msgpack_rmem_t* pm);

#define _msgpack_rmem_chunk_available(c) ((c)->mask != 0)

static inline void* _msgpack_rmem_chunk_alloc(msgpack_rmem_chunk_t* c)
/* Fixed-size scratch pages for the buffer and unpacker stack, served straight
* from xmalloc/xfree so they can be allocated and freed from any Ractor. */
static inline void* msgpack_rmem_alloc(void)
{
_msgpack_bsp32(pos, c->mask);
(c)->mask &= ~(1 << pos);
return ((char*)(c)->pages) + (pos * (MSGPACK_RMEM_PAGE_SIZE));
return xmalloc(MSGPACK_RMEM_PAGE_SIZE);
}

static inline bool _msgpack_rmem_chunk_try_free(msgpack_rmem_chunk_t* c, void* mem)
static inline void msgpack_rmem_free(void* mem)
{
ptrdiff_t pdiff = ((char*)(mem)) - ((char*)(c)->pages);
if(0 <= pdiff && pdiff < MSGPACK_RMEM_PAGE_SIZE * 32) {
size_t pos = pdiff / MSGPACK_RMEM_PAGE_SIZE;
(c)->mask |= (1 << pos);
return true;
}
return false;
xfree(mem);
}

static inline void* msgpack_rmem_alloc(msgpack_rmem_t* pm)
{
if(_msgpack_rmem_chunk_available(&pm->head)) {
return _msgpack_rmem_chunk_alloc(&pm->head);
}
return _msgpack_rmem_alloc2(pm);
}

void _msgpack_rmem_chunk_free(msgpack_rmem_t* pm, msgpack_rmem_chunk_t* c);

static inline bool msgpack_rmem_free(msgpack_rmem_t* pm, void* mem)
{
if(_msgpack_rmem_chunk_try_free(&pm->head, mem)) {
return true;
}

/* search from last */
msgpack_rmem_chunk_t* c = pm->array_last - 1;
msgpack_rmem_chunk_t* before_first = pm->array_first - 1;
for(; c != before_first; c--) {
if(_msgpack_rmem_chunk_try_free(c, mem)) {
if(c != pm->array_first && c->mask == 0xffffffff) {
_msgpack_rmem_chunk_free(pm, c);
}
return true;
}
}
return false;
}


#endif

11 changes: 2 additions & 9 deletions ext/msgpack/unpacker.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ static int RAW_TYPE_STRING = 256;
static int RAW_TYPE_BINARY = 257;
static int16_t INITIAL_BUFFER_CAPACITY_MAX = SHRT_MAX;

static msgpack_rmem_t s_stack_rmem;

#if !defined(HAVE_RB_HASH_NEW_CAPA)
static inline VALUE rb_hash_new_capa_inline(long capa)
{
Expand All @@ -82,21 +80,18 @@ static inline int16_t initial_buffer_size(long size)
void msgpack_unpacker_static_init(void)
{
assert(sizeof(msgpack_unpacker_stack_entry_t) * MSGPACK_UNPACKER_STACK_CAPACITY <= MSGPACK_RMEM_PAGE_SIZE);

msgpack_rmem_init(&s_stack_rmem);
}

void msgpack_unpacker_static_destroy(void)
{
msgpack_rmem_destroy(&s_stack_rmem);
}

#define HEAD_BYTE_REQUIRED 0xc1

static inline bool _msgpack_unpacker_stack_init(msgpack_unpacker_stack_t *stack) {
if (!stack->data) {
stack->capacity = MSGPACK_UNPACKER_STACK_CAPACITY;
stack->data = msgpack_rmem_alloc(&s_stack_rmem);
stack->data = msgpack_rmem_alloc();
stack->depth = 0;
return true;
}
Expand All @@ -105,9 +100,7 @@ static inline bool _msgpack_unpacker_stack_init(msgpack_unpacker_stack_t *stack)

static inline void _msgpack_unpacker_free_stack(msgpack_unpacker_stack_t* stack) {
if (stack->data) {
if (!msgpack_rmem_free(&s_stack_rmem, stack->data)) {
rb_bug("Failed to free an rmem pointer, memory leak?");
}
msgpack_rmem_free(stack->data);
stack->data = NULL;
stack->depth = 0;
}
Expand Down
61 changes: 61 additions & 0 deletions spec/cruby/ractor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'spec_helper'

ractor_supported = defined?(Ractor) && RUBY_ENGINE == 'ruby'

describe 'Ractor safety', skip: (ractor_supported ? false : 'Ractor not supported on this Ruby') do
def ractor_value(ractor)
# Ractor#value replaced #take in newer rubies; support both.
ractor.respond_to?(:value) ? ractor.value : ractor.take
end

# Ruby prints a one-time "Ractor API is experimental" warning to stderr. Quiet
# it for this group, and restore afterwards so we don't suppress the warning
# for unrelated specs sharing the process.
before(:all) do
@experimental_warning = Warning[:experimental]
Warning[:experimental] = false
end

after(:all) do
Warning[:experimental] = @experimental_warning
end

it 'round-trips via a Factory inside a non-main Ractor' do
result = ractor_value(Ractor.new do
factory = MessagePack::Factory.new
factory.load(factory.dump([1, "two", 3.0, nil, true, {"k" => "v"}]))
end)
expect(result).to eq([1, "two", 3.0, nil, true, {"k" => "v"}])
end

it 'round-trips via a Packer and Unpacker inside a non-main Ractor' do
result = ractor_value(Ractor.new do
packed = MessagePack::Packer.new.write({"x" => [1, 2, 3]}).to_s
MessagePack::Unpacker.new.feed(packed).read
end)
expect(result).to eq({"x" => [1, 2, 3]})
end

it 'packs and unpacks concurrently across many Ractors without corruption' do
ractor_count = 8

ractors = ractor_count.times.map do |n|
Ractor.new(n) do |seed|
obj = {
"seed" => seed,
"nums" => (0..20).to_a,
"str" => "x" * 100,
"nested" => {"deep" => [seed] * 10},
}
ok = true
2_000.times do
packed = MessagePack::Packer.new.write(obj).to_s
ok &&= MessagePack::Unpacker.new.feed(packed).read == obj
end
ok
end
end

expect(ractors.map { |r| ractor_value(r) }).to all(be true)
end
end