ITKeyword,专注技术干货聚合推荐

注册 | 登录

Sqoop导入数据到Hadoop代理执行

yu616568 分享于 2015-04-13

推荐:Hadoop数据传输工具sqoop(四)Java远程调用Sqoop进行数据传输

1、Maven配置 <dependency> <groupId>org.apache.sqoop</groupId> <artifactId>sqoop</artifactId> <version>1.4.5</version></dependency> 2、Jav

2018阿里云全部产品优惠券(新购或升级都可以使用,强烈推荐)
领取地址https://promotion.aliyun.com/ntms/yunparter/invite.html

最近在做执行服务器,它根据用户输入的sqoop命令代理向hadoop提交任务执行,目前需要支持的数据源包括mysql、oracle以及公司自己的分布式数据库DDB,数据导入的目的地可以是HDFS或者hive表。

首先来讨论一下对hive的支持,hive是作为一个支持JDBC的数据库,它的数据分成两部分,元数据和数据,元数据保存在一个本地的数据库,例如嵌入式数据库derby或者mysql,主要是存储一些关于hive的数据库和表定义的一些信息(关于元数据库表需要补充一下,这些表的创建都是hive完成的,不需要我们预先创建,但是在mysql中需要将字符集设置为latin1,否则在hive操作的时候可能出现问题,参见 http://www.cnblogs.com/blueren/archive/2011/06/29/sir_001.html),数据保存在hadoop中,它支持将本地文件导入到hive中,其实hive在导入数据库的时候并不会解析数据库,而是是将文件存储在hadoop中,对于数据的来源,它只支持load这样的全量导入和批量导入的操作,并不支持一条条的insert操作和update操作,对hive的查询是利用hadoop的计算能力将select语句转换成一个或者多个hadoop任务,通过hadoop的计算得到SQL的执行结果。所以对于sqoop来说,导入到hive其实也是将数据导入到HDFS中,只不过hive需要对元数据进行操作,在sqoop中,导入数据到hive执行的操作和导入数据到HDFS大体上流程是差不多的,只不过导入到HDFS中是向hadoop提交一个job执行数据导入到HDFS的某个文件,而导入到hive的时候在将数据放到HDFS之后在生成一个hive的脚本,然后调用hive程序(或者直接调用命令行中的hive命令)使用"-f"参数来执行该文件,例如我将一个mysql中的表pf_sqoop导入到hive中的demo_blog_4表中,当前的hadoop用户是hive用户,该用户是代理intern用户执行的操作,生成的hive的脚本文件内容如下: CREATE TABLE `demo_blog_4` ( `jobid` BIGINT, `product` STRING, `email` STRING, `jobname` STRING, `jobtype` TINYINT, `stype` TINYINT, `dtype` TINYINT, `dir` STRING, `dburl` STRING, `tablename` STRING, `username` STRING, `password` STRING, `hdbname` STRING, `htable` STRING, `params` STRING, `cluster` STRING, `begin_time` BIGINT, `end_time` BIGINT, `duration` BIGINT, `status` TINYINT, `created_at` STRING, `updated_at` STRING) COMMENT 'Imported by sqoop on 2015/04/13 10:16:31' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS TEXTFILE;LOAD DATA INPATH 'hdfs://hp1/user/intern/pf_sqoop' OVERWRITE INTO TABLE `demo_blog_4`;<span style="font-family: Tahoma; background-color: rgb(255, 255, 255);">

</span>

可以看出,我们的数据库表的数据已经导入到HDFS的hdfs://hp1/user/intern/pf_sqoop路径下(这应该是/user/user_name/表名),然后执行创建hive表,再将该文件的数据导入到hive中,这里导入到hive表中是将已经存在hdfs中的文件再导入到hive的数据目录下。sqoop关于hive的参数有以下几个: --create-hive-table

//是否创建新表,覆盖原来的表定义 --hive-overwrite

//是否覆盖hive中已经存在的数据 --hive-import

//标记是否导入数据到hive --hive-home

//hive的主目录,必要的时候需要根据该配置找到hive命令 --hive-table

//执行hive的表名

接下来看一下导入数据到HDFS吧,首先我们使用sqoop来执行mysql和HDFS之间的导入和导出,sqoop对于mysql的支持还是比较好的,实验一下就能够成功了,使用的命令如下: sqoop import --table pf_sqoop --target-dir /user/intern/terry

--delete-target-dir

-m 1 --connect jdbc:mysql://ip:port/database?user=name\&password=passwd --input-fields-terminated-by ',',执行成功之后该目录/user/intern/terry下会创建导入的表数据的文件。--delete-target-dir参数意味着在生成数据之前将目标路径清空,也就意味着会覆盖之前的数据。 然后再用oracle,开始的时候我想执行oracle的driver吧(因为sqoop使用的默认的driver和我之前使用的不一样),于是添加了这样的参数--driver oracle.jdbc.driver.OracleDriver,但是结果出现了如下的错误: Beginning code generationExecuting SQL statement: SELECT t.* FROM test AS t WHERE 1=0Error executing statement: java.sql.SQLException: ORA-00933: SQL command not properly endedjava.sql.SQLException: ORA-00933: SQL command not properly ended

根据错误堆栈查看代码发现执行的操作是sqoop需要查看数据源中该表的表结构,然后根据该结果生成java代码,从日志中可以看到查看表结果的SQL是SELECT t.* FROM test AS t WHERE 1=0,但是使用jdbc对oracle进行测试发现这个SQL是不支持的(不支持表的as),查看代码发现在执行importTable的时候执行了如下的操作: jarFile = codeGenerator.generateORM(options, tableName);

该操作是根据表名生成jar文件,在该函数中调用getColumnTypes函数生成表的字段名和每一个字段的数据类型,该函数中执行如下: 1831

protected Map<String, Integer> getColumnTypes() throws IOException {1832

if (options.getCall() == null) {1833

return connManager.getColumnTypes(tableName, options.getSqlQuery());1834

} else {1835

return connManager.getColumnTypesForProcedure(options.getCall());1836

}1837

}

可以看出,真正执行的操作是在connManager中完成的,那么这个connManager是什么呢?发现在sqoop中有如下一个参数:--connection-manager

className这里指定的是连接管理类,开始以为是dbcp之类的呢,但是我错了,这里指定的是在sqoop内部的数据库操作类,如果不指定这个参数会使用哪个connManager呢?代码如下: 121

public ConnManager getManager(JobData data) throws IOException {122

com.cloudera.sqoop.SqoopOptions options = data.getSqoopOptions();123

String manualDriver = options.getDriverClassName();124

String managerClassName = options.getConnManagerClassName();125

<span style="color:#ff6666;"> //如果在命令中指定了driver但是没有指定connManager那么就使用通用的JDBC的connManager,也会输出warn日志</span>131

if (manualDriver != null && managerClassName == null) {132

LOG.warn("Parameter --driver is set to an explicit driver however"133

+ " appropriate connection manager is not being set (via"134

+ " --connection-manager). Sqoop is going to fall back to "135

+ GenericJdbcManager.class.getCanonicalName() + ". Please specify"136

+ " explicitly which connection manager should be used next time."137

);138

return new GenericJdbcManager(manualDriver, options);139

}140

<span style="color:#ff6666;"> //如果制定了connManager,直接创建该类对象,不存在则报错。</span>142

if (managerClassName != null){143

ConnManager connManager = null;144145

try {146

Class<ConnManager> cls = (Class<ConnManager>)147

Class.forName(managerClassName);148152

if (manualDriver == null) {153

Constructor<ConnManager> constructor =154

cls.getDeclaredConstructor(com.cloudera.sqoop.SqoopOptions.class);155

connManager = constructor.newInstance(options);156

} else {157

Constructor<ConnManager> constructor =158

cls.getDeclaredConstructor(String.class,159

com.cloudera.sqoop.SqoopOptions.class);160

connManager = constructor.newInstance(manualDriver, options);161

}162

} catch (ClassNotFoundException e) {163

LOG.error("Sqoop could not found specified connection manager class "164

+ managerClassName

+ ". Please check that you've specified the "165

+ "class correctly.");166

throw new IOException(e);167

} catch (NoSuchMethodException e) {168

LOG.error("Sqoop wasn't able to create connnection manager properly. "169

+ "Some of the connectors supports explicit --driver and some "170

+ "do not. Please try to either specify --driver or leave it out.");171

throw new IOException(e);172

} catch (Exception e) {173

LOG.error("Problem with bootstrapping connector manager:"174

+ managerClassName);175

LOG.error(e);176

throw new IOException(e);177

}178

return connManager;179

}180<span style="white-space:pre">

</span><span style="color:#ff0000;">//最后,也就是说既没有指定driver也没有指定connManager,就使用系统预定的以connManager的factory创建一个connManager</span>181

// Try all the available manager factories.182

for (ManagerFactory factory : factories) {183

LOG.debug("Trying ManagerFactory: " + factory.getClass().getName());184

ConnManager mgr = factory.accept(data);185

if (null != mgr) {186

LOG.debug("Instantiated ConnManager " + mgr.toString());187

return mgr;188

}189

}190191

throw new IOException("No manager for connect string: "192

+ data.getSqoopOptions().getConnectString());193

}

在创建connManager的时候首先查看参数中的--driver和--connection-manager,如果driver指定了,但是connManager没有存在,那么就创建了默认的jdbc的connManager—GenericJdbcManager,接下来再根据参数指定的connManager创建对象,如果没有指定再从factory中创建一个新的对象,factory一般使用的是默认的,默认的factory如下: 74

public static final String[] DEFAULT_FACTORY_CLASS_NAMES_ARR =75

{OraOopManagerFactory.class.getName(),76

DefaultManagerFactory.class.getName(), };

创建的时候是按照顺序来创建的,第一个factory一般我们用不到, 看一下DefaultManagerFactory是如何创建connManager的: 38

public ConnManager More ...accept(JobData data) {39

SqoopOptions options = data.getSqoopOptions();40 41

String scheme = extractScheme(options);42

if (null == scheme) {43

// We don't know if this is a mysql://, hsql://, etc.44

// Can't do anything with this.45

LOG.warn("Null scheme associated with connect string.");46

return null;47

}48 49

LOG.debug("Trying with scheme: " + scheme);50 51

if (scheme.equals("jdbc:mysql:")) {52

if (options.isDirect()) {53

return new DirectMySQLManager(options);54

} else {55

return new MySQLManager(options);56

}57

} else if (scheme.equals("jdbc:postgresql:")) {58

if (options.isDirect()) {59

return n

推荐:Sqoop导入关系数据库到Hive

Sqoop 是 apache 下用于 RDBMS 和 HDFS 互相导数据的工具。本文以 mysql 数据库为例,实现关系数据库导入到 hdfs 和 hive。 1. 安装 Sqoop 使用 rpm 安装即可。

ew DirectPostgresqlManager(options);60

} else {61

return new PostgresqlManager(options);62

}63

} else if (scheme.startsWith("jdbc:hsqldb:")) {64

return new HsqldbManager(options);65

} else if (scheme.startsWith("jdbc:oracle:")) {66

return new OracleManager(options);67

} else if (scheme.startsWith("jdbc:sqlserver:")) {68

return new SQLServerManager(options);69

} else if (scheme.startsWith("jdbc:db2:")) {70

return new Db2Manager(options);71

} else if (scheme.startsWith("jdbc:netezza:")) {72

if (options.isDirect()) {73

return new DirectNetezzaManager(options);74

} else {75

return new NetezzaManager(options);76

}77

} else if (scheme.startsWith("jdbc:cubrid:")) {78

return new CubridManager(options);79

} else {80

return null;81

}82

}

可以看出他是根据url的开头的字符来判断使用了哪一个数据库已创建对应的connManager,好了,回到之前出现的问题,由于我们在sqoop命令中指定了driver但是没有指定connManager,那么就在getManager函数中直接返回了默认的GenericJdbcManager对象,查看该对象执行的getColumnTypes函数,得到的sql如下: 107

protected String getColNamesQuery(String tableName) {108

// adding where clause to prevent loading a big table109

return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t WHERE 1=0";110

}

也就是oracle执行出错的那么sql语句,那么怎么才能解决这个问题呢,那就不指定driver了,或者同时也指定connection-manager参数,该参数需要设置为org.apache.sqoop.manager.OracleManager,顺便看一下这个connManager中怎么获取查看的sql的: 261

protected String More ...getColNamesQuery(String tableName) {262

// SqlManager uses "tableName AS t" which doesn't work in Oracle.263

String query =

"SELECT t.* FROM " + escapeTableName(tableName)264

+ " t WHERE 1=0";265 266

LOG.debug("Using column names query: " + query);267

return query;268

}

这个sql去掉在from中的AS,再次执行oracle导入到HDFS也就能够成功了,日志中查看执行的sql如下: Beginning code generationTime zone has been set to GMTExecuting SQL statement: SELECT t.* FROM TEST t WHERE 1=0 好了,搞定了mysql和oracle, 不过oracle的数据库名必须使用大写!!这个也需要注意一下。

最后看一下公司自己的数据库DDB吧,因为它是我们公司自己研发的,虽然支持jdbc操作,但是通过jdbc只能执行数据库的SELECT、UPDATE、INSERT、DELETE之类的操作,并不能支持对表的元数据的操作。jdbc的url格式如下:host:port?key=xxx&logdir=xxx,因为它的driver不同于mysql和oracle的driver,那么在执行sqoop命令的时候就必须指定--driver参数,但是connManager并不指定(sqoop中并没有适合ddb的connManager),根据创建connManager的代码它会创建GenericJdbcManager用于数据库的操作,但是执行的时候得到了如下的错误: MServer[new_lighttest_ddb457]监控线程启动完成Error executing statement: java.sql.SQLException: Not supported function!java.sql.SQLException: Not supported function!

at com.netease.backend.db.DBConnection.setTransactionIsolation(DBConnection.java:709)

at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:883)

at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)

at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:736)

at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:759)

原来在ddb中不支持setTransactionIsolation的操作,该操作是在makeConnection中执行的,它是用来创建一个数据库连接,由于这些代码是在sqoop中写死的,我们又不可能去修改sqoop的代码以满足自己特定的需求,这时候我想起来了指定connManager,但是指定哪个connManager呢?mysql的?肯定不可以啊,想来想去找不到合适的,这时候又想到了,既然所有的connManager都是通过继承的方式实现自己的特定的操作,然后公共的操作由父进程来完成,那么为什么我不能自己写一个connManager呢?它集成自GenericJdbcManager,将GenericJdbcManager中的某些ddb不支持的操作换成其他支持的方式实现不就可以了(相当于覆盖一些GenericJdbcManager的操作,其余的操作还都是调用GenericJdbcManager的方法),说干就干,实现了connManager之后使用参数--connection-manager指定这个connManager类,该类仅仅实现了makeConnection(创建一个数据库连接)、execute(执行一个sql查询)、getPrimaryKey(获取数据库表的主键),因为这些函数在import的时候会被调用,但是ddb不支持其中的某些操作。

实现了上面的类之后再次运行,果然能够获得表结构、生成jar包然后提交hadoop任务,但是该任务执行的过程中出现了如下的错误: Job job_1428578888362_0207 running in uber mode : false map 0% reduce 0%Task Id : attempt_1428578888362_0207_m_000000_0, Status : FAILEDError: java.lang.ClassNotFoundException: com.netease.backend.db.common.exceptions.SQLExceptionWithCauseat java.net.URLClassLoader$1.run(URLClassLoader.java:366)at java.net.URLClassLoader$1.run(URLClassLoader.java:355)at java.security.AccessController.doPrivileged(Native Method)at java.net.URLClassLoader.findClass(URLClassLoader.java:354)at java.lang.ClassLoader.loadClass(ClassLoader.java:425)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)at java.lang.ClassLoader.loadClass(ClassLoader.java:358)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:191)at org.apache.sqoop.mapreduce.db.DBConfiguration.getConnection(DBConfiguration.java:277)at org.apache.sqoop.mapreduce.db.DBInputFormat.getConnection(DBInputFormat.java:218)at org.apache.sqoop.mapreduce.db.DBInputFormat.setConf(DBInputFormat.java:165)at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:726)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:415)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

这里看到hadoop的任务已经提交执行了,但是每一次的attempt都失败,原因是找不到com.netease.backend.db.common.exceptions.SQLExceptionWithCause这个类,原来在hadoop执行的时候是由具体的taskTracker执行的,这里找不到该类应该是taskTracker无法加载它,但是它明明存在与我的classpath中的某一个jar包中(和ddb的driver不是同一个jar),难道在执行的时候不会讲这些jar放到hdfs中吗?

查看sqoop执行import时向hadoop提交任务时候的代码,在真正提交job之前会初始化环境,包括讲一些比较的jar上传到hadoop中,并设置到配置中,代码如下(org.apache.sqoop.mapreduce.ImportJobBase的runImport函数): 251

Job job = createJob(conf);252

try {253

// Set the external jar to use for the job.254

job.getConfiguration().set("mapred.jar", ormJarFile);255

if (options.getMapreduceJobName() != null) {256

job.setJobName(options.getMapreduceJobName());257

}258

<span style="color:#ff6666;">//配置job的一些参数</span>259

propagateOptionsToJob(job);260

configureInputFormat(job, tableName, tableClassName, splitByCol);261

configureOutputFormat(job, tableName, tableClassName);262

configureMapper(job, tableName, tableClassName);263

configureNumTasks(job); //将一些使用的jar文件加入到配置中264

cacheJars(job, getContext().getConnManager());265266

jobSetup(job);267

setJob(job);268

boolean success = runJob(job);269

if (!success) {270

throw new ImportException("Import job failed!");271

}272273

completeImport(job); 将相关的jar包上传并加入到配置是在cacheJars函数中完成的,如下: 145

Configuration conf = job.getConfiguration();146

FileSystem fs = FileSystem.getLocal(conf);147

Set<String> localUrls = new HashSet<String>();148149

addToCache(Jars.getSqoopJarPath(), fs, localUrls);150

if (null != mgr) {151

addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);152

addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);153

}

这里看到如果connManager不为空,那么会根据该类中配置的driver找到该driver所在的jar包的路径(getJarPathForClass该函数用来根据某一个类的class对象找到该类所在jar文件的路径),然后将connManager类的jar文件和driver的jar文件加入到localUrls中,在该函数的之后,将这些jar文件加入到配置中: 199

if (localUrls.isEmpty()) {200

return;201

}202203

// Add these to the 'tmpjars' array, which the MR JobSubmitter204

// will upload to HDFS and put in the DistributedCache libjars.205

String tmpjars = conf.get("tmpjars");206

StringBuilder sb = new StringBuilder();207

if (null != tmpjars) {208

sb.append(tmpjars);209

sb.append(",");210

}211

sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));212

conf.set("tmpjars", sb.toString());

将这些jar文件加入到hadoop的job的tmpjars配置中会在job提交的时候将这些文件提交到hadoop中,在每一个taskTracker执行的时候会得到这些jar文件,但是从这段代码中可以看到, 这里只能通过Jars.getJarPathForClass函数得到driver的jar文件和connManager的jar文件,但是不能得到他们依赖的其它jar,怎么办呢?我知道在hadoop中可以通过--libjar参数指定第三方的jar文件,但是这里我们又不能修改sqoop提交job的方式,也没有参数用于此类情况。

既然这里只会找到connManager和driver这两个类的jar文件,那么我将所有的依赖的jar文件都放到一个更大的jar文件里面不就可以了吗?这个办法虽然很笨但是足够解决问题,于是我又开始查看如何将一些jar合并成一个jar,我知道jar其实就是zip压缩的文件, 我采用的方法是将这些jar文件都解压到一个目录中,然后再对这个目录中的所有文件进行zip压缩,压缩完成之后将文件名修改成xxx.jar就可以了,实验发现这个方法的确可以合并jar,使用合并之后的jar文件再次启动sqoop的import,成功完成! 从这里我学习到了一些东西: 1、sqoop如果使用的 2、sqoop的import的具体执行流程,以及对不同的关系数据库的支持 3、sqoop是如何提交hadoop任务的 4、hadoop任务依赖第三方库的解决方案(合并成一个大的jar、--libjars参数)

推荐:利用SQOOP将数据从数据库导入到HDFS

基本使用 如下面这个shell脚本: #Oracle的连接字符串,其中包含了Oracle的地址,SID,和端口号 CONNECTURL=jdbc:oracle:thin:@20.135.60.21:1521:DWRAC2 #使用

        最近在做执行服务器,它根据用户输入的sqoop命令代理向hadoop提交任务执行,目前需要支持的数据源包括mysql、oracle以及公司自己的分布式数据库DDB,数据导入的目的地可以是HDFS或者hiv

相关阅读排行


用户评论

游客

相关内容推荐

最新文章

×

×

请激活账号

为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。

您的注册邮箱: 修改

重新发送激活邮件 进入我的邮箱

如果您没有收到激活邮件,请注意检查垃圾箱。