forked from colinmarc/hdfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
187 lines (157 loc) · 4.35 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package hdfs
import (
"io"
"io/ioutil"
"os"
"os/user"
hdfs "github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/rpc"
)
// A Client represents a connection to an HDFS cluster
type Client struct {
namenode *rpc.NamenodeConnection
defaults *hdfs.FsServerDefaultsProto
}
// ClientOptions represents the configurable options for a client.
type ClientOptions struct {
Addresses []string
Namenode *rpc.NamenodeConnection
User string
}
// Username returns the value of HADOOP_USER_NAME in the environment, or
// the current system user if it is not set.
func Username() (string, error) {
username := os.Getenv("HADOOP_USER_NAME")
if username != "" {
return username, nil
}
currentUser, err := user.Current()
if err != nil {
return "", err
}
return currentUser.Username, nil
}
// NewClient returns a connected Client for the given options, or an error if
// the client could not be created.
func NewClient(options ClientOptions) (*Client, error) {
var err error
if options.User == "" {
options.User, err = Username()
if err != nil {
return nil, err
}
}
if options.Addresses == nil || len(options.Addresses) == 0 {
options.Addresses, err = getNameNodeFromConf()
if err != nil {
return nil, err
}
}
if options.Namenode == nil {
options.Namenode, err = rpc.NewNamenodeConnectionWithOptions(
rpc.NamenodeConnectionOptions{
Addresses: options.Addresses,
User: options.User,
},
)
if err != nil {
return nil, err
}
}
return &Client{namenode: options.Namenode}, nil
}
// New returns a connected Client, or an error if it can't connect. The user
// will be the user the code is running under. If address is an empty string
// it will try and get the namenode address from the hadoop configuration
// files.
func New(address string) (*Client, error) {
options := ClientOptions{}
if address != "" {
options.Addresses = []string{address}
}
return NewClient(options)
}
// getNameNodeFromConf returns namenodes from the system Hadoop configuration.
func getNameNodeFromConf() ([]string, error) {
hadoopConf := LoadHadoopConf("")
namenodes, nnErr := hadoopConf.Namenodes()
if nnErr != nil {
return nil, nnErr
}
return namenodes, nil
}
// NewForUser returns a connected Client with the user specified, or an error if
// it can't connect.
//
// Deprecated: Use NewClient with ClientOptions instead.
func NewForUser(address string, user string) (*Client, error) {
return NewClient(ClientOptions{
Addresses: []string{address},
User: user,
})
}
// NewForConnection returns Client with the specified, underlying rpc.NamenodeConnection.
// You can use rpc.WrapNamenodeConnection to wrap your own net.Conn.
//
// Deprecated: Use NewClient with ClientOptions instead.
func NewForConnection(namenode *rpc.NamenodeConnection) *Client {
client, _ := NewClient(ClientOptions{Namenode: namenode})
return client
}
// ReadFile reads the file named by filename and returns the contents.
func (c *Client) ReadFile(filename string) ([]byte, error) {
f, err := c.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
return ioutil.ReadAll(f)
}
// CopyToLocal copies the HDFS file specified by src to the local file at dst.
// If dst already exists, it will be overwritten.
func (c *Client) CopyToLocal(src string, dst string) error {
remote, err := c.Open(src)
if err != nil {
return err
}
defer remote.Close()
local, err := os.Create(dst)
if err != nil {
return err
}
defer local.Close()
_, err = io.Copy(local, remote)
return err
}
// CopyToRemote copies the local file specified by src to the HDFS file at dst.
func (c *Client) CopyToRemote(src string, dst string) error {
local, err := os.Open(src)
if err != nil {
return err
}
defer local.Close()
remote, err := c.Create(dst)
if err != nil {
return err
}
defer remote.Close()
_, err = io.Copy(remote, local)
return err
}
func (c *Client) fetchDefaults() (*hdfs.FsServerDefaultsProto, error) {
if c.defaults != nil {
return c.defaults, nil
}
req := &hdfs.GetServerDefaultsRequestProto{}
resp := &hdfs.GetServerDefaultsResponseProto{}
err := c.namenode.Execute("getServerDefaults", req, resp)
if err != nil {
return nil, err
}
c.defaults = resp.GetServerDefaults()
return c.defaults, nil
}
// Close terminates all underlying socket connections to remote server.
func (c *Client) Close() error {
return c.namenode.Close()
}