>From db153855c800c81cdd6817f58be33905c49d61f4 Mon Sep 17 00:00:00 2001 From: Nathan Kinder <nkinder@xxxxxxxxxx> Date: Tue, 18 May 2010 14:49:26 -0700 Subject: [PATCH] Add replication session hooks This adds the ability to write a plug-in to register callbacks for controlling when replication is allowed to occur. For details, please see the design document at: http://directory.fedoraproject.org/wiki/Replication_Session_Hooks --- Makefile.am | 1 + Makefile.in | 15 + .../plugins/replication/repl-session-plugin.h | 119 +++++ ldap/servers/plugins/replication/repl5.h | 32 ++- ldap/servers/plugins/replication/repl5_agmt.c | 17 +- .../servers/plugins/replication/repl5_connection.c | 80 +++- .../plugins/replication/repl5_inc_protocol.c | 2 + ldap/servers/plugins/replication/repl5_init.c | 4 + .../plugins/replication/repl5_prot_private.h | 4 +- .../plugins/replication/repl5_protocol_util.c | 503 ++++++++++++-------- .../plugins/replication/repl5_tot_protocol.c | 1 + ldap/servers/plugins/replication/repl_extop.c | 225 +++++++-- .../plugins/replication/repl_session_plugin.c | 186 ++++++++ .../plugins/replication/test_repl_session_plugin.c | 335 +++++++++++++ 14 files changed, 1273 insertions(+), 251 deletions(-) create mode 100644 ldap/servers/plugins/replication/repl-session-plugin.h create mode 100644 ldap/servers/plugins/replication/repl_session_plugin.c create mode 100644 ldap/servers/plugins/replication/test_repl_session_plugin.c diff --git a/Makefile.am b/Makefile.am index 9680824..45125ad 100644 --- a/Makefile.am +++ b/Makefile.am @@ -990,6 +990,7 @@ libreplication_plugin_la_SOURCES = ldap/servers/plugins/replication/cl5_api.c \ ldap/servers/plugins/replication/repl_ops.c \ ldap/servers/plugins/replication/repl_rootdse.c \ ldap/servers/plugins/replication/repl_search.c \ + ldap/servers/plugins/replication/repl_session_plugin.c \ ldap/servers/plugins/replication/repl5_agmt.c \ ldap/servers/plugins/replication/repl5_agmtlist.c \ ldap/servers/plugins/replication/repl5_backoff.c \ diff --git a/Makefile.in b/Makefile.in index dbc367e..5607d37 100644 --- a/Makefile.in +++ b/Makefile.in @@ -495,6 +495,7 @@ am_libreplication_plugin_la_OBJECTS = ldap/servers/plugins/replication/libreplic ldap/servers/plugins/replication/libreplication_plugin_la-repl_ops.lo \ ldap/servers/plugins/replication/libreplication_plugin_la-repl_rootdse.lo \ ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.lo \ + ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo \ ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmt.lo \ ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmtlist.lo \ ldap/servers/plugins/replication/libreplication_plugin_la-repl5_backoff.lo \ @@ -2087,6 +2088,7 @@ libreplication_plugin_la_SOURCES = ldap/servers/plugins/replication/cl5_api.c \ ldap/servers/plugins/replication/repl_ops.c \ ldap/servers/plugins/replication/repl_rootdse.c \ ldap/servers/plugins/replication/repl_search.c \ + ldap/servers/plugins/replication/repl_session_plugin.c \ ldap/servers/plugins/replication/repl5_agmt.c \ ldap/servers/plugins/replication/repl5_agmtlist.c \ ldap/servers/plugins/replication/repl5_backoff.c \ @@ -3480,6 +3482,9 @@ ldap/servers/plugins/replication/libreplication_plugin_la-repl_rootdse.lo: \ ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.lo: \ ldap/servers/plugins/replication/$(am__dirstamp) \ ldap/servers/plugins/replication/$(DEPDIR)/$(am__dirstamp) +ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo: \ + ldap/servers/plugins/replication/$(am__dirstamp) \ + ldap/servers/plugins/replication/$(DEPDIR)/$(am__dirstamp) ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmt.lo: \ ldap/servers/plugins/replication/$(am__dirstamp) \ ldap/servers/plugins/replication/$(DEPDIR)/$(am__dirstamp) @@ -4784,6 +4789,8 @@ mostlyclean-compile: -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-repl_rootdse.lo -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.$(OBJEXT) -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.lo + -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.$(OBJEXT) + -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-replutil.$(OBJEXT) -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-replutil.lo -rm -f ldap/servers/plugins/replication/libreplication_plugin_la-urp.$(OBJEXT) @@ -5496,6 +5503,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_ops.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_rootdse.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_search.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_session_plugin.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-replutil.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-urp.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-urp_glue.Plo@am__quote@ @@ -7267,6 +7275,13 @@ ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.lo: ldap/s @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libreplication_plugin_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o ldap/servers/plugins/replication/libreplication_plugin_la-repl_search.lo `test -f 'ldap/servers/plugins/replication/repl_search.c' || echo '$(srcdir)/'`ldap/servers/plugins/replication/repl_search.c +ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo: ldap/servers/plugins/replication/repl_session_plugin.c +@am__fastdepCC_TRUE@ if $(LIBTOOL) --tag=CC --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libreplication_plugin_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo -MD -MP -MF "ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_session_plugin.Tpo" -c -o ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo `test -f 'ldap/servers/plugins/replication/repl_session_plugin.c' || echo '$(srcdir)/'`ldap/servers/plugins/replication/repl_session_plugin.c; \ +@am__fastdepCC_TRUE@ then mv -f "ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_session_plugin.Tpo" "ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_session_plugin.Plo"; else rm -f "ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl_session_plugin.Tpo"; exit 1; fi +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='ldap/servers/plugins/replication/repl_session_plugin.c' object='ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCC_FALSE@ $(LIBTOOL) --tag=CC --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libreplication_plugin_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o ldap/servers/plugins/replication/libreplication_plugin_la-repl_session_plugin.lo `test -f 'ldap/servers/plugins/replication/repl_session_plugin.c' || echo '$(srcdir)/'`ldap/servers/plugins/replication/repl_session_plugin.c + ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmt.lo: ldap/servers/plugins/replication/repl5_agmt.c @am__fastdepCC_TRUE@ $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(libreplication_plugin_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmt.lo -MD -MP -MF ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl5_agmt.Tpo -c -o ldap/servers/plugins/replication/libreplication_plugin_la-repl5_agmt.lo `test -f 'ldap/servers/plugins/replication/repl5_agmt.c' || echo '$(srcdir)/'`ldap/servers/plugins/replication/repl5_agmt.c @am__fastdepCC_TRUE@ $(am__mv) ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl5_agmt.Tpo ldap/servers/plugins/replication/$(DEPDIR)/libreplication_plugin_la-repl5_agmt.Plo diff --git a/ldap/servers/plugins/replication/repl-session-plugin.h b/ldap/servers/plugins/replication/repl-session-plugin.h new file mode 100644 index 0000000..1c684af --- /dev/null +++ b/ldap/servers/plugins/replication/repl-session-plugin.h @@ -0,0 +1,119 @@ +/** BEGIN COPYRIGHT BLOCK + * This Program is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation; version 2 of the License. + * + * This Program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place, Suite 330, Boston, MA 02111-1307 USA. + * + * In addition, as a special exception, Red Hat, Inc. gives You the additional + * right to link the code of this Program with code not covered under the GNU + * General Public License ("Non-GPL Code") and to distribute linked combinations + * including the two, subject to the limitations in this paragraph. Non-GPL Code + * permitted under this exception must only link to the code of this Program + * through those well defined interfaces identified in the file named EXCEPTION + * found in the source code files (the "Approved Interfaces"). The files of + * Non-GPL Code may instantiate templates or use macros or inline functions from + * the Approved Interfaces without causing the resulting work to be covered by + * the GNU General Public License. Only Red Hat, Inc. may make changes or + * additions to the list of Approved Interfaces. You must obey the GNU General + * Public License in all respects for all of the Program code and other code used + * in conjunction with the Program except the Non-GPL Code covered by this + * exception. If you modify this file, you may extend this exception to your + * version of the file, but you are not obligated to do so. If you do not wish to + * provide this exception without modification, you must delete this exception + * statement from your version and license this file solely under the GPL without + * exception. + * + * + * Copyright (C) 2010 Red Hat, Inc. + * All rights reserved. + * END COPYRIGHT BLOCK **/ +#ifndef REPL_SESSION_PLUGIN_PUBLIC_API +#define REPL_SESSION_PLUGIN_PUBLIC_API + +#ifdef HAVE_CONFIG_H +# include <config.h> +#endif + +#include "slapi-plugin.h" + +/* + * Replication Session plug-in API + */ +#define REPL_SESSION_v1_0_GUID "210D7559-566B-41C6-9B03-5523BDF30880" + +/* + * This callback is called when a replication agreement is created. + * The repl_subtree from the agreement is read-only. + * The callback can allocate some private data to return. If so + * the callback must define a repl_session_plugin_destroy_agmt_cb + * so that the private data can be freed. This private data is passed + * to other callback functions on a master as the void *cookie argument. + */ +typedef void * (*repl_session_plugin_agmt_init_cb)(const Slapi_DN *repl_subtree); +#define REPL_SESSION_PLUGIN_AGMT_INIT_CB 1 + +/* + * Callbacks called when acquiring a replica + * + * The pre and post callbacks are called on the sending (master) side. + * The receive and reply callbacks are called on the receiving (replica) + * side. + * + * Data can be exchanged between the sending and receiving sides using + * these callbacks by using the data_guid and data parameters. The data + * guid is used as an identifier to confirm the data type. Your callbacks + * that receive data must consult the data_guid before attempting to read + * the data parameter. This allows you to confirm that the same replication + * session plug-in is being used on both sides before making assumptions + * about the format of the data. The callbacks use these parameters as + * follows: + * + * pre - send data to replica + * recv - receive data from master + * reply - send data to master + * post - receive data from replica + * + * The memory used by data_guid and data should be allocated in the pre + * and reply callbacks. The replication plug-in is responsible for + * freeing this memory, so they should not be free'd in the callbacks. + * + * The return value of the callbacks should be 0 to allow replication + * to continue. A non-0 return value will cause the replication session + * to be abandoned, causing the master to go into incremental backoff + * mode. + */ +typedef int (*repl_session_plugin_pre_acquire_cb)(void *cookie, const Slapi_DN *repl_subtree, + int is_total, char **data_guid, struct berval **data); +#define REPL_SESSION_PLUGIN_PRE_ACQUIRE_CB 2 + +typedef int (*repl_session_plugin_reply_acquire_cb)(const char *repl_subtree, int is_total, + char **data_guid, struct berval **data); +#define REPL_SESSION_PLUGIN_REPLY_ACQUIRE_CB 3 + +typedef int (*repl_session_plugin_post_acquire_cb)(void *cookie, const Slapi_DN *repl_subtree, + int is_total, const char *data_guid, const struct berval *data); +#define REPL_SESSION_PLUGIN_POST_ACQUIRE_CB 4 + +typedef int (*repl_session_plugin_recv_acquire_cb)(const char *repl_subtree, int is_total, + const char *data_guid, const struct berval *data); +#define REPL_SESSION_PLUGIN_RECV_ACQUIRE_CB 5 + +/* + * Callbacks called when the agreement is destroyed. + * + * The replication subtree from the agreement is passed in. + * This is read only. + * The plugin must define this function to free the cookie allocated + * in the init function, if any. + */ +typedef void (*repl_session_plugin_destroy_agmt_cb)(void *cookie, const Slapi_DN *repl_subtree); +#define REPL_SESSION_PLUGIN_DESTROY_AGMT_CB 6 + +#endif /* REPL_SESSION_PLUGIN_PUBLIC_API */ diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index 6be21ce..9b33ded 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -32,7 +32,7 @@ * * * Copyright (C) 2001 Sun Microsystems, Inc. Used by permission. - * Copyright (C) 2005 Red Hat, Inc. + * Copyright (C) 2010 Red Hat, Inc. * All rights reserved. * END COPYRIGHT BLOCK **/ @@ -88,6 +88,11 @@ * because we need a handy way to spot the difference between a pre-7.1 and post-7.0 * consumer at the supplier */ #define REPL_NSDS71_REPLICATION_ENTRY_REQUEST_OID "2.16.840.1.113730.3.5.9" +/* DS9.0 introduces replication session callbacks that can send/receive + * arbitrary data when starting a replication session. This requires a + * new set of start and response extops. */ +#define REPL_START_NSDS90_REPLICATION_REQUEST_OID "2.16.840.1.113730.3.5.12" +#define REPL_NSDS90_REPLICATION_RESPONSE_OID "2.16.840.1.113730.3.5.13" /* DS 5.0 replication protocol error codes */ @@ -105,6 +110,7 @@ #define NSDS50_REPL_REPLICAID_ERROR 0x0B /* replicaID doesn't seem to be unique */ #define NSDS50_REPL_DISABLED 0x0C /* replica suffix is disabled */ #define NSDS50_REPL_UPTODATE 0x0D /* replica is uptodate */ +#define NSDS50_REPL_BACKOFF 0x0E /* replica wants master to go into backoff mode */ #define NSDS50_REPL_REPLICA_NO_RESPONSE 0xff /* No response received */ /* Protocol status */ @@ -203,8 +209,11 @@ int extop_noop(Slapi_PBlock *pb); struct berval *NSDS50StartReplicationRequest_new(const char *protocol_oid, const char *repl_root, char **extra_referrals, CSN *csn); struct berval *NSDS50EndReplicationRequest_new(char *repl_root); -int decode_repl_ext_response(struct berval *data, int *response_code, - struct berval ***ruv_bervals); +int decode_repl_ext_response(struct berval *bvdata, int *response_code, + struct berval ***ruv_bervals, char **data_guid, struct berval **data); +struct berval *NSDS90StartReplicationRequest_new(const char *protocol_oid, + const char *repl_root, char **extra_referrals, CSN *csn, + const char *data_guid, const struct berval *data); /* In repl5_total.c */ int multimaster_extop_NSDS50ReplicationEntry(Slapi_PBlock *pb); @@ -365,7 +374,9 @@ typedef enum CONN_SUPPORTS_DIRSYNC, CONN_DOES_NOT_SUPPORT_DIRSYNC, CONN_IS_WIN2K3, - CONN_NOT_WIN2K3 + CONN_NOT_WIN2K3, + CONN_SUPPORTS_DS90_REPL, + CONN_DOES_NOT_SUPPORT_DS90_REPL } ConnResult; Repl_Connection *conn_new(Repl_Agmt *agmt); ConnResult conn_connect(Repl_Connection *conn); @@ -389,6 +400,7 @@ void conn_start_linger(Repl_Connection *conn); void conn_cancel_linger(Repl_Connection *conn); ConnResult conn_replica_supports_ds5_repl(Repl_Connection *conn); ConnResult conn_replica_supports_ds71_repl(Repl_Connection *conn); +ConnResult conn_replica_supports_ds90_repl(Repl_Connection *conn); ConnResult conn_replica_is_readonly(Repl_Connection *conn); ConnResult conn_read_entry_attribute(Repl_Connection *conn, const char *dn, char *type, @@ -588,5 +600,17 @@ int windows_handle_modify_agreement(Repl_Agmt *ra, const char *type, Slapi_Entry void windows_agreement_delete(Repl_Agmt *ra); Repl_Connection *windows_conn_new(Repl_Agmt *agmt); +/* repl_session_plugin.c */ +void repl_session_plugin_init(); +void repl_session_plugin_call_init_agmt_cb(Repl_Agmt *ra); +int repl_session_plugin_call_pre_acquire_cb(const Repl_Agmt *ra, int is_total, + char **data_guid, struct berval **data); +int repl_session_plugin_call_post_acquire_cb(const Repl_Agmt *ra, int is_total, + const char *data_guid, const struct berval *data); +int repl_session_plugin_call_recv_acquire_cb(const char *repl_area, int is_total, + const char *data_guid, const struct berval *data); +int repl_session_plugin_call_reply_acquire_cb(const char *repl_area, int is_total, + char **data_guid, struct berval **data); +void repl_session_plugin_call_destroy_agmt_cb(const Repl_Agmt *ra); #endif /* _REPL5_H_ */ diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c index f60da02..890452d 100644 --- a/ldap/servers/plugins/replication/repl5_agmt.c +++ b/ldap/servers/plugins/replication/repl5_agmt.c @@ -133,7 +133,9 @@ typedef struct repl5agmt { to allow another supplier to send its updates - should be greater than busywaittime - if set to 0, this means do not pause */ - void *priv; /* private data, used for windows-specific agreement data */ + void *priv; /* private data, used for windows-specific agreement data + for sync agreements or for replication session plug-in + private data for normal replication agreements */ int agreement_type; } repl5agmt; @@ -381,6 +383,7 @@ agmt_new_from_entry(Slapi_Entry *e) else { ra->agreement_type = REPLICA_TYPE_MULTIMASTER; + repl_session_plugin_call_agmt_init_cb(ra); } @@ -487,6 +490,14 @@ agmt_delete(void **rap) LDAP_SCOPE_BASE, "(objectclass=*)", get_agmt_status); + /* + * Call the replication session cleanup callback. We + * need to do this before we free replarea. + */ + if (ra->agreement_type != REPLICA_TYPE_WINDOWS) { + repl_session_plugin_call_destroy_agmt_cb(ra); + } + /* slapi_ch_free accepts NULL pointer */ slapi_ch_free((void **)&(ra->hostname)); slapi_ch_free((void **)&(ra->binddn)); @@ -1929,13 +1940,13 @@ agmt_set_last_update_status (Repl_Agmt *ra, int ldaprc, int replrc, const char * } else if (replrc == NSDS50_REPL_DISABLED) { - PR_snprintf(ra->last_update_status, STATUS_LEN, "%d Total update aborted: " + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d Incremental update aborted: " "Replication agreement for %s\n can not be updated while the replica is disabled.\n" "(If the suffix is disabled you must enable it then restart the server for replication to take place).", replrc, ra->long_name ? ra->long_name : "a replica"); /* Log into the errors log, as "ra->long_name" is not accessible from the caller */ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "Total update aborted: Replication agreement for \"%s\" " + "Incremental update aborted: Replication agreement for \"%s\" " "can not be updated while the replica is disabled\n", ra->long_name ? ra->long_name : "a replica"); slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "(If the suffix is disabled you must enable it then restart the server for replication to take place).\n"); diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c index 927fb20..bd28518 100644 --- a/ldap/servers/plugins/replication/repl5_connection.c +++ b/ldap/servers/plugins/replication/repl5_connection.c @@ -79,6 +79,7 @@ typedef struct repl_connection int supports_ds50_repl; /* 1 if does, 0 if doesn't, -1 if not determined */ int supports_ds40_repl; /* 1 if does, 0 if doesn't, -1 if not determined */ int supports_ds71_repl; /* 1 if does, 0 if doesn't, -1 if not determined */ + int supports_ds90_repl; /* 1 if does, 0 if doesn't, -1 if not determined */ int linger_time; /* time in seconds to leave an idle connection open */ PRBool linger_active; Slapi_Eq_Context *linger_event; @@ -166,6 +167,7 @@ conn_new(Repl_Agmt *agmt) rpc->supports_ds40_repl = -1; rpc->supports_ds50_repl = -1; rpc->supports_ds71_repl = -1; + rpc->supports_ds90_repl = -1; rpc->linger_active = PR_FALSE; rpc->delete_after_linger = PR_FALSE; @@ -1170,6 +1172,7 @@ close_connection_internal(Repl_Connection *conn) conn->status = STATUS_DISCONNECTED; conn->supports_ds50_repl = -1; conn->supports_ds71_repl = -1; + conn->supports_ds90_repl = -1; /* do this last, to minimize the chance that another thread might read conn->state as not disconnected and attempt to use conn->ld */ @@ -1282,11 +1285,11 @@ conn_replica_supports_ds5_repl(Repl_Connection *conn) /* - * Determine if the remote replica supports DS 5.0 replication. + * Determine if the remote replica supports DS 7.1 replication. * Return codes: - * CONN_SUPPORTS_DS71_REPL - the remote replica suport DS5 replication + * CONN_SUPPORTS_DS71_REPL - the remote replica suport DS7.1 replication * CONN_DOES_NOT_SUPPORT_DS71_REPL - the remote replica does not - * support DS5 replication. + * support DS7.1 replication. * CONN_OPERATION_FAILED - it could not be determined if the remote * replica supports DS5 replication. * CONN_NOT_CONNECTED - no connection was active. @@ -1351,6 +1354,77 @@ conn_replica_supports_ds71_repl(Repl_Connection *conn) return return_value; } +/* + * Determine if the remote replica supports DS 9.0 replication. + * Return codes: + * CONN_SUPPORTS_DS90_REPL - the remote replica suport DS5 replication + * CONN_DOES_NOT_SUPPORT_DS90_REPL - the remote replica does not + * support DS9.0 replication. + * CONN_OPERATION_FAILED - it could not be determined if the remote + * replica supports DS9.0 replication. + * CONN_NOT_CONNECTED - no connection was active. + */ +ConnResult +conn_replica_supports_ds90_repl(Repl_Connection *conn) +{ + ConnResult return_value; + int ldap_rc; + + if (conn_connected(conn)) + { + if (conn->supports_ds90_repl == -1) { + LDAPMessage *res = NULL; + LDAPMessage *entry = NULL; + char *attrs[] = {"supportedcontrol", "supportedextension", NULL}; + + conn->status = STATUS_SEARCHING; + ldap_rc = ldap_search_ext_s(conn->ld, "", LDAP_SCOPE_BASE, + "(objectclass=*)", attrs, 0 /* attrsonly */, + NULL /* server controls */, NULL /* client controls */, + &conn->timeout, LDAP_NO_LIMIT, &res); + if (LDAP_SUCCESS == ldap_rc) + { + conn->supports_ds90_repl = 0; + entry = ldap_first_entry(conn->ld, res); + if (!attribute_string_value_present(conn->ld, entry, "supportedextension", REPL_START_NSDS90_REPLICATION_REQUEST_OID)) + { + return_value = CONN_DOES_NOT_SUPPORT_DS90_REPL; + } + else + { + conn->supports_ds90_repl = 1; + return_value = CONN_SUPPORTS_DS90_REPL; + } + } + else + { + if (IS_DISCONNECT_ERROR(ldap_rc)) + { + conn->last_ldap_error = ldap_rc; /* specific reason */ + conn_disconnect(conn); + return_value = CONN_NOT_CONNECTED; + } + else + { + return_value = CONN_OPERATION_FAILED; + } + } + if (NULL != res) + ldap_msgfree(res); + } + else + { + return_value = conn->supports_ds90_repl ? CONN_SUPPORTS_DS90_REPL : CONN_DOES_NOT_SUPPORT_DS90_REPL; + } + } + else + { + /* Not connected */ + return_value = CONN_NOT_CONNECTED; + } + return return_value; +} + /* Determine if the replica is read-only */ ConnResult conn_replica_is_readonly(Repl_Connection *conn) diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c index d999d3b..6475eb8 100644 --- a/ldap/servers/plugins/replication/repl5_inc_protocol.c +++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c @@ -1145,6 +1145,7 @@ repl5_inc_run(Private_Repl_Protocol *prp) else { rc = send_updates(prp, ruv, &num_changes_sent); + if (rc == UPDATE_NO_MORE_UPDATES) { dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_NO_MORE_UPDATES -> STATE_WAIT_CHANGES"); @@ -1202,6 +1203,7 @@ repl5_inc_run(Private_Repl_Protocol *prp) if (rc == UPDATE_TIMEOUT) { conn_disconnect(prp->conn); } + if (rc == UPDATE_NO_MORE_UPDATES && num_changes_sent > 0) { if (pausetime > 0) diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c index 2c4a581..9d8776a 100644 --- a/ldap/servers/plugins/replication/repl5_init.c +++ b/ldap/servers/plugins/replication/repl5_init.c @@ -86,6 +86,7 @@ void plugin_init_debug_level(int *level_ptr) static char *start_oid_list[] = { REPL_START_NSDS50_REPLICATION_REQUEST_OID, + REPL_START_NSDS90_REPLICATION_REQUEST_OID, NULL }; static char *start_name_list[] = { @@ -441,6 +442,9 @@ multimaster_start( Slapi_PBlock *pb ) if (!multimaster_started_flag) { + /* Get any registered replication session API */ + repl_session_plugin_init(); + /* Initialize thread private data for logging. Ignore if fails */ PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL); PR_NewThreadPrivateIndex (&thread_private_cache, NULL); diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h index f9be9de..10aa02b 100644 --- a/ldap/servers/plugins/replication/repl5_prot_private.h +++ b/ldap/servers/plugins/replication/repl5_prot_private.h @@ -71,8 +71,10 @@ typedef struct private_repl_protocol Repl_Agmt *agmt; Object *replica_object; void *private; - PRBool replica_acquired; + PRBool replica_acquired; int repl50consumer; /* Flag to tell us if this is a 5.0-style consumer we're talking to */ + int repl71consumer; /* Flag to tell us if this is a 7.1-style consumer we're talking to */ + int repl90consumer; /* Flag to tell us if this is a 9.0-style consumer we're talking to */ } Private_Repl_Protocol; extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new(); diff --git a/ldap/servers/plugins/replication/repl5_protocol_util.c b/ldap/servers/plugins/replication/repl5_protocol_util.c index 3fbc978..8e34ad5 100644 --- a/ldap/servers/plugins/replication/repl5_protocol_util.c +++ b/ldap/servers/plugins/replication/repl5_protocol_util.c @@ -176,8 +176,11 @@ acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv) } else { + CSN *current_csn = NULL; + /* we don't want the timer to go off in the middle of an operation */ conn_cancel_linger(conn); + /* Does the remote replica support the 5.0 protocol? */ crc = conn_replica_supports_ds5_repl(conn); if (CONN_DOES_NOT_SUPPORT_DS5_REPL == crc) @@ -188,237 +191,337 @@ acquire_replica(Private_Repl_Protocol *prp, char *prot_oid, RUV **ruv) { /* We don't know anything about the remote replica. Try again later. */ return_value = ACQUIRE_TRANSIENT_ERROR; + goto error; } - else + + /* Find out what level of replication the replica supports. */ + crc = conn_replica_supports_ds90_repl(conn); + if (CONN_DOES_NOT_SUPPORT_DS90_REPL == crc) { /* Does the remote replica support the 7.1 protocol? */ crc = conn_replica_supports_ds71_repl(conn); if (CONN_DOES_NOT_SUPPORT_DS71_REPL == crc) { + /* This is a pre-7.1 replica. */ prp->repl50consumer = 1; } - if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) + else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) { /* We don't know anything about the remote replica. Try again later. */ return_value = ACQUIRE_TRANSIENT_ERROR; - } else + goto error; + } + else { - CSN *current_csn = NULL; + /* This replica is later than 7.1, but pre-9.0. */ + prp->repl71consumer = 1; + } + } + else if (CONN_NOT_CONNECTED == crc || CONN_OPERATION_FAILED == crc) + { + /* We don't know anything about the remote replica. Try again later. */ + return_value = ACQUIRE_TRANSIENT_ERROR; + goto error; + } + else + { + /* This replica is a 9.0 or later replica. */ + prp->repl90consumer = 1; + } - /* Good to go. Start the protocol. */ + /* Good to go. Start the protocol. */ + + /* Obtain a current CSN */ + replarea_sdn = agmt_get_replarea(prp->agmt); + current_csn = get_current_csn(replarea_sdn); + if (NULL != current_csn) + { + struct berval *payload = NULL; - /* Obtain a current CSN */ - replarea_sdn = agmt_get_replarea(prp->agmt); - current_csn = get_current_csn(replarea_sdn); - if (NULL != current_csn) + if (prp->repl90consumer) + { + int is_total = 0; + char *data_guid = NULL; + struct berval *data = NULL; + + /* Check if this is a total or incremental update. */ + if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) { - struct berval *payload = NSDS50StartReplicationRequest_new( - prot_oid, slapi_sdn_get_ndn(replarea_sdn), - NULL /* XXXggood need to provide referral(s) */, current_csn); - /* JCMREPL - Need to extract the referrals from the RUV */ - csn_free(¤t_csn); - current_csn = NULL; - crc = conn_send_extended_operation(conn, - REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload, NULL /* update control */, NULL /* Message ID */); - if (CONN_OPERATION_SUCCESS != crc) - { - int operation, error; - conn_get_error(conn, &operation, &error); + is_total = 1; + } - /* Couldn't send the extended operation */ - return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to send a startReplication " - "extended operation to consumer (%s). Will retry later.\n", - agmt_get_long_name(prp->agmt), - error ? ldap_err2string(error) : "unknown error"); - } - /* Since the operation request is async, we need to wait for the response here */ - crc = conn_read_result_ex(conn,&retoid,&retdata,NULL,NULL,1); - ber_bvfree(payload); - payload = NULL; - /* Look at the response we got. */ - if (CONN_OPERATION_SUCCESS == crc) + /* Call pre-start replication session callback. This callback + * may have extra data to be sent to the replica. */ + if (repl_session_plugin_call_pre_acquire_cb(prp->agmt, is_total, + &data_guid, &data) == 0) { + payload = NSDS90StartReplicationRequest_new( + prot_oid, slapi_sdn_get_ndn(replarea_sdn), + NULL, current_csn, data_guid, data); + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + } else { + return_value = ACQUIRE_TRANSIENT_ERROR; + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + goto error; + } + } + else + { + payload = NSDS50StartReplicationRequest_new( + prot_oid, slapi_sdn_get_ndn(replarea_sdn), + NULL /* XXXggood need to provide referral(s) */, current_csn); + } + + /* JCMREPL - Need to extract the referrals from the RUV */ + csn_free(¤t_csn); + current_csn = NULL; + crc = conn_send_extended_operation(conn, + prp->repl90consumer ? REPL_START_NSDS90_REPLICATION_REQUEST_OID : + REPL_START_NSDS50_REPLICATION_REQUEST_OID, payload, + NULL /* update control */, NULL /* Message ID */); + if (CONN_OPERATION_SUCCESS != crc) + { + int operation, error; + conn_get_error(conn, &operation, &error); + + /* Couldn't send the extended operation */ + return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to send a startReplication " + "extended operation to consumer (%s). Will retry later.\n", + agmt_get_long_name(prp->agmt), + error ? ldap_err2string(error) : "unknown error"); + } + /* Since the operation request is async, we need to wait for the response here */ + crc = conn_read_result_ex(conn,&retoid,&retdata,NULL,NULL,1); + ber_bvfree(payload); + payload = NULL; + /* Look at the response we got. */ + if (CONN_OPERATION_SUCCESS == crc) + { + /* + * Extop was processed. Look at extop response to see if we're + * permitted to go ahead. + */ + int extop_result; + char *data_guid = NULL; + struct berval *data = NULL; + + int extop_rc = decode_repl_ext_response(retdata, &extop_result, + &ruv_bervals, &data_guid, + &data); + + if (0 == extop_rc) + { + prp->last_acquire_response_code = extop_result; + switch (extop_result) { - /* - * Extop was processed. Look at extop response to see if we're - * permitted to go ahead. - */ - int extop_result; - int extop_rc = decode_repl_ext_response(retdata, &extop_result, - &ruv_bervals); - if (0 == extop_rc) + /* XXXggood handle other error codes here */ + case NSDS50_REPL_INTERNAL_ERROR: + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: " + "an internal error occurred on the remote replica. " + "Replication is aborting.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; + break; + case NSDS50_REPL_PERMISSION_DENIED: + /* Not allowed to send updates */ + { + char *repl_binddn = agmt_get_binddn(prp->agmt); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: permission denied. " + "The bind dn \"%s\" does not have permission to " + "supply replication updates to the replica. " + "Will retry later.\n", + agmt_get_long_name(prp->agmt), repl_binddn); + slapi_ch_free((void **)&repl_binddn); + return_value = ACQUIRE_TRANSIENT_ERROR; + break; + } + case NSDS50_REPL_NO_SUCH_REPLICA: + /* There is no such replica on the consumer */ + { + Slapi_DN *repl_root = agmt_get_replarea(prp->agmt); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: there is no " + "replicated area \"%s\" on the consumer server. " + "Replication is aborting.\n", + agmt_get_long_name(prp->agmt), + slapi_sdn_get_dn(repl_root)); + slapi_sdn_free(&repl_root); + return_value = ACQUIRE_FATAL_ERROR; + break; + } + case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW: + /* Large clock skew between the consumer and the supplier */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: " + "Excessive clock skew between the supplier and " + "the consumer. Replication is aborting.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; + break; + case NSDS50_REPL_DECODING_ERROR: + /* We sent something the replica couldn't understand. */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: " + "the consumer was unable to decode the " + "startReplicationRequest extended operation sent by the " + "supplier. Replication is aborting.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; + break; + case NSDS50_REPL_REPLICA_BUSY: + /* Someone else is updating the replica. Try later. */ + /* if acquire_replica is called for replica + initialization, log REPLICA_BUSY, too */ + if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, + prot_oid) == 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: " + "the replica is currently being updated" + "by another supplier.\n", + agmt_get_long_name(prp->agmt)); + } + else /* REPL_NSDS50_INCREMENTAL_PROTOCOL_OID */ + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: Unable to acquire replica: " + "the replica is currently being updated" + "by another supplier. Will try later\n", + agmt_get_long_name(prp->agmt)); + } + return_value = ACQUIRE_REPLICA_BUSY; + break; + case NSDS50_REPL_LEGACY_CONSUMER: + /* remote replica is a legacy consumer */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: the replica " + "is supplied by a legacy supplier. " + "Replication is aborting.\n", agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; + break; + case NSDS50_REPL_REPLICAID_ERROR: + /* remote replica detected a duplicate ReplicaID */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to aquire replica: the replica " + "has the same Replica ID as this one. " + "Replication is aborting.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; + break; + case NSDS50_REPL_BACKOFF: + /* A replication sesssion hook on the replica + * wants us to go into backoff mode. */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to acquire replica: " + "the replica instructed us to go into " + "backoff mode. Will retry later.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_TRANSIENT_ERROR; + break; + case NSDS50_REPL_REPLICA_READY: + /* Call any registered replication session post + * acquire callback if we are dealing with a 9.0 + * style replica. We want to bail on sending + * updates if the return value is non-0. */ + if (prp->repl90consumer) { - prp->last_acquire_response_code = extop_result; - switch (extop_result) + int is_total = 0; + + /* Check if this is a total or incremental update. */ + if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, prot_oid) == 0) { - /* XXXggood handle other error codes here */ - case NSDS50_REPL_INTERNAL_ERROR: - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: " - "an internal error occurred on the remote replica. " - "Replication is aborting.\n", - agmt_get_long_name(prp->agmt)); - return_value = ACQUIRE_FATAL_ERROR; - break; - case NSDS50_REPL_PERMISSION_DENIED: - /* Not allowed to send updates */ - { - char *repl_binddn = agmt_get_binddn(prp->agmt); - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: permission denied. " - "The bind dn \"%s\" does not have permission to " - "supply replication updates to the replica. " - "Will retry later.\n", - agmt_get_long_name(prp->agmt), repl_binddn); - slapi_ch_free((void **)&repl_binddn); - return_value = ACQUIRE_TRANSIENT_ERROR; - break; - } - case NSDS50_REPL_NO_SUCH_REPLICA: - /* There is no such replica on the consumer */ - { - Slapi_DN *repl_root = agmt_get_replarea(prp->agmt); - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: there is no " - "replicated area \"%s\" on the consumer server. " - "Replication is aborting.\n", - agmt_get_long_name(prp->agmt), - slapi_sdn_get_dn(repl_root)); - slapi_sdn_free(&repl_root); - return_value = ACQUIRE_FATAL_ERROR; - break; - } - case NSDS50_REPL_EXCESSIVE_CLOCK_SKEW: - /* Large clock skew between the consumer and the supplier */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: " - "Excessive clock skew between the supplier and " - "the consumer. Replication is aborting.\n", - agmt_get_long_name(prp->agmt)); - return_value = ACQUIRE_FATAL_ERROR; - break; - case NSDS50_REPL_DECODING_ERROR: - /* We sent something the replica couldn't understand. */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: " - "the consumer was unable to decode the " - "startReplicationRequest extended operation sent by the " - "supplier. Replication is aborting.\n", - agmt_get_long_name(prp->agmt)); - return_value = ACQUIRE_FATAL_ERROR; - break; - case NSDS50_REPL_REPLICA_BUSY: - /* Someone else is updating the replica. Try later. */ - /* if acquire_replica is called for replica - initialization, log REPLICA_BUSY, too */ - if (strcmp(REPL_NSDS50_TOTAL_PROTOCOL_OID, - prot_oid) == 0) - { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: " - "the replica is currently being updated" - "by another supplier.\n", - agmt_get_long_name(prp->agmt)); - } - else /* REPL_NSDS50_INCREMENTAL_PROTOCOL_OID */ - { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, - "%s: Unable to acquire replica: " - "the replica is currently being updated" - "by another supplier. Will try later\n", - agmt_get_long_name(prp->agmt)); - } - return_value = ACQUIRE_REPLICA_BUSY; - break; - case NSDS50_REPL_LEGACY_CONSUMER: - /* remote replica is a legacy consumer */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to acquire replica: the replica " - "is supplied by a legacy supplier. " - "Replication is aborting.\n", agmt_get_long_name(prp->agmt)); - return_value = ACQUIRE_FATAL_ERROR; - break; - case NSDS50_REPL_REPLICAID_ERROR: - /* remote replica detected a duplicate ReplicaID */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to aquire replica: the replica " - "has the same Replica ID as this one. " - "Replication is aborting.\n", - agmt_get_long_name(prp->agmt)); - return_value = ACQUIRE_FATAL_ERROR; - break; - case NSDS50_REPL_REPLICA_READY: - /* We've acquired the replica. */ - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, - "%s: Replica was successfully acquired.\n", - agmt_get_long_name(prp->agmt)); - /* Parse the update vector */ - if (NULL != ruv_bervals && NULL != ruv) - { - if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS) - { - /* Couldn't parse the update vector */ - *ruv = NULL; - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Warning: acquired replica, " - "but could not parse update vector. " - "The replica must be reinitialized.\n", - agmt_get_long_name(prp->agmt)); - } - } - - /* Save consumer's RUV in the replication agreement. - It is used by the changelog trimming code */ - if (ruv && *ruv) - agmt_set_consumer_ruv (prp->agmt, *ruv); - - return_value = ACQUIRE_SUCCESS; + is_total = 1; + } + + if (repl_session_plugin_call_post_acquire_cb(prp->agmt, is_total, data_guid, data)) + { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + return_value = ACQUIRE_TRANSIENT_ERROR; break; - default: - return_value = ACQUIRE_FATAL_ERROR; } + + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; } - else + + /* We've acquired the replica. */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "%s: Replica was successfully acquired.\n", + agmt_get_long_name(prp->agmt)); + /* Parse the update vector */ + if (NULL != ruv_bervals && NULL != ruv) { - /* Couldn't parse the response */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to parse the response to the " - "startReplication extended operation. " - "Replication is aborting.\n", - agmt_get_long_name(prp->agmt)); - prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR; - return_value = ACQUIRE_FATAL_ERROR; + if (ruv_init_from_bervals(ruv_bervals, ruv) != RUV_SUCCESS) + { + /* Couldn't parse the update vector */ + *ruv = NULL; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Warning: acquired replica, " + "but could not parse update vector. " + "The replica must be reinitialized.\n", + agmt_get_long_name(prp->agmt)); + } } - } - else - { - int operation, error; - conn_get_error(conn, &operation, &error); - /* Couldn't send the extended operation */ - return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to receive the response for a startReplication " - "extended operation to consumer (%s). Will retry later.\n", - agmt_get_long_name(prp->agmt), - error ? ldap_err2string(error) : "unknown error"); + /* Save consumer's RUV in the replication agreement. + It is used by the changelog trimming code */ + if (ruv && *ruv) + agmt_set_consumer_ruv (prp->agmt, *ruv); + + return_value = ACQUIRE_SUCCESS; + break; + default: + return_value = ACQUIRE_FATAL_ERROR; } } else { - /* Couldn't get a current CSN */ + /* Couldn't parse the response */ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, - "%s: Unable to obtain current CSN. " - "Replication is aborting.\n", + "%s: Unable to parse the response to the " + "startReplication extended operation. " + "Replication is aborting.\n", agmt_get_long_name(prp->agmt)); + prp->last_acquire_response_code = NSDS50_REPL_INTERNAL_ERROR; return_value = ACQUIRE_FATAL_ERROR; } } + else + { + int operation, error; + conn_get_error(conn, &operation, &error); + + /* Couldn't send the extended operation */ + return_value = ACQUIRE_TRANSIENT_ERROR; /* XXX right return value? */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to receive the response for a startReplication " + "extended operation to consumer (%s). Will retry later.\n", + agmt_get_long_name(prp->agmt), + error ? ldap_err2string(error) : "unknown error"); + } + } + else + { + /* Couldn't get a current CSN */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "%s: Unable to obtain current CSN. " + "Replication is aborting.\n", + agmt_get_long_name(prp->agmt)); + return_value = ACQUIRE_FATAL_ERROR; } } -/* error: */ +error: if (NULL != ruv_bervals) ber_bvecfree(ruv_bervals); if (NULL != replarea_sdn) @@ -497,6 +600,8 @@ release_replica(Private_Repl_Protocol *prp) struct berval **ruv_bervals = NULL; /* Shouldn't actually be returned */ int extop_result; int extop_rc = 0; + char *data_guid = NULL; + struct berval *data = NULL; /* Check the message id's match */ if (sent_message_id != sent_message_id) @@ -509,8 +614,16 @@ release_replica(Private_Repl_Protocol *prp) error ? ldap_err2string(error) : "unknown error"); } + /* We need to pass data_guid and data in even though they + * are not used here. We will free them anyway in case they + * are used in the future. */ extop_rc = decode_repl_ext_response(retdata, &extop_result, - (struct berval ***)&ruv_bervals); + (struct berval ***)&ruv_bervals, &data_guid, &data); + + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + if (0 == extop_rc) { if (NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED == extop_result) diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c index 7bd6e25..8e26f47 100644 --- a/ldap/servers/plugins/replication/repl5_tot_protocol.c +++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c @@ -397,6 +397,7 @@ repl5_tot_run(Private_Repl_Protocol *prp) slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Beginning total update of replica " "\"%s\".\n", agmt_get_long_name(prp->agmt)); + pb = slapi_pblock_new (); /* RMREPL - need to send schema here */ diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c index c47ea93..e3ad242 100644 --- a/ldap/servers/plugins/replication/repl_extop.c +++ b/ldap/servers/plugins/replication/repl_extop.c @@ -102,10 +102,12 @@ done: return rc; } +/* The data_guid and data parameters should only be set if we + * are talking with a 9.0 replica. */ static struct berval * -create_NSDS50ReplicationExtopPayload(const char *protocol_oid, +create_ReplicationExtopPayload(const char *protocol_oid, const char *repl_root, char **extra_referrals, CSN *csn, - int send_end) + int send_end, const char *data_guid, const struct berval *data) { struct berval *req_data = NULL; BerElement *tmp_bere = NULL; @@ -209,6 +211,15 @@ create_NSDS50ReplicationExtopPayload(const char *protocol_oid, } } + /* If we have data to send to a 9.0 style replica, set it here. */ + if (data_guid && data) { + if (ber_printf(tmp_bere, "sO", data_guid, data) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + } + if (ber_printf(tmp_bere, "}") == -1) { rc = LDAP_ENCODING_ERROR; @@ -255,14 +266,23 @@ struct berval * NSDS50StartReplicationRequest_new(const char *protocol_oid, const char *repl_root, char **extra_referrals, CSN *csn) { - return(create_NSDS50ReplicationExtopPayload(protocol_oid, - repl_root, extra_referrals, csn, 0)); + return(create_ReplicationExtopPayload(protocol_oid, + repl_root, extra_referrals, csn, 0, 0, 0)); +} + +struct berval * +NSDS90StartReplicationRequest_new(const char *protocol_oid, + const char *repl_root, char **extra_referrals, CSN *csn, + const char *data_guid, const struct berval *data) +{ + return(create_ReplicationExtopPayload(protocol_oid, + repl_root, extra_referrals, csn, 0, data_guid, data)); } struct berval * NSDS50EndReplicationRequest_new(char *repl_root) { - return(create_NSDS50ReplicationExtopPayload(NULL, repl_root, NULL, NULL, 1)); + return(create_ReplicationExtopPayload(NULL, repl_root, NULL, NULL, 1, 0, 0)); } static int @@ -292,14 +312,15 @@ done: } /* - * Decode an NSDS50 Start Replication Request extended + * Decode an NSDS50 or NSDS90 Start Replication Request extended * operation. Returns 0 on success, -1 on decoding error. * The caller is responsible for freeing protocol_oid, - * repl_root, referrals, and csn. + * repl_root, referrals, csn, data_guid, and data. */ static int decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, - RUV **supplier_ruv, char ***extra_referrals, char **csnstr) + RUV **supplier_ruv, char ***extra_referrals, char **csnstr, + char **data_guid, struct berval **data, int *is90) { char *extop_oid = NULL; struct berval *extop_value = NULL; @@ -307,19 +328,20 @@ decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, ber_len_t len; int rc = 0; - PR_ASSERT (pb && protocol_oid && repl_root && supplier_ruv && extra_referrals && csnstr); + PR_ASSERT (pb && protocol_oid && repl_root && supplier_ruv && extra_referrals && csnstr && data_guid && data); - *protocol_oid = NULL; - *repl_root = NULL; - *supplier_ruv = NULL; - *extra_referrals = NULL; - *csnstr = NULL; + *protocol_oid = NULL; + *repl_root = NULL; + *supplier_ruv = NULL; + *extra_referrals = NULL; + *csnstr = NULL; slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid); slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value); if (NULL == extop_oid || - strcmp(extop_oid, REPL_START_NSDS50_REPLICATION_REQUEST_OID) != 0 || + ((strcmp(extop_oid, REPL_START_NSDS50_REPLICATION_REQUEST_OID) != 0) && + (strcmp(extop_oid, REPL_START_NSDS90_REPLICATION_REQUEST_OID) != 0)) || NULL == extop_value) { /* bogus */ @@ -327,6 +349,16 @@ decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, goto free_and_return; } + /* Set a flag to let the caller know if this is a 9.0 style start extop */ + if (strcmp(extop_oid, REPL_START_NSDS90_REPLICATION_REQUEST_OID) == 0) + { + *is90 = 1; + } + else + { + *is90 = 0; + } + if ((tmp_bere = ber_init(extop_value)) == NULL) { rc = -1; @@ -349,12 +381,12 @@ decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, goto free_and_return; } - /* get supplier's ruv */ - if (decode_ruv (tmp_bere, supplier_ruv) == -1) - { - rc = -1; - goto free_and_return; - } + /* get supplier's ruv */ + if (decode_ruv (tmp_bere, supplier_ruv) == -1) + { + rc = -1; + goto free_and_return; + } /* Get the optional set of referral URLs */ if (ber_peek_tag(tmp_bere, &len) == LBER_SET) @@ -365,10 +397,30 @@ decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, goto free_and_return; } } - /* Get the optional CSN */ + /* Get the CSN */ + if (ber_get_stringa(tmp_bere, csnstr) == LBER_ERROR) + { + rc = -1; + goto free_and_return; + } + /* Get the optional replication session callback data. */ if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) { - if (ber_get_stringa(tmp_bere, csnstr) == LBER_ERROR) + if (ber_get_stringa(tmp_bere, data_guid) == LBER_ERROR) + { + rc = -1; + goto free_and_return; + } + /* If a data_guid was specified, data must be specified as well. */ + if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) + { + if (ber_get_stringal(tmp_bere, data) == LBER_ERROR) + { + rc = -1; + goto free_and_return; + } + } + else { rc = -1; goto free_and_return; @@ -469,16 +521,19 @@ free_and_return: /* - * Decode an NSDS50ReplicationResponse extended response. - * The extended response just contains a sequence that contains: + * Decode an NSDS50ReplicationResponse or NSDS90ReplicationResponse + * extended response. The extended response just contains a sequence + * that contains: * 1) An integer response code * 2) An optional array of bervals representing the consumer * replica's update vector + * 3) An optional data guid and data string if this is a 9.0 + * style response * Returns 0 on success, or -1 if the response could not be parsed. */ int -decode_repl_ext_response(struct berval *data, int *response_code, - struct berval ***ruv_bervals) +decode_repl_ext_response(struct berval *bvdata, int *response_code, + struct berval ***ruv_bervals, char **data_guid, struct berval **data) { BerElement *tmp_bere = NULL; int return_value = 0; @@ -486,7 +541,8 @@ decode_repl_ext_response(struct berval *data, int *response_code, PR_ASSERT(NULL != response_code); PR_ASSERT(NULL != ruv_bervals); - if (NULL == data || NULL == response_code || NULL == ruv_bervals) + if (NULL == bvdata || NULL == response_code || NULL == ruv_bervals || + NULL == data_guid || NULL == data) { return_value = -1; } @@ -495,7 +551,7 @@ decode_repl_ext_response(struct berval *data, int *response_code, ber_len_t len; ber_int_t temp_response_code = 0; *ruv_bervals = NULL; - if ((tmp_bere = ber_init(data)) == NULL) + if ((tmp_bere = ber_init(bvdata)) == NULL) { return_value = -1; } @@ -505,14 +561,24 @@ decode_repl_ext_response(struct berval *data, int *response_code, } else if (ber_peek_tag(tmp_bere, &len) == LBER_SEQUENCE) { - if (ber_scanf(tmp_bere, "{V}}", ruv_bervals) == LBER_ERROR) + if (ber_scanf(tmp_bere, "{V}", ruv_bervals) == LBER_ERROR) + { + return_value = -1; + } + } + /* Check for optional data from replication session callback */ + if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) + { + if (ber_scanf(tmp_bere, "aO}", data_guid, data) == LBER_ERROR) { return_value = -1; } - } else if (ber_scanf(tmp_bere, "}") == LBER_ERROR) + } + else if (ber_scanf(tmp_bere, "}") == LBER_ERROR) { return_value = -1; } + *response_code = (int)temp_response_code; } if (0 != return_value) @@ -561,17 +627,20 @@ multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb) Slapi_DN *bind_sdn = NULL; char *bind_dn = NULL; Object *ruv_object = NULL; - RUV *supplier_ruv = NULL; + RUV *supplier_ruv = NULL; PRUint64 connid = 0; int opid = 0; PRBool isInc = PR_FALSE; /* true if incremental update */ char *locking_purl = NULL; /* the supplier contacting us */ char *current_purl = NULL; /* the supplier which already has exclusive access */ char locking_session[24]; + char *data_guid = NULL; + struct berval *data = NULL; + int is90 = 0; /* Decode the extended operation */ if (decode_startrepl_extop(pb, &protocol_oid, &repl_root, &supplier_ruv, - &referrals, &replicacsnstr) == -1) + &referrals, &replicacsnstr, &data_guid, &data, &is90) == -1) { response = NSDS50_REPL_DECODING_ERROR; goto send_response; @@ -602,6 +671,20 @@ multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb) /* Verify that we know about this replication protocol OID */ if (strcmp(protocol_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0) { + if (repl_session_plugin_call_recv_acquire_cb(repl_root, 0 /* is_total == FALSE */, + data_guid, data)) + { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + response = NSDS50_REPL_BACKOFF; + goto send_response; + } else { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + } + /* Stash info that this is an incremental update session */ connext->repl_protocol_version = REPL_PROTOCOL_50_INCREMENTAL; slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, @@ -611,6 +694,20 @@ multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb) } else if (strcmp(protocol_oid, REPL_NSDS50_TOTAL_PROTOCOL_OID) == 0) { + if (repl_session_plugin_call_recv_acquire_cb(repl_root, 1 /* is_total == TRUE */, + data_guid, data)) + { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + response = NSDS50_REPL_DISABLED; + goto send_response; + } else { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + } + /* Stash info that this is a total update session */ if (NULL != connext) { @@ -896,12 +993,15 @@ send_response: /* Don't log replica busy as errors - these are almost always not errors - use the replication monitoring tools to determine if a replica is not converging, then look for pathological replica - busy errors by turning on the replication log level */ - if (response == NSDS50_REPL_REPLICA_BUSY) { + busy errors by turning on the replication log level. We also + don't want to log replica backoff as an error, as that response + is only used when a replication session hook wants a master to + go into incremental backoff mode. */ + if ((response == NSDS50_REPL_REPLICA_BUSY) || (response == NSDS50_REPL_BACKOFF)) { resp_log_level = SLAPI_LOG_REPL; } - slapi_log_error (resp_log_level, repl_plugin_name, + slapi_log_error (resp_log_level, repl_plugin_name, "conn=%" NSPRIu64 " op=%d replica=\"%s\": " "Unable to acquire replica: error: %s%s\n", connid, opid, @@ -910,7 +1010,20 @@ send_response: /* enable tombstone reap again since the total update failed */ replica_set_tombstone_reap_stop(replica, PR_FALSE); - } + } + + /* Call any registered replica session reply callback. We + * want to reject the updates if the return value is non-0. */ + if (repl_session_plugin_call_reply_acquire_cb(replica ? + slapi_sdn_get_ndn(replica_get_root(replica)) : "", + ((isInc == PR_TRUE) ? 0 : 1), &data_guid, &data)) + { + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + response = NSDS50_REPL_BACKOFF; + } + /* Send the response */ if ((resp_bere = der_alloc()) == NULL) { @@ -921,19 +1034,41 @@ send_response: { ber_printf(resp_bere, "{V}", ruv_bervals); } + /* Add extra data from replication session callback if necessary */ + if (is90 && data_guid && data) + { + ber_printf(resp_bere, "sO", data_guid, data); + } + ber_printf(resp_bere, "}"); ber_flatten(resp_bere, &resp_bval); - slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID); + + if (is90) + { + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS90_REPLICATION_RESPONSE_OID); + } + else + { + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID); + } + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval); slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "conn=%" NSPRIu64 " op=%d repl=\"%s\": " - "StartNSDS50ReplicationRequest: response=%d rc=%d\n", + "%s: response=%d rc=%d\n", connid, opid, repl_root, - response, rc); + is90 ? "StartNSDS90ReplicationRequest" : + "StartNSDS50ReplicationRequest", response, rc); slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL); return_value = SLAPI_PLUGIN_EXTENDED_SENT_RESULT; + /* Free any data allocated by the replication + * session reply callback. */ + slapi_ch_free_string(&data_guid); + ber_bvfree(data); + data = NULL; + slapi_ch_free_string(¤t_purl); /* protocol_oid */ @@ -943,11 +1078,11 @@ send_response: /* repl_root */ slapi_ch_free((void **)&repl_root); - /* supplier's ruv */ - if (supplier_ruv) - { - ruv_destroy (&supplier_ruv); - } + /* supplier's ruv */ + if (supplier_ruv) + { + ruv_destroy (&supplier_ruv); + } /* referrals */ slapi_ch_free((void **)&referrals); diff --git a/ldap/servers/plugins/replication/repl_session_plugin.c b/ldap/servers/plugins/replication/repl_session_plugin.c new file mode 100644 index 0000000..0e5b6ce --- /dev/null +++ b/ldap/servers/plugins/replication/repl_session_plugin.c @@ -0,0 +1,186 @@ +/** BEGIN COPYRIGHT BLOCK + * This Program is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation; version 2 of the License. + * + * This Program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place, Suite 330, Boston, MA 02111-1307 USA. + * + * In addition, as a special exception, Red Hat, Inc. gives You the additional + * right to link the code of this Program with code not covered under the GNU + * General Public License ("Non-GPL Code") and to distribute linked combinations + * including the two, subject to the limitations in this paragraph. Non-GPL Code + * permitted under this exception must only link to the code of this Program + * through those well defined interfaces identified in the file named EXCEPTION + * found in the source code files (the "Approved Interfaces"). The files of + * Non-GPL Code may instantiate templates or use macros or inline functions from + * the Approved Interfaces without causing the resulting work to be covered by + * the GNU General Public License. Only Red Hat, Inc. may make changes or + * additions to the list of Approved Interfaces. You must obey the GNU General + * Public License in all respects for all of the Program code and other code used + * in conjunction with the Program except the Non-GPL Code covered by this + * exception. If you modify this file, you may extend this exception to your + * version of the file, but you are not obligated to do so. If you do not wish to + * provide this exception without modification, you must delete this exception + * statement from your version and license this file solely under the GPL without + * exception. + * + * + * Copyright (C) 2010 Red Hat, Inc. + * All rights reserved. + * END COPYRIGHT BLOCK **/ + +/* repl_session_plugin.c */ + +#include "repl.h" +#include "repl5.h" +#include "slap.h" +#include "slapi-plugin.h" +#include "repl-session-plugin.h" + +/* an array of function pointers */ +static void **_ReplSessionAPI = NULL; + +void +repl_session_plugin_init() +{ + /* If the function pointer array is null, get the functions. + * We will only grab the api once. */ + if((NULL == _ReplSessionAPI) && + (slapi_apib_get_interface(REPL_SESSION_v1_0_GUID, &_ReplSessionAPI) || + (NULL == _ReplSessionAPI))) { + LDAPDebug1Arg( LDAP_DEBUG_PLUGIN, + "<-- repl_session_plugin_init -- no replication session" + " plugin API registered for GUID [%s] -- end\n", + REPL_SESSION_v1_0_GUID); + } + + return; +} + +void +repl_session_plugin_call_agmt_init_cb(Repl_Agmt *ra) +{ + void *cookie = NULL; + Slapi_DN *replarea = NULL; + repl_session_plugin_agmt_init_cb initfunc = NULL; + + LDAPDebug0Args( LDAP_DEBUG_PLUGIN, "--> repl_session_plugin_call_agmt_init_cb -- begin\n"); + + initfunc = (repl_session_plugin_agmt_init_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_AGMT_INIT_CB]; + if (initfunc) { + replarea = agmt_get_replarea(ra); + cookie = (*initfunc)(replarea); + slapi_sdn_free(&replarea); + } + + agmt_set_priv(ra, cookie); + + LDAPDebug0Args( LDAP_DEBUG_PLUGIN, "<-- repl_session_plugin_call_agmt_init_cb -- end\n"); + + return; +} + +int +repl_session_plugin_call_pre_acquire_cb(const Repl_Agmt *ra, int is_total, + char **data_guid, struct berval **data) +{ + int rc = 0; + Slapi_DN *replarea = NULL; + + repl_session_plugin_pre_acquire_cb thefunc = + (_ReplSessionAPI && _ReplSessionAPI[REPL_SESSION_PLUGIN_PRE_ACQUIRE_CB]) ? + (repl_session_plugin_pre_acquire_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_PRE_ACQUIRE_CB] : + NULL; + + if (thefunc) { + replarea = agmt_get_replarea(ra); + rc = (*thefunc)(agmt_get_priv(ra), replarea, is_total, + data_guid, data); + slapi_sdn_free(&replarea); + } + + return rc; +} + +int +repl_session_plugin_call_post_acquire_cb(const Repl_Agmt *ra, int is_total, + const char *data_guid, const struct berval *data) +{ + int rc = 0; + Slapi_DN *replarea = NULL; + + repl_session_plugin_post_acquire_cb thefunc = + (_ReplSessionAPI && _ReplSessionAPI[REPL_SESSION_PLUGIN_POST_ACQUIRE_CB]) ? + (repl_session_plugin_post_acquire_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_POST_ACQUIRE_CB] : + NULL; + + if (thefunc) { + replarea = agmt_get_replarea(ra); + rc = (*thefunc)(agmt_get_priv(ra), replarea, + is_total, data_guid, data); + slapi_sdn_free(&replarea); + } + + return rc; +} + +int +repl_session_plugin_call_recv_acquire_cb(const char *repl_area, int is_total, + const char *data_guid, const struct berval *data) +{ + int rc = 0; + + repl_session_plugin_recv_acquire_cb thefunc = + (_ReplSessionAPI && _ReplSessionAPI[REPL_SESSION_PLUGIN_RECV_ACQUIRE_CB]) ? + (repl_session_plugin_recv_acquire_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_RECV_ACQUIRE_CB] : + NULL; + + if (thefunc) { + rc = (*thefunc)(repl_area, is_total, data_guid, data); + } + + return rc; +} + +int +repl_session_plugin_call_reply_acquire_cb(const char *repl_area, int is_total, + char **data_guid, struct berval **data) +{ + int rc = 0; + + repl_session_plugin_reply_acquire_cb thefunc = + (_ReplSessionAPI && _ReplSessionAPI[REPL_SESSION_PLUGIN_REPLY_ACQUIRE_CB]) ? + (repl_session_plugin_reply_acquire_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_REPLY_ACQUIRE_CB] : + NULL; + + if (thefunc) { + rc = (*thefunc)(repl_area, is_total, data_guid, data); + } + + return rc; +} + +void +repl_session_plugin_call_destroy_agmt_cb(const Repl_Agmt *ra) +{ + Slapi_DN *replarea = NULL; + + repl_session_plugin_destroy_agmt_cb thefunc = + (_ReplSessionAPI && _ReplSessionAPI[REPL_SESSION_PLUGIN_DESTROY_AGMT_CB]) ? + (repl_session_plugin_destroy_agmt_cb)_ReplSessionAPI[REPL_SESSION_PLUGIN_DESTROY_AGMT_CB] : + NULL; + + if (thefunc) { + replarea = agmt_get_replarea(ra); + (*thefunc)(agmt_get_priv(ra), replarea); + slapi_sdn_free(&replarea); + } + + return; +} diff --git a/ldap/servers/plugins/replication/test_repl_session_plugin.c b/ldap/servers/plugins/replication/test_repl_session_plugin.c new file mode 100644 index 0000000..f6a425a --- /dev/null +++ b/ldap/servers/plugins/replication/test_repl_session_plugin.c @@ -0,0 +1,335 @@ +/** BEGIN COPYRIGHT BLOCK + * This Program is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software + * Foundation; version 2 of the License. + * + * This Program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along with + * this Program; if not, write to the Free Software Foundation, Inc., 59 Temple + * Place, Suite 330, Boston, MA 02111-1307 USA. + * + * In addition, as a special exception, Red Hat, Inc. gives You the additional + * right to link the code of this Program with code not covered under the GNU + * General Public License ("Non-GPL Code") and to distribute linked combinations + * including the two, subject to the limitations in this paragraph. Non-GPL Code + * permitted under this exception must only link to the code of this Program + * through those well defined interfaces identified in the file named EXCEPTION + * found in the source code files (the "Approved Interfaces"). The files of + * Non-GPL Code may instantiate templates or use macros or inline functions from + * the Approved Interfaces without causing the resulting work to be covered by + * the GNU General Public License. Only Red Hat, Inc. may make changes or + * additions to the list of Approved Interfaces. You must obey the GNU General + * Public License in all respects for all of the Program code and other code used + * in conjunction with the Program except the Non-GPL Code covered by this + * exception. If you modify this file, you may extend this exception to your + * version of the file, but you are not obligated to do so. If you do not wish to + * provide this exception without modification, you must delete this exception + * statement from your version and license this file solely under the GPL without + * exception. + * + * + * Copyright (C) 2010 Red Hat, Inc. + * All rights reserved. + * END COPYRIGHT BLOCK **/ + +#include "slapi-plugin.h" +#include "repl-session-plugin.h" +#include <string.h> + +#define REPL_SESSION_v1_0_GUID "210D7559-566B-41C6-9B03-5523BDF30880" + +static char *test_repl_session_plugin_name = "test_repl_session_api"; + +/* + * Plugin identifiers + */ +static Slapi_PluginDesc test_repl_session_pdesc = { + "test-repl-session-plugin", + "Test Vendor", + "1.0", + "test replication session plugin" +}; + +static Slapi_ComponentId *test_repl_session_plugin_id = NULL; + + +/* + * Replication Session Callbacks + */ +/* + * This is called on a master when a replication agreement is + * initialized at startup. A cookie can be allocated at this + * time which is passed to other callbacks on the master side. + */ +static void * +test_repl_session_plugin_agmt_init_cb(const Slapi_DN *repl_subtree) +{ + char *cookie = NULL; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_init_cb() called for suffix \"%s\".\n", + slapi_sdn_get_ndn(repl_subtree)); + + /* allocate a string and set as the cookie */ + cookie = slapi_ch_smprintf("cookie test"); + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_init_cb(): Setting cookie: \"%s\".\n", + cookie); + + return cookie; +} + +/* + * This is called on a master when we are about to acquire a + * replica. This callback can allocate some extra data to + * be sent to the replica in the start replication request. + * This memory will be free'd by the replication plug-in + * after it is sent. A guid string must be set that is to + * be used by the receiving side to ensure that the data is + * from the same replication session plug-in. + * + * Returning non-0 will abort the replication session. This + * results in the master going into incremental backoff mode. + */ +static int +test_repl_session_plugin_pre_acquire_cb(void *cookie, const Slapi_DN *repl_subtree, + int is_total, char **data_guid, struct berval **data) +{ + int rc = 0; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_pre_acquire_cb() called for suffix \"%s\", " + "is_total: \"%s\", cookie: \"%s\".\n", slapi_sdn_get_ndn(repl_subtree), + is_total ? "TRUE" : "FALSE", cookie ? (char *)cookie : "NULL"); + + /* allocate some data to be sent to the replica */ + *data_guid = slapi_ch_smprintf("test-guid"); + *data = (struct berval *)slapi_ch_malloc(sizeof(struct berval)); + (*data)->bv_val = slapi_ch_smprintf("test-data"); + (*data)->bv_len = strlen((*data)->bv_val) + 1; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_pre_acquire_cb() sending data: guid: \"%s\" data: \"%s\".\n", + *data_guid, (*data)->bv_val); + + return rc; +} + +/* + * This is called on a replica when we are about to reply to + * a start replication request from a master. This callback + * can allocate some extra data to be sent to the master in + * the start replication response. This memory will be free'd + * by the replication plug-in after it is sent. A guid string + * must be set that is to be used by the receiving side to ensure + * that the data is from the same replication session plug-in. + * + * Returning non-0 will abort the replication session. This + * results in the master going into incremental backoff mode. + */ +static int +test_repl_session_plugin_reply_acquire_cb(const char *repl_subtree, int is_total, + char **data_guid, struct berval **data) +{ + int rc = 0; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_reply_acquire_cb() called for suffix \"%s\", is_total: \"%s\".\n", + repl_subtree, is_total ? "TRUE" : "FALSE"); + + /* allocate some data to be sent to the master */ + *data_guid = slapi_ch_smprintf("test-reply-guid"); + *data = (struct berval *)slapi_ch_malloc(sizeof(struct berval)); + (*data)->bv_val = slapi_ch_smprintf("test-reply-data"); + (*data)->bv_len = strlen((*data)->bv_val) + 1; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_reply_acquire_cb() sending data: guid: \"%s\" data: \"%s\".\n", + *data_guid, (*data)->bv_val); + + return rc; +} + +/* + * This is called on a master when it receives a reply to a + * start replication extop that we sent to a replica. Any + * extra data sent by a replication session callback on the + * replica will be set here as the data parameter. The data_guid + * should be checked first to ensure that the sending side is + * using the same replication session plug-in before making any + * assumptions about the contents of the data parameter. You + * should not free data_guid or data. The replication plug-in + * will take care of freeing this memory. + * + * Returning non-0 will abort the replication session. This + * results in the master going into incremental backoff mode. + */ +static int +test_repl_session_plugin_post_acquire_cb(void *cookie, const Slapi_DN *repl_subtree, int is_total, + const char *data_guid, const struct berval *data) +{ + int rc = 0; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_post_acquire_cb() called for suffix \"%s\", " + "is_total: \"%s\" cookie: \"%s\".\n", slapi_sdn_get_ndn(repl_subtree), + is_total ? "TRUE" : "FALSE", cookie ? (char *)cookie : "NULL"); + + /* log any extra data that was sent from the replica */ + if (data_guid && data) { + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_post_acquire_cb() received data: guid: \"%s\" data: \"%s\".\n", + data_guid, data->bv_val); + } + + return rc; +} + +/* + * This is called on a replica when it receives a start replication + * extended operation from a master. If the replication session + * plug-in on the master sent any extra data, it will be set here + * as the data parameter. The data_guid should be checked first to + * ensure that the sending side is using the same replication session + * plug-in before making any assumptions about the contents of the + * data parameter. You should not free data_guid or data. The + * replication plug-in will take care of freeing this memory. + * + * Returning non-0 will abort the replication session. This + * results in the master going into incremental backoff mode. + */ +static int +test_repl_session_plugin_recv_acquire_cb(const char *repl_subtree, int is_total, + const char *data_guid, const struct berval *data) +{ + int rc = 0; + + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_recv_acquire_cb() called for suffix \"%s\", is_total: \"%s\".\n", + repl_subtree, is_total ? "TRUE" : "FALSE"); + + /* log any extra data that was sent from the master */ + if (data_guid && data) { + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_recv_acquire_cb() received data: guid: \"%s\" data: \"%s\".\n", + data_guid, data->bv_val); + } + + return rc; +} + +/* + * This is called on a master when a replication agreement is + * destroyed. Any cookie allocated when the agreement was initialized + * should be free'd here. + */ +static void +test_repl_session_plugin_destroy_cb(void *cookie, const Slapi_DN *repl_subtree) +{ + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "test_repl_session_plugin_destroy_cb() called for suffix \"%s\".\n", + slapi_sdn_get_ndn(repl_subtree)); + + /* free cookie */ + slapi_ch_free_string((char **)&cookie); + + return; +} + +/* + * Callback list for registering API + */ +static void *test_repl_session_api[] = { + NULL, /* reserved for api broker use, must be zero */ + test_repl_session_plugin_agmt_init_cb, + test_repl_session_plugin_pre_acquire_cb, + test_repl_session_plugin_reply_acquire_cb, + test_repl_session_plugin_post_acquire_cb, + test_repl_session_plugin_recv_acquire_cb, + test_repl_session_plugin_destroy_cb +}; + +/* + * Plug-in framework functions + */ +static int +test_repl_session_plugin_start(Slapi_PBlock *pb) +{ + slapi_log_error(SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "--> test_repl_session_plugin_start -- begin\n"); + + slapi_log_error(SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_start -- end\n"); + return 0; +} + +static int +test_repl_session_plugin_close(Slapi_PBlock *pb) +{ + slapi_log_error(SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "--> test_repl_session_plugin_close -- begin\n"); + + slapi_apib_unregister(REPL_SESSION_v1_0_GUID); + + slapi_log_error(SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_close -- end\n"); + return 0; +} + +int test_repl_session_plugin_init(Slapi_PBlock *pb) +{ + slapi_log_error(SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "--> test_repl_session_plugin_init -- begin\n"); + + if ( slapi_pblock_set( pb, SLAPI_PLUGIN_VERSION, + SLAPI_PLUGIN_VERSION_01 ) != 0 || + slapi_pblock_set(pb, SLAPI_PLUGIN_START_FN, + (void *) test_repl_session_plugin_start ) != 0 || + slapi_pblock_set(pb, SLAPI_PLUGIN_CLOSE_FN, + (void *) test_repl_session_plugin_close ) != 0 || + slapi_pblock_set( pb, SLAPI_PLUGIN_DESCRIPTION, + (void *)&test_repl_session_pdesc ) != 0 ) + { + slapi_log_error( SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_init -- failed to register plugin -- end\n"); + return -1; + } + + if( slapi_apib_register(REPL_SESSION_v1_0_GUID, test_repl_session_api) ) { + slapi_log_error( SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_start -- failed to register repl_session api -- end\n"); + return -1; + } + + + /* Retrieve and save the plugin identity to later pass to + internal operations */ + if (slapi_pblock_get(pb, SLAPI_PLUGIN_IDENTITY, &test_repl_session_plugin_id) != 0) { + slapi_log_error(SLAPI_LOG_FATAL, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_init -- failed to retrieve plugin identity -- end\n"); + return -1; + } + + slapi_log_error( SLAPI_LOG_PLUGIN, test_repl_session_plugin_name, + "<-- test_repl_session_plugin_init -- end\n"); + return 0; +} + +/* +dn: cn=Test Replication Session API,cn=plugins,cn=config +objectclass: top +objectclass: nsSlapdPlugin +objectclass: extensibleObject +cn: Test Replication Session API +nsslapd-pluginpath: libtestreplsession-plugin +nsslapd-plugininitfunc: test_repl_session_plugin_init +nsslapd-plugintype: preoperation +nsslapd-pluginenabled: on +nsslapd-plugin-depends-on-type: database +nsslapd-plugin-depends-on-named: Multimaster Replication Plugin +*/ + -- 1.6.2.5
-- 389-devel mailing list 389-devel@xxxxxxxxxxxxxxxxxxxxxxx https://admin.fedoraproject.org/mailman/listinfo/389-devel