Apparent data missing bug in Ceph RGW

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hey all, we were doing some testing of ceph against our product and we found some behavior we want to run by you.  

We are using the S3 ceph interface.  
Attached is a python file using boto3 which, when run against two different deployments of ceph (octopus ceph nano and our production nautilus 14.2.11 deployment), appears to repro a strange issue.
After running for a while, a recently uploaded file forever disappears from list_objects requests.  This file still appears to be visible to get_object if you know the specific name, but does not show up in list_objects.
There are more details about the experiment in the attached python file.

We produced a run of this experiment with debug logging, in which we see a trace message

RGWRados::cls_bucket_list_ordered: skipping <filename>

In the same millisecond that the file was PUT.  

Reading the code, this comes from when a call to check_disk_state returns ENOENT, where we see

  if (!list_state.is_delete_marker() && !astate->exists) {
      /* object doesn't exist right now -- hopefully because it's
       * marked as !exists and got deleted */
    if (list_state.exists) {
      /* FIXME: what should happen now? Work out if there are any
       * non-bad ways this could happen (there probably are, but annoying
       * to handle!) */
    }
    // encode a suggested removal of that key
    list_state.ver.epoch = io_ctx.get_last_version();
    list_state.ver.pool = io_ctx.get_id();
    cls_rgw_encode_suggestion(CEPH_RGW_REMOVE, list_state, suggested_updates);
    return -ENOENT;
  }

It seems like this might be some kind of race between PUT and list_object in which some kind of object metadata is apparently deleted... the FIXME is at least a little suspicious :).  

I would love to know what's going on here, and if there is a fix or workaround we can do to prevent this behavior.  Let me know if there is any other information we can provide.

Thank you so much!

Best,
-Joseph Victor
import boto3
import time
import multiprocessing
import random
import uuid
import datetime

ENDPOINT_URL = "http://127.0.0.1:8000";
ACCESS_KEY = "accesskey"
SECRET_KEY = "secretkey"
BUCKET = "bottomless"

NUM_ITERS = 5000
NUM_UPLOAD_THREADS = 10
NUM_LIST_THREADS = 3

FILES_PER_DIR = 1000

g_num_finished = multiprocessing.Value('i', 0)
g_value_lock = multiprocessing.Lock()
g_failed = multiprocessing.Value('i', 0)

# Sorry if my python isn't the best!
#
# The idea of this repro is to have two sets up threads, one running uploads and
# the other running lists.  Each iteration gets a directory, d_<iteration_num>, and
# the upload threads and lists more or less try to pound the same subdirectory at the same time.
# The lists also verify that there are the right number of files in each iteration after all the
# uploaders are done.
#
# When I run this test (against Ceph Nano Octopus and a production deployment at Singlestore on Nautulus),
# it eventually hits the assert ahd prints out the name of a missing file.  That file doesn't seem to show
# up in any list queries after that, even though the uploader must have recieved a 200 back from ceph's S3
# endpoint.  Curiously, it DOES show up in get_object as existing.

def timestamp():
    return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")

def ConnectToCeph():
    return boto3.client(
        's3',
        aws_access_key_id=ACCESS_KEY,
        aws_secret_access_key=SECRET_KEY,
        endpoint_url=ENDPOINT_URL)

def ListThread(thread_id, test_subdir):
    ceph_conn = ConnectToCeph()
    iter_verified = 0
    times_failed = 0
    time_of_last_fail = time.time()

    # This thread is done after verifying all directories.
    #
    while iter_verified < NUM_ITERS:
        assert not g_failed.value
        
        # Randomly pick between listing the directory currently being uploaded to or listing
        # the lowest un-verified directory.
        #
        num_finished = g_num_finished.value
        current_upload_dir = min(NUM_ITERS, num_finished // NUM_UPLOAD_THREADS)        
        dir_id = random.choice([current_upload_dir, iter_verified])

        # If num_finished > than the dir id times the number of upload threads, all files
        # will have been uploaded.
        #
        expect_all_found = (dir_id + 1) * NUM_UPLOAD_THREADS <= num_finished

        dir = "%s/d_%d/" % (test_subdir, dir_id)
        seen = set([])
        resp = ceph_conn.list_objects(Bucket=BUCKET, Prefix=dir)
        assert not resp['IsTruncated'], resp
        if 'Contents' in resp:
            for obj in resp['Contents']:
                assert obj['Key'][:len(dir) + 2] == (dir + "f_"), obj['Key']
                seen.add(int(obj['Key'][len(dir) + 2:]))

        # If dir_id is the same as the next iteration to verify, try verifying it.
        #
        if dir_id == iter_verified:
            if len(seen) == FILES_PER_DIR:
                print ("%s %d verified directory %d" % (timestamp(), thread_id, dir_id))
                iter_verified += 1
                times_failed = 0
                time_of_last_fail = time.time()
            elif expect_all_found:
                print ("%s %d only saw %d files in directory %d" % (timestamp(), thread_id, len(seen), dir_id))
                for fid in range(FILES_PER_DIR):
                    if fid not in seen:
                        missing_file = dir + "f_" + str(fid)
                        print ("%s %d Is this file '%s' missing?  Check if missing?" % (timestamp(), thread_id, missing_file))
                        try:
                            get_resp = ceph_conn.get_object(Key=missing_file, Bucket=BUCKET)
                            print ("%s %d Seems like its here! %s" % (timestamp(), thread_id, str(get_resp)))
                        except Exception as e:
                            print ("%s %d Seems like an error: %s" % (timestamp(), thread_id, str(e)))
                times_failed += 1
                time_of_last_fail = time.time()
                if times_failed > 10:
                    g_failed.value = 1
                    assert False
                
def UploadThread(thread_id, test_subdir):
    ceph_conn = ConnectToCeph()

    # Upload empty files d_<dir_id>/f_<file_id> where the file_id mod the number of threads is
    # this thread's id.
    #
    for i in range(NUM_ITERS):
        assert not g_failed.value
        for j in range(FILES_PER_DIR):
            if j % NUM_UPLOAD_THREADS == thread_id:
                filename = "%s/d_%d/f_%d" % (test_subdir, i, j)
                ceph_conn.upload_file("/tmp/empty_file", BUCKET, filename)

        print ("%s %d uploaded directory %d" % (timestamp(), thread_id, i))
                
        # Wait till all threads move on to the next directory
        #
        with g_value_lock:
            g_num_finished.value += 1
        while g_num_finished.value < (i + 1) * NUM_UPLOAD_THREADS:
            time.sleep(0.001)
            
if __name__ == "__main__":
    test_subdir = str(uuid.uuid1())
    print ("Test subdir is %s" % test_subdir)

    # Create an empty file to upload
    #
    with open("/tmp/empty_file", "w"):
        pass

    threads = []
    for i in range(NUM_LIST_THREADS):
        threads.append(multiprocessing.Process(target=ListThread, args=(i, test_subdir)))
    for i in range(NUM_UPLOAD_THREADS):
        threads.append(multiprocessing.Process(target=UploadThread, args=(i, test_subdir)))
    for t in threads:
        t.start()
    for t in threads:
        t.join()
            


                
_______________________________________________
Dev mailing list -- dev@xxxxxxx
To unsubscribe send an email to dev-leave@xxxxxxx

[Index of Archives]     [CEPH Users]     [Ceph Devel]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux