[PATCH 3/4] hadoop: return all replica hostnames

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



From: Noah Watkins <noahwatkins@xxxxxxxxx>

Updates CephFileSystem to return all replica locations,
and in addition attempts to use reverse DNS to convert
the OSD IPs into hostnames. Hadoop does not do well at
comparing the IP with hostnames, and locality is lost.

Signed-off-by: Noah Watkins <noahwatkins@xxxxxxxxx>
---
 src/client/hadoop/ceph/CephFileSystem.java |   72 +++++++++++++++-------------
 1 files changed, 38 insertions(+), 34 deletions(-)

diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java
index 6889b02..00dc64b 100644
--- a/src/client/hadoop/ceph/CephFileSystem.java
+++ b/src/client/hadoop/ceph/CephFileSystem.java
@@ -25,8 +25,10 @@ import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.net.InetAddress;
 import java.util.EnumSet;
 import java.lang.Math;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.net.DNS;
 
 
 /**
@@ -72,6 +75,10 @@ public class CephFileSystem extends FileSystem {
   private final Path root;
   private CephFS ceph = null;
 
+  private static String CEPH_NAMESERVER;
+  private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
+  private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
+
   /**
    * Create a new CephFileSystem.
    */
@@ -115,6 +122,9 @@ public class CephFileSystem extends FileSystem {
     if (ceph == null) {
       ceph = new CephTalker(conf, LOG);
     }
+
+    CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
+
     // build up the arguments for Ceph
     String arguments = "CephFSInterface";
 
@@ -580,6 +590,25 @@ public class CephFileSystem extends FileSystem {
     return result;
   }
 
+  /*
+   * Attempt to convert an IP into its hostname
+   */
+  private String[] ips2Hosts(String[] ips) {
+    ArrayList<String> hosts = new ArrayList<String>();
+    for (String ip : ips) {
+      try {
+        String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
+        if (host.charAt(host.length()-1) == '.') {
+          host = host.substring(0, host.length()-1);
+        }
+        hosts.add(host); /* append */
+      } catch (Exception e) {
+        LOG.error("reverseDns ["+ip+"] failed: "+ e);
+      }
+    }
+    return hosts.toArray(new String[hosts.size()]);
+  }
+
   /**
    * Get a BlockLocation object for each block in a file.
    *
@@ -595,52 +624,27 @@ public class CephFileSystem extends FileSystem {
    */
   @Override
   public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
-    LOG.debug(
-        "getFileBlockLocations:enter with path " + file.getPath()
-        + ", start pos " + start + ", length " + len);
-    // sanitize and get the filehandle
     Path abs_path = makeAbsolute(file.getPath());
 
-    LOG.trace("getFileBlockLocations:call ceph_open_for_read from Java");
     int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
-
-    LOG.trace(
-        "getFileBlockLocations:return from ceph_open_for_read to Java with fh "
-            + fh);
     if (fh < 0) {
-      LOG.error(
-          "getFileBlockLocations:got error " + fh
-          + ", exiting and returning null!");
+      LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
       return null;
     }
-    // get the block size
-    LOG.trace("getFileBlockLocations:call ceph_getblocksize from Java");
-    long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
 
-    LOG.trace("getFileBlockLocations:return from ceph_getblocksize");
+    long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
     BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
-    long offset;
 
     for (int i = 0; i < locations.length; ++i) {
-      offset = start + i * blockSize;
-      LOG.trace(
-          "getFileBlockLocations:call ceph_hosts from Java on fh " + fh
-          + " and offset " + offset);
-      String host[] = ceph.ceph_hosts(fh, offset);
-
-      LOG.trace(
-          "getFileBlockLocations:return from ceph_hosts to Java with host "
-              + host[0]);
-      String[] hostArray = new String[1];
-
-      hostArray[0] = host[0];
-      locations[i] = new BlockLocation(hostArray, hostArray,
-          start + i * blockSize - (start % blockSize), blockSize);
+      long offset = start + i * blockSize;
+      long blockStart = start + i * blockSize - (start % blockSize);
+      String ips[] = ceph.ceph_hosts(fh, offset);
+      String hosts[] = ips2Hosts(ips);
+      locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
+      LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
     }
-    LOG.trace("getFileBlockLocations:call ceph_close from Java on fh " + fh);
+
     ceph.ceph_close(fh);
-    LOG.debug(
-        "getFileBlockLocations:return with " + locations.length + " locations");
     return locations;
   }
 
-- 
1.7.1

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux